Merged fullstack_lsm_staging upto r3336

git-svn-id: https://hyracks.googlecode.com/svn/trunk/fullstack@3339 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix/pom.xml b/pregelix/pom.xml
index 0d2bdba..5c01e31 100644
--- a/pregelix/pom.xml
+++ b/pregelix/pom.xml
@@ -44,7 +44,7 @@
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-surefire-plugin</artifactId>
-        <version>2.13</version>
+        <version>2.12</version>
         <configuration>
             <forkMode>pertest</forkMode>
             <argLine>-enableassertions -Djava.util.logging.config.file=${user.home}/logging.properties -Xdebug -Xrunjdwp:transport=dt_socket,server=y,address=8000,suspend=n ${jvm.extraargs}</argLine>
@@ -53,16 +53,6 @@
     </plugins>
   </build>
 
-  <reporting>
-    <plugins>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-changelog-plugin</artifactId>
-        <version>2.2</version>
-      </plugin>
-    </plugins>
-  </reporting>
-
   <distributionManagement>
     <repository>
       <id>hyracks-releases</id>
diff --git a/pregelix/pregelix-api/pom.xml b/pregelix/pregelix-api/pom.xml
index b8bfce9..85f6ea2 100644
--- a/pregelix/pregelix-api/pom.xml
+++ b/pregelix/pregelix-api/pom.xml
@@ -41,9 +41,10 @@
 				</configuration>
 			</plugin>
 			<plugin>
-				<artifactId>maven-clean-plugin</artifactId>
-				<version>2.5</version>
-				<configuration>
+				<groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-clean-plugin</artifactId>
+                <version>2.4.1</version>
+                <configuration>
 					<filesets>
 						<fileset>
 							<directory>.</directory>
diff --git a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
index cd49184..e51d4bc 100644
--- a/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
+++ b/pregelix/pregelix-api/src/main/java/edu/uci/ics/pregelix/api/graph/Vertex.java
@@ -49,525 +49,524 @@
  */
 @SuppressWarnings("rawtypes")
 public abstract class Vertex<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable>
