Merged fullstack_lsm_staging upto r3336
git-svn-id: https://hyracks.googlecode.com/svn/trunk/fullstack@3339 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix/pom.xml b/pregelix/pom.xml
index 0d2bdba..5c01e31 100644
--- a/pregelix/pom.xml
+++ b/pregelix/pom.xml
@@ -44,7 +44,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
- <version>2.13</version>
+ <version>2.12</version>
<configuration>
<forkMode>pertest</forkMode>
<argLine>-enableassertions -Djava.util.logging.config.file=${user.home}/logging.properties -Xdebug -Xrunjdwp:transport=dt_socket,server=y,address=8000,suspend=n ${jvm.extraargs}</argLine>
@@ -53,16 +53,6 @@
</plugins>
</build>
- <reporting>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-changelog-plugin</artifactId>
- <version>2.2</version>
- </plugin>
- </plugins>
- </reporting>
-
<distributionManagement>
<repository>
<id>hyracks-releases</id>
diff --git a/pregelix/pregelix-api/pom.xml b/pregelix/pregelix-api/pom.xml
index b8bfce9..85f6ea2 100644
--- a/pregelix/pregelix-api/pom.xml
+++ b/pregelix/pregelix-api/pom.xml
@@ -41,9 +41,10 @@
</configuration>
</plugin>
<plugin>
- <artifactId>maven-clean-plugin</artifactId>
- <version>2.5</version>
- <configuration>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-clean-plugin</artifactId>
+ <version>2.4.1</version>
+ <configuration>
<filesets>
<fileset>
<directory>.</directory>
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
index cd49184..e51d4bc 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
@@ -49,525 +49,524 @@
*/
@SuppressWarnings("rawtypes")
public abstract class Vertex<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
- implements Writable {
- private static long superstep = 0;
- /** Class-wide number of vertices */
- private static long numVertices = -1;
- /** Class-wide number of edges */
- private static long numEdges = -1;
- /** Vertex id */
- private I vertexId = null;
- /** Vertex value */
- private V vertexValue = null;
- /** Map of destination vertices and their edge values */
- private final List<Edge<I, E>> destEdgeList = new ArrayList<Edge<I, E>>();
- /** If true, do not do anymore computation on this vertex. */
- boolean halt = false;
- /** List of incoming messages from the previous superstep */
- private final List<M> msgList = new ArrayList<M>();
- /** map context */
- private static TaskAttemptContext context = null;
- /** a delegate for hyracks stuff */
- private VertexDelegate<I, V, E, M> delegate = new VertexDelegate<I, V, E, M>(
- this);
- /** this vertex is updated or not */
- private boolean updated = false;
- /** has outgoing messages */
- private boolean hasMessage = false;
- /** created new vertex */
- private boolean createdNewLiveVertex = false;
+ implements Writable {
+ private static long superstep = 0;
+ /** Class-wide number of vertices */
+ private static long numVertices = -1;
+ /** Class-wide number of edges */
+ private static long numEdges = -1;
+ /** Vertex id */
+ private I vertexId = null;
+ /** Vertex value */
+ private V vertexValue = null;
+ /** Map of destination vertices and their edge values */
+ private final List<Edge<I, E>> destEdgeList = new ArrayList<Edge<I, E>>();
+ /** If true, do not do anymore computation on this vertex. */
+ boolean halt = false;
+ /** List of incoming messages from the previous superstep */
+ private final List<M> msgList = new ArrayList<M>();
+ /** map context */
+ private static TaskAttemptContext context = null;
+ /** a delegate for hyracks stuff */
+ private VertexDelegate<I, V, E, M> delegate = new VertexDelegate<I, V, E, M>(this);
+ /** this vertex is updated or not */
+ private boolean updated = false;
+ /** has outgoing messages */
+ private boolean hasMessage = false;
+ /** created new vertex */
+ private boolean createdNewLiveVertex = false;
- /**
- * use object pool for re-using objects
- */
- private List<Edge<I, E>> edgePool = new ArrayList<Edge<I, E>>();
- private List<M> msgPool = new ArrayList<M>();
- private List<V> valuePool = new ArrayList<V>();
- private int usedEdge = 0;
- private int usedMessage = 0;
- private int usedValue = 0;
+ /**
+ * use object pool for re-using objects
+ */
+ private List<Edge<I, E>> edgePool = new ArrayList<Edge<I, E>>();
+ private List<M> msgPool = new ArrayList<M>();
+ private List<V> valuePool = new ArrayList<V>();
+ private int usedEdge = 0;
+ private int usedMessage = 0;
+ private int usedValue = 0;
- /**
- * The key method that users need to implement
- *
- * @param msgIterator
- * an iterator of incoming messages
- */
- public abstract void compute(Iterator<M> msgIterator);
+ /**
+ * The key method that users need to implement
+ *
+ * @param msgIterator
+ * an iterator of incoming messages
+ */
+ public abstract void compute(Iterator<M> msgIterator);
- /**
- * Add an edge for the vertex.
- *
- * @param targetVertexId
- * @param edgeValue
- * @return successful or not
- */
- public final boolean addEdge(I targetVertexId, E edgeValue) {
- Edge<I, E> edge = this.allocateEdge();
- edge.setDestVertexId(targetVertexId);
- edge.setEdgeValue(edgeValue);
- destEdgeList.add(edge);
- updated = true;
- return true;
- }
+ /**
+ * Add an edge for the vertex.
+ *
+ * @param targetVertexId
+ * @param edgeValue
+ * @return successful or not
+ */
+ public final boolean addEdge(I targetVertexId, E edgeValue) {
+ Edge<I, E> edge = this.allocateEdge();
+ edge.setDestVertexId(targetVertexId);
+ edge.setEdgeValue(edgeValue);
+ destEdgeList.add(edge);
+ updated = true;
+ return true;
+ }
- /**
- * Initialize a new vertex
- *
- * @param vertexId
- * @param vertexValue
- * @param edges
- * @param messages
- */
- public void initialize(I vertexId, V vertexValue, Map<I, E> edges,
- List<M> messages) {
- if (vertexId != null) {
- setVertexId(vertexId);
- }
- if (vertexValue != null) {
- setVertexValue(vertexValue);
- }
- destEdgeList.clear();
- if (edges != null && !edges.isEmpty()) {
- for (Map.Entry<I, E> entry : edges.entrySet()) {
- destEdgeList.add(new Edge<I, E>(entry.getKey(), entry
- .getValue()));
- }
- }
- if (messages != null && !messages.isEmpty()) {
- msgList.addAll(messages);
- }
- }
+ /**
+ * Initialize a new vertex
+ *
+ * @param vertexId
+ * @param vertexValue
+ * @param edges
+ * @param messages
+ */
+ public void initialize(I vertexId, V vertexValue, Map<I, E> edges, List<M> messages) {
+ if (vertexId != null) {
+ setVertexId(vertexId);
+ }
+ if (vertexValue != null) {
+ setVertexValue(vertexValue);
+ }
+ destEdgeList.clear();
+ if (edges != null && !edges.isEmpty()) {
+ for (Map.Entry<I, E> entry : edges.entrySet()) {
+ destEdgeList.add(new Edge<I, E>(entry.getKey(), entry.getValue()));
+ }
+ }
+ if (messages != null && !messages.isEmpty()) {
+ msgList.addAll(messages);
+ }
+ }
- /**
- * reset a vertex object: clear its internal states
- */
- public void reset() {
- usedEdge = 0;
- usedMessage = 0;
- usedValue = 0;
- updated = false;
- }
+ /**
+ * reset a vertex object: clear its internal states
+ */
+ public void reset() {
+ usedEdge = 0;
+ usedMessage = 0;
+ usedValue = 0;
+ updated = false;
+ }
- /**
- * Set the vertex id
- *
- * @param vertexId
- */
- public final void setVertexId(I vertexId) {
- this.vertexId = vertexId;
- delegate.setVertexId(vertexId);
- }
+ /**
+ * Set the vertex id
+ *
+ * @param vertexId
+ */
+ public final void setVertexId(I vertexId) {
+ this.vertexId = vertexId;
+ delegate.setVertexId(vertexId);
+ }
- /**
- * Get the vertex id
- *
- * @return vertex id
- */
- public final I getVertexId() {
- return vertexId;
- }
+ /**
+ * Get the vertex id
+ *
+ * @return vertex id
+ */
+ public final I getVertexId() {
+ return vertexId;
+ }
- /**
- * Get the vertex value
- *
- * @return the vertex value
- */
- public final V getVertexValue() {
- return vertexValue;
- }
+ /**
+ * Get the vertex value
+ *
+ * @return the vertex value
+ */
+ public final V getVertexValue() {
+ return vertexValue;
+ }
- /**
- * Set the vertex value
- *
- * @param vertexValue
- */
- public final void setVertexValue(V vertexValue) {
- this.vertexValue = vertexValue;
- this.updated = true;
- }
+ /**
+ * Set the vertex value
+ *
+ * @param vertexValue
+ */
+ public final void setVertexValue(V vertexValue) {
+ this.vertexValue = vertexValue;
+ this.updated = true;
+ }
- /***
- * Send a message to a specific vertex
- *
- * @param id
- * the receiver vertex id
- * @param msg
- * the message
- */
- public final void sendMsg(I id, M msg) {
- if (msg == null) {
- throw new IllegalArgumentException(
- "sendMsg: Cannot send null message to " + id);
- }
- delegate.sendMsg(id, msg);
- this.hasMessage = true;
- }
+ /***
+ * Send a message to a specific vertex
+ *
+ * @param id
+ * the receiver vertex id
+ * @param msg
+ * the message
+ */
+ public final void sendMsg(I id, M msg) {
+ if (msg == null) {
+ throw new IllegalArgumentException("sendMsg: Cannot send null message to " + id);
+ }
+ delegate.sendMsg(id, msg);
+ this.hasMessage = true;
+ }
- /**
- * Send a message to all direct outgoing neighbors
- *
- * @param msg
- * the message
- */
- public final void sendMsgToAllEdges(M msg) {
- if (msg == null) {
- throw new IllegalArgumentException(
- "sendMsgToAllEdges: Cannot send null message to all edges");
- }
- for (Edge<I, E> edge : destEdgeList) {
- sendMsg(edge.getDestVertexId(), msg);
- }
- }
+ /**
+ * Send a message to all direct outgoing neighbors
+ *
+ * @param msg
+ * the message
+ */
+ public final void sendMsgToAllEdges(M msg) {
+ if (msg == null) {
+ throw new IllegalArgumentException("sendMsgToAllEdges: Cannot send null message to all edges");
+ }
+ for (Edge<I, E> edge : destEdgeList) {
+ sendMsg(edge.getDestVertexId(), msg);
+ }
+ }
- /**
- * Vote to halt. Once all vertex vote to halt and no more messages, a
- * Pregelix job will terminate.
- */
- public final void voteToHalt() {
- halt = true;
- updated = true;
- }
+ /**
+ * Vote to halt. Once all vertex vote to halt and no more messages, a
+ * Pregelix job will terminate.
+ */
+ public final void voteToHalt() {
+ halt = true;
+ updated = true;
+ }
- /**
- * @return the vertex is halted (true) or not (false)
- */
- public final boolean isHalted() {
- return halt;
- }
+ /**
+ * Activate a halted vertex such that it is alive again.
+ */
+ public final void activate() {
+ halt = false;
+ updated = true;
+ }
- @Override
- final public void readFields(DataInput in) throws IOException {
- reset();
- if (vertexId == null)
- vertexId = BspUtils.<I> createVertexIndex(getContext()
- .getConfiguration());
- vertexId.readFields(in);
- delegate.setVertexId(vertexId);
- boolean hasVertexValue = in.readBoolean();
+ /**
+ * @return the vertex is halted (true) or not (false)
+ */
+ public final boolean isHalted() {
+ return halt;
+ }
- if (hasVertexValue) {
- vertexValue = allocateValue();
- vertexValue.readFields(in);
- delegate.setVertex(this);
- }
- destEdgeList.clear();
- long edgeMapSize = SerDeUtils.readVLong(in);
- for (long i = 0; i < edgeMapSize; ++i) {
- Edge<I, E> edge = allocateEdge();
- edge.setConf(getContext().getConfiguration());
- edge.readFields(in);
- addEdge(edge);
- }
- msgList.clear();
- long msgListSize = SerDeUtils.readVLong(in);
- for (long i = 0; i < msgListSize; ++i) {
- M msg = allocateMessage();
- msg.readFields(in);
- msgList.add(msg);
- }
- halt = in.readBoolean();
- updated = false;
- hasMessage = false;
- createdNewLiveVertex = false;
- }
+ @Override
+ final public void readFields(DataInput in) throws IOException {
+ reset();
+ if (vertexId == null)
+ vertexId = BspUtils.<I> createVertexIndex(getContext().getConfiguration());
+ vertexId.readFields(in);
+ delegate.setVertexId(vertexId);
+ boolean hasVertexValue = in.readBoolean();
- @Override
- public void write(DataOutput out) throws IOException {
- vertexId.write(out);
- out.writeBoolean(vertexValue != null);
- if (vertexValue != null) {
- vertexValue.write(out);
- }
- SerDeUtils.writeVLong(out, destEdgeList.size());
- for (Edge<I, E> edge : destEdgeList) {
- edge.write(out);
- }
- SerDeUtils.writeVLong(out, msgList.size());
- for (M msg : msgList) {
- msg.write(out);
- }
- out.writeBoolean(halt);
- }
+ if (hasVertexValue) {
+ vertexValue = allocateValue();
+ vertexValue.readFields(in);
+ delegate.setVertex(this);
+ }
+ destEdgeList.clear();
+ long edgeMapSize = SerDeUtils.readVLong(in);
+ for (long i = 0; i < edgeMapSize; ++i) {
+ Edge<I, E> edge = allocateEdge();
+ edge.setConf(getContext().getConfiguration());
+ edge.readFields(in);
+ addEdge(edge);
+ }
+ msgList.clear();
+ long msgListSize = SerDeUtils.readVLong(in);
+ for (long i = 0; i < msgListSize; ++i) {
+ M msg = allocateMessage();
+ msg.readFields(in);
+ msgList.add(msg);
+ }
+ halt = in.readBoolean();
+ updated = false;
+ hasMessage = false;
+ createdNewLiveVertex = false;
+ }
- /**
- * Get the list of incoming messages
- *
- * @return the list of messages
- */
- public List<M> getMsgList() {
- return msgList;
- }
+ @Override
+ public void write(DataOutput out) throws IOException {
+ vertexId.write(out);
+ out.writeBoolean(vertexValue != null);
+ if (vertexValue != null) {
+ vertexValue.write(out);
+ }
+ SerDeUtils.writeVLong(out, destEdgeList.size());
+ for (Edge<I, E> edge : destEdgeList) {
+ edge.write(out);
+ }
+ SerDeUtils.writeVLong(out, msgList.size());
+ for (M msg : msgList) {
+ msg.write(out);
+ }
+ out.writeBoolean(halt);
+ }
- /**
- * Get outgoing edge list
- *
- * @return a list of outgoing edges
- */
- public List<Edge<I, E>> getEdges() {
- return this.destEdgeList;
- }
+ /**
+ * Get the list of incoming messages
+ *
+ * @return the list of messages
+ */
+ public List<M> getMsgList() {
+ return msgList;
+ }
- @Override
- @SuppressWarnings("unchecked")
- public String toString() {
- Collections.sort(destEdgeList);
- StringBuffer edgeBuffer = new StringBuffer();
- edgeBuffer.append("(");
- for (Edge<I, E> edge : destEdgeList) {
- edgeBuffer.append(edge.getDestVertexId()).append(",");
- }
- edgeBuffer.append(")");
- return "Vertex(id=" + getVertexId() + ",value=" + getVertexValue()
- + ", edges=" + edgeBuffer + ")";
- }
+ /**
+ * Get outgoing edge list
+ *
+ * @return a list of outgoing edges
+ */
+ public List<Edge<I, E>> getEdges() {
+ return this.destEdgeList;
+ }
- /**
- * Get the number of outgoing edges
- *
- * @return the number of outging edges
- */
- public int getNumOutEdges() {
- return destEdgeList.size();
- }
+ @Override
+ @SuppressWarnings("unchecked")
+ public String toString() {
+ Collections.sort(destEdgeList);
+ StringBuffer edgeBuffer = new StringBuffer();
+ edgeBuffer.append("(");
+ for (Edge<I, E> edge : destEdgeList) {
+ edgeBuffer.append(edge.getDestVertexId()).append(",");
+ }
+ edgeBuffer.append(")");
+ return "Vertex(id=" + getVertexId() + ",value=" + getVertexValue() + ", edges=" + edgeBuffer + ")";
+ }
- /**
- * Pregelix internal use only
- *
- * @param writers
- */
- public void setOutputWriters(List<IFrameWriter> writers) {
- delegate.setOutputWriters(writers);
- }
+ /**
+ * Get the number of outgoing edges
+ *
+ * @return the number of outging edges
+ */
+ public int getNumOutEdges() {
+ return destEdgeList.size();
+ }
- /**
- * Pregelix internal use only
- *
- * @param writers
- */
- public void setOutputAppenders(List<FrameTupleAppender> appenders) {
- delegate.setOutputAppenders(appenders);
- }
+ /**
+ * Pregelix internal use only
+ *
+ * @param writers
+ */
+ public void setOutputWriters(List<IFrameWriter> writers) {
+ delegate.setOutputWriters(writers);
+ }
- /**
- * Pregelix internal use only
- *
- * @param writers
- */
- public void setOutputTupleBuilders(List<ArrayTupleBuilder> tbs) {
- delegate.setOutputTupleBuilders(tbs);
- }
+ /**
+ * Pregelix internal use only
+ *
+ * @param writers
+ */
+ public void setOutputAppenders(List<FrameTupleAppender> appenders) {
+ delegate.setOutputAppenders(appenders);
+ }
- /**
- * Pregelix internal use only
- *
- * @param writers
- */
- public void finishCompute() throws IOException {
- delegate.finishCompute();
- }
+ /**
+ * Pregelix internal use only
+ *
+ * @param writers
+ */
+ public void setOutputTupleBuilders(List<ArrayTupleBuilder> tbs) {
+ delegate.setOutputTupleBuilders(tbs);
+ }
- /**
- * Pregelix internal use only
- */
- public boolean hasUpdate() {
- return this.updated;
- }
+ /**
+ * Pregelix internal use only
+ *
+ * @param writers
+ */
+ public void finishCompute() throws IOException {
+ delegate.finishCompute();
+ }
- /**
- * Pregelix internal use only
- */
- public boolean hasMessage() {
- return this.hasMessage;
- }
+ /**
+ * Pregelix internal use only
+ */
+ public boolean hasUpdate() {
+ return this.updated;
+ }
- /**
- * Pregelix internal use only
- */
- public boolean createdNewLiveVertex() {
- return this.createdNewLiveVertex;
- }
+ /**
+ * Pregelix internal use only
+ */
+ public boolean hasMessage() {
+ return this.hasMessage;
+ }
- /**
- * sort the edges
- */
- @SuppressWarnings("unchecked")
- public void sortEdges() {
- updated = true;
- Collections.sort(destEdgeList);
- }
+ /**
+ * Pregelix internal use only
+ */
+ public boolean createdNewLiveVertex() {
+ return this.createdNewLiveVertex;
+ }
- /**
- * Allocate a new edge from the edge pool
- */
- private Edge<I, E> allocateEdge() {
- Edge<I, E> edge;
- if (usedEdge < edgePool.size()) {
- edge = edgePool.get(usedEdge);
- usedEdge++;
- } else {
- edge = new Edge<I, E>();
- edgePool.add(edge);
- usedEdge++;
- }
- return edge;
- }
+ /**
+ * sort the edges
+ */
+ @SuppressWarnings("unchecked")
+ public void sortEdges() {
+ updated = true;
+ Collections.sort(destEdgeList);
+ }
- /**
- * Allocate a new message from the message pool
- */
- private M allocateMessage() {
- M message;
- if (usedMessage < msgPool.size()) {
- message = msgPool.get(usedEdge);
- usedMessage++;
- } else {
- message = BspUtils.<M> createMessageValue(getContext()
- .getConfiguration());
- msgPool.add(message);
- usedMessage++;
- }
- return message;
- }
+ /**
+ * Allocate a new edge from the edge pool
+ */
+ private Edge<I, E> allocateEdge() {
+ Edge<I, E> edge;
+ if (usedEdge < edgePool.size()) {
+ edge = edgePool.get(usedEdge);
+ usedEdge++;
+ } else {
+ edge = new Edge<I, E>();
+ edgePool.add(edge);
+ usedEdge++;
+ }
+ return edge;
+ }
- /**
- * Set the global superstep for all the vertices (internal use)
- *
- * @param superstep
- * New superstep
- */
- public static final void setSuperstep(long superstep) {
- Vertex.superstep = superstep;
- }
+ /**
+ * Allocate a new message from the message pool
+ */
+ private M allocateMessage() {
+ M message;
+ if (usedMessage < msgPool.size()) {
+ message = msgPool.get(usedEdge);
+ usedMessage++;
+ } else {
+ message = BspUtils.<M> createMessageValue(getContext().getConfiguration());
+ msgPool.add(message);
+ usedMessage++;
+ }
+ return message;
+ }
- /**
- * Add an outgoing edge into the vertex
- *
- * @param edge
- * the edge to be added
- * @return true if the edge list changed as a result of this call
- */
- public boolean addEdge(Edge<I, E> edge) {
- edge.setConf(getContext().getConfiguration());
- updated = true;
- return destEdgeList.add(edge);
- }
+ /**
+ * Set the global superstep for all the vertices (internal use)
+ *
+ * @param superstep
+ * New superstep
+ */
+ public static final void setSuperstep(long superstep) {
+ Vertex.superstep = superstep;
+ }
- /**
- * remove an outgoing edge in the graph
- *
- * @param edge
- * the edge to be removed
- * @return true if the edge is in the edge list of the vertex
- */
- public boolean removeEdge(Edge<I, E> edge) {
- updated = true;
- return destEdgeList.remove(edge);
- }
+ /**
+ * Add an outgoing edge into the vertex
+ *
+ * @param edge
+ * the edge to be added
+ * @return true if the edge list changed as a result of this call
+ */
+ public boolean addEdge(Edge<I, E> edge) {
+ edge.setConf(getContext().getConfiguration());
+ updated = true;
+ return destEdgeList.add(edge);
+ }
- /**
- * Add a new vertex into the graph
- *
- * @param vertexId
- * the vertex id
- * @param vertex
- * the vertex
- */
- public final void addVertex(I vertexId, Vertex vertex) {
- createdNewLiveVertex |= !vertex.isHalted();
- delegate.addVertex(vertexId, vertex);
- }
+ /**
+ * remove an outgoing edge in the graph
+ *
+ * @param edge
+ * the edge to be removed
+ * @return true if the edge is in the edge list of the vertex
+ */
+ public boolean removeEdge(Edge<I, E> edge) {
+ updated = true;
+ return destEdgeList.remove(edge);
+ }
- /**
- * Delete a vertex from id
- *
- * @param vertexId
- * the vertex id
- */
- public final void deleteVertex(I vertexId) {
- delegate.deleteVertex(vertexId);
- }
+ /**
+ * Add a new vertex into the graph
+ *
+ * @param vertexId
+ * the vertex id
+ * @param vertex
+ * the vertex
+ */
+ public final void addVertex(I vertexId, Vertex vertex) {
+ createdNewLiveVertex |= !vertex.isHalted();
+ delegate.addVertex(vertexId, vertex);
+ }
- /**
- * Allocate a vertex value from the object pool
- *
- * @return a vertex value instance
- */
- private V allocateValue() {
- V value;
- if (usedValue < valuePool.size()) {
- value = valuePool.get(usedValue);
- usedValue++;
- } else {
- value = BspUtils.<V> createVertexValue(getContext()
- .getConfiguration());
- valuePool.add(value);
- usedValue++;
- }
- return value;
- }
+ /**
+ * Delete a vertex from id
+ *
+ * @param vertexId
+ * the vertex id
+ */
+ public final void deleteVertex(I vertexId) {
+ delegate.deleteVertex(vertexId);
+ }
- /**
- * Get the current global superstep number
- *
- * @return the current superstep number
- */
- public static final long getSuperstep() {
- return superstep;
- }
+ /**
+ * Allocate a vertex value from the object pool
+ *
+ * @return a vertex value instance
+ */
+ private V allocateValue() {
+ V value;
+ if (usedValue < valuePool.size()) {
+ value = valuePool.get(usedValue);
+ usedValue++;
+ } else {
+ value = BspUtils.<V> createVertexValue(getContext().getConfiguration());
+ valuePool.add(value);
+ usedValue++;
+ }
+ return value;
+ }
- /**
- * Set the total number of vertices from the last superstep.
- *
- * @param numVertices
- * Aggregate vertices in the last superstep
- */
- public static final void setNumVertices(long numVertices) {
- Vertex.numVertices = numVertices;
- }
+ /**
+ * Get the current global superstep number
+ *
+ * @return the current superstep number
+ */
+ public static final long getSuperstep() {
+ return superstep;
+ }
- /**
- * Get the number of vertexes in the graph
- *
- * @return the number of vertexes in the graph
- */
- public static final long getNumVertices() {
- return numVertices;
- }
+ /**
+ * Set the total number of vertices from the last superstep.
+ *
+ * @param numVertices
+ * Aggregate vertices in the last superstep
+ */
+ public static final void setNumVertices(long numVertices) {
+ Vertex.numVertices = numVertices;
+ }
- /**
- * Set the total number of edges from the last superstep.
- *
- * @param numEdges
- * Aggregate edges in the last superstep
- */
- public static void setNumEdges(long numEdges) {
- Vertex.numEdges = numEdges;
- }
+ /**
+ * Get the number of vertexes in the graph
+ *
+ * @return the number of vertexes in the graph
+ */
+ public static final long getNumVertices() {
+ return numVertices;
+ }
- /**
- * Get the number of edges from this graph
- *
- * @return the number of edges in the graph
- */
- public static final long getNumEdges() {
- return numEdges;
- }
+ /**
+ * Set the total number of edges from the last superstep.
+ *
+ * @param numEdges
+ * Aggregate edges in the last superstep
+ */
+ public static void setNumEdges(long numEdges) {
+ Vertex.numEdges = numEdges;
+ }
- /**
- * Pregelix internal use only
- */
- public static final TaskAttemptContext getContext() {
- return context;
- }
+ /**
+ * Get the number of edges from this graph
+ *
+ * @return the number of edges in the graph
+ */
+ public static final long getNumEdges() {
+ return numEdges;
+ }
- /**
- * Pregelix internal use only
- *
- * @param context
- */
- public static final void setContext(TaskAttemptContext context) {
- Vertex.context = context;
- }
+ /**
+ * Pregelix internal use only
+ */
+ public static final TaskAttemptContext getContext() {
+ return context;
+ }
+
+ /**
+ * Pregelix internal use only
+ *
+ * @param context
+ */
+ public static final void setContext(TaskAttemptContext context) {
+ Vertex.context = context;
+ }
}
diff --git a/pregelix/pregelix-core/pom.xml b/pregelix/pregelix-core/pom.xml
index 576758b..7b247a8 100644
--- a/pregelix/pregelix-core/pom.xml
+++ b/pregelix/pregelix-core/pom.xml
@@ -18,8 +18,9 @@
<build>
<plugins>
<plugin>
+ <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
- <version>2.4</version>
+ <version>2.3.2</version>
<executions>
<execution>
<id>balancer</id>
@@ -72,7 +73,7 @@
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>appassembler-maven-plugin</artifactId>
- <version>1.3</version>
+ <version>1.3</version>
<executions>
<execution>
<configuration>
@@ -164,7 +165,7 @@
</plugin>
<plugin>
<artifactId>maven-clean-plugin</artifactId>
- <version>2.5</version>
+ <version>2.4.1</version>
<configuration>
<filesets>
<fileset>
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
index 3a4c41b..72256f9 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -147,22 +147,11 @@
start = System.currentTimeMillis();
runHDFSWRite(jobGen);
runCleanup(jobGen);
- destroyApplication(applicationName);
end = System.currentTimeMillis();
time = end - start;
LOG.info("result writing finished " + time + "ms");
LOG.info("job finished");
} catch (Exception e) {
- try {
- /**
- * destroy application if there is any exception
- */
- if (hcc != null) {
- destroyApplication(applicationName);
- }
- } catch (Exception e2) {
- throw new HyracksException(e2);
- }
throw new HyracksException(e);
}
}
@@ -220,8 +209,8 @@
private void execute(JobSpecification job) throws Exception {
job.setUseConnectorPolicyForScheduling(false);
- JobId jobId = hcc.startJob(applicationName, job,
- profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
+ JobId jobId = hcc
+ .startJob(job, profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
hcc.waitForCompletion(jobId);
}
@@ -236,15 +225,11 @@
LOG.info("jar packing finished " + (end - start) + "ms");
start = System.currentTimeMillis();
- hcc.createApplication(applicationName, appZip);
+ // TODO: Fix this step to use Yarn
+ //hcc.createApplication(applicationName, appZip);
end = System.currentTimeMillis();
LOG.info("jar deployment finished " + (end - start) + "ms");
}
-
- public void destroyApplication(String appName) throws Exception {
- hcc.destroyApplication(appName);
- }
-
}
class FileFilter implements FilenameFilter {
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index 0b1be61..77fd1a7 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -52,13 +52,13 @@
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDropOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+import edu.uci.ics.hyracks.storage.common.file.TransientLocalResourceFactoryProvider;
import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
import edu.uci.ics.pregelix.api.graph.MessageCombiner;
import edu.uci.ics.pregelix.api.graph.Vertex;
@@ -80,8 +80,8 @@
import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
+import edu.uci.ics.pregelix.runtime.bootstrap.IndexLifeCycleManagerProvider;
import edu.uci.ics.pregelix.runtime.bootstrap.StorageManagerInterface;
-import edu.uci.ics.pregelix.runtime.bootstrap.TreeIndexRegistryProvider;
import edu.uci.ics.pregelix.runtime.touchpoint.RuntimeHookFactory;
import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdPartitionComputerFactory;
@@ -93,7 +93,7 @@
protected static final String PRIMARY_INDEX = "primary";
protected final Configuration conf;
protected final PregelixJob giraphJob;
- protected IIndexRegistryProvider<IIndex> treeRegistryProvider = TreeIndexRegistryProvider.INSTANCE;
+ protected IIndexLifecycleManagerProvider lcManagerProvider = IndexLifeCycleManagerProvider.INSTANCE;
protected IStorageManagerInterface storageManagerInterface = StorageManagerInterface.INSTANCE;
protected String jobId = new UUID(System.currentTimeMillis(), System.nanoTime()).toString();
protected int frameSize = ClusterConfig.getFrameSize();
@@ -169,8 +169,9 @@
IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
TreeIndexCreateOperatorDescriptor btreeCreate = new TreeIndexCreateOperatorDescriptor(spec,
- storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
- new BTreeDataflowHelperFactory(), NoOpOperationCallbackProvider.INSTANCE);
+ storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories, null,
+ new BTreeDataflowHelperFactory(), new TransientLocalResourceFactoryProvider(),
+ NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, btreeCreate);
return spec;
}
@@ -229,9 +230,9 @@
typeTraits[0] = new TypeTraits(false);
typeTraits[1] = new TypeTraits(false);
TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
- storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
- fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, new BTreeDataflowHelperFactory(),
- NoOpOperationCallbackProvider.INSTANCE);
+ storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories, null,
+ fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, false, 0, new BTreeDataflowHelperFactory(),
+ NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
/**
@@ -356,8 +357,8 @@
typeTraits[0] = new TypeTraits(false);
typeTraits[1] = new TypeTraits(false);
BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(spec, recordDescriptor,
- storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
- null, null, true, true, new BTreeDataflowHelperFactory(), false, NoOpOperationCallbackProvider.INSTANCE);
+ storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories, null,
+ null, null, true, true, new BTreeDataflowHelperFactory(), false, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, scanner);
/**
@@ -424,9 +425,10 @@
ITypeTraits[] typeTraits = new ITypeTraits[2];
typeTraits[0] = new TypeTraits(false);
typeTraits[1] = new TypeTraits(false);
+
BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(spec, recordDescriptor,
- storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
- null, null, true, true, new BTreeDataflowHelperFactory(), false, NoOpOperationCallbackProvider.INSTANCE);
+ storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories, null,
+ null, null, true, true, new BTreeDataflowHelperFactory(), false, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, scanner);
/**
@@ -455,8 +457,8 @@
JobSpecification spec = new JobSpecification();
IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, indexName);
- TreeIndexDropOperatorDescriptor drop = new TreeIndexDropOperatorDescriptor(spec, storageManagerInterface,
- treeRegistryProvider, fileSplitProvider);
+ IndexDropOperatorDescriptor drop = new IndexDropOperatorDescriptor(spec, storageManagerInterface,
+ lcManagerProvider, fileSplitProvider, new BTreeDataflowHelperFactory());
ClusterConfig.setLocationConstraint(spec, drop);
spec.addRoot(drop);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index 9de4c04..fe2fcac 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -40,8 +40,8 @@
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
-import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
import edu.uci.ics.pregelix.api.graph.MsgList;
import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -135,7 +135,7 @@
RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
- recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+ recordDescriptor, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
new BTreeDataflowHelperFactory(), inputRdFactory, 6,
new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
@@ -166,8 +166,8 @@
indexCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration + 1,
WritableComparator.get(vertexIdClass).getClass());
TreeIndexBulkReLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkReLoadOperatorDescriptor(spec,
- storageManagerInterface, treeRegistryProvider, secondaryFileSplitProvider, typeTraits,
- indexCmpFactories, fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, new BTreeDataflowHelperFactory());
+ storageManagerInterface, lcManagerProvider, secondaryFileSplitProvider, typeTraits, indexCmpFactories,
+ fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, new BTreeDataflowHelperFactory());
ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
/**
@@ -222,18 +222,18 @@
* add the insert operator to insert vertexes
*/
TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, insertOp);
/**
* add the delete operator to delete vertexes
*/
TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.DELETE, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, deleteOp);
/** construct empty sink operator */
@@ -343,7 +343,7 @@
ITreeIndexFrameFactory leafFrameFactory = new BTreeNSMLeafFrameFactory(new TypeAwareTupleWriterFactory(
typeTraits));
IndexNestedLoopJoinOperatorDescriptor setUnion = new IndexNestedLoopJoinOperatorDescriptor(spec, rdFinal,
- storageManagerInterface, treeRegistryProvider, secondaryFileSplitProviderRead, interiorFrameFactory,
+ storageManagerInterface, lcManagerProvider, secondaryFileSplitProviderRead, interiorFrameFactory,
leafFrameFactory, typeTraits, comparatorFactories, true, keyFields, keyFields, true, true,
new BTreeDataflowHelperFactory(), true);
ClusterConfig.setLocationConstraint(spec, setUnion);
@@ -361,8 +361,8 @@
vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
- spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields, keyFields, true, true,
+ spec, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories,
+ JobGenUtil.getForwardScan(iteration), keyFields, keyFields, true, true,
new BTreeDataflowHelperFactory(), inputRdFactory, 6, new ComputeUpdateFunctionFactory(confFactory),
preHookFactory, null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdInsert, rdDelete, rdFinal);
ClusterConfig.setLocationConstraint(spec, join);
@@ -377,7 +377,7 @@
String writeFile = iteration % 2 == 0 ? SECONDARY_INDEX_EVEN : SECONDARY_INDEX_ODD;
IFileSplitProvider secondaryFileSplitProviderWrite = ClusterConfig.getFileSplitProvider(jobId, writeFile);
TreeIndexBulkReLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkReLoadOperatorDescriptor(spec,
- storageManagerInterface, treeRegistryProvider, secondaryFileSplitProviderWrite, typeTraits,
+ storageManagerInterface, lcManagerProvider, secondaryFileSplitProviderWrite, typeTraits,
indexCmpFactories, fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, new BTreeDataflowHelperFactory());
ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
@@ -446,18 +446,18 @@
* add the insert operator to insert vertexes
*/
TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, insertOp);
/**
* add the delete operator to delete vertexes
*/
TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.DELETE, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, deleteOp);
/** construct empty sink operator */
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
index 91c15b2..f1eceb7 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -40,8 +40,8 @@
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
-import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
import edu.uci.ics.pregelix.api.graph.MsgList;
import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -129,7 +129,7 @@
RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
- recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+ recordDescriptor, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
new BTreeDataflowHelperFactory(), inputRdFactory, 5,
new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
@@ -205,18 +205,18 @@
*/
int[] fieldPermutation = new int[] { 0, 1 };
TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, insertOp);
/**
* add the delete operator to delete vertexes
*/
TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.DELETE, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, deleteOp);
/** construct empty sink operator */
@@ -335,7 +335,7 @@
vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
- spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, interiorFrameFactory,
+ spec, storageManagerInterface, lcManagerProvider, fileSplitProvider, interiorFrameFactory,
leafFrameFactory, typeTraits, comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields,
keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 5,
new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
@@ -408,18 +408,18 @@
*/
int[] fieldPermutation = new int[] { 0, 1 };
TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, insertOp);
/**
* add the delete operator to delete vertexes
*/
TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.DELETE, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, deleteOp);
/** construct empty sink operator */
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
index ee1fd0f..314c393 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
@@ -39,8 +39,8 @@
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
-import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
import edu.uci.ics.pregelix.api.graph.MsgList;
import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -131,7 +131,7 @@
RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
- recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+ recordDescriptor, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
new BTreeDataflowHelperFactory(), inputRdFactory, 5,
new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
@@ -197,18 +197,18 @@
*/
int[] fieldPermutation = new int[] { 0, 1 };
TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, insertOp);
/**
* add the delete operator to delete vertexes
*/
TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.DELETE, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, deleteOp);
/** construct empty sink operator */
@@ -323,7 +323,7 @@
vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
- spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, interiorFrameFactory,
+ spec, storageManagerInterface, lcManagerProvider, fileSplitProvider, interiorFrameFactory,
leafFrameFactory, typeTraits, comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields,
keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 5,
new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
@@ -384,18 +384,18 @@
int[] fieldPermutation = new int[] { 0, 1 };
TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, insertOp);
/**
* add the delete operator to delete vertexes
*/
TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.DELETE, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, deleteOp);
/** construct empty sink operator */
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
index 628e9ce..0c3db38 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
@@ -39,8 +39,8 @@
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
-import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
import edu.uci.ics.pregelix.api.graph.MsgList;
import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -128,7 +128,7 @@
RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
- recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+ recordDescriptor, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
new BTreeDataflowHelperFactory(), inputRdFactory, 5,
new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
@@ -211,18 +211,18 @@
*/
int[] fieldPermutation = new int[] { 0, 1 };
TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, insertOp);
/**
* add the delete operator to delete vertexes
*/
TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.DELETE, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, deleteOp);
/** construct empty sink operator */
@@ -337,7 +337,7 @@
vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
- spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, interiorFrameFactory,
+ spec, storageManagerInterface, lcManagerProvider, fileSplitProvider, interiorFrameFactory,
leafFrameFactory, typeTraits, comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields,
keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 5,
new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
@@ -417,18 +417,18 @@
*/
int[] fieldPermutation = new int[] { 0, 1 };
TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, insertOp);
/**
* add the delete operator to delete vertexes
*/
TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
- spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
- NoOpOperationCallbackProvider.INSTANCE);
+ spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, fieldPermutation, IndexOperation.DELETE, new BTreeDataflowHelperFactory(),
+ null, NoOpOperationCallbackFactory.INSTANCE);
ClusterConfig.setLocationConstraint(spec, deleteOp);
/** construct empty sink operator */
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
index cd2a864..d099645 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
@@ -14,7 +14,6 @@
*/
package edu.uci.ics.pregelix.core.util;
-import java.io.File;
import java.util.EnumSet;
import edu.uci.ics.hyracks.api.client.HyracksConnection;
@@ -27,6 +26,7 @@
import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
import edu.uci.ics.hyracks.control.nc.NodeControllerService;
import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.runtime.bootstrap.NCApplicationEntryPoint;
public class PregelixHyracksIntegrationUtil {
@@ -45,7 +45,7 @@
private static NodeControllerService nc2;
private static IHyracksClientConnection hcc;
- public static void init(String topologyFilePath) throws Exception {
+ public static void init() throws Exception {
CCConfig ccConfig = new CCConfig();
ccConfig.clientNetIpAddress = CC_HOST;
ccConfig.clusterNetIpAddress = CC_HOST;
@@ -54,7 +54,6 @@
ccConfig.defaultMaxJobAttempts = 0;
ccConfig.jobHistorySize = 0;
ccConfig.profileDumpPeriod = -1;
- ccConfig.clusterTopologyDefinition = new File(topologyFilePath);
// cluster controller
cc = new ClusterControllerService(ccConfig);
@@ -68,6 +67,7 @@
ncConfig1.dataIPAddress = "127.0.0.1";
ncConfig1.datasetIPAddress = "127.0.0.1";
ncConfig1.nodeId = NC1_ID;
+ ncConfig1.appNCMainClass = NCApplicationEntryPoint.class.getName();
nc1 = new NodeControllerService(ncConfig1);
nc1.start();
@@ -78,6 +78,7 @@
ncConfig2.dataIPAddress = "127.0.0.1";
ncConfig2.datasetIPAddress = "127.0.0.1";
ncConfig2.nodeId = NC2_ID;
+ ncConfig2.appNCMainClass = NCApplicationEntryPoint.class.getName();
nc2 = new NodeControllerService(ncConfig2);
nc2.start();
@@ -86,14 +87,6 @@
ClusterConfig.loadClusterConfig(CC_HOST, TEST_HYRACKS_CC_CLIENT_PORT);
}
- public static void destroyApp(String hyracksAppName) throws Exception {
- hcc.destroyApplication(hyracksAppName);
- }
-
- public static void createApp(String hyracksAppName) throws Exception {
- hcc.createApplication(hyracksAppName, null);
- }
-
public static void deinit() throws Exception {
nc2.stop();
nc1.stop();
@@ -102,7 +95,7 @@
public static void runJob(JobSpecification spec, String appName) throws Exception {
spec.setFrameSize(FRAME_SIZE);
- JobId jobId = hcc.startJob(appName, spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
+ JobId jobId = hcc.startJob(spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
hcc.waitForCompletion(jobId);
}
diff --git a/pregelix/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java b/pregelix/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java
index f7cadf6..3c00cad 100644
--- a/pregelix/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java
+++ b/pregelix/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java
@@ -50,14 +50,14 @@
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+import edu.uci.ics.hyracks.storage.common.file.TransientLocalResourceFactoryProvider;
import edu.uci.ics.pregelix.core.data.TypeTraits;
import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
@@ -65,8 +65,8 @@
import edu.uci.ics.pregelix.dataflow.VertexWriteOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.std.IndexNestedLoopJoinOperatorDescriptor;
import edu.uci.ics.pregelix.dataflow.std.ProjectOperatorDescriptor;
+import edu.uci.ics.pregelix.runtime.bootstrap.IndexLifeCycleManagerProvider;
import edu.uci.ics.pregelix.runtime.bootstrap.StorageManagerInterface;
-import edu.uci.ics.pregelix.runtime.bootstrap.TreeIndexRegistryProvider;
public class JoinTest {
private final static String ACTUAL_RESULT_DIR = "actual";
@@ -82,7 +82,7 @@
private static final String PATH_TO_CLUSTER_PROPERTIES = "src/test/resources/cluster/cluster.properties";
private static final float DEFAULT_BTREE_FILL_FACTOR = 1.00f;
- private IIndexRegistryProvider<IIndex> treeRegistry = TreeIndexRegistryProvider.INSTANCE;
+ private IIndexLifecycleManagerProvider lcManagerProvider = IndexLifeCycleManagerProvider.INSTANCE;
private IStorageManagerInterface storageManagerInterface = StorageManagerInterface.INSTANCE;
private IBinaryHashFunctionFactory stringHashFactory = new PointableBinaryHashFunctionFactory(
@@ -102,8 +102,7 @@
ClusterConfig.setStorePath(PATH_TO_CLUSTER_STORE);
ClusterConfig.setClusterPropertiesPath(PATH_TO_CLUSTER_PROPERTIES);
cleanupStores();
- PregelixHyracksIntegrationUtil.init("src/test/resources/topology.xml");
- PregelixHyracksIntegrationUtil.createApp(HYRACKS_APP_NAME);
+ PregelixHyracksIntegrationUtil.init();
FileUtils.forceMkdir(new File(EXPECT_RESULT_DIR));
FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
@@ -121,7 +120,6 @@
runIndexRightOuterJoin();
TestUtils.compareWithResult(new File(EXPECTED_RESULT_FILE), new File(ACTUAL_RESULT_FILE));
- PregelixHyracksIntegrationUtil.destroyApp(HYRACKS_APP_NAME);
PregelixHyracksIntegrationUtil.deinit();
}
@@ -195,8 +193,8 @@
FileSplit[] results = new FileSplit[1];
results[0] = resultFile;
IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
- VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
- null);
+ VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider,
+ null, null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { NC1_ID });
PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
@@ -234,8 +232,9 @@
for (int i = 0; i < typeTraits.length; i++)
typeTraits[i] = new TypeTraits(false);
TreeIndexCreateOperatorDescriptor writer = new TreeIndexCreateOperatorDescriptor(spec, storageManagerInterface,
- treeRegistry, fileSplitProvider, typeTraits, comparatorFactories, new BTreeDataflowHelperFactory(),
- NoOpOperationCallbackProvider.INSTANCE);
+ lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories, null,
+ new BTreeDataflowHelperFactory(), new TransientLocalResourceFactoryProvider(),
+ NoOpOperationCallbackFactory.INSTANCE);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, NC1_ID, NC2_ID);
spec.addRoot(writer);
runTest(spec);
@@ -278,9 +277,9 @@
for (int i = 0; i < typeTraits.length; i++)
typeTraits[i] = new TypeTraits(false);
TreeIndexBulkLoadOperatorDescriptor writer = new TreeIndexBulkLoadOperatorDescriptor(spec,
- storageManagerInterface, treeRegistry, fileSplitProvider, typeTraits, comparatorFactories,
- fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, new BTreeDataflowHelperFactory(),
- NoOpOperationCallbackProvider.INSTANCE);
+ storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories, null,
+ fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, false, 0, new BTreeDataflowHelperFactory(),
+ NoOpOperationCallbackFactory.INSTANCE);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, NC1_ID, NC2_ID);
spec.connect(new OneToOneConnectorDescriptor(spec), custScanner, 0, sorter, 0);
@@ -353,8 +352,8 @@
for (int i = 0; i < typeTraits.length; i++)
typeTraits[i] = new TypeTraits(false);
IndexNestedLoopJoinOperatorDescriptor join = new IndexNestedLoopJoinOperatorDescriptor(spec, custOrderJoinDesc,
- storageManagerInterface, treeRegistry, fileSplitProvider, typeTraits, keyComparatorFactories, true,
- keyFields, keyFields, true, true, new BTreeDataflowHelperFactory());
+ storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, keyComparatorFactories,
+ true, keyFields, keyFields, true, true, new BTreeDataflowHelperFactory());
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
/** results (already in sorted order) */
@@ -362,8 +361,8 @@
FileSplit[] results = new FileSplit[1];
results[0] = resultFile;
IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
- VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
- null);
+ VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider,
+ null, null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { NC1_ID });
PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
@@ -459,8 +458,8 @@
FileSplit[] results = new FileSplit[1];
results[0] = resultFile;
IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
- VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
- null);
+ VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider,
+ null, null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { NC1_ID });
PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
@@ -556,7 +555,7 @@
ITreeIndexFrameFactory leafFrameFactory = new BTreeNSMLeafFrameFactory(new TypeAwareTupleWriterFactory(
typeTraits));
IndexNestedLoopJoinOperatorDescriptor join = new IndexNestedLoopJoinOperatorDescriptor(spec, custOrderJoinDesc,
- storageManagerInterface, treeRegistry, fileSplitProvider, interiorFrameFactory, leafFrameFactory,
+ storageManagerInterface, lcManagerProvider, fileSplitProvider, interiorFrameFactory, leafFrameFactory,
typeTraits, keyComparatorFactories, true, keyFields, keyFields, true, true,
new BTreeDataflowHelperFactory(), true, nullWriterFactories);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
@@ -566,8 +565,8 @@
FileSplit[] results = new FileSplit[1];
results[0] = resultFile;
IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
- VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
- null);
+ VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider,
+ null, null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { NC1_ID });
PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
diff --git a/pregelix/pregelix-dataflow-std-base/pom.xml b/pregelix/pregelix-dataflow-std-base/pom.xml
index eeaa6c9..6222a04 100644
--- a/pregelix/pregelix-dataflow-std-base/pom.xml
+++ b/pregelix/pregelix-dataflow-std-base/pom.xml
@@ -42,8 +42,9 @@
</configuration>
</plugin>
<plugin>
+ <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-clean-plugin</artifactId>
- <version>2.5</version>
+ <version>2.4.1</version>
<configuration>
<filesets>
<fileset>
diff --git a/pregelix/pregelix-dataflow-std/pom.xml b/pregelix/pregelix-dataflow-std/pom.xml
index 889c876..aa77ad3 100644
--- a/pregelix/pregelix-dataflow-std/pom.xml
+++ b/pregelix/pregelix-dataflow-std/pom.xml
@@ -42,8 +42,9 @@
</configuration>
</plugin>
<plugin>
+ <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-clean-plugin</artifactId>
- <version>2.5</version>
+ <version>2.4.1</version>
<configuration>
<filesets>
<fileset>
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorDescriptor.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorDescriptor.java
index 99e55f1..c9f3fe7 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorDescriptor.java
@@ -23,12 +23,12 @@
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+import edu.uci.ics.hyracks.storage.common.file.TransientLocalResourceFactoryProvider;
import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
@@ -53,14 +53,16 @@
private final int outputArity;
public BTreeSearchFunctionUpdateOperatorDescriptor(JobSpecification spec, RecordDescriptor recDesc,
- IStorageManagerInterface storageManager, IIndexRegistryProvider<IIndex> indexRegistryProvider,
+ IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lcManagerProvider,
IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
IBinaryComparatorFactory[] comparatorFactories, boolean isForward, int[] lowKeyFields, int[] highKeyFields,
boolean lowKeyInclusive, boolean highKeyInclusive, IIndexDataflowHelperFactory dataflowHelperFactory,
IRecordDescriptorFactory inputRdFactory, int outputArity, IUpdateFunctionFactory functionFactory,
IRuntimeHookFactory preHookFactory, IRuntimeHookFactory postHookFactory, RecordDescriptor... rDescs) {
- super(spec, 1, outputArity, recDesc, storageManager, indexRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, dataflowHelperFactory, null, false, NoOpOperationCallbackProvider.INSTANCE);
+ super(spec, 1, outputArity, recDesc, storageManager, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, dataflowHelperFactory, null, false,
+ new TransientLocalResourceFactoryProvider(), NoOpOperationCallbackFactory.INSTANCE,
+ NoOpOperationCallbackFactory.INSTANCE);
this.isForward = isForward;
this.lowKeyFields = lowKeyFields;
this.highKeyFields = highKeyFields;
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorNodePushable.java
index 3938613..ff95e52 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorNodePushable.java
@@ -36,9 +36,10 @@
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.PermutingFrameTupleReference;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
@@ -106,11 +107,11 @@
* open the function
*/
functionProxy.functionOpen();
- accessor = new FrameTupleAccessor(treeIndexHelper.getHyracksTaskContext().getFrameSize(), recDesc);
+ accessor = new FrameTupleAccessor(treeIndexHelper.getTaskContext().getFrameSize(), recDesc);
try {
- treeIndexHelper.init(false);
- btree = (BTree) treeIndexHelper.getIndex();
+ treeIndexHelper.open();
+ btree = (BTree) treeIndexHelper.getIndexInstance();
cursorFrame = btree.getLeafFrameFactory().createFrame();
setCursor();
@@ -120,17 +121,17 @@
rangePred = new RangePredicate(null, null, lowKeyInclusive, highKeyInclusive, lowKeySearchCmp,
highKeySearchCmp);
- writeBuffer = treeIndexHelper.getHyracksTaskContext().allocateFrame();
+ writeBuffer = treeIndexHelper.getTaskContext().allocateFrame();
tb = new ArrayTupleBuilder(btree.getFieldCount());
dos = tb.getDataOutput();
- appender = new FrameTupleAppender(treeIndexHelper.getHyracksTaskContext().getFrameSize());
+ appender = new FrameTupleAppender(treeIndexHelper.getTaskContext().getFrameSize());
appender.reset(writeBuffer, true);
- indexAccessor = btree.createAccessor();
+ indexAccessor = btree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
cloneUpdateTb = new ArrayTupleBuilder(btree.getFieldCount());
updateBuffer.setFieldCount(btree.getFieldCount());
} catch (Exception e) {
- treeIndexHelper.deinit();
+ treeIndexHelper.close();
throw new HyracksDataException(e);
}
}
@@ -203,7 +204,7 @@
*/
functionProxy.functionClose();
} finally {
- treeIndexHelper.deinit();
+ treeIndexHelper.close();
}
}
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorDescriptor.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorDescriptor.java
index 60559e8..7662aa8 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorDescriptor.java
@@ -25,13 +25,13 @@
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+import edu.uci.ics.hyracks.storage.common.file.TransientLocalResourceFactoryProvider;
import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
@@ -61,14 +61,16 @@
private final int outputArity;
public IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(JobSpecification spec,
- IStorageManagerInterface storageManager, IIndexRegistryProvider<IIndex> treeIndexRegistryProvider,
+ IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lcManagerProvider,
IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
IBinaryComparatorFactory[] comparatorFactories, boolean isForward, int[] lowKeyFields, int[] highKeyFields,
boolean lowKeyInclusive, boolean highKeyInclusive, IIndexDataflowHelperFactory opHelperFactory,
IRecordDescriptorFactory inputRdFactory, int outputArity, IUpdateFunctionFactory functionFactory,
IRuntimeHookFactory preHookFactory, IRuntimeHookFactory postHookFactory, RecordDescriptor... rDescs) {
- super(spec, 1, outputArity, rDescs[0], storageManager, treeIndexRegistryProvider, fileSplitProvider,
- typeTraits, comparatorFactories, opHelperFactory, null, false, NoOpOperationCallbackProvider.INSTANCE);
+ super(spec, 1, outputArity, rDescs[0], storageManager, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, highKeyFields, opHelperFactory, null, false,
+ new TransientLocalResourceFactoryProvider(), NoOpOperationCallbackFactory.INSTANCE,
+ NoOpOperationCallbackFactory.INSTANCE);
this.isForward = isForward;
this.lowKeyFields = lowKeyFields;
this.highKeyFields = highKeyFields;
@@ -88,7 +90,7 @@
}
public IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(JobSpecification spec,
- IStorageManagerInterface storageManager, IIndexRegistryProvider<IIndex> treeIndexRegistryProvider,
+ IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lcManagerProvider,
IFileSplitProvider fileSplitProvider, ITreeIndexFrameFactory interiorFrameFactory,
ITreeIndexFrameFactory leafFrameFactory, ITypeTraits[] typeTraits,
IBinaryComparatorFactory[] comparatorFactories, boolean isForward, int[] lowKeyFields, int[] highKeyFields,
@@ -96,8 +98,10 @@
boolean isRightOuter, INullWriterFactory[] nullWriterFactories, IRecordDescriptorFactory inputRdFactory,
int outputArity, IUpdateFunctionFactory functionFactory, IRuntimeHookFactory preHookFactory,
IRuntimeHookFactory postHookFactory, RecordDescriptor... rDescs) {
- super(spec, 1, outputArity, rDescs[0], storageManager, treeIndexRegistryProvider, fileSplitProvider,
- typeTraits, comparatorFactories, opHelperFactory, null, false, NoOpOperationCallbackProvider.INSTANCE);
+ super(spec, 1, outputArity, rDescs[0], storageManager, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, highKeyFields, opHelperFactory, null, false,
+ new TransientLocalResourceFactoryProvider(), NoOpOperationCallbackFactory.INSTANCE,
+ NoOpOperationCallbackFactory.INSTANCE);
this.isForward = isForward;
this.lowKeyFields = lowKeyFields;
this.highKeyFields = highKeyFields;
@@ -120,7 +124,7 @@
}
public IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(JobSpecification spec,
- IStorageManagerInterface storageManager, IIndexRegistryProvider<IIndex> treeIndexRegistryProvider,
+ IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lcManagerProvider,
IFileSplitProvider fileSplitProvider, ITreeIndexFrameFactory interiorFrameFactory,
ITreeIndexFrameFactory leafFrameFactory, ITypeTraits[] typeTraits,
IBinaryComparatorFactory[] comparatorFactories, boolean isForward, int[] lowKeyFields, int[] highKeyFields,
@@ -128,8 +132,10 @@
boolean isSetUnion, IRecordDescriptorFactory inputRdFactory, int outputArity,
IUpdateFunctionFactory functionFactory, IRuntimeHookFactory preHookFactory,
IRuntimeHookFactory postHookFactory, RecordDescriptor... rDescs) {
- super(spec, 1, outputArity, rDescs[0], storageManager, treeIndexRegistryProvider, fileSplitProvider,
- typeTraits, comparatorFactories, opHelperFactory, null, false, NoOpOperationCallbackProvider.INSTANCE);
+ super(spec, 1, outputArity, rDescs[0], storageManager, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, highKeyFields, opHelperFactory, null, false,
+ new TransientLocalResourceFactoryProvider(), NoOpOperationCallbackFactory.INSTANCE,
+ NoOpOperationCallbackFactory.INSTANCE);
this.isForward = isForward;
this.lowKeyFields = lowKeyFields;
this.highKeyFields = highKeyFields;
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
index 37029f3..61e4649 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
@@ -36,9 +36,10 @@
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.PermutingFrameTupleReference;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
@@ -104,12 +105,11 @@
* open the function
*/
functionProxy.functionOpen();
- accessor = new FrameTupleAccessor(treeIndexOpHelper.getHyracksTaskContext().getFrameSize(), recDesc);
+ accessor = new FrameTupleAccessor(treeIndexOpHelper.getTaskContext().getFrameSize(), recDesc);
try {
- treeIndexOpHelper.init(false);
- btree = (BTree) treeIndexOpHelper.getIndex();
- btree.open(treeIndexOpHelper.getIndexFileId());
+ treeIndexOpHelper.open();
+ btree = (BTree) treeIndexOpHelper.getIndexInstance();
cursorFrame = btree.getLeafFrameFactory().createFrame();
setCursor();
@@ -140,15 +140,15 @@
rangePred = new RangePredicate(null, null, lowKeyInclusive, highKeyInclusive, lowKeySearchCmp,
highKeySearchCmp);
- writeBuffer = treeIndexOpHelper.getHyracksTaskContext().allocateFrame();
- appender = new FrameTupleAppender(treeIndexOpHelper.getHyracksTaskContext().getFrameSize());
+ writeBuffer = treeIndexOpHelper.getTaskContext().allocateFrame();
+ appender = new FrameTupleAppender(treeIndexOpHelper.getTaskContext().getFrameSize());
appender.reset(writeBuffer, true);
- indexAccessor = btree.createAccessor();
+ indexAccessor = btree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
cloneUpdateTb = new ArrayTupleBuilder(btree.getFieldCount());
updateBuffer.setFieldCount(btree.getFieldCount());
} catch (Exception e) {
- treeIndexOpHelper.deinit();
+ treeIndexOpHelper.close();
throw new HyracksDataException(e);
}
}
@@ -220,7 +220,7 @@
*/
functionProxy.functionClose();
} finally {
- treeIndexOpHelper.deinit();
+ treeIndexOpHelper.close();
}
}
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorDescriptor.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorDescriptor.java
index ed177e3..d237761 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorDescriptor.java
@@ -24,13 +24,13 @@
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+import edu.uci.ics.hyracks.storage.common.file.TransientLocalResourceFactoryProvider;
public class IndexNestedLoopJoinOperatorDescriptor extends AbstractTreeIndexOperatorDescriptor {
@@ -51,12 +51,13 @@
private boolean isSetUnion = false;
public IndexNestedLoopJoinOperatorDescriptor(JobSpecification spec, RecordDescriptor recDesc,
- IStorageManagerInterface storageManager, IIndexRegistryProvider<IIndex> treeIndexRegistryProvider,
+ IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lcManagerProvider,
IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
IBinaryComparatorFactory[] comparatorFactories, boolean isForward, int[] lowKeyFields, int[] highKeyFields,
boolean lowKeyInclusive, boolean highKeyInclusive, IIndexDataflowHelperFactory opHelperFactory) {
- super(spec, 1, 1, recDesc, storageManager, treeIndexRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, opHelperFactory, null, false, NoOpOperationCallbackProvider.INSTANCE);
+ super(spec, 1, 1, recDesc, storageManager, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, opHelperFactory, null, false, new TransientLocalResourceFactoryProvider(),
+ NoOpOperationCallbackFactory.INSTANCE, NoOpOperationCallbackFactory.INSTANCE);
this.isForward = isForward;
this.lowKeyFields = lowKeyFields;
this.highKeyFields = highKeyFields;
@@ -65,14 +66,15 @@
}
public IndexNestedLoopJoinOperatorDescriptor(JobSpecification spec, RecordDescriptor recDesc,
- IStorageManagerInterface storageManager, IIndexRegistryProvider<IIndex> treeIndexRegistryProvider,
+ IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lcManagerProvider,
IFileSplitProvider fileSplitProvider, ITreeIndexFrameFactory interiorFrameFactory,
ITreeIndexFrameFactory leafFrameFactory, ITypeTraits[] typeTraits,
IBinaryComparatorFactory[] comparatorFactories, boolean isForward, int[] lowKeyFields, int[] highKeyFields,
boolean lowKeyInclusive, boolean highKeyInclusive, IIndexDataflowHelperFactory opHelperFactory,
boolean isRightOuter, INullWriterFactory[] nullWriterFactories) {
- super(spec, 1, 1, recDesc, storageManager, treeIndexRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, opHelperFactory, null, false, NoOpOperationCallbackProvider.INSTANCE);
+ super(spec, 1, 1, recDesc, storageManager, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, opHelperFactory, null, false, new TransientLocalResourceFactoryProvider(),
+ NoOpOperationCallbackFactory.INSTANCE, NoOpOperationCallbackFactory.INSTANCE);
this.isForward = isForward;
this.lowKeyFields = lowKeyFields;
this.highKeyFields = highKeyFields;
@@ -84,14 +86,15 @@
}
public IndexNestedLoopJoinOperatorDescriptor(JobSpecification spec, RecordDescriptor recDesc,
- IStorageManagerInterface storageManager, IIndexRegistryProvider<IIndex> treeIndexRegistryProvider,
+ IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lcManagerProvider,
IFileSplitProvider fileSplitProvider, ITreeIndexFrameFactory interiorFrameFactory,
ITreeIndexFrameFactory leafFrameFactory, ITypeTraits[] typeTraits,
IBinaryComparatorFactory[] comparatorFactories, boolean isForward, int[] lowKeyFields, int[] highKeyFields,
boolean lowKeyInclusive, boolean highKeyInclusive, IIndexDataflowHelperFactory opHelperFactory,
boolean isSetUnion) {
- super(spec, 1, 1, recDesc, storageManager, treeIndexRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, opHelperFactory, null, false, NoOpOperationCallbackProvider.INSTANCE);
+ super(spec, 1, 1, recDesc, storageManager, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, null, opHelperFactory, null, false, new TransientLocalResourceFactoryProvider(),
+ NoOpOperationCallbackFactory.INSTANCE, NoOpOperationCallbackFactory.INSTANCE);
this.isForward = isForward;
this.lowKeyFields = lowKeyFields;
this.highKeyFields = highKeyFields;
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorNodePushable.java
index bd076d3..8b9bfc2 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorNodePushable.java
@@ -34,9 +34,10 @@
import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.PermutingFrameTupleReference;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
public class IndexNestedLoopJoinOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
private TreeIndexDataflowHelper treeIndexOpHelper;
@@ -86,11 +87,11 @@
@Override
public void open() throws HyracksDataException {
- accessor = new FrameTupleAccessor(treeIndexOpHelper.getHyracksTaskContext().getFrameSize(), recDesc);
+ accessor = new FrameTupleAccessor(treeIndexOpHelper.getTaskContext().getFrameSize(), recDesc);
try {
- treeIndexOpHelper.init(false);
- btree = (BTree) treeIndexOpHelper.getIndex();
+ treeIndexOpHelper.open();
+ btree = (BTree) treeIndexOpHelper.getIndexInstance();
writer.open();
int lowKeySearchFields = btree.getComparatorFactories().length;
@@ -118,15 +119,15 @@
rangePred = new RangePredicate(null, null, lowKeyInclusive, highKeyInclusive, lowKeySearchCmp,
highKeySearchCmp);
- writeBuffer = treeIndexOpHelper.getHyracksTaskContext().allocateFrame();
+ writeBuffer = treeIndexOpHelper.getTaskContext().allocateFrame();
tb = new ArrayTupleBuilder(inputRecDesc.getFields().length + btree.getFieldCount());
dos = tb.getDataOutput();
- appender = new FrameTupleAppender(treeIndexOpHelper.getHyracksTaskContext().getFrameSize());
+ appender = new FrameTupleAppender(treeIndexOpHelper.getTaskContext().getFrameSize());
appender.reset(writeBuffer, true);
- indexAccessor = btree.createAccessor();
+ indexAccessor = btree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
setCursor();
} catch (Exception e) {
- treeIndexOpHelper.deinit();
+ treeIndexOpHelper.close();
throw new HyracksDataException(e);
}
}
@@ -196,7 +197,7 @@
throw new HyracksDataException(e);
}
} finally {
- treeIndexOpHelper.deinit();
+ treeIndexOpHelper.close();
}
}
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
index f7b3d62..5ca5382 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
@@ -38,9 +38,10 @@
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.PermutingFrameTupleReference;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
@@ -116,11 +117,11 @@
* function open
*/
functionProxy.functionOpen();
- accessor = new FrameTupleAccessor(treeIndexOpHelper.getHyracksTaskContext().getFrameSize(), recDesc);
+ accessor = new FrameTupleAccessor(treeIndexOpHelper.getTaskContext().getFrameSize(), recDesc);
try {
- treeIndexOpHelper.init(false);
- btree = (BTree) treeIndexOpHelper.getIndex();
+ treeIndexOpHelper.open();
+ btree = (BTree) treeIndexOpHelper.getIndexInstance();
cursorFrame = btree.getLeafFrameFactory().createFrame();
setCursor();
@@ -147,7 +148,7 @@
rangePred = new RangePredicate(null, null, true, true, lowKeySearchCmp, highKeySearchCmp);
- writeBuffer = treeIndexOpHelper.getHyracksTaskContext().allocateFrame();
+ writeBuffer = treeIndexOpHelper.getTaskContext().allocateFrame();
nullTupleBuilder = new ArrayTupleBuilder(inputRecDesc.getFields().length);
dos = nullTupleBuilder.getDataOutput();
@@ -157,10 +158,10 @@
nullTupleBuilder.addFieldEndOffset();
}
- appender = new FrameTupleAppender(treeIndexOpHelper.getHyracksTaskContext().getFrameSize());
+ appender = new FrameTupleAppender(treeIndexOpHelper.getTaskContext().getFrameSize());
appender.reset(writeBuffer, true);
- indexAccessor = btree.createAccessor();
+ indexAccessor = btree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
/** set the search cursor */
rangePred.setLowKey(null, true);
@@ -178,7 +179,7 @@
cloneUpdateTb = new ArrayTupleBuilder(btree.getFieldCount());
updateBuffer.setFieldCount(btree.getFieldCount());
} catch (Exception e) {
- treeIndexOpHelper.deinit();
+ treeIndexOpHelper.close();
throw new HyracksDataException(e);
}
}
@@ -273,7 +274,7 @@
} catch (Exception e) {
throw new HyracksDataException(e);
} finally {
- treeIndexOpHelper.deinit();
+ treeIndexOpHelper.close();
}
}
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinOperatorNodePushable.java
index 9f1e1ad..d7c5d1f 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinOperatorNodePushable.java
@@ -38,9 +38,10 @@
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.PermutingFrameTupleReference;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
public class IndexNestedLoopRightOuterJoinOperatorNodePushable extends
AbstractUnaryInputUnaryOutputOperatorNodePushable {
@@ -97,10 +98,10 @@
@Override
public void open() throws HyracksDataException {
- accessor = new FrameTupleAccessor(treeIndexOpHelper.getHyracksTaskContext().getFrameSize(), recDesc);
+ accessor = new FrameTupleAccessor(treeIndexOpHelper.getTaskContext().getFrameSize(), recDesc);
try {
- treeIndexOpHelper.init(false);
- btree = (BTree) treeIndexOpHelper.getIndex();
+ treeIndexOpHelper.open();
+ btree = (BTree) treeIndexOpHelper.getIndexInstance();
cursorFrame = btree.getLeafFrameFactory().createFrame();
setCursor();
writer.open();
@@ -129,13 +130,13 @@
rangePred = new RangePredicate(null, null, true, true, lowKeySearchCmp, highKeySearchCmp);
- writeBuffer = treeIndexOpHelper.getHyracksTaskContext().allocateFrame();
+ writeBuffer = treeIndexOpHelper.getTaskContext().allocateFrame();
tb = new ArrayTupleBuilder(inputRecDesc.getFields().length + btree.getFieldCount());
dos = tb.getDataOutput();
- appender = new FrameTupleAppender(treeIndexOpHelper.getHyracksTaskContext().getFrameSize());
+ appender = new FrameTupleAppender(treeIndexOpHelper.getTaskContext().getFrameSize());
appender.reset(writeBuffer, true);
- indexAccessor = btree.createAccessor();
+ indexAccessor = btree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
/** set the search cursor */
rangePred.setLowKey(null, true);
@@ -151,7 +152,7 @@
}
} catch (Exception e) {
- treeIndexOpHelper.deinit();
+ treeIndexOpHelper.close();
throw new HyracksDataException(e);
}
}
@@ -243,7 +244,7 @@
} catch (Exception e) {
throw new HyracksDataException(e);
} finally {
- treeIndexOpHelper.deinit();
+ treeIndexOpHelper.close();
}
}
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
index 6af60a8..160324e 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
@@ -36,9 +36,10 @@
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.PermutingFrameTupleReference;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
@@ -104,11 +105,11 @@
@Override
public void open() throws HyracksDataException {
functionProxy.functionOpen();
- accessor = new FrameTupleAccessor(treeIndexOpHelper.getHyracksTaskContext().getFrameSize(), recDesc);
+ accessor = new FrameTupleAccessor(treeIndexOpHelper.getTaskContext().getFrameSize(), recDesc);
try {
- treeIndexOpHelper.init(false);
- btree = (BTree) treeIndexOpHelper.getIndex();
+ treeIndexOpHelper.open();
+ btree = (BTree) treeIndexOpHelper.getIndexInstance();
cursorFrame = btree.getLeafFrameFactory().createFrame();
setCursor();
@@ -120,11 +121,11 @@
}
lowKeySearchCmp = new MultiComparator(lowKeySearchComparators);
- writeBuffer = treeIndexOpHelper.getHyracksTaskContext().allocateFrame();
- appender = new FrameTupleAppender(treeIndexOpHelper.getHyracksTaskContext().getFrameSize());
+ writeBuffer = treeIndexOpHelper.getTaskContext().allocateFrame();
+ appender = new FrameTupleAppender(treeIndexOpHelper.getTaskContext().getFrameSize());
appender.reset(writeBuffer, true);
- indexAccessor = btree.createAccessor();
+ indexAccessor = btree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
/** set the search cursor */
rangePred.setLowKey(null, true);
@@ -141,7 +142,7 @@
cloneUpdateTb = new ArrayTupleBuilder(btree.getFieldCount());
updateBuffer.setFieldCount(btree.getFieldCount());
} catch (Exception e) {
- treeIndexOpHelper.deinit();
+ treeIndexOpHelper.close();
throw new HyracksDataException(e);
}
}
@@ -214,7 +215,7 @@
} catch (Exception e) {
throw new HyracksDataException(e);
} finally {
- treeIndexOpHelper.deinit();
+ treeIndexOpHelper.close();
}
}
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionOperatorNodePushable.java
index 615a25b..579935b 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionOperatorNodePushable.java
@@ -37,9 +37,10 @@
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.PermutingFrameTupleReference;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
public class IndexNestedLoopSetUnionOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
private TreeIndexDataflowHelper treeIndexOpHelper;
@@ -90,11 +91,11 @@
@Override
public void open() throws HyracksDataException {
- accessor = new FrameTupleAccessor(treeIndexOpHelper.getHyracksTaskContext().getFrameSize(), recDesc);
+ accessor = new FrameTupleAccessor(treeIndexOpHelper.getTaskContext().getFrameSize(), recDesc);
try {
- treeIndexOpHelper.init(false);
- btree = (BTree) treeIndexOpHelper.getIndex();
+ treeIndexOpHelper.open();
+ btree = (BTree) treeIndexOpHelper.getIndexInstance();
cursorFrame = btree.getLeafFrameFactory().createFrame();
setCursor();
writer.open();
@@ -107,13 +108,13 @@
}
lowKeySearchCmp = new MultiComparator(lowKeySearchComparators);
- writeBuffer = treeIndexOpHelper.getHyracksTaskContext().allocateFrame();
+ writeBuffer = treeIndexOpHelper.getTaskContext().allocateFrame();
tb = new ArrayTupleBuilder(btree.getFieldCount());
dos = tb.getDataOutput();
- appender = new FrameTupleAppender(treeIndexOpHelper.getHyracksTaskContext().getFrameSize());
+ appender = new FrameTupleAppender(treeIndexOpHelper.getTaskContext().getFrameSize());
appender.reset(writeBuffer, true);
- indexAccessor = btree.createAccessor();
+ indexAccessor = btree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
/** set the search cursor */
rangePred.setLowKey(null, true);
@@ -129,7 +130,7 @@
}
} catch (Exception e) {
- treeIndexOpHelper.deinit();
+ treeIndexOpHelper.close();
throw new HyracksDataException(e);
}
}
@@ -203,7 +204,7 @@
} catch (Exception e) {
throw new HyracksDataException(e);
} finally {
- treeIndexOpHelper.deinit();
+ treeIndexOpHelper.close();
}
}
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorDescriptor.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorDescriptor.java
index 126fcb8..eb5ece6 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorDescriptor.java
@@ -22,12 +22,12 @@
import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+import edu.uci.ics.hyracks.storage.common.file.TransientLocalResourceFactoryProvider;
public class TreeIndexBulkReLoadOperatorDescriptor extends AbstractTreeIndexOperatorDescriptor {
@@ -35,20 +35,21 @@
private final int[] fieldPermutation;
private final IStorageManagerInterface storageManager;
- private final IIndexRegistryProvider<IIndex> treeIndexRegistryProvider;
+ private final IIndexLifecycleManagerProvider lcManagerProvider;
private final IFileSplitProvider fileSplitProvider;
private final float fillFactor;
public TreeIndexBulkReLoadOperatorDescriptor(JobSpecification spec, IStorageManagerInterface storageManager,
- IIndexRegistryProvider<IIndex> treeIndexRegistryProvider, IFileSplitProvider fileSplitProvider,
+ IIndexLifecycleManagerProvider lcManagerProvider, IFileSplitProvider fileSplitProvider,
ITypeTraits[] typeTraits, IBinaryComparatorFactory[] comparatorFactories, int[] fieldPermutation,
float fillFactor, IIndexDataflowHelperFactory opHelperFactory) {
- super(spec, 1, 0, null, storageManager, treeIndexRegistryProvider, fileSplitProvider, typeTraits,
- comparatorFactories, opHelperFactory, null, false, NoOpOperationCallbackProvider.INSTANCE);
+ super(spec, 1, 0, null, storageManager, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories,
+ fieldPermutation, opHelperFactory, null, false, new TransientLocalResourceFactoryProvider(),
+ NoOpOperationCallbackFactory.INSTANCE, NoOpOperationCallbackFactory.INSTANCE);
this.fieldPermutation = fieldPermutation;
this.storageManager = storageManager;
- this.treeIndexRegistryProvider = treeIndexRegistryProvider;
+ this.lcManagerProvider = lcManagerProvider;
this.fileSplitProvider = fileSplitProvider;
this.fillFactor = fillFactor;
}
@@ -57,6 +58,6 @@
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
return new TreeIndexBulkReLoadOperatorNodePushable(this, ctx, partition, fieldPermutation, fillFactor,
- recordDescProvider, storageManager, treeIndexRegistryProvider, fileSplitProvider);
+ recordDescProvider, storageManager, lcManagerProvider, fileSplitProvider);
}
}
\ No newline at end of file
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorNodePushable.java
index 883fef4..5e089a5 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorNodePushable.java
@@ -20,111 +20,54 @@
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
-import edu.uci.ics.hyracks.storage.am.common.api.IIndexBulkLoadContext;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexBulkLoader;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexRegistry;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.PermutingFrameTupleReference;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
-import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
-import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
public class TreeIndexBulkReLoadOperatorNodePushable extends AbstractUnaryInputSinkOperatorNodePushable {
- private final TreeIndexDataflowHelper treeIndexOpHelper;
- private FrameTupleAccessor accessor;
- private IIndexBulkLoadContext bulkLoadCtx;
-
- private IRecordDescriptorProvider recordDescProvider;
- private PermutingFrameTupleReference tuple = new PermutingFrameTupleReference();
-
- private final IStorageManagerInterface storageManager;
- private final IIndexRegistryProvider<IIndex> treeIndexRegistryProvider;
- private final IFileSplitProvider fileSplitProvider;
- private final int partition;
private final float fillFactor;
- private IHyracksTaskContext ctx;
+ private final TreeIndexDataflowHelper treeIndexOpHelper;
+ private final IIndexOperatorDescriptor opDesc;
+ private final IRecordDescriptorProvider recordDescProvider;
+ private final PermutingFrameTupleReference tuple = new PermutingFrameTupleReference();
+
private ITreeIndex index;
+ private FrameTupleAccessor accessor;
+ private IIndexBulkLoader bulkLoader;
public TreeIndexBulkReLoadOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
int partition, int[] fieldPermutation, float fillFactor, IRecordDescriptorProvider recordDescProvider,
- IStorageManagerInterface storageManager, IIndexRegistryProvider<IIndex> treeIndexRegistryProvider,
+ IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lcManagerProvider,
IFileSplitProvider fileSplitProvider) {
+ this.fillFactor = fillFactor;
treeIndexOpHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
opDesc, ctx, partition);
+ this.opDesc = opDesc;
this.recordDescProvider = recordDescProvider;
tuple.setFieldPermutation(fieldPermutation);
-
- this.storageManager = storageManager;
- this.treeIndexRegistryProvider = treeIndexRegistryProvider;
- this.fileSplitProvider = fileSplitProvider;
- this.partition = partition;
- this.ctx = ctx;
- this.fillFactor = fillFactor;
}
@Override
public void open() throws HyracksDataException {
- initDrop();
- init();
- }
-
- private void initDrop() throws HyracksDataException {
- try {
- IndexRegistry<IIndex> treeIndexRegistry = treeIndexRegistryProvider.getRegistry(ctx);
- IBufferCache bufferCache = storageManager.getBufferCache(ctx);
- IFileMapProvider fileMapProvider = storageManager.getFileMapProvider(ctx);
-
- FileReference f = fileSplitProvider.getFileSplits()[partition].getLocalFile();
- int indexFileId = -1;
- boolean fileIsMapped = false;
- synchronized (fileMapProvider) {
- fileIsMapped = fileMapProvider.isMapped(f);
- if (fileIsMapped)
- indexFileId = fileMapProvider.lookupFileId(f);
- }
-
- /**
- * delete the file if it is mapped
- */
- if (fileIsMapped) {
- // Unregister tree instance.
- synchronized (treeIndexRegistry) {
- treeIndexRegistry.unregister(indexFileId);
- }
-
- // remove name to id mapping
- bufferCache.deleteFile(indexFileId, false);
- }
- }
- // TODO: for the time being we don't throw,
- // with proper exception handling (no hanging job problem) we should
- // throw
- catch (Exception e) {
- throw new HyracksDataException(e);
- }
- }
-
- private void init() throws HyracksDataException {
- AbstractTreeIndexOperatorDescriptor opDesc = (AbstractTreeIndexOperatorDescriptor) treeIndexOpHelper
- .getOperatorDescriptor();
RecordDescriptor recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
- accessor = new FrameTupleAccessor(treeIndexOpHelper.getHyracksTaskContext().getFrameSize(), recDesc);
+ accessor = new FrameTupleAccessor(treeIndexOpHelper.getTaskContext().getFrameSize(), recDesc);
+ treeIndexOpHelper.create();
+ treeIndexOpHelper.open();
try {
- treeIndexOpHelper.init(true);
- treeIndexOpHelper.getIndex().open(treeIndexOpHelper.getIndexFileId());
- index = (ITreeIndex) treeIndexOpHelper.getIndex();
- index.open(treeIndexOpHelper.getIndexFileId());
- bulkLoadCtx = index.beginBulkLoad(fillFactor);
+ index = (ITreeIndex) treeIndexOpHelper.getIndexInstance();
+ bulkLoader = index.createBulkLoader(fillFactor, false, 0);
} catch (Exception e) {
// cleanup in case of failure
- treeIndexOpHelper.deinit();
+ treeIndexOpHelper.close();
throw new HyracksDataException(e);
}
}
@@ -135,16 +78,22 @@
int tupleCount = accessor.getTupleCount();
for (int i = 0; i < tupleCount; i++) {
tuple.reset(accessor, i);
- index.bulkLoadAddTuple(tuple, bulkLoadCtx);
+ try {
+ bulkLoader.add(tuple);
+ } catch (IndexException e) {
+ throw new HyracksDataException(e);
+ }
}
}
@Override
public void close() throws HyracksDataException {
try {
- index.endBulkLoad(bulkLoadCtx);
+ bulkLoader.end();
+ } catch (IndexException e) {
+ throw new HyracksDataException(e);
} finally {
- treeIndexOpHelper.deinit();
+ treeIndexOpHelper.close();
}
}
diff --git a/pregelix/pregelix-dataflow/pom.xml b/pregelix/pregelix-dataflow/pom.xml
index ffc000b..37abe57 100644
--- a/pregelix/pregelix-dataflow/pom.xml
+++ b/pregelix/pregelix-dataflow/pom.xml
@@ -42,8 +42,9 @@
</configuration>
</plugin>
<plugin>
+ <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-clean-plugin</artifactId>
- <version>2.5</version>
+ <version>2.4.1</version>
<configuration>
<filesets>
<fileset>
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
index 567e220..8d6ab38 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
@@ -28,8 +28,8 @@
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
import edu.uci.ics.hyracks.control.nc.io.IOManager;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexRegistry;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexLifecycleManager;
import edu.uci.ics.hyracks.storage.common.buffercache.BufferCache;
import edu.uci.ics.hyracks.storage.common.buffercache.ClockPageReplacementStrategy;
import edu.uci.ics.hyracks.storage.common.buffercache.HeapBufferAllocator;
@@ -38,20 +38,25 @@
import edu.uci.ics.hyracks.storage.common.buffercache.IPageReplacementStrategy;
import edu.uci.ics.hyracks.storage.common.file.IFileMapManager;
import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
-import edu.uci.ics.hyracks.storage.common.smi.TransientFileMapManager;
+import edu.uci.ics.hyracks.storage.common.file.ILocalResourceRepository;
+import edu.uci.ics.hyracks.storage.common.file.ResourceIdFactory;
+import edu.uci.ics.hyracks.storage.common.file.TransientFileMapManager;
+import edu.uci.ics.hyracks.storage.common.file.TransientLocalResourceRepository;
import edu.uci.ics.pregelix.api.graph.Vertex;
public class RuntimeContext implements IWorkspaceFileFactory {
private static final Logger LOGGER = Logger.getLogger(RuntimeContext.class.getName());
- private IndexRegistry<IIndex> treeIndexRegistry;
- private IBufferCache bufferCache;
- private IFileMapManager fileMapManager;
- private Map<StateKey, IStateObject> appStateMap = new ConcurrentHashMap<StateKey, IStateObject>();
- private Map<String, Long> giraphJobIdToSuperStep = new ConcurrentHashMap<String, Long>();
- private Map<String, Boolean> giraphJobIdToMove = new ConcurrentHashMap<String, Boolean>();
- private IOManager ioManager;
- private Map<Long, List<FileReference>> iterationToFiles = new ConcurrentHashMap<Long, List<FileReference>>();
+ private final IIndexLifecycleManager lcManager;
+ private final ILocalResourceRepository localResourceRepository;
+ private final ResourceIdFactory resourceIdFactory;
+ private final IBufferCache bufferCache;
+ private final IFileMapManager fileMapManager;
+ private final Map<StateKey, IStateObject> appStateMap = new ConcurrentHashMap<StateKey, IStateObject>();
+ private final Map<String, Long> giraphJobIdToSuperStep = new ConcurrentHashMap<String, Long>();
+ private final Map<String, Boolean> giraphJobIdToMove = new ConcurrentHashMap<String, Boolean>();
+ private final IOManager ioManager;
+ private final Map<Long, List<FileReference>> iterationToFiles = new ConcurrentHashMap<Long, List<FileReference>>();
public RuntimeContext(INCApplicationContext appCtx) {
fileMapManager = new TransientFileMapManager();
@@ -64,8 +69,10 @@
/** let the buffer cache never flush dirty pages */
bufferCache = new BufferCache(appCtx.getRootContext().getIOManager(), allocator, prs,
new PreDelayPageCleanerPolicy(Long.MAX_VALUE), fileMapManager, pageSize, numPages, 1000000);
- treeIndexRegistry = new IndexRegistry<IIndex>();
ioManager = (IOManager) appCtx.getRootContext().getIOManager();
+ lcManager = new IndexLifecycleManager();
+ localResourceRepository = new TransientLocalResourceRepository();
+ resourceIdFactory = new ResourceIdFactory(0);
}
public void close() {
@@ -80,6 +87,18 @@
System.gc();
}
+ public ILocalResourceRepository getLocalResourceRepository() {
+ return localResourceRepository;
+ }
+
+ public ResourceIdFactory getResourceIdFactory() {
+ return resourceIdFactory;
+ }
+
+ public IIndexLifecycleManager getIndexLifecycleManager() {
+ return lcManager;
+ }
+
public IBufferCache getBufferCache() {
return bufferCache;
}
@@ -88,10 +107,6 @@
return fileMapManager;
}
- public IndexRegistry<IIndex> getTreeIndexRegistry() {
- return treeIndexRegistry;
- }
-
public Map<StateKey, IStateObject> getAppStateStore() {
return appStateMap;
}
diff --git a/pregelix/pregelix-example/pom.xml b/pregelix/pregelix-example/pom.xml
index beefee2..20c45ec 100644
--- a/pregelix/pregelix-example/pom.xml
+++ b/pregelix/pregelix-example/pom.xml
@@ -24,6 +24,7 @@
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
+ <version>2.2-beta-5</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
@@ -42,7 +43,7 @@
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>appassembler-maven-plugin</artifactId>
- <version>1.3</version>
+ <version>1.3</version>
<executions>
<execution>
<configuration>
@@ -79,7 +80,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-clean-plugin</artifactId>
- <version>2.5</version>
+ <version>2.4.1</version>
<configuration>
<filesets>
<fileset>
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/dataload/DataLoadTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/dataload/DataLoadTest.java
index 321b5b2..da6d564 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/dataload/DataLoadTest.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/dataload/DataLoadTest.java
@@ -76,8 +76,7 @@
ClusterConfig.setStorePath(PATH_TO_CLUSTER_STORE);
ClusterConfig.setClusterPropertiesPath(PATH_TO_CLUSTER_PROPERTIES);
cleanupStores();
- PregelixHyracksIntegrationUtil.init("src/test/resources/topology.xml");
- PregelixHyracksIntegrationUtil.createApp(HYRACKS_APP_NAME);
+ PregelixHyracksIntegrationUtil.init();
LOGGER.info("Hyracks mini-cluster started");
startHDFS();
FileUtils.forceMkdir(new File(EXPECT_RESULT_DIR));
@@ -112,7 +111,6 @@
}
public void tearDown() throws Exception {
- PregelixHyracksIntegrationUtil.destroyApp(HYRACKS_APP_NAME);
PregelixHyracksIntegrationUtil.deinit();
LOGGER.info("Hyracks mini-cluster shut down");
cleanupHDFS();
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestSuite.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestSuite.java
index fa98ebd..4bf83e6 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestSuite.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestSuite.java
@@ -41,176 +41,166 @@
@SuppressWarnings("deprecation")
public class RunJobTestSuite extends TestSuite {
- private static final Logger LOGGER = Logger.getLogger(RunJobTestSuite.class
- .getName());
+ private static final Logger LOGGER = Logger.getLogger(RunJobTestSuite.class.getName());
- private static final String ACTUAL_RESULT_DIR = "actual";
- private static final String EXPECTED_RESULT_DIR = "src/test/resources/expected";
- private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
- private static final String PATH_TO_CLUSTER_STORE = "src/test/resources/cluster/stores.properties";
- private static final String PATH_TO_CLUSTER_PROPERTIES = "src/test/resources/cluster/cluster.properties";
- private static final String PATH_TO_JOBS = "src/test/resources/jobs/";
- private static final String PATH_TO_IGNORE = "src/test/resources/ignore.txt";
- private static final String PATH_TO_ONLY = "src/test/resources/only.txt";
- private static final String FILE_EXTENSION_OF_RESULTS = "result";
+ private static final String ACTUAL_RESULT_DIR = "actual";
+ private static final String EXPECTED_RESULT_DIR = "src/test/resources/expected";
+ private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
+ private static final String PATH_TO_CLUSTER_STORE = "src/test/resources/cluster/stores.properties";
+ private static final String PATH_TO_CLUSTER_PROPERTIES = "src/test/resources/cluster/cluster.properties";
+ private static final String PATH_TO_JOBS = "src/test/resources/jobs/";
+ private static final String PATH_TO_IGNORE = "src/test/resources/ignore.txt";
+ private static final String PATH_TO_ONLY = "src/test/resources/only.txt";
+ private static final String FILE_EXTENSION_OF_RESULTS = "result";
- private static final String DATA_PATH = "data/webmap/webmap_link.txt";
- private static final String HDFS_PATH = "/webmap/";
+ private static final String DATA_PATH = "data/webmap/webmap_link.txt";
+ private static final String HDFS_PATH = "/webmap/";
- private static final String DATA_PATH2 = "data/webmapcomplex/webmap_link.txt";
- private static final String HDFS_PATH2 = "/webmapcomplex/";
+ private static final String DATA_PATH2 = "data/webmapcomplex/webmap_link.txt";
+ private static final String HDFS_PATH2 = "/webmapcomplex/";
- private static final String DATA_PATH3 = "data/clique/clique.txt";
- private static final String HDFS_PATH3 = "/clique/";
+ private static final String DATA_PATH3 = "data/clique/clique.txt";
+ private static final String HDFS_PATH3 = "/clique/";
- private static final String HYRACKS_APP_NAME = "pregelix";
- private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR
- + File.separator + "conf.xml";
- private MiniDFSCluster dfsCluster;
+ private static final String HYRACKS_APP_NAME = "pregelix";
+ private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";
+ private MiniDFSCluster dfsCluster;
- private JobConf conf = new JobConf();
- private int numberOfNC = 2;
+ private JobConf conf = new JobConf();
+ private int numberOfNC = 2;
- public void setUp() throws Exception {
- ClusterConfig.setStorePath(PATH_TO_CLUSTER_STORE);
- ClusterConfig.setClusterPropertiesPath(PATH_TO_CLUSTER_PROPERTIES);
- cleanupStores();
- PregelixHyracksIntegrationUtil.init("src/test/resources/topology.xml");
- PregelixHyracksIntegrationUtil.createApp(HYRACKS_APP_NAME);
- LOGGER.info("Hyracks mini-cluster started");
- FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
- FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
- startHDFS();
- }
+ public void setUp() throws Exception {
+ ClusterConfig.setStorePath(PATH_TO_CLUSTER_STORE);
+ ClusterConfig.setClusterPropertiesPath(PATH_TO_CLUSTER_PROPERTIES);
+ cleanupStores();
+ PregelixHyracksIntegrationUtil.init();
+ LOGGER.info("Hyracks mini-cluster started");
+ FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
+ FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
+ startHDFS();
+ }
- private void cleanupStores() throws IOException {
- FileUtils.forceMkdir(new File("teststore"));
- FileUtils.forceMkdir(new File("build"));
- FileUtils.cleanDirectory(new File("teststore"));
- FileUtils.cleanDirectory(new File("build"));
- }
+ private void cleanupStores() throws IOException {
+ FileUtils.forceMkdir(new File("teststore"));
+ FileUtils.forceMkdir(new File("build"));
+ FileUtils.cleanDirectory(new File("teststore"));
+ FileUtils.cleanDirectory(new File("build"));
+ }
- private void startHDFS() throws IOException {
- conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/core-site.xml"));
- conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/mapred-site.xml"));
- conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/hdfs-site.xml"));
- FileSystem lfs = FileSystem.getLocal(new Configuration());
- lfs.delete(new Path("build"), true);
- System.setProperty("hadoop.log.dir", "logs");
- dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
- FileSystem dfs = FileSystem.get(conf);
- Path src = new Path(DATA_PATH);
- Path dest = new Path(HDFS_PATH);
- dfs.mkdirs(dest);
- dfs.copyFromLocalFile(src, dest);
+ private void startHDFS() throws IOException {
+ conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/core-site.xml"));
+ conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/mapred-site.xml"));
+ conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/hdfs-site.xml"));
+ FileSystem lfs = FileSystem.getLocal(new Configuration());
+ lfs.delete(new Path("build"), true);
+ System.setProperty("hadoop.log.dir", "logs");
+ dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
+ FileSystem dfs = FileSystem.get(conf);
+ Path src = new Path(DATA_PATH);
+ Path dest = new Path(HDFS_PATH);
+ dfs.mkdirs(dest);
+ dfs.copyFromLocalFile(src, dest);
- src = new Path(DATA_PATH2);
- dest = new Path(HDFS_PATH2);
- dfs.mkdirs(dest);
- dfs.copyFromLocalFile(src, dest);
+ src = new Path(DATA_PATH2);
+ dest = new Path(HDFS_PATH2);
+ dfs.mkdirs(dest);
+ dfs.copyFromLocalFile(src, dest);
- src = new Path(DATA_PATH3);
- dest = new Path(HDFS_PATH3);
- dfs.mkdirs(dest);
- dfs.copyFromLocalFile(src, dest);
+ src = new Path(DATA_PATH3);
+ dest = new Path(HDFS_PATH3);
+ dfs.mkdirs(dest);
+ dfs.copyFromLocalFile(src, dest);
- DataOutputStream confOutput = new DataOutputStream(
- new FileOutputStream(new File(HADOOP_CONF_PATH)));
- conf.writeXml(confOutput);
- confOutput.flush();
- confOutput.close();
- }
+ DataOutputStream confOutput = new DataOutputStream(new FileOutputStream(new File(HADOOP_CONF_PATH)));
+ conf.writeXml(confOutput);
+ confOutput.flush();
+ confOutput.close();
+ }
- /**
- * cleanup hdfs cluster
- */
- private void cleanupHDFS() throws Exception {
- dfsCluster.shutdown();
- }
+ /**
+ * cleanup hdfs cluster
+ */
+ private void cleanupHDFS() throws Exception {
+ dfsCluster.shutdown();
+ }
- public void tearDown() throws Exception {
- PregelixHyracksIntegrationUtil.destroyApp(HYRACKS_APP_NAME);
- PregelixHyracksIntegrationUtil.deinit();
- LOGGER.info("Hyracks mini-cluster shut down");
- cleanupHDFS();
- }
+ public void tearDown() throws Exception {
+ PregelixHyracksIntegrationUtil.deinit();
+ LOGGER.info("Hyracks mini-cluster shut down");
+ cleanupHDFS();
+ }
- public static Test suite() throws Exception {
- List<String> ignores = getFileList(PATH_TO_IGNORE);
- List<String> onlys = getFileList(PATH_TO_ONLY);
- File testData = new File(PATH_TO_JOBS);
- File[] queries = testData.listFiles();
- RunJobTestSuite testSuite = new RunJobTestSuite();
- testSuite.setUp();
- boolean onlyEnabled = false;
+ public static Test suite() throws Exception {
+ List<String> ignores = getFileList(PATH_TO_IGNORE);
+ List<String> onlys = getFileList(PATH_TO_ONLY);
+ File testData = new File(PATH_TO_JOBS);
+ File[] queries = testData.listFiles();
+ RunJobTestSuite testSuite = new RunJobTestSuite();
+ testSuite.setUp();
+ boolean onlyEnabled = false;
- if (onlys.size() > 0) {
- onlyEnabled = true;
- }
- for (File qFile : queries) {
- if (isInList(ignores, qFile.getName()))
- continue;
+ if (onlys.size() > 0) {
+ onlyEnabled = true;
+ }
+ for (File qFile : queries) {
+ if (isInList(ignores, qFile.getName()))
+ continue;
- if (qFile.isFile()) {
- if (onlyEnabled && !isInList(onlys, qFile.getName())) {
- continue;
- } else {
- String resultFileName = ACTUAL_RESULT_DIR + File.separator
- + jobExtToResExt(qFile.getName());
- String expectedFileName = EXPECTED_RESULT_DIR
- + File.separator + jobExtToResExt(qFile.getName());
- testSuite.addTest(new RunJobTestCase(HADOOP_CONF_PATH,
- qFile.getName(),
- qFile.getAbsolutePath().toString(), resultFileName,
- expectedFileName));
- }
- }
- }
- return testSuite;
- }
+ if (qFile.isFile()) {
+ if (onlyEnabled && !isInList(onlys, qFile.getName())) {
+ continue;
+ } else {
+ String resultFileName = ACTUAL_RESULT_DIR + File.separator + jobExtToResExt(qFile.getName());
+ String expectedFileName = EXPECTED_RESULT_DIR + File.separator + jobExtToResExt(qFile.getName());
+ testSuite.addTest(new RunJobTestCase(HADOOP_CONF_PATH, qFile.getName(), qFile.getAbsolutePath()
+ .toString(), resultFileName, expectedFileName));
+ }
+ }
+ }
+ return testSuite;
+ }
- /**
- * Runs the tests and collects their result in a TestResult.
- */
- @Override
- public void run(TestResult result) {
- try {
- int testCount = countTestCases();
- for (int i = 0; i < testCount; i++) {
- // cleanupStores();
- Test each = this.testAt(i);
- if (result.shouldStop())
- break;
- runTest(each, result);
- }
- tearDown();
- } catch (Exception e) {
- throw new IllegalStateException(e);
- }
- }
+ /**
+ * Runs the tests and collects their result in a TestResult.
+ */
+ @Override
+ public void run(TestResult result) {
+ try {
+ int testCount = countTestCases();
+ for (int i = 0; i < testCount; i++) {
+ // cleanupStores();
+ Test each = this.testAt(i);
+ if (result.shouldStop())
+ break;
+ runTest(each, result);
+ }
+ tearDown();
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
- protected static List<String> getFileList(String ignorePath)
- throws FileNotFoundException, IOException {
- BufferedReader reader = new BufferedReader(new FileReader(ignorePath));
- String s = null;
- List<String> ignores = new ArrayList<String>();
- while ((s = reader.readLine()) != null) {
- ignores.add(s);
- }
- reader.close();
- return ignores;
- }
+ protected static List<String> getFileList(String ignorePath) throws FileNotFoundException, IOException {
+ BufferedReader reader = new BufferedReader(new FileReader(ignorePath));
+ String s = null;
+ List<String> ignores = new ArrayList<String>();
+ while ((s = reader.readLine()) != null) {
+ ignores.add(s);
+ }
+ reader.close();
+ return ignores;
+ }
- private static String jobExtToResExt(String fname) {
- int dot = fname.lastIndexOf('.');
- return fname.substring(0, dot + 1) + FILE_EXTENSION_OF_RESULTS;
- }
+ private static String jobExtToResExt(String fname) {
+ int dot = fname.lastIndexOf('.');
+ return fname.substring(0, dot + 1) + FILE_EXTENSION_OF_RESULTS;
+ }
- private static boolean isInList(List<String> onlys, String name) {
- for (String only : onlys)
- if (name.indexOf(only) >= 0)
- return true;
- return false;
- }
+ private static boolean isInList(List<String> onlys, String name) {
+ for (String only : onlys)
+ if (name.indexOf(only) >= 0)
+ return true;
+ return false;
+ }
}
diff --git a/pregelix/pregelix-runtime/pom.xml b/pregelix/pregelix-runtime/pom.xml
index bce7b12..29b6ba7 100644
--- a/pregelix/pregelix-runtime/pom.xml
+++ b/pregelix/pregelix-runtime/pom.xml
@@ -42,8 +42,9 @@
</configuration>
</plugin>
<plugin>
+ <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-clean-plugin</artifactId>
- <version>2.5</version>
+ <version>2.4.1</version>
<configuration>
<filesets>
<fileset>
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/IndexLifeCycleManagerProvider.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/IndexLifeCycleManagerProvider.java
new file mode 100644
index 0000000..4fce6b3
--- /dev/null
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/IndexLifeCycleManagerProvider.java
@@ -0,0 +1,22 @@
+package edu.uci.ics.pregelix.runtime.bootstrap;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
+import edu.uci.ics.pregelix.dataflow.context.RuntimeContext;
+
+public class IndexLifeCycleManagerProvider implements IIndexLifecycleManagerProvider {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final IIndexLifecycleManagerProvider INSTANCE = new IndexLifeCycleManagerProvider();
+
+ private IndexLifeCycleManagerProvider() {
+ }
+
+ @Override
+ public IIndexLifecycleManager getLifecycleManager(IHyracksTaskContext ctx) {
+ return RuntimeContext.get(ctx).getIndexLifecycleManager();
+ }
+
+}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/NCApplicationEntryPoint.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/NCApplicationEntryPoint.java
new file mode 100644
index 0000000..fbebc66
--- /dev/null
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/NCApplicationEntryPoint.java
@@ -0,0 +1,23 @@
+package edu.uci.ics.pregelix.runtime.bootstrap;
+
+import edu.uci.ics.hyracks.api.application.INCApplicationContext;
+import edu.uci.ics.hyracks.api.application.INCApplicationEntryPoint;
+import edu.uci.ics.pregelix.dataflow.context.RuntimeContext;
+
+public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
+ @Override
+ public void start(INCApplicationContext ncAppCtx, String[] args) throws Exception {
+ RuntimeContext rCtx = new RuntimeContext(ncAppCtx);
+ ncAppCtx.setApplicationObject(rCtx);
+ }
+
+ @Override
+ public void notifyStartupComplete() throws Exception {
+
+ }
+
+ @Override
+ public void stop() throws Exception {
+
+ }
+}
\ No newline at end of file
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/NCBootstrapImpl.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/NCBootstrapImpl.java
deleted file mode 100644
index 76c725e..0000000
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/NCBootstrapImpl.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.pregelix.runtime.bootstrap;
-
-import java.util.logging.Logger;
-
-import edu.uci.ics.hyracks.api.application.INCApplicationContext;
-import edu.uci.ics.hyracks.api.application.INCBootstrap;
-import edu.uci.ics.pregelix.dataflow.context.RuntimeContext;
-
-public class NCBootstrapImpl implements INCBootstrap {
- private static final Logger LOGGER = Logger.getLogger(NCBootstrapImpl.class.getName());
- private INCApplicationContext appCtx;
-
- @Override
- public void start() throws Exception {
- LOGGER.info("Starting NC Bootstrap");
- RuntimeContext rCtx = new RuntimeContext(appCtx);
- appCtx.setApplicationObject(rCtx);
- LOGGER.info("Initialized RuntimeContext: " + rCtx);
- }
-
- @Override
- public void stop() throws Exception {
- LOGGER.info("Stopping NC Bootstrap");
- RuntimeContext rCtx = (RuntimeContext) appCtx.getApplicationObject();
- rCtx.close();
- }
-
- @Override
- public void setApplicationContext(INCApplicationContext appCtx) {
- this.appCtx = appCtx;
- }
-}
\ No newline at end of file
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/StorageManagerInterface.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/StorageManagerInterface.java
index 57bbfbe..0cce59d 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/StorageManagerInterface.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/StorageManagerInterface.java
@@ -18,6 +18,8 @@
import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
+import edu.uci.ics.hyracks.storage.common.file.ILocalResourceRepository;
+import edu.uci.ics.hyracks.storage.common.file.ResourceIdFactory;
import edu.uci.ics.pregelix.dataflow.context.RuntimeContext;
public class StorageManagerInterface implements IStorageManagerInterface {
@@ -37,4 +39,14 @@
public IFileMapProvider getFileMapProvider(IHyracksTaskContext ctx) {
return RuntimeContext.get(ctx).getFileMapManager();
}
+
+ @Override
+ public ILocalResourceRepository getLocalResourceRepository(IHyracksTaskContext ctx) {
+ return RuntimeContext.get(ctx).getLocalResourceRepository();
+ }
+
+ @Override
+ public ResourceIdFactory getResourceIdFactory(IHyracksTaskContext ctx) {
+ return RuntimeContext.get(ctx).getResourceIdFactory();
+ }
}
\ No newline at end of file
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/TreeIndexRegistryProvider.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/TreeIndexRegistryProvider.java
deleted file mode 100644
index 7d66422..0000000
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/TreeIndexRegistryProvider.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.pregelix.runtime.bootstrap;
-
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexRegistry;
-import edu.uci.ics.pregelix.dataflow.context.RuntimeContext;
-
-public class TreeIndexRegistryProvider implements IIndexRegistryProvider<IIndex> {
- private static final long serialVersionUID = 1L;
-
- public static final TreeIndexRegistryProvider INSTANCE = new TreeIndexRegistryProvider();
-
- private TreeIndexRegistryProvider() {
- }
-
- @Override
- public IndexRegistry<IIndex> getRegistry(IHyracksTaskContext ctx) {
- return RuntimeContext.get(ctx).getTreeIndexRegistry();
- }
-}
\ No newline at end of file
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
index f7958d9..0c09757 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
@@ -175,8 +175,12 @@
ArrayListWritable msgContentList = (ArrayListWritable) tuple[1];
msgContentList.reset(msgIterator);
- if (!msgIterator.hasNext() && vertex.isHalted())
+ if (!msgIterator.hasNext() && vertex.isHalted()) {
return;
+ }
+ if (vertex.isHalted()) {
+ vertex.activate();
+ }
try {
vertex.compute(msgIterator);
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
index 0cf64a0..1bf6a2b 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
@@ -176,8 +176,12 @@
vertex.setOutputAppenders(appenders);
vertex.setOutputTupleBuilders(tbs);
- if (!msgIterator.hasNext() && vertex.isHalted())
+ if (!msgIterator.hasNext() && vertex.isHalted()) {
return;
+ }
+ if (vertex.isHalted()) {
+ vertex.activate();
+ }
try {
vertex.compute(msgIterator);