-		implements Writable {
-	private static long superstep = 0;
-	/** Class-wide number of vertices */
-	private static long numVertices = -1;
-	/** Class-wide number of edges */
-	private static long numEdges = -1;
-	/** Vertex id */
-	private I vertexId = null;
-	/** Vertex value */
-	private V vertexValue = null;
-	/** Map of destination vertices and their edge values */
-	private final List<Edge<I, E>> destEdgeList = new ArrayList<Edge<I, E>>();
-	/** If true, do not do anymore computation on this vertex. */
-	boolean halt = false;
-	/** List of incoming messages from the previous superstep */
-	private final List<M> msgList = new ArrayList<M>();
-	/** map context */
-	private static TaskAttemptContext context = null;
-	/** a delegate for hyracks stuff */
-	private VertexDelegate<I, V, E, M> delegate = new VertexDelegate<I, V, E, M>(
-			this);
-	/** this vertex is updated or not */
-	private boolean updated = false;
-	/** has outgoing messages */
-	private boolean hasMessage = false;
-	/** created new vertex */
-	private boolean createdNewLiveVertex = false;
+        implements Writable {
+    private static long superstep = 0;
+    /** Class-wide number of vertices */
+    private static long numVertices = -1;
+    /** Class-wide number of edges */
+    private static long numEdges = -1;
+    /** Vertex id */
+    private I vertexId = null;
+    /** Vertex value */
+    private V vertexValue = null;
+    /** Map of destination vertices and their edge values */
+    private final List<Edge<I, E>> destEdgeList = new ArrayList<Edge<I, E>>();
+    /** If true, do not do anymore computation on this vertex. */
+    boolean halt = false;
+    /** List of incoming messages from the previous superstep */
+    private final List<M> msgList = new ArrayList<M>();
+    /** map context */
+    private static TaskAttemptContext context = null;
+    /** a delegate for hyracks stuff */
+    private VertexDelegate<I, V, E, M> delegate = new VertexDelegate<I, V, E, M>(this);
+    /** this vertex is updated or not */
+    private boolean updated = false;
+    /** has outgoing messages */
+    private boolean hasMessage = false;
+    /** created new vertex */
+    private boolean createdNewLiveVertex = false;
 
-	/**
-	 * use object pool for re-using objects
-	 */
-	private List<Edge<I, E>> edgePool = new ArrayList<Edge<I, E>>();
-	private List<M> msgPool = new ArrayList<M>();
-	private List<V> valuePool = new ArrayList<V>();
-	private int usedEdge = 0;
-	private int usedMessage = 0;
-	private int usedValue = 0;
+    /**
+     * use object pool for re-using objects
+     */
+    private List<Edge<I, E>> edgePool = new ArrayList<Edge<I, E>>();
+    private List<M> msgPool = new ArrayList<M>();
+    private List<V> valuePool = new ArrayList<V>();
+    private int usedEdge = 0;
+    private int usedMessage = 0;
+    private int usedValue = 0;
 
-	/**
-	 * The key method that users need to implement
-	 * 
-	 * @param msgIterator
-	 *            an iterator of incoming messages
-	 */
-	public abstract void compute(Iterator<M> msgIterator);
+    /**
+     * The key method that users need to implement
+     * 
+     * @param msgIterator
+     *            an iterator of incoming messages
+     */
+    public abstract void compute(Iterator<M> msgIterator);
 
-	/**
-	 * Add an edge for the vertex.
-	 * 
-	 * @param targetVertexId
-	 * @param edgeValue
-	 * @return successful or not
-	 */
-	public final boolean addEdge(I targetVertexId, E edgeValue) {
-		Edge<I, E> edge = this.allocateEdge();
-		edge.setDestVertexId(targetVertexId);
-		edge.setEdgeValue(edgeValue);
-		destEdgeList.add(edge);
-		updated = true;
-		return true;
-	}
+    /**
+     * Add an edge for the vertex.
+     * 
+     * @param targetVertexId
+     * @param edgeValue
+     * @return successful or not
+     */
+    public final boolean addEdge(I targetVertexId, E edgeValue) {
+        Edge<I, E> edge = this.allocateEdge();
+        edge.setDestVertexId(targetVertexId);
+        edge.setEdgeValue(edgeValue);
+        destEdgeList.add(edge);
+        updated = true;
+        return true;
+    }
 
-	/**
-	 * Initialize a new vertex
-	 * 
-	 * @param vertexId
-	 * @param vertexValue
-	 * @param edges
-	 * @param messages
-	 */
-	public void initialize(I vertexId, V vertexValue, Map<I, E> edges,
-			List<M> messages) {
-		if (vertexId != null) {
-			setVertexId(vertexId);
-		}
-		if (vertexValue != null) {
-			setVertexValue(vertexValue);
-		}
-		destEdgeList.clear();
-		if (edges != null && !edges.isEmpty()) {
-			for (Map.Entry<I, E> entry : edges.entrySet()) {
-				destEdgeList.add(new Edge<I, E>(entry.getKey(), entry
-						.getValue()));
-			}
-		}
-		if (messages != null && !messages.isEmpty()) {
-			msgList.addAll(messages);
-		}
-	}
+    /**
+     * Initialize a new vertex
+     * 
+     * @param vertexId
+     * @param vertexValue
+     * @param edges
+     * @param messages
+     */
+    public void initialize(I vertexId, V vertexValue, Map<I, E> edges, List<M> messages) {
+        if (vertexId != null) {
+            setVertexId(vertexId);
+        }
+        if (vertexValue != null) {
+            setVertexValue(vertexValue);
+        }
+        destEdgeList.clear();
+        if (edges != null && !edges.isEmpty()) {
+            for (Map.Entry<I, E> entry : edges.entrySet()) {
+                destEdgeList.add(new Edge<I, E>(entry.getKey(), entry.getValue()));
+            }
+        }
+        if (messages != null && !messages.isEmpty()) {
+            msgList.addAll(messages);
+        }
+    }
 
-	/**
-	 * reset a vertex object: clear its internal states
-	 */
-	public void reset() {
-		usedEdge = 0;
-		usedMessage = 0;
-		usedValue = 0;
-		updated = false;
-	}
+    /**
+     * reset a vertex object: clear its internal states
+     */
+    public void reset() {
+        usedEdge = 0;
+        usedMessage = 0;
+        usedValue = 0;
+        updated = false;
+    }
 
-	/**
-	 * Set the vertex id
-	 * 
-	 * @param vertexId
-	 */
-	public final void setVertexId(I vertexId) {
-		this.vertexId = vertexId;
-		delegate.setVertexId(vertexId);
-	}
+    /**
+     * Set the vertex id
+     * 
+     * @param vertexId
+     */
+    public final void setVertexId(I vertexId) {
+        this.vertexId = vertexId;
+        delegate.setVertexId(vertexId);
+    }
 
-	/**
-	 * Get the vertex id
-	 * 
-	 * @return vertex id
-	 */
-	public final I getVertexId() {
-		return vertexId;
-	}
+    /**
+     * Get the vertex id
+     * 
+     * @return vertex id
+     */
+    public final I getVertexId() {
+        return vertexId;
+    }
 
-	/**
-	 * Get the vertex value
-	 * 
-	 * @return the vertex value
-	 */
-	public final V getVertexValue() {
-		return vertexValue;
-	}
+    /**
+     * Get the vertex value
+     * 
+     * @return the vertex value
+     */
+    public final V getVertexValue() {
+        return vertexValue;
+    }
 
-	/**
-	 * Set the vertex value
-	 * 
-	 * @param vertexValue
-	 */
-	public final void setVertexValue(V vertexValue) {
-		this.vertexValue = vertexValue;
-		this.updated = true;
-	}
+    /**
+     * Set the vertex value
+     * 
+     * @param vertexValue
+     */
+    public final void setVertexValue(V vertexValue) {
+        this.vertexValue = vertexValue;
+        this.updated = true;
+    }
 
-	/***
-	 * Send a message to a specific vertex
-	 * 
-	 * @param id
-	 *            the receiver vertex id
-	 * @param msg
-	 *            the message
-	 */
-	public final void sendMsg(I id, M msg) {
-		if (msg == null) {
-			throw new IllegalArgumentException(
-					"sendMsg: Cannot send null message to " + id);
-		}
-		delegate.sendMsg(id, msg);
-		this.hasMessage = true;
-	}
+    /***
+     * Send a message to a specific vertex
+     * 
+     * @param id
+     *            the receiver vertex id
+     * @param msg
+     *            the message
+     */
+    public final void sendMsg(I id, M msg) {
+        if (msg == null) {
+            throw new IllegalArgumentException("sendMsg: Cannot send null message to " + id);
+        }
+        delegate.sendMsg(id, msg);
+        this.hasMessage = true;
+    }
 
-	/**
-	 * Send a message to all direct outgoing neighbors
-	 * 
-	 * @param msg
-	 *            the message
-	 */
-	public final void sendMsgToAllEdges(M msg) {
-		if (msg == null) {
-			throw new IllegalArgumentException(
-					"sendMsgToAllEdges: Cannot send null message to all edges");
-		}
-		for (Edge<I, E> edge : destEdgeList) {
-			sendMsg(edge.getDestVertexId(), msg);
-		}
-	}
+    /**
+     * Send a message to all direct outgoing neighbors
+     * 
+     * @param msg
+     *            the message
+     */
+    public final void sendMsgToAllEdges(M msg) {
+        if (msg == null) {
+            throw new IllegalArgumentException("sendMsgToAllEdges: Cannot send null message to all edges");
+        }
+        for (Edge<I, E> edge : destEdgeList) {
+            sendMsg(edge.getDestVertexId(), msg);
+        }
+    }
 
-	/**
-	 * Vote to halt. Once all vertex vote to halt and no more messages, a
-	 * Pregelix job will terminate.
-	 */
-	public final void voteToHalt() {
-		halt = true;
-		updated = true;
-	}
+    /**
+     * Vote to halt. Once all vertex vote to halt and no more messages, a
+     * Pregelix job will terminate.
+     */
+    public final void voteToHalt() {
+        halt = true;
+        updated = true;
+    }
 
-	/**
-	 * @return the vertex is halted (true) or not (false)
-	 */
-	public final boolean isHalted() {
-		return halt;
-	}
+    /**
+     * Activate a halted vertex such that it is alive again.
+     */
+    public final void activate() {
+        halt = false;
+        updated = true;
+    }
 
-	@Override
-	final public void readFields(DataInput in) throws IOException {
-		reset();
-		if (vertexId == null)
-			vertexId = BspUtils.<I> createVertexIndex(getContext()
-					.getConfiguration());
-		vertexId.readFields(in);
-		delegate.setVertexId(vertexId);
-		boolean hasVertexValue = in.readBoolean();
+    /**
+     * @return the vertex is halted (true) or not (false)
+     */
+    public final boolean isHalted() {
+        return halt;
+    }
 
-		if (hasVertexValue) {
-			vertexValue = allocateValue();
-			vertexValue.readFields(in);
-			delegate.setVertex(this);
-		}
-		destEdgeList.clear();
-		long edgeMapSize = SerDeUtils.readVLong(in);
-		for (long i = 0; i < edgeMapSize; ++i) {
-			Edge<I, E> edge = allocateEdge();
-			edge.setConf(getContext().getConfiguration());
-			edge.readFields(in);
-			addEdge(edge);
-		}
-		msgList.clear();
-		long msgListSize = SerDeUtils.readVLong(in);
-		for (long i = 0; i < msgListSize; ++i) {
-			M msg = allocateMessage();
-			msg.readFields(in);
-			msgList.add(msg);
-		}
-		halt = in.readBoolean();
-		updated = false;
-		hasMessage = false;
-		createdNewLiveVertex = false;
-	}
+    @Override
+    final public void readFields(DataInput in) throws IOException {
+        reset();
+        if (vertexId == null)
+            vertexId = BspUtils.<I> createVertexIndex(getContext().getConfiguration());
+        vertexId.readFields(in);
+        delegate.setVertexId(vertexId);
+        boolean hasVertexValue = in.readBoolean();
 
-	@Override
-	public void write(DataOutput out) throws IOException {
-		vertexId.write(out);
-		out.writeBoolean(vertexValue != null);
-		if (vertexValue != null) {
-			vertexValue.write(out);
-		}
-		SerDeUtils.writeVLong(out, destEdgeList.size());
-		for (Edge<I, E> edge : destEdgeList) {
-			edge.write(out);
-		}
-		SerDeUtils.writeVLong(out, msgList.size());
-		for (M msg : msgList) {
-			msg.write(out);
-		}
-		out.writeBoolean(halt);
-	}
+        if (hasVertexValue) {
+            vertexValue = allocateValue();
+            vertexValue.readFields(in);
+            delegate.setVertex(this);
+        }
+        destEdgeList.clear();
+        long edgeMapSize = SerDeUtils.readVLong(in);
+        for (long i = 0; i < edgeMapSize; ++i) {
+            Edge<I, E> edge = allocateEdge();
+            edge.setConf(getContext().getConfiguration());
+            edge.readFields(in);
+            addEdge(edge);
+        }
+        msgList.clear();
+        long msgListSize = SerDeUtils.readVLong(in);
+        for (long i = 0; i < msgListSize; ++i) {
+            M msg = allocateMessage();
+            msg.readFields(in);
+            msgList.add(msg);
+        }
+        halt = in.readBoolean();
+        updated = false;
+        hasMessage = false;
+        createdNewLiveVertex = false;
+    }
 
-	/**
-	 * Get the list of incoming messages
-	 * 
-	 * @return the list of messages
-	 */
-	public List<M> getMsgList() {
-		return msgList;
-	}
+    @Override
+    public void write(DataOutput out) throws IOException {
+        vertexId.write(out);
+        out.writeBoolean(vertexValue != null);
+        if (vertexValue != null) {
+            vertexValue.write(out);
+        }
+        SerDeUtils.writeVLong(out, destEdgeList.size());
+        for (Edge<I, E> edge : destEdgeList) {
+            edge.write(out);
+        }
+        SerDeUtils.writeVLong(out, msgList.size());
+        for (M msg : msgList) {
+            msg.write(out);
+        }
+        out.writeBoolean(halt);
+    }
 
-	/**
-	 * Get outgoing edge list
-	 * 
-	 * @return a list of outgoing edges
-	 */
-	public List<Edge<I, E>> getEdges() {
-		return this.destEdgeList;
-	}
+    /**
+     * Get the list of incoming messages
+     * 
+     * @return the list of messages
+     */
+    public List<M> getMsgList() {
+        return msgList;
+    }
 
-	@Override
-	@SuppressWarnings("unchecked")
-	public String toString() {
-		Collections.sort(destEdgeList);
-		StringBuffer edgeBuffer = new StringBuffer();
-		edgeBuffer.append("(");
-		for (Edge<I, E> edge : destEdgeList) {
-			edgeBuffer.append(edge.getDestVertexId()).append(",");
-		}
-		edgeBuffer.append(")");
-		return "Vertex(id=" + getVertexId() + ",value=" + getVertexValue()
-				+ ", edges=" + edgeBuffer + ")";
-	}
+    /**
+     * Get outgoing edge list
+     * 
+     * @return a list of outgoing edges
+     */
+    public List<Edge<I, E>> getEdges() {
+        return this.destEdgeList;
+    }
 
-	/**
-	 * Get the number of outgoing edges
-	 * 
-	 * @return the number of outging edges
-	 */
-	public int getNumOutEdges() {
-		return destEdgeList.size();
-	}
+    @Override
+    @SuppressWarnings("unchecked")
+    public String toString() {
+        Collections.sort(destEdgeList);
+        StringBuffer edgeBuffer = new StringBuffer();
+        edgeBuffer.append("(");
+        for (Edge<I, E> edge : destEdgeList) {
+            edgeBuffer.append(edge.getDestVertexId()).append(",");
+        }
+        edgeBuffer.append(")");
+        return "Vertex(id=" + getVertexId() + ",value=" + getVertexValue() + ", edges=" + edgeBuffer + ")";
+    }
 
-	/**
-	 * Pregelix internal use only
-	 * 
-	 * @param writers
-	 */
-	public void setOutputWriters(List<IFrameWriter> writers) {
-		delegate.setOutputWriters(writers);
-	}
+    /**
+     * Get the number of outgoing edges
+     * 
+     * @return the number of outging edges
+     */
+    public int getNumOutEdges() {
+        return destEdgeList.size();
+    }
 
-	/**
-	 * Pregelix internal use only
-	 * 
-	 * @param writers
-	 */
-	public void setOutputAppenders(List<FrameTupleAppender> appenders) {
-		delegate.setOutputAppenders(appenders);
-	}
+    /**
+     * Pregelix internal use only
+     * 
+     * @param writers
+     */
+    public void setOutputWriters(List<IFrameWriter> writers) {
+        delegate.setOutputWriters(writers);
+    }
 
-	/**
-	 * Pregelix internal use only
-	 * 
-	 * @param writers
-	 */
-	public void setOutputTupleBuilders(List<ArrayTupleBuilder> tbs) {
-		delegate.setOutputTupleBuilders(tbs);
-	}
+    /**
+     * Pregelix internal use only
+     * 
+     * @param writers
+     */
+    public void setOutputAppenders(List<FrameTupleAppender> appenders) {
+        delegate.setOutputAppenders(appenders);
+    }
 
-	/**
-	 * Pregelix internal use only
-	 * 
-	 * @param writers
-	 */
-	public void finishCompute() throws IOException {
-		delegate.finishCompute();
-	}
+    /**
+     * Pregelix internal use only
+     * 
+     * @param writers
+     */
+    public void setOutputTupleBuilders(List<ArrayTupleBuilder> tbs) {
+        delegate.setOutputTupleBuilders(tbs);
+    }
 
-	/**
-	 * Pregelix internal use only
-	 */
-	public boolean hasUpdate() {
-		return this.updated;
-	}
+    /**
+     * Pregelix internal use only
+     * 
+     * @param writers
+     */
+    public void finishCompute() throws IOException {
+        delegate.finishCompute();
+    }
 
-	/**
-	 * Pregelix internal use only
-	 */
-	public boolean hasMessage() {
-		return this.hasMessage;
-	}
+    /**
+     * Pregelix internal use only
+     */
+    public boolean hasUpdate() {
+        return this.updated;
+    }
 
-	/**
-	 * Pregelix internal use only
-	 */
-	public boolean createdNewLiveVertex() {
-		return this.createdNewLiveVertex;
-	}
+    /**
+     * Pregelix internal use only
+     */
+    public boolean hasMessage() {
+        return this.hasMessage;
+    }
 
-	/**
-	 * sort the edges
-	 */
-	@SuppressWarnings("unchecked")
-	public void sortEdges() {
-		updated = true;
-		Collections.sort(destEdgeList);
-	}
+    /**
+     * Pregelix internal use only
+     */
+    public boolean createdNewLiveVertex() {
+        return this.createdNewLiveVertex;
+    }
 
-	/**
-	 * Allocate a new edge from the edge pool
-	 */
-	private Edge<I, E> allocateEdge() {
-		Edge<I, E> edge;
-		if (usedEdge < edgePool.size()) {
-			edge = edgePool.get(usedEdge);
-			usedEdge++;
-		} else {
-			edge = new Edge<I, E>();
-			edgePool.add(edge);
-			usedEdge++;
-		}
-		return edge;
-	}
+    /**
+     * sort the edges
+     */
+    @SuppressWarnings("unchecked")
+    public void sortEdges() {
+        updated = true;
+        Collections.sort(destEdgeList);
+    }
 
-	/**
-	 * Allocate a new message from the message pool
-	 */
-	private M allocateMessage() {
-		M message;
-		if (usedMessage < msgPool.size()) {
-			message = msgPool.get(usedEdge);
-			usedMessage++;
-		} else {
-			message = BspUtils.<M> createMessageValue(getContext()
-					.getConfiguration());
-			msgPool.add(message);
-			usedMessage++;
-		}
-		return message;
-	}
+    /**
+     * Allocate a new edge from the edge pool
+     */
+    private Edge<I, E> allocateEdge() {
+        Edge<I, E> edge;
+        if (usedEdge < edgePool.size()) {
+            edge = edgePool.get(usedEdge);
+            usedEdge++;
+        } else {
+            edge = new Edge<I, E>();
+            edgePool.add(edge);
+            usedEdge++;
+        }
+        return edge;
+    }
 
-	/**
-	 * Set the global superstep for all the vertices (internal use)
-	 * 
-	 * @param superstep
-	 *            New superstep
-	 */
-	public static final void setSuperstep(long superstep) {
-		Vertex.superstep = superstep;
-	}
+    /**
+     * Allocate a new message from the message pool
+     */
+    private M allocateMessage() {
+        M message;
+        if (usedMessage < msgPool.size()) {
+            message = msgPool.get(usedEdge);
+            usedMessage++;
+        } else {
+            message = BspUtils.<M> createMessageValue(getContext().getConfiguration());
+            msgPool.add(message);
+            usedMessage++;
+        }
+        return message;
+    }
 
-	/**
-	 * Add an outgoing edge into the vertex
-	 * 
-	 * @param edge
-	 *            the edge to be added
-	 * @return true if the edge list changed as a result of this call
-	 */
-	public boolean addEdge(Edge<I, E> edge) {
-		edge.setConf(getContext().getConfiguration());
-		updated = true;
-		return destEdgeList.add(edge);
-	}
+    /**
+     * Set the global superstep for all the vertices (internal use)
+     * 
+     * @param superstep
+     *            New superstep
+     */
+    public static final void setSuperstep(long superstep) {
+        Vertex.superstep = superstep;
+    }
 
-	/**
-	 * remove an outgoing edge in the graph
-	 * 
-	 * @param edge
-	 *            the edge to be removed
-	 * @return true if the edge is in the edge list of the vertex
-	 */
-	public boolean removeEdge(Edge<I, E> edge) {
-		updated = true;
-		return destEdgeList.remove(edge);
-	}
+    /**
+     * Add an outgoing edge into the vertex
+     * 
+     * @param edge
+     *            the edge to be added
+     * @return true if the edge list changed as a result of this call
+     */
+    public boolean addEdge(Edge<I, E> edge) {
+        edge.setConf(getContext().getConfiguration());
+        updated = true;
+        return destEdgeList.add(edge);
+    }
 
-	/**
-	 * Add a new vertex into the graph
-	 * 
-	 * @param vertexId
-	 *            the vertex id
-	 * @param vertex
-	 *            the vertex
-	 */
-	public final void addVertex(I vertexId, Vertex vertex) {
-		createdNewLiveVertex |= !vertex.isHalted();
-		delegate.addVertex(vertexId, vertex);
-	}
+    /**
+     * remove an outgoing edge in the graph
+     * 
+     * @param edge
+     *            the edge to be removed
+     * @return true if the edge is in the edge list of the vertex
+     */
+    public boolean removeEdge(Edge<I, E> edge) {
+        updated = true;
+        return destEdgeList.remove(edge);
+    }
 
-	/**
-	 * Delete a vertex from id
-	 * 
-	 * @param vertexId
-	 *            the vertex id
-	 */
-	public final void deleteVertex(I vertexId) {
-		delegate.deleteVertex(vertexId);
-	}
+    /**
+     * Add a new vertex into the graph
+     * 
+     * @param vertexId
+     *            the vertex id
+     * @param vertex
+     *            the vertex
+     */
+    public final void addVertex(I vertexId, Vertex vertex) {
+        createdNewLiveVertex |= !vertex.isHalted();
+        delegate.addVertex(vertexId, vertex);
+    }
 
-	/**
-	 * Allocate a vertex value from the object pool
-	 * 
-	 * @return a vertex value instance
-	 */
-	private V allocateValue() {
-		V value;
-		if (usedValue < valuePool.size()) {
-			value = valuePool.get(usedValue);
-			usedValue++;
-		} else {
-			value = BspUtils.<V> createVertexValue(getContext()
-					.getConfiguration());
-			valuePool.add(value);
-			usedValue++;
-		}
-		return value;
-	}
+    /**
+     * Delete a vertex from id
+     * 
+     * @param vertexId
+     *            the vertex id
+     */
+    public final void deleteVertex(I vertexId) {
+        delegate.deleteVertex(vertexId);
+    }
 
-	/**
-	 * Get the current global superstep number
-	 * 
-	 * @return the current superstep number
-	 */
-	public static final long getSuperstep() {
-		return superstep;
-	}
+    /**
+     * Allocate a vertex value from the object pool
+     * 
+     * @return a vertex value instance
+     */
+    private V allocateValue() {
+        V value;
+        if (usedValue < valuePool.size()) {
+            value = valuePool.get(usedValue);
+            usedValue++;
+        } else {
+            value = BspUtils.<V> createVertexValue(getContext().getConfiguration());
+            valuePool.add(value);
+            usedValue++;
+        }
+        return value;
+    }
 
-	/**
-	 * Set the total number of vertices from the last superstep.
-	 * 
-	 * @param numVertices
-	 *            Aggregate vertices in the last superstep
-	 */
-	public static final void setNumVertices(long numVertices) {
-		Vertex.numVertices = numVertices;
-	}
+    /**
+     * Get the current global superstep number
+     * 
+     * @return the current superstep number
+     */
+    public static final long getSuperstep() {
+        return superstep;
+    }
 
-	/**
-	 * Get the number of vertexes in the graph
-	 * 
-	 * @return the number of vertexes in the graph
-	 */
-	public static final long getNumVertices() {
-		return numVertices;
-	}
+    /**
+     * Set the total number of vertices from the last superstep.
+     * 
+     * @param numVertices
+     *            Aggregate vertices in the last superstep
+     */
+    public static final void setNumVertices(long numVertices) {
+        Vertex.numVertices = numVertices;
+    }
 
-	/**
-	 * Set the total number of edges from the last superstep.
-	 * 
-	 * @param numEdges
-	 *            Aggregate edges in the last superstep
-	 */
-	public static void setNumEdges(long numEdges) {
-		Vertex.numEdges = numEdges;
-	}
+    /**
+     * Get the number of vertexes in the graph
+     * 
+     * @return the number of vertexes in the graph
+     */
+    public static final long getNumVertices() {
+        return numVertices;
+    }
 
-	/**
-	 * Get the number of edges from this graph
-	 * 
-	 * @return the number of edges in the graph
-	 */
-	public static final long getNumEdges() {
-		return numEdges;
-	}
+    /**
+     * Set the total number of edges from the last superstep.
+     * 
+     * @param numEdges
+     *            Aggregate edges in the last superstep
+     */
+    public static void setNumEdges(long numEdges) {
+        Vertex.numEdges = numEdges;
+    }
 
-	/**
-	 * Pregelix internal use only
-	 */
-	public static final TaskAttemptContext getContext() {
-		return context;
-	}
+    /**
+     * Get the number of edges from this graph
+     * 
+     * @return the number of edges in the graph
+     */
+    public static final long getNumEdges() {
+        return numEdges;
+    }
 
-	/**
-	 * Pregelix internal use only
-	 * 
-	 * @param context
-	 */
-	public static final void setContext(TaskAttemptContext context) {
-		Vertex.context = context;
-	}
+    /**
+     * Pregelix internal use only
+     */
+    public static final TaskAttemptContext getContext() {
+        return context;
+    }
+
+    /**
+     * Pregelix internal use only
+     * 
+     * @param context
+     */
+    public static final void setContext(TaskAttemptContext context) {
+        Vertex.context = context;
+    }
 
 }
diff --git a/pregelix/pregelix-core/pom.xml b/pregelix/pregelix-core/pom.xml
index 576758b..7b247a8 100644
--- a/pregelix/pregelix-core/pom.xml
+++ b/pregelix/pregelix-core/pom.xml
@@ -18,8 +18,9 @@
 	<build>
 		<plugins>
 			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
 				<artifactId>maven-jar-plugin</artifactId>
-				<version>2.4</version>
+				<version>2.3.2</version>
 				<executions>
 					<execution>
 						<id>balancer</id>
@@ -72,7 +73,7 @@
 			<plugin>
 				<groupId>org.codehaus.mojo</groupId>
 				<artifactId>appassembler-maven-plugin</artifactId>
-				<version>1.3</version>
+                <version>1.3</version>
 				<executions>
 					<execution>
 						<configuration>
@@ -164,7 +165,7 @@
 			</plugin>
 			<plugin>
 				<artifactId>maven-clean-plugin</artifactId>
-				<version>2.5</version>
+                <version>2.4.1</version>
 				<configuration>
 					<filesets>
 						<fileset>
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
index 3a4c41b..72256f9 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -147,22 +147,11 @@
             start = System.currentTimeMillis();
             runHDFSWRite(jobGen);
             runCleanup(jobGen);
-            destroyApplication(applicationName);
             end = System.currentTimeMillis();
             time = end - start;
             LOG.info("result writing finished " + time + "ms");
             LOG.info("job finished");
         } catch (Exception e) {
-            try {
-                /**
-                 * destroy application if there is any exception
-                 */
-                if (hcc != null) {
-                    destroyApplication(applicationName);
-                }
-            } catch (Exception e2) {
-                throw new HyracksException(e2);
-            }
             throw new HyracksException(e);
         }
     }
@@ -220,8 +209,8 @@
 
     private void execute(JobSpecification job) throws Exception {
         job.setUseConnectorPolicyForScheduling(false);
-        JobId jobId = hcc.startJob(applicationName, job,
-                profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
+        JobId jobId = hcc
+                .startJob(job, profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
         hcc.waitForCompletion(jobId);
     }
 
@@ -236,15 +225,11 @@
         LOG.info("jar packing finished " + (end - start) + "ms");
 
         start = System.currentTimeMillis();
-        hcc.createApplication(applicationName, appZip);
+        // TODO: Fix this step to use Yarn
+        //hcc.createApplication(applicationName, appZip);
         end = System.currentTimeMillis();
         LOG.info("jar deployment finished " + (end - start) + "ms");
     }
-
-    public void destroyApplication(String appName) throws Exception {
-        hcc.destroyApplication(appName);
-    }
-
 }
 
 class FileFilter implements FilenameFilter {
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index 0b1be61..77fd1a7 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -52,13 +52,13 @@
 import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDropOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
 import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+import edu.uci.ics.hyracks.storage.common.file.TransientLocalResourceFactoryProvider;
 import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
 import edu.uci.ics.pregelix.api.graph.MessageCombiner;
 import edu.uci.ics.pregelix.api.graph.Vertex;
@@ -80,8 +80,8 @@
 import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
+import edu.uci.ics.pregelix.runtime.bootstrap.IndexLifeCycleManagerProvider;
 import edu.uci.ics.pregelix.runtime.bootstrap.StorageManagerInterface;
-import edu.uci.ics.pregelix.runtime.bootstrap.TreeIndexRegistryProvider;
 import edu.uci.ics.pregelix.runtime.touchpoint.RuntimeHookFactory;
 import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdPartitionComputerFactory;
 
@@ -93,7 +93,7 @@
     protected static final String PRIMARY_INDEX = "primary";
     protected final Configuration conf;
     protected final PregelixJob giraphJob;
-    protected IIndexRegistryProvider<IIndex> treeRegistryProvider = TreeIndexRegistryProvider.INSTANCE;
+    protected IIndexLifecycleManagerProvider lcManagerProvider = IndexLifeCycleManagerProvider.INSTANCE;
     protected IStorageManagerInterface storageManagerInterface = StorageManagerInterface.INSTANCE;
     protected String jobId = new UUID(System.currentTimeMillis(), System.nanoTime()).toString();
     protected int frameSize = ClusterConfig.getFrameSize();
@@ -169,8 +169,9 @@
 
         IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
         TreeIndexCreateOperatorDescriptor btreeCreate = new TreeIndexCreateOperatorDescriptor(spec,
-                storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
-                new BTreeDataflowHelperFactory(), NoOpOperationCallbackProvider.INSTANCE);
+                storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories, null,
+                new BTreeDataflowHelperFactory(), new TransientLocalResourceFactoryProvider(),
+                NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, btreeCreate);
         return spec;
     }
@@ -229,9 +230,9 @@
         typeTraits[0] = new TypeTraits(false);
         typeTraits[1] = new TypeTraits(false);
         TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
-                storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
-                fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, new BTreeDataflowHelperFactory(),
-                NoOpOperationCallbackProvider.INSTANCE);
+                storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories, null,
+                fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, false, 0, new BTreeDataflowHelperFactory(),
+                NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
 
         /**
@@ -356,8 +357,8 @@
         typeTraits[0] = new TypeTraits(false);
         typeTraits[1] = new TypeTraits(false);
         BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(spec, recordDescriptor,
-                storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
-                null, null, true, true, new BTreeDataflowHelperFactory(), false, NoOpOperationCallbackProvider.INSTANCE);
+                storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories, null,
+                null, null, true, true, new BTreeDataflowHelperFactory(), false, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, scanner);
 
         /**
@@ -424,9 +425,10 @@
         ITypeTraits[] typeTraits = new ITypeTraits[2];
         typeTraits[0] = new TypeTraits(false);
         typeTraits[1] = new TypeTraits(false);
+
         BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(spec, recordDescriptor,
-                storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
-                null, null, true, true, new BTreeDataflowHelperFactory(), false, NoOpOperationCallbackProvider.INSTANCE);
+                storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories, null,
+                null, null, true, true, new BTreeDataflowHelperFactory(), false, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, scanner);
 
         /**
@@ -455,8 +457,8 @@
         JobSpecification spec = new JobSpecification();
 
         IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, indexName);
-        TreeIndexDropOperatorDescriptor drop = new TreeIndexDropOperatorDescriptor(spec, storageManagerInterface,
-                treeRegistryProvider, fileSplitProvider);
+        IndexDropOperatorDescriptor drop = new IndexDropOperatorDescriptor(spec, storageManagerInterface,
+                lcManagerProvider, fileSplitProvider, new BTreeDataflowHelperFactory());
 
         ClusterConfig.setLocationConstraint(spec, drop);
         spec.addRoot(drop);
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index 9de4c04..fe2fcac 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -40,8 +40,8 @@
 import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
-import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
 import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
 import edu.uci.ics.pregelix.api.graph.MsgList;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -135,7 +135,7 @@
         RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
 
         BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
-                recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+                recordDescriptor, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
                 new BTreeDataflowHelperFactory(), inputRdFactory, 6,
                 new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
@@ -166,8 +166,8 @@
         indexCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration + 1,
                 WritableComparator.get(vertexIdClass).getClass());
         TreeIndexBulkReLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkReLoadOperatorDescriptor(spec,
-                storageManagerInterface, treeRegistryProvider, secondaryFileSplitProvider, typeTraits,
-                indexCmpFactories, fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, new BTreeDataflowHelperFactory());
+                storageManagerInterface, lcManagerProvider, secondaryFileSplitProvider, typeTraits, indexCmpFactories,
+                fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, new BTreeDataflowHelperFactory());
         ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
 
         /**
@@ -222,18 +222,18 @@
          * add the insert operator to insert vertexes
          */
         TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
-                spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
-                NoOpOperationCallbackProvider.INSTANCE);
+                spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
+                null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, insertOp);
 
         /**
          * add the delete operator to delete vertexes
          */
         TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
-                spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
-                NoOpOperationCallbackProvider.INSTANCE);
+                spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, null, fieldPermutation, IndexOperation.DELETE, new BTreeDataflowHelperFactory(),
+                null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, deleteOp);
 
         /** construct empty sink operator */
@@ -343,7 +343,7 @@
         ITreeIndexFrameFactory leafFrameFactory = new BTreeNSMLeafFrameFactory(new TypeAwareTupleWriterFactory(
                 typeTraits));
         IndexNestedLoopJoinOperatorDescriptor setUnion = new IndexNestedLoopJoinOperatorDescriptor(spec, rdFinal,
-                storageManagerInterface, treeRegistryProvider, secondaryFileSplitProviderRead, interiorFrameFactory,
+                storageManagerInterface, lcManagerProvider, secondaryFileSplitProviderRead, interiorFrameFactory,
                 leafFrameFactory, typeTraits, comparatorFactories, true, keyFields, keyFields, true, true,
                 new BTreeDataflowHelperFactory(), true);
         ClusterConfig.setLocationConstraint(spec, setUnion);
@@ -361,8 +361,8 @@
                 vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
 
         IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
-                spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields, keyFields, true, true,
+                spec, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories,
+                JobGenUtil.getForwardScan(iteration), keyFields, keyFields, true, true,
                 new BTreeDataflowHelperFactory(), inputRdFactory, 6, new ComputeUpdateFunctionFactory(confFactory),
                 preHookFactory, null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdInsert, rdDelete, rdFinal);
         ClusterConfig.setLocationConstraint(spec, join);
@@ -377,7 +377,7 @@
         String writeFile = iteration % 2 == 0 ? SECONDARY_INDEX_EVEN : SECONDARY_INDEX_ODD;
         IFileSplitProvider secondaryFileSplitProviderWrite = ClusterConfig.getFileSplitProvider(jobId, writeFile);
         TreeIndexBulkReLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkReLoadOperatorDescriptor(spec,
-                storageManagerInterface, treeRegistryProvider, secondaryFileSplitProviderWrite, typeTraits,
+                storageManagerInterface, lcManagerProvider, secondaryFileSplitProviderWrite, typeTraits,
                 indexCmpFactories, fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, new BTreeDataflowHelperFactory());
         ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
 
@@ -446,18 +446,18 @@
          * add the insert operator to insert vertexes
          */
         TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
-                spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
-                NoOpOperationCallbackProvider.INSTANCE);
+                spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
+                null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, insertOp);
 
         /**
          * add the delete operator to delete vertexes
          */
         TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
-                spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
-                NoOpOperationCallbackProvider.INSTANCE);
+                spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, null, fieldPermutation, IndexOperation.DELETE, new BTreeDataflowHelperFactory(),
+                null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, deleteOp);
 
         /** construct empty sink operator */
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
index 91c15b2..f1eceb7 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -40,8 +40,8 @@
 import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
-import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
 import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
 import edu.uci.ics.pregelix.api.graph.MsgList;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -129,7 +129,7 @@
         RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
 
         BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
-                recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+                recordDescriptor, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
                 new BTreeDataflowHelperFactory(), inputRdFactory, 5,
                 new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
@@ -205,18 +205,18 @@
          */
         int[] fieldPermutation = new int[] { 0, 1 };
         TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
-                spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
-                NoOpOperationCallbackProvider.INSTANCE);
+                spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
+                null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, insertOp);
 
         /**
          * add the delete operator to delete vertexes
          */
         TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
-                spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
-                NoOpOperationCallbackProvider.INSTANCE);
+                spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, null, fieldPermutation, IndexOperation.DELETE, new BTreeDataflowHelperFactory(),
+                null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, deleteOp);
 
         /** construct empty sink operator */
@@ -335,7 +335,7 @@
                 vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
 
         IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
-                spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, interiorFrameFactory,
+                spec, storageManagerInterface, lcManagerProvider, fileSplitProvider, interiorFrameFactory,
                 leafFrameFactory, typeTraits, comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields,
                 keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 5,
                 new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
@@ -408,18 +408,18 @@
          */
         int[] fieldPermutation = new int[] { 0, 1 };
         TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
-                spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
-                NoOpOperationCallbackProvider.INSTANCE);
+                spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
+                null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, insertOp);
 
         /**
          * add the delete operator to delete vertexes
          */
         TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
-                spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
-                NoOpOperationCallbackProvider.INSTANCE);
+                spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, null, fieldPermutation, IndexOperation.DELETE, new BTreeDataflowHelperFactory(),
+                null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, deleteOp);
 
         /** construct empty sink operator */
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
index ee1fd0f..314c393 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
@@ -39,8 +39,8 @@
 import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
-import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
 import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
 import edu.uci.ics.pregelix.api.graph.MsgList;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -131,7 +131,7 @@
         RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
 
         BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
-                recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+                recordDescriptor, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
                 new BTreeDataflowHelperFactory(), inputRdFactory, 5,
                 new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
@@ -197,18 +197,18 @@
          */
         int[] fieldPermutation = new int[] { 0, 1 };
         TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
-                spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
-                NoOpOperationCallbackProvider.INSTANCE);
+                spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
+                null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, insertOp);
 
         /**
          * add the delete operator to delete vertexes
          */
         TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
-                spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
-                NoOpOperationCallbackProvider.INSTANCE);
+                spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, null, fieldPermutation, IndexOperation.DELETE, new BTreeDataflowHelperFactory(),
+                null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, deleteOp);
 
         /** construct empty sink operator */
@@ -323,7 +323,7 @@
                 vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
 
         IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
-                spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, interiorFrameFactory,
+                spec, storageManagerInterface, lcManagerProvider, fileSplitProvider, interiorFrameFactory,
                 leafFrameFactory, typeTraits, comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields,
                 keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 5,
                 new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
@@ -384,18 +384,18 @@
 
         int[] fieldPermutation = new int[] { 0, 1 };
         TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
-                spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
-                NoOpOperationCallbackProvider.INSTANCE);
+                spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
+                null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, insertOp);
 
         /**
          * add the delete operator to delete vertexes
          */
         TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
-                spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
-                NoOpOperationCallbackProvider.INSTANCE);
+                spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, null, fieldPermutation, IndexOperation.DELETE, new BTreeDataflowHelperFactory(),
+                null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, deleteOp);
 
         /** construct empty sink operator */
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
index 628e9ce..0c3db38 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
@@ -39,8 +39,8 @@
 import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
-import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
 import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
 import edu.uci.ics.pregelix.api.graph.MsgList;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -128,7 +128,7 @@
         RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
 
         BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
-                recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+                recordDescriptor, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
                 new BTreeDataflowHelperFactory(), inputRdFactory, 5,
                 new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
@@ -211,18 +211,18 @@
          */
         int[] fieldPermutation = new int[] { 0, 1 };
         TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
-                spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
-                NoOpOperationCallbackProvider.INSTANCE);
+                spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
+                null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, insertOp);
 
         /**
          * add the delete operator to delete vertexes
          */
         TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
-                spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
-                NoOpOperationCallbackProvider.INSTANCE);
+                spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, null, fieldPermutation, IndexOperation.DELETE, new BTreeDataflowHelperFactory(),
+                null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, deleteOp);
 
         /** construct empty sink operator */
@@ -337,7 +337,7 @@
                 vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
 
         IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
-                spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, interiorFrameFactory,
+                spec, storageManagerInterface, lcManagerProvider, fileSplitProvider, interiorFrameFactory,
                 leafFrameFactory, typeTraits, comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields,
                 keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 5,
                 new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
@@ -417,18 +417,18 @@
          */
         int[] fieldPermutation = new int[] { 0, 1 };
         TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
-                spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
-                NoOpOperationCallbackProvider.INSTANCE);
+                spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
+                null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, insertOp);
 
         /**
          * add the delete operator to delete vertexes
          */
         TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
-                spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
-                NoOpOperationCallbackProvider.INSTANCE);
+                spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, null, fieldPermutation, IndexOperation.DELETE, new BTreeDataflowHelperFactory(),
+                null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, deleteOp);
 
         /** construct empty sink operator */
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
index cd2a864..d099645 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
@@ -14,7 +14,6 @@
  */
 package edu.uci.ics.pregelix.core.util;
 
-import java.io.File;
 import java.util.EnumSet;
 
 import edu.uci.ics.hyracks.api.client.HyracksConnection;
@@ -27,6 +26,7 @@
 import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
 import edu.uci.ics.hyracks.control.nc.NodeControllerService;
 import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.runtime.bootstrap.NCApplicationEntryPoint;
 
 public class PregelixHyracksIntegrationUtil {
 
@@ -45,7 +45,7 @@
     private static NodeControllerService nc2;
     private static IHyracksClientConnection hcc;
 
-    public static void init(String topologyFilePath) throws Exception {
+    public static void init() throws Exception {
         CCConfig ccConfig = new CCConfig();
         ccConfig.clientNetIpAddress = CC_HOST;
         ccConfig.clusterNetIpAddress = CC_HOST;
@@ -54,7 +54,6 @@
         ccConfig.defaultMaxJobAttempts = 0;
         ccConfig.jobHistorySize = 0;
         ccConfig.profileDumpPeriod = -1;
-        ccConfig.clusterTopologyDefinition = new File(topologyFilePath);
 
         // cluster controller
         cc = new ClusterControllerService(ccConfig);
@@ -68,6 +67,7 @@
         ncConfig1.dataIPAddress = "127.0.0.1";
         ncConfig1.datasetIPAddress = "127.0.0.1";
         ncConfig1.nodeId = NC1_ID;
+        ncConfig1.appNCMainClass = NCApplicationEntryPoint.class.getName();
         nc1 = new NodeControllerService(ncConfig1);
         nc1.start();
 
@@ -78,6 +78,7 @@
         ncConfig2.dataIPAddress = "127.0.0.1";
         ncConfig2.datasetIPAddress = "127.0.0.1";
         ncConfig2.nodeId = NC2_ID;
+        ncConfig2.appNCMainClass = NCApplicationEntryPoint.class.getName();
         nc2 = new NodeControllerService(ncConfig2);
         nc2.start();
 
@@ -86,14 +87,6 @@
         ClusterConfig.loadClusterConfig(CC_HOST, TEST_HYRACKS_CC_CLIENT_PORT);
     }
 
-    public static void destroyApp(String hyracksAppName) throws Exception {
-        hcc.destroyApplication(hyracksAppName);
-    }
-
-    public static void createApp(String hyracksAppName) throws Exception {
-        hcc.createApplication(hyracksAppName, null);
-    }
-
     public static void deinit() throws Exception {
         nc2.stop();
         nc1.stop();
@@ -102,7 +95,7 @@
 
     public static void runJob(JobSpecification spec, String appName) throws Exception {
         spec.setFrameSize(FRAME_SIZE);
-        JobId jobId = hcc.startJob(appName, spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
+        JobId jobId = hcc.startJob(spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
         hcc.waitForCompletion(jobId);
     }
 
diff --git a/pregelix/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java b/pregelix/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java
index f7cadf6..3c00cad 100644
--- a/pregelix/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java
+++ b/pregelix/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java
@@ -50,14 +50,14 @@
 import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
 import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
 import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
 import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+import edu.uci.ics.hyracks.storage.common.file.TransientLocalResourceFactoryProvider;
 import edu.uci.ics.pregelix.core.data.TypeTraits;
 import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
 import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
@@ -65,8 +65,8 @@
 import edu.uci.ics.pregelix.dataflow.VertexWriteOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.std.IndexNestedLoopJoinOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.std.ProjectOperatorDescriptor;
+import edu.uci.ics.pregelix.runtime.bootstrap.IndexLifeCycleManagerProvider;
 import edu.uci.ics.pregelix.runtime.bootstrap.StorageManagerInterface;
-import edu.uci.ics.pregelix.runtime.bootstrap.TreeIndexRegistryProvider;
 
 public class JoinTest {
     private final static String ACTUAL_RESULT_DIR = "actual";
@@ -82,7 +82,7 @@
     private static final String PATH_TO_CLUSTER_PROPERTIES = "src/test/resources/cluster/cluster.properties";
 
     private static final float DEFAULT_BTREE_FILL_FACTOR = 1.00f;
-    private IIndexRegistryProvider<IIndex> treeRegistry = TreeIndexRegistryProvider.INSTANCE;
+    private IIndexLifecycleManagerProvider lcManagerProvider = IndexLifeCycleManagerProvider.INSTANCE;
     private IStorageManagerInterface storageManagerInterface = StorageManagerInterface.INSTANCE;
 
     private IBinaryHashFunctionFactory stringHashFactory = new PointableBinaryHashFunctionFactory(
@@ -102,8 +102,7 @@
         ClusterConfig.setStorePath(PATH_TO_CLUSTER_STORE);
         ClusterConfig.setClusterPropertiesPath(PATH_TO_CLUSTER_PROPERTIES);
         cleanupStores();
-        PregelixHyracksIntegrationUtil.init("src/test/resources/topology.xml");
-        PregelixHyracksIntegrationUtil.createApp(HYRACKS_APP_NAME);
+        PregelixHyracksIntegrationUtil.init();
 
         FileUtils.forceMkdir(new File(EXPECT_RESULT_DIR));
         FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
@@ -121,7 +120,6 @@
         runIndexRightOuterJoin();
         TestUtils.compareWithResult(new File(EXPECTED_RESULT_FILE), new File(ACTUAL_RESULT_FILE));
 
-        PregelixHyracksIntegrationUtil.destroyApp(HYRACKS_APP_NAME);
         PregelixHyracksIntegrationUtil.deinit();
     }
 
@@ -195,8 +193,8 @@
         FileSplit[] results = new FileSplit[1];
         results[0] = resultFile;
         IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
-        VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
-                null);
+        VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider,
+                null, null);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { NC1_ID });
         PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
 
@@ -234,8 +232,9 @@
         for (int i = 0; i < typeTraits.length; i++)
             typeTraits[i] = new TypeTraits(false);
         TreeIndexCreateOperatorDescriptor writer = new TreeIndexCreateOperatorDescriptor(spec, storageManagerInterface,
-                treeRegistry, fileSplitProvider, typeTraits, comparatorFactories, new BTreeDataflowHelperFactory(),
-                NoOpOperationCallbackProvider.INSTANCE);
+                lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories, null,
+                new BTreeDataflowHelperFactory(), new TransientLocalResourceFactoryProvider(),
+                NoOpOperationCallbackFactory.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, NC1_ID, NC2_ID);
         spec.addRoot(writer);
         runTest(spec);
@@ -278,9 +277,9 @@
         for (int i = 0; i < typeTraits.length; i++)
             typeTraits[i] = new TypeTraits(false);
         TreeIndexBulkLoadOperatorDescriptor writer = new TreeIndexBulkLoadOperatorDescriptor(spec,
-                storageManagerInterface, treeRegistry, fileSplitProvider, typeTraits, comparatorFactories,
-                fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, new BTreeDataflowHelperFactory(),
-                NoOpOperationCallbackProvider.INSTANCE);
+                storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories, null,
+                fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, false, 0, new BTreeDataflowHelperFactory(),
+                NoOpOperationCallbackFactory.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, NC1_ID, NC2_ID);
 
         spec.connect(new OneToOneConnectorDescriptor(spec), custScanner, 0, sorter, 0);
@@ -353,8 +352,8 @@
         for (int i = 0; i < typeTraits.length; i++)
             typeTraits[i] = new TypeTraits(false);
         IndexNestedLoopJoinOperatorDescriptor join = new IndexNestedLoopJoinOperatorDescriptor(spec, custOrderJoinDesc,
-                storageManagerInterface, treeRegistry, fileSplitProvider, typeTraits, keyComparatorFactories, true,
-                keyFields, keyFields, true, true, new BTreeDataflowHelperFactory());
+                storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, keyComparatorFactories,
+                true, keyFields, keyFields, true, true, new BTreeDataflowHelperFactory());
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
 
         /** results (already in sorted order) */
@@ -362,8 +361,8 @@
         FileSplit[] results = new FileSplit[1];
         results[0] = resultFile;
         IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
-        VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
-                null);
+        VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider,
+                null, null);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { NC1_ID });
         PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
 
@@ -459,8 +458,8 @@
         FileSplit[] results = new FileSplit[1];
         results[0] = resultFile;
         IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
-        VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
-                null);
+        VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider,
+                null, null);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { NC1_ID });
         PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
 
@@ -556,7 +555,7 @@
         ITreeIndexFrameFactory leafFrameFactory = new BTreeNSMLeafFrameFactory(new TypeAwareTupleWriterFactory(
                 typeTraits));
         IndexNestedLoopJoinOperatorDescriptor join = new IndexNestedLoopJoinOperatorDescriptor(spec, custOrderJoinDesc,
-                storageManagerInterface, treeRegistry, fileSplitProvider, interiorFrameFactory, leafFrameFactory,
+                storageManagerInterface, lcManagerProvider, fileSplitProvider, interiorFrameFactory, leafFrameFactory,
                 typeTraits, keyComparatorFactories, true, keyFields, keyFields, true, true,
                 new BTreeDataflowHelperFactory(), true, nullWriterFactories);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
@@ -566,8 +565,8 @@
         FileSplit[] results = new FileSplit[1];
         results[0] = resultFile;
         IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
-        VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
-                null);
+        VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider,
+                null, null);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { NC1_ID });
         PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
 
diff --git a/pregelix/pregelix-dataflow-std-base/pom.xml b/pregelix/pregelix-dataflow-std-base/pom.xml
index eeaa6c9..6222a04 100644
--- a/pregelix/pregelix-dataflow-std-base/pom.xml
+++ b/pregelix/pregelix-dataflow-std-base/pom.xml
@@ -42,8 +42,9 @@
 				</configuration>
 			</plugin>
 			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
 				<artifactId>maven-clean-plugin</artifactId>
-				<version>2.5</version>
+                <version>2.4.1</version>
 				<configuration>
 					<filesets>
 						<fileset>
diff --git a/pregelix/pregelix-dataflow-std/pom.xml b/pregelix/pregelix-dataflow-std/pom.xml
index 889c876..aa77ad3 100644
--- a/pregelix/pregelix-dataflow-std/pom.xml
+++ b/pregelix/pregelix-dataflow-std/pom.xml
@@ -42,8 +42,9 @@
 				</configuration>
 			</plugin>
 			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
 				<artifactId>maven-clean-plugin</artifactId>
-				<version>2.5</version>
+				<version>2.4.1</version>
 				<configuration>
 					<filesets>
 						<fileset>
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorDescriptor.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorDescriptor.java
index 99e55f1..c9f3fe7 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorDescriptor.java
@@ -23,12 +23,12 @@
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
 import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+import edu.uci.ics.hyracks.storage.common.file.TransientLocalResourceFactoryProvider;
 import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
@@ -53,14 +53,16 @@
     private final int outputArity;
 
     public BTreeSearchFunctionUpdateOperatorDescriptor(JobSpecification spec, RecordDescriptor recDesc,
-            IStorageManagerInterface storageManager, IIndexRegistryProvider<IIndex> indexRegistryProvider,
+            IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lcManagerProvider,
             IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
             IBinaryComparatorFactory[] comparatorFactories, boolean isForward, int[] lowKeyFields, int[] highKeyFields,
             boolean lowKeyInclusive, boolean highKeyInclusive, IIndexDataflowHelperFactory dataflowHelperFactory,
             IRecordDescriptorFactory inputRdFactory, int outputArity, IUpdateFunctionFactory functionFactory,
             IRuntimeHookFactory preHookFactory, IRuntimeHookFactory postHookFactory, RecordDescriptor... rDescs) {
-        super(spec, 1, outputArity, recDesc, storageManager, indexRegistryProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, dataflowHelperFactory, null, false, NoOpOperationCallbackProvider.INSTANCE);
+        super(spec, 1, outputArity, recDesc, storageManager, lcManagerProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, null, dataflowHelperFactory, null, false,
+                new TransientLocalResourceFactoryProvider(), NoOpOperationCallbackFactory.INSTANCE,
+                NoOpOperationCallbackFactory.INSTANCE);
         this.isForward = isForward;
         this.lowKeyFields = lowKeyFields;
         this.highKeyFields = highKeyFields;
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorNodePushable.java
index 3938613..ff95e52 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/BTreeSearchFunctionUpdateOperatorNodePushable.java
@@ -36,9 +36,10 @@
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.PermutingFrameTupleReference;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
 import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
@@ -106,11 +107,11 @@
          * open the function
          */
         functionProxy.functionOpen();
-        accessor = new FrameTupleAccessor(treeIndexHelper.getHyracksTaskContext().getFrameSize(), recDesc);
+        accessor = new FrameTupleAccessor(treeIndexHelper.getTaskContext().getFrameSize(), recDesc);
 
         try {
-            treeIndexHelper.init(false);
-            btree = (BTree) treeIndexHelper.getIndex();
+            treeIndexHelper.open();
+            btree = (BTree) treeIndexHelper.getIndexInstance();
             cursorFrame = btree.getLeafFrameFactory().createFrame();
             setCursor();
 
@@ -120,17 +121,17 @@
             rangePred = new RangePredicate(null, null, lowKeyInclusive, highKeyInclusive, lowKeySearchCmp,
                     highKeySearchCmp);
 
-            writeBuffer = treeIndexHelper.getHyracksTaskContext().allocateFrame();
+            writeBuffer = treeIndexHelper.getTaskContext().allocateFrame();
             tb = new ArrayTupleBuilder(btree.getFieldCount());
             dos = tb.getDataOutput();
-            appender = new FrameTupleAppender(treeIndexHelper.getHyracksTaskContext().getFrameSize());
+            appender = new FrameTupleAppender(treeIndexHelper.getTaskContext().getFrameSize());
             appender.reset(writeBuffer, true);
-            indexAccessor = btree.createAccessor();
+            indexAccessor = btree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
 
             cloneUpdateTb = new ArrayTupleBuilder(btree.getFieldCount());
             updateBuffer.setFieldCount(btree.getFieldCount());
         } catch (Exception e) {
-            treeIndexHelper.deinit();
+            treeIndexHelper.close();
             throw new HyracksDataException(e);
         }
     }
@@ -203,7 +204,7 @@
              */
             functionProxy.functionClose();
         } finally {
-            treeIndexHelper.deinit();
+            treeIndexHelper.close();
         }
     }
 
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorDescriptor.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorDescriptor.java
index 60559e8..7662aa8 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorDescriptor.java
@@ -25,13 +25,13 @@
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
 import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+import edu.uci.ics.hyracks.storage.common.file.TransientLocalResourceFactoryProvider;
 import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
@@ -61,14 +61,16 @@
     private final int outputArity;
 
     public IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(JobSpecification spec,
-            IStorageManagerInterface storageManager, IIndexRegistryProvider<IIndex> treeIndexRegistryProvider,
+            IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lcManagerProvider,
             IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
             IBinaryComparatorFactory[] comparatorFactories, boolean isForward, int[] lowKeyFields, int[] highKeyFields,
             boolean lowKeyInclusive, boolean highKeyInclusive, IIndexDataflowHelperFactory opHelperFactory,
             IRecordDescriptorFactory inputRdFactory, int outputArity, IUpdateFunctionFactory functionFactory,
             IRuntimeHookFactory preHookFactory, IRuntimeHookFactory postHookFactory, RecordDescriptor... rDescs) {
-        super(spec, 1, outputArity, rDescs[0], storageManager, treeIndexRegistryProvider, fileSplitProvider,
-                typeTraits, comparatorFactories, opHelperFactory, null, false, NoOpOperationCallbackProvider.INSTANCE);
+        super(spec, 1, outputArity, rDescs[0], storageManager, lcManagerProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, highKeyFields, opHelperFactory, null, false,
+                new TransientLocalResourceFactoryProvider(), NoOpOperationCallbackFactory.INSTANCE,
+                NoOpOperationCallbackFactory.INSTANCE);
         this.isForward = isForward;
         this.lowKeyFields = lowKeyFields;
         this.highKeyFields = highKeyFields;
@@ -88,7 +90,7 @@
     }
 
     public IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(JobSpecification spec,
-            IStorageManagerInterface storageManager, IIndexRegistryProvider<IIndex> treeIndexRegistryProvider,
+            IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lcManagerProvider,
             IFileSplitProvider fileSplitProvider, ITreeIndexFrameFactory interiorFrameFactory,
             ITreeIndexFrameFactory leafFrameFactory, ITypeTraits[] typeTraits,
             IBinaryComparatorFactory[] comparatorFactories, boolean isForward, int[] lowKeyFields, int[] highKeyFields,
@@ -96,8 +98,10 @@
             boolean isRightOuter, INullWriterFactory[] nullWriterFactories, IRecordDescriptorFactory inputRdFactory,
             int outputArity, IUpdateFunctionFactory functionFactory, IRuntimeHookFactory preHookFactory,
             IRuntimeHookFactory postHookFactory, RecordDescriptor... rDescs) {
-        super(spec, 1, outputArity, rDescs[0], storageManager, treeIndexRegistryProvider, fileSplitProvider,
-                typeTraits, comparatorFactories, opHelperFactory, null, false, NoOpOperationCallbackProvider.INSTANCE);
+        super(spec, 1, outputArity, rDescs[0], storageManager, lcManagerProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, highKeyFields, opHelperFactory, null, false,
+                new TransientLocalResourceFactoryProvider(), NoOpOperationCallbackFactory.INSTANCE,
+                NoOpOperationCallbackFactory.INSTANCE);
         this.isForward = isForward;
         this.lowKeyFields = lowKeyFields;
         this.highKeyFields = highKeyFields;
@@ -120,7 +124,7 @@
     }
 
     public IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(JobSpecification spec,
-            IStorageManagerInterface storageManager, IIndexRegistryProvider<IIndex> treeIndexRegistryProvider,
+            IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lcManagerProvider,
             IFileSplitProvider fileSplitProvider, ITreeIndexFrameFactory interiorFrameFactory,
             ITreeIndexFrameFactory leafFrameFactory, ITypeTraits[] typeTraits,
             IBinaryComparatorFactory[] comparatorFactories, boolean isForward, int[] lowKeyFields, int[] highKeyFields,
@@ -128,8 +132,10 @@
             boolean isSetUnion, IRecordDescriptorFactory inputRdFactory, int outputArity,
             IUpdateFunctionFactory functionFactory, IRuntimeHookFactory preHookFactory,
             IRuntimeHookFactory postHookFactory, RecordDescriptor... rDescs) {
-        super(spec, 1, outputArity, rDescs[0], storageManager, treeIndexRegistryProvider, fileSplitProvider,
-                typeTraits, comparatorFactories, opHelperFactory, null, false, NoOpOperationCallbackProvider.INSTANCE);
+        super(spec, 1, outputArity, rDescs[0], storageManager, lcManagerProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, highKeyFields, opHelperFactory, null, false,
+                new TransientLocalResourceFactoryProvider(), NoOpOperationCallbackFactory.INSTANCE,
+                NoOpOperationCallbackFactory.INSTANCE);
         this.isForward = isForward;
         this.lowKeyFields = lowKeyFields;
         this.highKeyFields = highKeyFields;
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
index 37029f3..61e4649 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinFunctionUpdateOperatorNodePushable.java
@@ -36,9 +36,10 @@
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.PermutingFrameTupleReference;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
 import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
@@ -104,12 +105,11 @@
          * open the function
          */
         functionProxy.functionOpen();
-        accessor = new FrameTupleAccessor(treeIndexOpHelper.getHyracksTaskContext().getFrameSize(), recDesc);
+        accessor = new FrameTupleAccessor(treeIndexOpHelper.getTaskContext().getFrameSize(), recDesc);
 
         try {
-            treeIndexOpHelper.init(false);
-            btree = (BTree) treeIndexOpHelper.getIndex();
-            btree.open(treeIndexOpHelper.getIndexFileId());
+            treeIndexOpHelper.open();
+            btree = (BTree) treeIndexOpHelper.getIndexInstance();
             cursorFrame = btree.getLeafFrameFactory().createFrame();
             setCursor();
 
@@ -140,15 +140,15 @@
 
             rangePred = new RangePredicate(null, null, lowKeyInclusive, highKeyInclusive, lowKeySearchCmp,
                     highKeySearchCmp);
-            writeBuffer = treeIndexOpHelper.getHyracksTaskContext().allocateFrame();
-            appender = new FrameTupleAppender(treeIndexOpHelper.getHyracksTaskContext().getFrameSize());
+            writeBuffer = treeIndexOpHelper.getTaskContext().allocateFrame();
+            appender = new FrameTupleAppender(treeIndexOpHelper.getTaskContext().getFrameSize());
             appender.reset(writeBuffer, true);
 
-            indexAccessor = btree.createAccessor();
+            indexAccessor = btree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
             cloneUpdateTb = new ArrayTupleBuilder(btree.getFieldCount());
             updateBuffer.setFieldCount(btree.getFieldCount());
         } catch (Exception e) {
-            treeIndexOpHelper.deinit();
+            treeIndexOpHelper.close();
             throw new HyracksDataException(e);
         }
     }
@@ -220,7 +220,7 @@
              */
             functionProxy.functionClose();
         } finally {
-            treeIndexOpHelper.deinit();
+            treeIndexOpHelper.close();
         }
     }
 
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorDescriptor.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorDescriptor.java
index ed177e3..d237761 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorDescriptor.java
@@ -24,13 +24,13 @@
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
 import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+import edu.uci.ics.hyracks.storage.common.file.TransientLocalResourceFactoryProvider;
 
 public class IndexNestedLoopJoinOperatorDescriptor extends AbstractTreeIndexOperatorDescriptor {
 
@@ -51,12 +51,13 @@
     private boolean isSetUnion = false;
 
     public IndexNestedLoopJoinOperatorDescriptor(JobSpecification spec, RecordDescriptor recDesc,
-            IStorageManagerInterface storageManager, IIndexRegistryProvider<IIndex> treeIndexRegistryProvider,
+            IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lcManagerProvider,
             IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
             IBinaryComparatorFactory[] comparatorFactories, boolean isForward, int[] lowKeyFields, int[] highKeyFields,
             boolean lowKeyInclusive, boolean highKeyInclusive, IIndexDataflowHelperFactory opHelperFactory) {
-        super(spec, 1, 1, recDesc, storageManager, treeIndexRegistryProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, opHelperFactory, null, false, NoOpOperationCallbackProvider.INSTANCE);
+        super(spec, 1, 1, recDesc, storageManager, lcManagerProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, null, opHelperFactory, null, false, new TransientLocalResourceFactoryProvider(),
+                NoOpOperationCallbackFactory.INSTANCE, NoOpOperationCallbackFactory.INSTANCE);
         this.isForward = isForward;
         this.lowKeyFields = lowKeyFields;
         this.highKeyFields = highKeyFields;
@@ -65,14 +66,15 @@
     }
 
     public IndexNestedLoopJoinOperatorDescriptor(JobSpecification spec, RecordDescriptor recDesc,
-            IStorageManagerInterface storageManager, IIndexRegistryProvider<IIndex> treeIndexRegistryProvider,
+            IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lcManagerProvider,
             IFileSplitProvider fileSplitProvider, ITreeIndexFrameFactory interiorFrameFactory,
             ITreeIndexFrameFactory leafFrameFactory, ITypeTraits[] typeTraits,
             IBinaryComparatorFactory[] comparatorFactories, boolean isForward, int[] lowKeyFields, int[] highKeyFields,
             boolean lowKeyInclusive, boolean highKeyInclusive, IIndexDataflowHelperFactory opHelperFactory,
             boolean isRightOuter, INullWriterFactory[] nullWriterFactories) {
-        super(spec, 1, 1, recDesc, storageManager, treeIndexRegistryProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, opHelperFactory, null, false, NoOpOperationCallbackProvider.INSTANCE);
+        super(spec, 1, 1, recDesc, storageManager, lcManagerProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, null, opHelperFactory, null, false, new TransientLocalResourceFactoryProvider(),
+                NoOpOperationCallbackFactory.INSTANCE, NoOpOperationCallbackFactory.INSTANCE);
         this.isForward = isForward;
         this.lowKeyFields = lowKeyFields;
         this.highKeyFields = highKeyFields;
@@ -84,14 +86,15 @@
     }
 
     public IndexNestedLoopJoinOperatorDescriptor(JobSpecification spec, RecordDescriptor recDesc,
-            IStorageManagerInterface storageManager, IIndexRegistryProvider<IIndex> treeIndexRegistryProvider,
+            IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lcManagerProvider,
             IFileSplitProvider fileSplitProvider, ITreeIndexFrameFactory interiorFrameFactory,
             ITreeIndexFrameFactory leafFrameFactory, ITypeTraits[] typeTraits,
             IBinaryComparatorFactory[] comparatorFactories, boolean isForward, int[] lowKeyFields, int[] highKeyFields,
             boolean lowKeyInclusive, boolean highKeyInclusive, IIndexDataflowHelperFactory opHelperFactory,
             boolean isSetUnion) {
-        super(spec, 1, 1, recDesc, storageManager, treeIndexRegistryProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, opHelperFactory, null, false, NoOpOperationCallbackProvider.INSTANCE);
+        super(spec, 1, 1, recDesc, storageManager, lcManagerProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, null, opHelperFactory, null, false, new TransientLocalResourceFactoryProvider(),
+                NoOpOperationCallbackFactory.INSTANCE, NoOpOperationCallbackFactory.INSTANCE);
         this.isForward = isForward;
         this.lowKeyFields = lowKeyFields;
         this.highKeyFields = highKeyFields;
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorNodePushable.java
index bd076d3..8b9bfc2 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopJoinOperatorNodePushable.java
@@ -34,9 +34,10 @@
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.PermutingFrameTupleReference;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
 
 public class IndexNestedLoopJoinOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
     private TreeIndexDataflowHelper treeIndexOpHelper;
@@ -86,11 +87,11 @@
 
     @Override
     public void open() throws HyracksDataException {
-        accessor = new FrameTupleAccessor(treeIndexOpHelper.getHyracksTaskContext().getFrameSize(), recDesc);
+        accessor = new FrameTupleAccessor(treeIndexOpHelper.getTaskContext().getFrameSize(), recDesc);
 
         try {
-            treeIndexOpHelper.init(false);
-            btree = (BTree) treeIndexOpHelper.getIndex();
+            treeIndexOpHelper.open();
+            btree = (BTree) treeIndexOpHelper.getIndexInstance();
             writer.open();
 
             int lowKeySearchFields = btree.getComparatorFactories().length;
@@ -118,15 +119,15 @@
 
             rangePred = new RangePredicate(null, null, lowKeyInclusive, highKeyInclusive, lowKeySearchCmp,
                     highKeySearchCmp);
-            writeBuffer = treeIndexOpHelper.getHyracksTaskContext().allocateFrame();
+            writeBuffer = treeIndexOpHelper.getTaskContext().allocateFrame();
             tb = new ArrayTupleBuilder(inputRecDesc.getFields().length + btree.getFieldCount());
             dos = tb.getDataOutput();
-            appender = new FrameTupleAppender(treeIndexOpHelper.getHyracksTaskContext().getFrameSize());
+            appender = new FrameTupleAppender(treeIndexOpHelper.getTaskContext().getFrameSize());
             appender.reset(writeBuffer, true);
-            indexAccessor = btree.createAccessor();
+            indexAccessor = btree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
             setCursor();
         } catch (Exception e) {
-            treeIndexOpHelper.deinit();
+            treeIndexOpHelper.close();
             throw new HyracksDataException(e);
         }
     }
@@ -196,7 +197,7 @@
                 throw new HyracksDataException(e);
             }
         } finally {
-            treeIndexOpHelper.deinit();
+            treeIndexOpHelper.close();
         }
     }
 
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
index f7b3d62..5ca5382 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinFunctionUpdateOperatorNodePushable.java
@@ -38,9 +38,10 @@
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.PermutingFrameTupleReference;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
 import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
@@ -116,11 +117,11 @@
          * function open
          */
         functionProxy.functionOpen();
-        accessor = new FrameTupleAccessor(treeIndexOpHelper.getHyracksTaskContext().getFrameSize(), recDesc);
+        accessor = new FrameTupleAccessor(treeIndexOpHelper.getTaskContext().getFrameSize(), recDesc);
 
         try {
-            treeIndexOpHelper.init(false);
-            btree = (BTree) treeIndexOpHelper.getIndex();
+            treeIndexOpHelper.open();
+            btree = (BTree) treeIndexOpHelper.getIndexInstance();
             cursorFrame = btree.getLeafFrameFactory().createFrame();
             setCursor();
 
@@ -147,7 +148,7 @@
 
             rangePred = new RangePredicate(null, null, true, true, lowKeySearchCmp, highKeySearchCmp);
 
-            writeBuffer = treeIndexOpHelper.getHyracksTaskContext().allocateFrame();
+            writeBuffer = treeIndexOpHelper.getTaskContext().allocateFrame();
 
             nullTupleBuilder = new ArrayTupleBuilder(inputRecDesc.getFields().length);
             dos = nullTupleBuilder.getDataOutput();
@@ -157,10 +158,10 @@
                 nullTupleBuilder.addFieldEndOffset();
             }
 
-            appender = new FrameTupleAppender(treeIndexOpHelper.getHyracksTaskContext().getFrameSize());
+            appender = new FrameTupleAppender(treeIndexOpHelper.getTaskContext().getFrameSize());
             appender.reset(writeBuffer, true);
 
-            indexAccessor = btree.createAccessor();
+            indexAccessor = btree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
 
             /** set the search cursor */
             rangePred.setLowKey(null, true);
@@ -178,7 +179,7 @@
             cloneUpdateTb = new ArrayTupleBuilder(btree.getFieldCount());
             updateBuffer.setFieldCount(btree.getFieldCount());
         } catch (Exception e) {
-            treeIndexOpHelper.deinit();
+            treeIndexOpHelper.close();
             throw new HyracksDataException(e);
         }
     }
@@ -273,7 +274,7 @@
         } catch (Exception e) {
             throw new HyracksDataException(e);
         } finally {
-            treeIndexOpHelper.deinit();
+            treeIndexOpHelper.close();
         }
     }
 
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinOperatorNodePushable.java
index 9f1e1ad..d7c5d1f 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopRightOuterJoinOperatorNodePushable.java
@@ -38,9 +38,10 @@
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.PermutingFrameTupleReference;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
 
 public class IndexNestedLoopRightOuterJoinOperatorNodePushable extends
         AbstractUnaryInputUnaryOutputOperatorNodePushable {
@@ -97,10 +98,10 @@
 
     @Override
     public void open() throws HyracksDataException {
-        accessor = new FrameTupleAccessor(treeIndexOpHelper.getHyracksTaskContext().getFrameSize(), recDesc);
+        accessor = new FrameTupleAccessor(treeIndexOpHelper.getTaskContext().getFrameSize(), recDesc);
         try {
-            treeIndexOpHelper.init(false);
-            btree = (BTree) treeIndexOpHelper.getIndex();
+            treeIndexOpHelper.open();
+            btree = (BTree) treeIndexOpHelper.getIndexInstance();
             cursorFrame = btree.getLeafFrameFactory().createFrame();
             setCursor();
             writer.open();
@@ -129,13 +130,13 @@
 
             rangePred = new RangePredicate(null, null, true, true, lowKeySearchCmp, highKeySearchCmp);
 
-            writeBuffer = treeIndexOpHelper.getHyracksTaskContext().allocateFrame();
+            writeBuffer = treeIndexOpHelper.getTaskContext().allocateFrame();
             tb = new ArrayTupleBuilder(inputRecDesc.getFields().length + btree.getFieldCount());
             dos = tb.getDataOutput();
-            appender = new FrameTupleAppender(treeIndexOpHelper.getHyracksTaskContext().getFrameSize());
+            appender = new FrameTupleAppender(treeIndexOpHelper.getTaskContext().getFrameSize());
             appender.reset(writeBuffer, true);
 
-            indexAccessor = btree.createAccessor();
+            indexAccessor = btree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
 
             /** set the search cursor */
             rangePred.setLowKey(null, true);
@@ -151,7 +152,7 @@
             }
 
         } catch (Exception e) {
-            treeIndexOpHelper.deinit();
+            treeIndexOpHelper.close();
             throw new HyracksDataException(e);
         }
     }
@@ -243,7 +244,7 @@
         } catch (Exception e) {
             throw new HyracksDataException(e);
         } finally {
-            treeIndexOpHelper.deinit();
+            treeIndexOpHelper.close();
         }
     }
 
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
index 6af60a8..160324e 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionFunctionUpdateOperatorNodePushable.java
@@ -36,9 +36,10 @@
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.PermutingFrameTupleReference;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
 import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IUpdateFunctionFactory;
@@ -104,11 +105,11 @@
     @Override
     public void open() throws HyracksDataException {
         functionProxy.functionOpen();
-        accessor = new FrameTupleAccessor(treeIndexOpHelper.getHyracksTaskContext().getFrameSize(), recDesc);
+        accessor = new FrameTupleAccessor(treeIndexOpHelper.getTaskContext().getFrameSize(), recDesc);
 
         try {
-            treeIndexOpHelper.init(false);
-            btree = (BTree) treeIndexOpHelper.getIndex();
+            treeIndexOpHelper.open();
+            btree = (BTree) treeIndexOpHelper.getIndexInstance();
             cursorFrame = btree.getLeafFrameFactory().createFrame();
             setCursor();
 
@@ -120,11 +121,11 @@
             }
             lowKeySearchCmp = new MultiComparator(lowKeySearchComparators);
 
-            writeBuffer = treeIndexOpHelper.getHyracksTaskContext().allocateFrame();
-            appender = new FrameTupleAppender(treeIndexOpHelper.getHyracksTaskContext().getFrameSize());
+            writeBuffer = treeIndexOpHelper.getTaskContext().allocateFrame();
+            appender = new FrameTupleAppender(treeIndexOpHelper.getTaskContext().getFrameSize());
             appender.reset(writeBuffer, true);
 
-            indexAccessor = btree.createAccessor();
+            indexAccessor = btree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
 
             /** set the search cursor */
             rangePred.setLowKey(null, true);
@@ -141,7 +142,7 @@
             cloneUpdateTb = new ArrayTupleBuilder(btree.getFieldCount());
             updateBuffer.setFieldCount(btree.getFieldCount());
         } catch (Exception e) {
-            treeIndexOpHelper.deinit();
+            treeIndexOpHelper.close();
             throw new HyracksDataException(e);
         }
     }
@@ -214,7 +215,7 @@
         } catch (Exception e) {
             throw new HyracksDataException(e);
         } finally {
-            treeIndexOpHelper.deinit();
+            treeIndexOpHelper.close();
         }
     }
 
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionOperatorNodePushable.java
index 615a25b..579935b 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/IndexNestedLoopSetUnionOperatorNodePushable.java
@@ -37,9 +37,10 @@
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.PermutingFrameTupleReference;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
 
 public class IndexNestedLoopSetUnionOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
     private TreeIndexDataflowHelper treeIndexOpHelper;
@@ -90,11 +91,11 @@
 
     @Override
     public void open() throws HyracksDataException {
-        accessor = new FrameTupleAccessor(treeIndexOpHelper.getHyracksTaskContext().getFrameSize(), recDesc);
+        accessor = new FrameTupleAccessor(treeIndexOpHelper.getTaskContext().getFrameSize(), recDesc);
 
         try {
-            treeIndexOpHelper.init(false);
-            btree = (BTree) treeIndexOpHelper.getIndex();
+            treeIndexOpHelper.open();
+            btree = (BTree) treeIndexOpHelper.getIndexInstance();
             cursorFrame = btree.getLeafFrameFactory().createFrame();
             setCursor();
             writer.open();
@@ -107,13 +108,13 @@
             }
             lowKeySearchCmp = new MultiComparator(lowKeySearchComparators);
 
-            writeBuffer = treeIndexOpHelper.getHyracksTaskContext().allocateFrame();
+            writeBuffer = treeIndexOpHelper.getTaskContext().allocateFrame();
             tb = new ArrayTupleBuilder(btree.getFieldCount());
             dos = tb.getDataOutput();
-            appender = new FrameTupleAppender(treeIndexOpHelper.getHyracksTaskContext().getFrameSize());
+            appender = new FrameTupleAppender(treeIndexOpHelper.getTaskContext().getFrameSize());
             appender.reset(writeBuffer, true);
 
-            indexAccessor = btree.createAccessor();
+            indexAccessor = btree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
 
             /** set the search cursor */
             rangePred.setLowKey(null, true);
@@ -129,7 +130,7 @@
             }
 
         } catch (Exception e) {
-            treeIndexOpHelper.deinit();
+            treeIndexOpHelper.close();
             throw new HyracksDataException(e);
         }
     }
@@ -203,7 +204,7 @@
         } catch (Exception e) {
             throw new HyracksDataException(e);
         } finally {
-            treeIndexOpHelper.deinit();
+            treeIndexOpHelper.close();
         }
     }
 
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorDescriptor.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorDescriptor.java
index 126fcb8..eb5ece6 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorDescriptor.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorDescriptor.java
@@ -22,12 +22,12 @@
 import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
 import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+import edu.uci.ics.hyracks.storage.common.file.TransientLocalResourceFactoryProvider;
 
 public class TreeIndexBulkReLoadOperatorDescriptor extends AbstractTreeIndexOperatorDescriptor {
 
@@ -35,20 +35,21 @@
 
     private final int[] fieldPermutation;
     private final IStorageManagerInterface storageManager;
-    private final IIndexRegistryProvider<IIndex> treeIndexRegistryProvider;
+    private final IIndexLifecycleManagerProvider lcManagerProvider;
     private final IFileSplitProvider fileSplitProvider;
     private final float fillFactor;
 
     public TreeIndexBulkReLoadOperatorDescriptor(JobSpecification spec, IStorageManagerInterface storageManager,
-            IIndexRegistryProvider<IIndex> treeIndexRegistryProvider, IFileSplitProvider fileSplitProvider,
+            IIndexLifecycleManagerProvider lcManagerProvider, IFileSplitProvider fileSplitProvider,
             ITypeTraits[] typeTraits, IBinaryComparatorFactory[] comparatorFactories, int[] fieldPermutation,
             float fillFactor, IIndexDataflowHelperFactory opHelperFactory) {
-        super(spec, 1, 0, null, storageManager, treeIndexRegistryProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, opHelperFactory, null, false, NoOpOperationCallbackProvider.INSTANCE);
+        super(spec, 1, 0, null, storageManager, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories,
+                fieldPermutation, opHelperFactory, null, false, new TransientLocalResourceFactoryProvider(),
+                NoOpOperationCallbackFactory.INSTANCE, NoOpOperationCallbackFactory.INSTANCE);
         this.fieldPermutation = fieldPermutation;
 
         this.storageManager = storageManager;
-        this.treeIndexRegistryProvider = treeIndexRegistryProvider;
+        this.lcManagerProvider = lcManagerProvider;
         this.fileSplitProvider = fileSplitProvider;
         this.fillFactor = fillFactor;
     }
@@ -57,6 +58,6 @@
     public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
         return new TreeIndexBulkReLoadOperatorNodePushable(this, ctx, partition, fieldPermutation, fillFactor,
-                recordDescProvider, storageManager, treeIndexRegistryProvider, fileSplitProvider);
+                recordDescProvider, storageManager, lcManagerProvider, fileSplitProvider);
     }
 }
\ No newline at end of file
diff --git a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorNodePushable.java b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorNodePushable.java
index 883fef4..5e089a5 100644
--- a/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorNodePushable.java
+++ b/pregelix/pregelix-dataflow-std/src/main/java/edu/uci/ics/pregelix/dataflow/std/TreeIndexBulkReLoadOperatorNodePushable.java
@@ -20,111 +20,54 @@
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
-import edu.uci.ics.hyracks.storage.am.common.api.IIndexBulkLoadContext;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexBulkLoader;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexRegistry;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.PermutingFrameTupleReference;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
 import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
-import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
-import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
 
 public class TreeIndexBulkReLoadOperatorNodePushable extends AbstractUnaryInputSinkOperatorNodePushable {
-    private final TreeIndexDataflowHelper treeIndexOpHelper;
-    private FrameTupleAccessor accessor;
-    private IIndexBulkLoadContext bulkLoadCtx;
-
-    private IRecordDescriptorProvider recordDescProvider;
-    private PermutingFrameTupleReference tuple = new PermutingFrameTupleReference();
-
-    private final IStorageManagerInterface storageManager;
-    private final IIndexRegistryProvider<IIndex> treeIndexRegistryProvider;
-    private final IFileSplitProvider fileSplitProvider;
-    private final int partition;
     private final float fillFactor;
-    private IHyracksTaskContext ctx;
+    private final TreeIndexDataflowHelper treeIndexOpHelper;
+    private final IIndexOperatorDescriptor opDesc;
+    private final IRecordDescriptorProvider recordDescProvider;
+    private final PermutingFrameTupleReference tuple = new PermutingFrameTupleReference();
+
     private ITreeIndex index;
+    private FrameTupleAccessor accessor;
+    private IIndexBulkLoader bulkLoader;
 
     public TreeIndexBulkReLoadOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
             int partition, int[] fieldPermutation, float fillFactor, IRecordDescriptorProvider recordDescProvider,
-            IStorageManagerInterface storageManager, IIndexRegistryProvider<IIndex> treeIndexRegistryProvider,
+            IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lcManagerProvider,
             IFileSplitProvider fileSplitProvider) {
+        this.fillFactor = fillFactor;
         treeIndexOpHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
                 opDesc, ctx, partition);
+        this.opDesc = opDesc;
         this.recordDescProvider = recordDescProvider;
         tuple.setFieldPermutation(fieldPermutation);
-
-        this.storageManager = storageManager;
-        this.treeIndexRegistryProvider = treeIndexRegistryProvider;
-        this.fileSplitProvider = fileSplitProvider;
-        this.partition = partition;
-        this.ctx = ctx;
-        this.fillFactor = fillFactor;
     }
 
     @Override
     public void open() throws HyracksDataException {
-        initDrop();
-        init();
-    }
-
-    private void initDrop() throws HyracksDataException {
-        try {
-            IndexRegistry<IIndex> treeIndexRegistry = treeIndexRegistryProvider.getRegistry(ctx);
-            IBufferCache bufferCache = storageManager.getBufferCache(ctx);
-            IFileMapProvider fileMapProvider = storageManager.getFileMapProvider(ctx);
-
-            FileReference f = fileSplitProvider.getFileSplits()[partition].getLocalFile();
-            int indexFileId = -1;
-            boolean fileIsMapped = false;
-            synchronized (fileMapProvider) {
-                fileIsMapped = fileMapProvider.isMapped(f);
-                if (fileIsMapped)
-                    indexFileId = fileMapProvider.lookupFileId(f);
-            }
-
-            /**
-             * delete the file if it is mapped
-             */
-            if (fileIsMapped) {
-                // Unregister tree instance.
-                synchronized (treeIndexRegistry) {
-                    treeIndexRegistry.unregister(indexFileId);
-                }
-
-                // remove name to id mapping
-                bufferCache.deleteFile(indexFileId, false);
-            }
-        }
-        // TODO: for the time being we don't throw,
-        // with proper exception handling (no hanging job problem) we should
-        // throw
-        catch (Exception e) {
-            throw new HyracksDataException(e);
-        }
-    }
-
-    private void init() throws HyracksDataException {
-        AbstractTreeIndexOperatorDescriptor opDesc = (AbstractTreeIndexOperatorDescriptor) treeIndexOpHelper
-                .getOperatorDescriptor();
         RecordDescriptor recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
-        accessor = new FrameTupleAccessor(treeIndexOpHelper.getHyracksTaskContext().getFrameSize(), recDesc);
+        accessor = new FrameTupleAccessor(treeIndexOpHelper.getTaskContext().getFrameSize(), recDesc);
+        treeIndexOpHelper.create();
+        treeIndexOpHelper.open();
         try {
-            treeIndexOpHelper.init(true);
-            treeIndexOpHelper.getIndex().open(treeIndexOpHelper.getIndexFileId());
-            index = (ITreeIndex) treeIndexOpHelper.getIndex();
-            index.open(treeIndexOpHelper.getIndexFileId());
-            bulkLoadCtx = index.beginBulkLoad(fillFactor);
+            index = (ITreeIndex) treeIndexOpHelper.getIndexInstance();
+            bulkLoader = index.createBulkLoader(fillFactor, false, 0);
         } catch (Exception e) {
             // cleanup in case of failure
-            treeIndexOpHelper.deinit();
+            treeIndexOpHelper.close();
             throw new HyracksDataException(e);
         }
     }
@@ -135,16 +78,22 @@
         int tupleCount = accessor.getTupleCount();
         for (int i = 0; i < tupleCount; i++) {
             tuple.reset(accessor, i);
-            index.bulkLoadAddTuple(tuple, bulkLoadCtx);
+            try {
+                bulkLoader.add(tuple);
+            } catch (IndexException e) {
+                throw new HyracksDataException(e);
+            }
         }
     }
 
     @Override
     public void close() throws HyracksDataException {
         try {
-            index.endBulkLoad(bulkLoadCtx);
+            bulkLoader.end();
+        } catch (IndexException e) {
+            throw new HyracksDataException(e);
         } finally {
-            treeIndexOpHelper.deinit();
+            treeIndexOpHelper.close();
         }
     }
 
diff --git a/pregelix/pregelix-dataflow/pom.xml b/pregelix/pregelix-dataflow/pom.xml
index ffc000b..37abe57 100644
--- a/pregelix/pregelix-dataflow/pom.xml
+++ b/pregelix/pregelix-dataflow/pom.xml
@@ -42,8 +42,9 @@
 				</configuration>
 			</plugin>
 			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
 				<artifactId>maven-clean-plugin</artifactId>
-				<version>2.5</version>
+				<version>2.4.1</version>
 				<configuration>
 					<filesets>
 						<fileset>
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
index 567e220..8d6ab38 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
@@ -28,8 +28,8 @@
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
 import edu.uci.ics.hyracks.control.nc.io.IOManager;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexRegistry;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexLifecycleManager;
 import edu.uci.ics.hyracks.storage.common.buffercache.BufferCache;
 import edu.uci.ics.hyracks.storage.common.buffercache.ClockPageReplacementStrategy;
 import edu.uci.ics.hyracks.storage.common.buffercache.HeapBufferAllocator;
@@ -38,20 +38,25 @@
 import edu.uci.ics.hyracks.storage.common.buffercache.IPageReplacementStrategy;
 import edu.uci.ics.hyracks.storage.common.file.IFileMapManager;
 import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
-import edu.uci.ics.hyracks.storage.common.smi.TransientFileMapManager;
+import edu.uci.ics.hyracks.storage.common.file.ILocalResourceRepository;
+import edu.uci.ics.hyracks.storage.common.file.ResourceIdFactory;
+import edu.uci.ics.hyracks.storage.common.file.TransientFileMapManager;
+import edu.uci.ics.hyracks.storage.common.file.TransientLocalResourceRepository;
 import edu.uci.ics.pregelix.api.graph.Vertex;
 
 public class RuntimeContext implements IWorkspaceFileFactory {
     private static final Logger LOGGER = Logger.getLogger(RuntimeContext.class.getName());
 
-    private IndexRegistry<IIndex> treeIndexRegistry;
-    private IBufferCache bufferCache;
-    private IFileMapManager fileMapManager;
-    private Map<StateKey, IStateObject> appStateMap = new ConcurrentHashMap<StateKey, IStateObject>();
-    private Map<String, Long> giraphJobIdToSuperStep = new ConcurrentHashMap<String, Long>();
-    private Map<String, Boolean> giraphJobIdToMove = new ConcurrentHashMap<String, Boolean>();
-    private IOManager ioManager;
-    private Map<Long, List<FileReference>> iterationToFiles = new ConcurrentHashMap<Long, List<FileReference>>();
+    private final IIndexLifecycleManager lcManager;
+    private final ILocalResourceRepository localResourceRepository;
+    private final ResourceIdFactory resourceIdFactory;
+    private final IBufferCache bufferCache;
+    private final IFileMapManager fileMapManager;
+    private final Map<StateKey, IStateObject> appStateMap = new ConcurrentHashMap<StateKey, IStateObject>();
+    private final Map<String, Long> giraphJobIdToSuperStep = new ConcurrentHashMap<String, Long>();
+    private final Map<String, Boolean> giraphJobIdToMove = new ConcurrentHashMap<String, Boolean>();
+    private final IOManager ioManager;
+    private final Map<Long, List<FileReference>> iterationToFiles = new ConcurrentHashMap<Long, List<FileReference>>();
 
     public RuntimeContext(INCApplicationContext appCtx) {
         fileMapManager = new TransientFileMapManager();
@@ -64,8 +69,10 @@
         /** let the buffer cache never flush dirty pages */
         bufferCache = new BufferCache(appCtx.getRootContext().getIOManager(), allocator, prs,
                 new PreDelayPageCleanerPolicy(Long.MAX_VALUE), fileMapManager, pageSize, numPages, 1000000);
-        treeIndexRegistry = new IndexRegistry<IIndex>();
         ioManager = (IOManager) appCtx.getRootContext().getIOManager();
+        lcManager = new IndexLifecycleManager();
+        localResourceRepository = new TransientLocalResourceRepository();
+        resourceIdFactory = new ResourceIdFactory(0);
     }
 
     public void close() {
@@ -80,6 +87,18 @@
         System.gc();
     }
 
+    public ILocalResourceRepository getLocalResourceRepository() {
+        return localResourceRepository;
+    }
+
+    public ResourceIdFactory getResourceIdFactory() {
+        return resourceIdFactory;
+    }
+
+    public IIndexLifecycleManager getIndexLifecycleManager() {
+        return lcManager;
+    }
+
     public IBufferCache getBufferCache() {
         return bufferCache;
     }
@@ -88,10 +107,6 @@
         return fileMapManager;
     }
 
-    public IndexRegistry<IIndex> getTreeIndexRegistry() {
-        return treeIndexRegistry;
-    }
-
     public Map<StateKey, IStateObject> getAppStateStore() {
         return appStateMap;
     }
diff --git a/pregelix/pregelix-example/pom.xml b/pregelix/pregelix-example/pom.xml
index beefee2..20c45ec 100644
--- a/pregelix/pregelix-example/pom.xml
+++ b/pregelix/pregelix-example/pom.xml
@@ -24,6 +24,7 @@
 			</plugin>
 			<plugin>
 				<artifactId>maven-assembly-plugin</artifactId>
+                <version>2.2-beta-5</version>
 				<configuration>
 					<descriptorRefs>
 						<descriptorRef>jar-with-dependencies</descriptorRef>
@@ -42,7 +43,7 @@
 			<plugin>
 				<groupId>org.codehaus.mojo</groupId>
 				<artifactId>appassembler-maven-plugin</artifactId>
-				<version>1.3</version>
+                <version>1.3</version>
 				<executions>
 					<execution>
 						<configuration>
@@ -79,7 +80,7 @@
 			<plugin>
 				<groupId>org.apache.maven.plugins</groupId>
 				<artifactId>maven-clean-plugin</artifactId>
-				<version>2.5</version>
+				<version>2.4.1</version>
 				<configuration>
 					<filesets>
 						<fileset>
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/dataload/DataLoadTest.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/dataload/DataLoadTest.java
index 321b5b2..da6d564 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/dataload/DataLoadTest.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/dataload/DataLoadTest.java
@@ -76,8 +76,7 @@
         ClusterConfig.setStorePath(PATH_TO_CLUSTER_STORE);
         ClusterConfig.setClusterPropertiesPath(PATH_TO_CLUSTER_PROPERTIES);
         cleanupStores();
-        PregelixHyracksIntegrationUtil.init("src/test/resources/topology.xml");
-        PregelixHyracksIntegrationUtil.createApp(HYRACKS_APP_NAME);
+        PregelixHyracksIntegrationUtil.init();
         LOGGER.info("Hyracks mini-cluster started");
         startHDFS();
         FileUtils.forceMkdir(new File(EXPECT_RESULT_DIR));
@@ -112,7 +111,6 @@
     }
 
     public void tearDown() throws Exception {
-        PregelixHyracksIntegrationUtil.destroyApp(HYRACKS_APP_NAME);
         PregelixHyracksIntegrationUtil.deinit();
         LOGGER.info("Hyracks mini-cluster shut down");
         cleanupHDFS();
diff --git a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestSuite.java b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestSuite.java
index fa98ebd..4bf83e6 100644
--- a/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestSuite.java
+++ b/pregelix/pregelix-example/src/test/java/edu/uci/ics/pregelix/example/jobrun/RunJobTestSuite.java
@@ -41,176 +41,166 @@
 
 @SuppressWarnings("deprecation")
 public class RunJobTestSuite extends TestSuite {
-	private static final Logger LOGGER = Logger.getLogger(RunJobTestSuite.class
-			.getName());
+    private static final Logger LOGGER = Logger.getLogger(RunJobTestSuite.class.getName());
 
-	private static final String ACTUAL_RESULT_DIR = "actual";
-	private static final String EXPECTED_RESULT_DIR = "src/test/resources/expected";
-	private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
-	private static final String PATH_TO_CLUSTER_STORE = "src/test/resources/cluster/stores.properties";
-	private static final String PATH_TO_CLUSTER_PROPERTIES = "src/test/resources/cluster/cluster.properties";
-	private static final String PATH_TO_JOBS = "src/test/resources/jobs/";
-	private static final String PATH_TO_IGNORE = "src/test/resources/ignore.txt";
-	private static final String PATH_TO_ONLY = "src/test/resources/only.txt";
-	private static final String FILE_EXTENSION_OF_RESULTS = "result";
+    private static final String ACTUAL_RESULT_DIR = "actual";
+    private static final String EXPECTED_RESULT_DIR = "src/test/resources/expected";
+    private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
+    private static final String PATH_TO_CLUSTER_STORE = "src/test/resources/cluster/stores.properties";
+    private static final String PATH_TO_CLUSTER_PROPERTIES = "src/test/resources/cluster/cluster.properties";
+    private static final String PATH_TO_JOBS = "src/test/resources/jobs/";
+    private static final String PATH_TO_IGNORE = "src/test/resources/ignore.txt";
+    private static final String PATH_TO_ONLY = "src/test/resources/only.txt";
+    private static final String FILE_EXTENSION_OF_RESULTS = "result";
 
-	private static final String DATA_PATH = "data/webmap/webmap_link.txt";
-	private static final String HDFS_PATH = "/webmap/";
+    private static final String DATA_PATH = "data/webmap/webmap_link.txt";
+    private static final String HDFS_PATH = "/webmap/";
 
-	private static final String DATA_PATH2 = "data/webmapcomplex/webmap_link.txt";
-	private static final String HDFS_PATH2 = "/webmapcomplex/";
+    private static final String DATA_PATH2 = "data/webmapcomplex/webmap_link.txt";
+    private static final String HDFS_PATH2 = "/webmapcomplex/";
 
-	private static final String DATA_PATH3 = "data/clique/clique.txt";
-	private static final String HDFS_PATH3 = "/clique/";
+    private static final String DATA_PATH3 = "data/clique/clique.txt";
+    private static final String HDFS_PATH3 = "/clique/";
 
-	private static final String HYRACKS_APP_NAME = "pregelix";
-	private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR
-			+ File.separator + "conf.xml";
-	private MiniDFSCluster dfsCluster;
+    private static final String HYRACKS_APP_NAME = "pregelix";
+    private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";
+    private MiniDFSCluster dfsCluster;
 
-	private JobConf conf = new JobConf();
-	private int numberOfNC = 2;
+    private JobConf conf = new JobConf();
+    private int numberOfNC = 2;
 
-	public void setUp() throws Exception {
-		ClusterConfig.setStorePath(PATH_TO_CLUSTER_STORE);
-		ClusterConfig.setClusterPropertiesPath(PATH_TO_CLUSTER_PROPERTIES);
-		cleanupStores();
-		PregelixHyracksIntegrationUtil.init("src/test/resources/topology.xml");
-		PregelixHyracksIntegrationUtil.createApp(HYRACKS_APP_NAME);
-		LOGGER.info("Hyracks mini-cluster started");
-		FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
-		FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
-		startHDFS();
-	}
+    public void setUp() throws Exception {
+        ClusterConfig.setStorePath(PATH_TO_CLUSTER_STORE);
+        ClusterConfig.setClusterPropertiesPath(PATH_TO_CLUSTER_PROPERTIES);
+        cleanupStores();
+        PregelixHyracksIntegrationUtil.init();
+        LOGGER.info("Hyracks mini-cluster started");
+        FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
+        FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
+        startHDFS();
+    }
 
-	private void cleanupStores() throws IOException {
-		FileUtils.forceMkdir(new File("teststore"));
-		FileUtils.forceMkdir(new File("build"));
-		FileUtils.cleanDirectory(new File("teststore"));
-		FileUtils.cleanDirectory(new File("build"));
-	}
+    private void cleanupStores() throws IOException {
+        FileUtils.forceMkdir(new File("teststore"));
+        FileUtils.forceMkdir(new File("build"));
+        FileUtils.cleanDirectory(new File("teststore"));
+        FileUtils.cleanDirectory(new File("build"));
+    }
 
-	private void startHDFS() throws IOException {
-		conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/core-site.xml"));
-		conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/mapred-site.xml"));
-		conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/hdfs-site.xml"));
-		FileSystem lfs = FileSystem.getLocal(new Configuration());
-		lfs.delete(new Path("build"), true);
-		System.setProperty("hadoop.log.dir", "logs");
-		dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
-		FileSystem dfs = FileSystem.get(conf);
-		Path src = new Path(DATA_PATH);
-		Path dest = new Path(HDFS_PATH);
-		dfs.mkdirs(dest);
-		dfs.copyFromLocalFile(src, dest);
+    private void startHDFS() throws IOException {
+        conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/core-site.xml"));
+        conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/mapred-site.xml"));
+        conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/hdfs-site.xml"));
+        FileSystem lfs = FileSystem.getLocal(new Configuration());
+        lfs.delete(new Path("build"), true);
+        System.setProperty("hadoop.log.dir", "logs");
+        dfsCluster = new MiniDFSCluster(conf, numberOfNC, true, null);
+        FileSystem dfs = FileSystem.get(conf);
+        Path src = new Path(DATA_PATH);
+        Path dest = new Path(HDFS_PATH);
+        dfs.mkdirs(dest);
+        dfs.copyFromLocalFile(src, dest);
 
-		src = new Path(DATA_PATH2);
-		dest = new Path(HDFS_PATH2);
-		dfs.mkdirs(dest);
-		dfs.copyFromLocalFile(src, dest);
+        src = new Path(DATA_PATH2);
+        dest = new Path(HDFS_PATH2);
+        dfs.mkdirs(dest);
+        dfs.copyFromLocalFile(src, dest);
 
-		src = new Path(DATA_PATH3);
-		dest = new Path(HDFS_PATH3);
-		dfs.mkdirs(dest);
-		dfs.copyFromLocalFile(src, dest);
+        src = new Path(DATA_PATH3);
+        dest = new Path(HDFS_PATH3);
+        dfs.mkdirs(dest);
+        dfs.copyFromLocalFile(src, dest);
 
-		DataOutputStream confOutput = new DataOutputStream(
-				new FileOutputStream(new File(HADOOP_CONF_PATH)));
-		conf.writeXml(confOutput);
-		confOutput.flush();
-		confOutput.close();
-	}
+        DataOutputStream confOutput = new DataOutputStream(new FileOutputStream(new File(HADOOP_CONF_PATH)));
+        conf.writeXml(confOutput);
+        confOutput.flush();
+        confOutput.close();
+    }
 
-	/**
-	 * cleanup hdfs cluster
-	 */
-	private void cleanupHDFS() throws Exception {
-		dfsCluster.shutdown();
-	}
+    /**
+     * cleanup hdfs cluster
+     */
+    private void cleanupHDFS() throws Exception {
+        dfsCluster.shutdown();
+    }
 
-	public void tearDown() throws Exception {
-		PregelixHyracksIntegrationUtil.destroyApp(HYRACKS_APP_NAME);
-		PregelixHyracksIntegrationUtil.deinit();
-		LOGGER.info("Hyracks mini-cluster shut down");
-		cleanupHDFS();
-	}
+    public void tearDown() throws Exception {
+        PregelixHyracksIntegrationUtil.deinit();
+        LOGGER.info("Hyracks mini-cluster shut down");
+        cleanupHDFS();
+    }
 
-	public static Test suite() throws Exception {
-		List<String> ignores = getFileList(PATH_TO_IGNORE);
-		List<String> onlys = getFileList(PATH_TO_ONLY);
-		File testData = new File(PATH_TO_JOBS);
-		File[] queries = testData.listFiles();
-		RunJobTestSuite testSuite = new RunJobTestSuite();
-		testSuite.setUp();
-		boolean onlyEnabled = false;
+    public static Test suite() throws Exception {
+        List<String> ignores = getFileList(PATH_TO_IGNORE);
+        List<String> onlys = getFileList(PATH_TO_ONLY);
+        File testData = new File(PATH_TO_JOBS);
+        File[] queries = testData.listFiles();
+        RunJobTestSuite testSuite = new RunJobTestSuite();
+        testSuite.setUp();
+        boolean onlyEnabled = false;
 
-		if (onlys.size() > 0) {
-			onlyEnabled = true;
-		}
-		for (File qFile : queries) {
-			if (isInList(ignores, qFile.getName()))
-				continue;
+        if (onlys.size() > 0) {
+            onlyEnabled = true;
+        }
+        for (File qFile : queries) {
+            if (isInList(ignores, qFile.getName()))
+                continue;
 
-			if (qFile.isFile()) {
-				if (onlyEnabled && !isInList(onlys, qFile.getName())) {
-					continue;
-				} else {
-					String resultFileName = ACTUAL_RESULT_DIR + File.separator
-							+ jobExtToResExt(qFile.getName());
-					String expectedFileName = EXPECTED_RESULT_DIR
-							+ File.separator + jobExtToResExt(qFile.getName());
-					testSuite.addTest(new RunJobTestCase(HADOOP_CONF_PATH,
-							qFile.getName(),
-							qFile.getAbsolutePath().toString(), resultFileName,
-							expectedFileName));
-				}
-			}
-		}
-		return testSuite;
-	}
+            if (qFile.isFile()) {
+                if (onlyEnabled && !isInList(onlys, qFile.getName())) {
+                    continue;
+                } else {
+                    String resultFileName = ACTUAL_RESULT_DIR + File.separator + jobExtToResExt(qFile.getName());
+                    String expectedFileName = EXPECTED_RESULT_DIR + File.separator + jobExtToResExt(qFile.getName());
+                    testSuite.addTest(new RunJobTestCase(HADOOP_CONF_PATH, qFile.getName(), qFile.getAbsolutePath()
+                            .toString(), resultFileName, expectedFileName));
+                }
+            }
+        }
+        return testSuite;
+    }
 
-	/**
-	 * Runs the tests and collects their result in a TestResult.
-	 */
-	@Override
-	public void run(TestResult result) {
-		try {
-			int testCount = countTestCases();
-			for (int i = 0; i < testCount; i++) {
-				// cleanupStores();
-				Test each = this.testAt(i);
-				if (result.shouldStop())
-					break;
-				runTest(each, result);
-			}
-			tearDown();
-		} catch (Exception e) {
-			throw new IllegalStateException(e);
-		}
-	}
+    /**
+     * Runs the tests and collects their result in a TestResult.
+     */
+    @Override
+    public void run(TestResult result) {
+        try {
+            int testCount = countTestCases();
+            for (int i = 0; i < testCount; i++) {
+                // cleanupStores();
+                Test each = this.testAt(i);
+                if (result.shouldStop())
+                    break;
+                runTest(each, result);
+            }
+            tearDown();
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
 
-	protected static List<String> getFileList(String ignorePath)
-			throws FileNotFoundException, IOException {
-		BufferedReader reader = new BufferedReader(new FileReader(ignorePath));
-		String s = null;
-		List<String> ignores = new ArrayList<String>();
-		while ((s = reader.readLine()) != null) {
-			ignores.add(s);
-		}
-		reader.close();
-		return ignores;
-	}
+    protected static List<String> getFileList(String ignorePath) throws FileNotFoundException, IOException {
+        BufferedReader reader = new BufferedReader(new FileReader(ignorePath));
+        String s = null;
+        List<String> ignores = new ArrayList<String>();
+        while ((s = reader.readLine()) != null) {
+            ignores.add(s);
+        }
+        reader.close();
+        return ignores;
+    }
 
-	private static String jobExtToResExt(String fname) {
-		int dot = fname.lastIndexOf('.');
-		return fname.substring(0, dot + 1) + FILE_EXTENSION_OF_RESULTS;
-	}
+    private static String jobExtToResExt(String fname) {
+        int dot = fname.lastIndexOf('.');
+        return fname.substring(0, dot + 1) + FILE_EXTENSION_OF_RESULTS;
+    }
 
-	private static boolean isInList(List<String> onlys, String name) {
-		for (String only : onlys)
-			if (name.indexOf(only) >= 0)
-				return true;
-		return false;
-	}
+    private static boolean isInList(List<String> onlys, String name) {
+        for (String only : onlys)
+            if (name.indexOf(only) >= 0)
+                return true;
+        return false;
+    }
 
 }
diff --git a/pregelix/pregelix-runtime/pom.xml b/pregelix/pregelix-runtime/pom.xml
index bce7b12..29b6ba7 100644
--- a/pregelix/pregelix-runtime/pom.xml
+++ b/pregelix/pregelix-runtime/pom.xml
@@ -42,8 +42,9 @@
 				</configuration>
 			</plugin>
 			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
 				<artifactId>maven-clean-plugin</artifactId>
-				<version>2.5</version>
+				<version>2.4.1</version>
 				<configuration>
 					<filesets>
 						<fileset>
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/IndexLifeCycleManagerProvider.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/IndexLifeCycleManagerProvider.java
new file mode 100644
index 0000000..4fce6b3
--- /dev/null
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/IndexLifeCycleManagerProvider.java
@@ -0,0 +1,22 @@
+package edu.uci.ics.pregelix.runtime.bootstrap;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
+import edu.uci.ics.pregelix.dataflow.context.RuntimeContext;
+
+public class IndexLifeCycleManagerProvider implements IIndexLifecycleManagerProvider {
+
+    private static final long serialVersionUID = 1L;
+
+    public static final IIndexLifecycleManagerProvider INSTANCE = new IndexLifeCycleManagerProvider();
+
+    private IndexLifeCycleManagerProvider() {
+    }
+
+    @Override
+    public IIndexLifecycleManager getLifecycleManager(IHyracksTaskContext ctx) {
+        return RuntimeContext.get(ctx).getIndexLifecycleManager();
+    }
+
+}
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/NCApplicationEntryPoint.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/NCApplicationEntryPoint.java
new file mode 100644
index 0000000..fbebc66
--- /dev/null
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/NCApplicationEntryPoint.java
@@ -0,0 +1,23 @@
+package edu.uci.ics.pregelix.runtime.bootstrap;
+
+import edu.uci.ics.hyracks.api.application.INCApplicationContext;
+import edu.uci.ics.hyracks.api.application.INCApplicationEntryPoint;
+import edu.uci.ics.pregelix.dataflow.context.RuntimeContext;
+
+public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
+    @Override
+    public void start(INCApplicationContext ncAppCtx, String[] args) throws Exception {
+        RuntimeContext rCtx = new RuntimeContext(ncAppCtx);
+        ncAppCtx.setApplicationObject(rCtx);
+    }
+
+    @Override
+    public void notifyStartupComplete() throws Exception {
+
+    }
+
+    @Override
+    public void stop() throws Exception {
+
+    }
+}
\ No newline at end of file
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/NCBootstrapImpl.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/NCBootstrapImpl.java
deleted file mode 100644
index 76c725e..0000000
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/NCBootstrapImpl.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.pregelix.runtime.bootstrap;
-
-import java.util.logging.Logger;
-
-import edu.uci.ics.hyracks.api.application.INCApplicationContext;
-import edu.uci.ics.hyracks.api.application.INCBootstrap;
-import edu.uci.ics.pregelix.dataflow.context.RuntimeContext;
-
-public class NCBootstrapImpl implements INCBootstrap {
-    private static final Logger LOGGER = Logger.getLogger(NCBootstrapImpl.class.getName());
-    private INCApplicationContext appCtx;
-
-    @Override
-    public void start() throws Exception {
-        LOGGER.info("Starting NC Bootstrap");
-        RuntimeContext rCtx = new RuntimeContext(appCtx);
-        appCtx.setApplicationObject(rCtx);
-        LOGGER.info("Initialized RuntimeContext: " + rCtx);
-    }
-
-    @Override
-    public void stop() throws Exception {
-        LOGGER.info("Stopping NC Bootstrap");
-        RuntimeContext rCtx = (RuntimeContext) appCtx.getApplicationObject();
-        rCtx.close();
-    }
-
-    @Override
-    public void setApplicationContext(INCApplicationContext appCtx) {
-        this.appCtx = appCtx;
-    }
-}
\ No newline at end of file
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/StorageManagerInterface.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/StorageManagerInterface.java
index 57bbfbe..0cce59d 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/StorageManagerInterface.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/StorageManagerInterface.java
@@ -18,6 +18,8 @@
 import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
 import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
 import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
+import edu.uci.ics.hyracks.storage.common.file.ILocalResourceRepository;
+import edu.uci.ics.hyracks.storage.common.file.ResourceIdFactory;
 import edu.uci.ics.pregelix.dataflow.context.RuntimeContext;
 
 public class StorageManagerInterface implements IStorageManagerInterface {
@@ -37,4 +39,14 @@
     public IFileMapProvider getFileMapProvider(IHyracksTaskContext ctx) {
         return RuntimeContext.get(ctx).getFileMapManager();
     }
+
+    @Override
+    public ILocalResourceRepository getLocalResourceRepository(IHyracksTaskContext ctx) {
+        return RuntimeContext.get(ctx).getLocalResourceRepository();
+    }
+
+    @Override
+    public ResourceIdFactory getResourceIdFactory(IHyracksTaskContext ctx) {
+        return RuntimeContext.get(ctx).getResourceIdFactory();
+    }
 }
\ No newline at end of file
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/TreeIndexRegistryProvider.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/TreeIndexRegistryProvider.java
deleted file mode 100644
index 7d66422..0000000
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/bootstrap/TreeIndexRegistryProvider.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.pregelix.runtime.bootstrap;
-
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexRegistry;
-import edu.uci.ics.pregelix.dataflow.context.RuntimeContext;
-
-public class TreeIndexRegistryProvider implements IIndexRegistryProvider<IIndex> {
-    private static final long serialVersionUID = 1L;
-
-    public static final TreeIndexRegistryProvider INSTANCE = new TreeIndexRegistryProvider();
-
-    private TreeIndexRegistryProvider() {
-    }
-
-    @Override
-    public IndexRegistry<IIndex> getRegistry(IHyracksTaskContext ctx) {
-        return RuntimeContext.get(ctx).getTreeIndexRegistry();
-    }
-}
\ No newline at end of file
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
index f7958d9..0c09757 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/ComputeUpdateFunctionFactory.java
@@ -175,8 +175,12 @@
                 ArrayListWritable msgContentList = (ArrayListWritable) tuple[1];
                 msgContentList.reset(msgIterator);
 
-                if (!msgIterator.hasNext() && vertex.isHalted())
+                if (!msgIterator.hasNext() && vertex.isHalted()) {
                     return;
+                }
+                if (vertex.isHalted()) {
+                    vertex.activate();
+                }
 
                 try {
                     vertex.compute(msgIterator);
diff --git a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
index 0cf64a0..1bf6a2b 100644
--- a/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
+++ b/pregelix/pregelix-runtime/src/main/java/edu/uci/ics/pregelix/runtime/function/StartComputeUpdateFunctionFactory.java
@@ -176,8 +176,12 @@
                 vertex.setOutputAppenders(appenders);
                 vertex.setOutputTupleBuilders(tbs);
 
-                if (!msgIterator.hasNext() && vertex.isHalted())
+                if (!msgIterator.hasNext() && vertex.isHalted()) {
                     return;
+                }
+                if (vertex.isHalted()) {
+                    vertex.activate();
+                }
 
                 try {
                     vertex.compute(msgIterator);