Merged fullstack_lsm_staging upto r3336

git-svn-id: https://hyracks.googlecode.com/svn/trunk@3339 123451ca-8445-de46-9d55-352943316053
diff --git a/fullstack/pregelix/pregelix-core/pom.xml b/fullstack/pregelix/pregelix-core/pom.xml
index 576758b..7b247a8 100644
--- a/fullstack/pregelix/pregelix-core/pom.xml
+++ b/fullstack/pregelix/pregelix-core/pom.xml
@@ -18,8 +18,9 @@
 	<build>
 		<plugins>
 			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
 				<artifactId>maven-jar-plugin</artifactId>
-				<version>2.4</version>
+				<version>2.3.2</version>
 				<executions>
 					<execution>
 						<id>balancer</id>
@@ -72,7 +73,7 @@
 			<plugin>
 				<groupId>org.codehaus.mojo</groupId>
 				<artifactId>appassembler-maven-plugin</artifactId>
-				<version>1.3</version>
+                <version>1.3</version>
 				<executions>
 					<execution>
 						<configuration>
@@ -164,7 +165,7 @@
 			</plugin>
 			<plugin>
 				<artifactId>maven-clean-plugin</artifactId>
-				<version>2.5</version>
+                <version>2.4.1</version>
 				<configuration>
 					<filesets>
 						<fileset>
diff --git a/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java b/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
index 3a4c41b..72256f9 100644
--- a/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
+++ b/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/driver/Driver.java
@@ -147,22 +147,11 @@
             start = System.currentTimeMillis();
             runHDFSWRite(jobGen);
             runCleanup(jobGen);
-            destroyApplication(applicationName);
             end = System.currentTimeMillis();
             time = end - start;
             LOG.info("result writing finished " + time + "ms");
             LOG.info("job finished");
         } catch (Exception e) {
-            try {
-                /**
-                 * destroy application if there is any exception
-                 */
-                if (hcc != null) {
-                    destroyApplication(applicationName);
-                }
-            } catch (Exception e2) {
-                throw new HyracksException(e2);
-            }
             throw new HyracksException(e);
         }
     }
@@ -220,8 +209,8 @@
 
     private void execute(JobSpecification job) throws Exception {
         job.setUseConnectorPolicyForScheduling(false);
-        JobId jobId = hcc.startJob(applicationName, job,
-                profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
+        JobId jobId = hcc
+                .startJob(job, profiling ? EnumSet.of(JobFlag.PROFILE_RUNTIME) : EnumSet.noneOf(JobFlag.class));
         hcc.waitForCompletion(jobId);
     }
 
@@ -236,15 +225,11 @@
         LOG.info("jar packing finished " + (end - start) + "ms");
 
         start = System.currentTimeMillis();
-        hcc.createApplication(applicationName, appZip);
+        // TODO: Fix this step to use Yarn
+        //hcc.createApplication(applicationName, appZip);
         end = System.currentTimeMillis();
         LOG.info("jar deployment finished " + (end - start) + "ms");
     }
-
-    public void destroyApplication(String appName) throws Exception {
-        hcc.destroyApplication(appName);
-    }
-
 }
 
 class FileFilter implements FilenameFilter {
diff --git a/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index 0b1be61..77fd1a7 100644
--- a/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -52,13 +52,13 @@
 import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDropOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
 import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+import edu.uci.ics.hyracks.storage.common.file.TransientLocalResourceFactoryProvider;
 import edu.uci.ics.pregelix.api.graph.GlobalAggregator;
 import edu.uci.ics.pregelix.api.graph.MessageCombiner;
 import edu.uci.ics.pregelix.api.graph.Vertex;
@@ -80,8 +80,8 @@
 import edu.uci.ics.pregelix.dataflow.base.IConfigurationFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IRecordDescriptorFactory;
 import edu.uci.ics.pregelix.dataflow.std.base.IRuntimeHookFactory;
+import edu.uci.ics.pregelix.runtime.bootstrap.IndexLifeCycleManagerProvider;
 import edu.uci.ics.pregelix.runtime.bootstrap.StorageManagerInterface;
-import edu.uci.ics.pregelix.runtime.bootstrap.TreeIndexRegistryProvider;
 import edu.uci.ics.pregelix.runtime.touchpoint.RuntimeHookFactory;
 import edu.uci.ics.pregelix.runtime.touchpoint.VertexIdPartitionComputerFactory;
 
@@ -93,7 +93,7 @@
     protected static final String PRIMARY_INDEX = "primary";
     protected final Configuration conf;
     protected final PregelixJob giraphJob;
-    protected IIndexRegistryProvider<IIndex> treeRegistryProvider = TreeIndexRegistryProvider.INSTANCE;
+    protected IIndexLifecycleManagerProvider lcManagerProvider = IndexLifeCycleManagerProvider.INSTANCE;
     protected IStorageManagerInterface storageManagerInterface = StorageManagerInterface.INSTANCE;
     protected String jobId = new UUID(System.currentTimeMillis(), System.nanoTime()).toString();
     protected int frameSize = ClusterConfig.getFrameSize();
@@ -169,8 +169,9 @@
 
         IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, PRIMARY_INDEX);
         TreeIndexCreateOperatorDescriptor btreeCreate = new TreeIndexCreateOperatorDescriptor(spec,
-                storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
-                new BTreeDataflowHelperFactory(), NoOpOperationCallbackProvider.INSTANCE);
+                storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories, null,
+                new BTreeDataflowHelperFactory(), new TransientLocalResourceFactoryProvider(),
+                NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, btreeCreate);
         return spec;
     }
@@ -229,9 +230,9 @@
         typeTraits[0] = new TypeTraits(false);
         typeTraits[1] = new TypeTraits(false);
         TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
-                storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
-                fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, new BTreeDataflowHelperFactory(),
-                NoOpOperationCallbackProvider.INSTANCE);
+                storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories, null,
+                fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, false, 0, new BTreeDataflowHelperFactory(),
+                NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
 
         /**
@@ -356,8 +357,8 @@
         typeTraits[0] = new TypeTraits(false);
         typeTraits[1] = new TypeTraits(false);
         BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(spec, recordDescriptor,
-                storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
-                null, null, true, true, new BTreeDataflowHelperFactory(), false, NoOpOperationCallbackProvider.INSTANCE);
+                storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories, null,
+                null, null, true, true, new BTreeDataflowHelperFactory(), false, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, scanner);
 
         /**
@@ -424,9 +425,10 @@
         ITypeTraits[] typeTraits = new ITypeTraits[2];
         typeTraits[0] = new TypeTraits(false);
         typeTraits[1] = new TypeTraits(false);
+
         BTreeSearchOperatorDescriptor scanner = new BTreeSearchOperatorDescriptor(spec, recordDescriptor,
-                storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits, comparatorFactories,
-                null, null, true, true, new BTreeDataflowHelperFactory(), false, NoOpOperationCallbackProvider.INSTANCE);
+                storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories, null,
+                null, null, true, true, new BTreeDataflowHelperFactory(), false, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, scanner);
 
         /**
@@ -455,8 +457,8 @@
         JobSpecification spec = new JobSpecification();
 
         IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(jobId, indexName);
-        TreeIndexDropOperatorDescriptor drop = new TreeIndexDropOperatorDescriptor(spec, storageManagerInterface,
-                treeRegistryProvider, fileSplitProvider);
+        IndexDropOperatorDescriptor drop = new IndexDropOperatorDescriptor(spec, storageManagerInterface,
+                lcManagerProvider, fileSplitProvider, new BTreeDataflowHelperFactory());
 
         ClusterConfig.setLocationConstraint(spec, drop);
         spec.addRoot(drop);
diff --git a/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java b/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
index 9de4c04..fe2fcac 100644
--- a/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
+++ b/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenInnerJoin.java
@@ -40,8 +40,8 @@
 import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
-import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
 import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
 import edu.uci.ics.pregelix.api.graph.MsgList;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -135,7 +135,7 @@
         RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
 
         BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
-                recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+                recordDescriptor, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
                 new BTreeDataflowHelperFactory(), inputRdFactory, 6,
                 new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
@@ -166,8 +166,8 @@
         indexCmpFactories[0] = JobGenUtil.getIBinaryComparatorFactory(iteration + 1,
                 WritableComparator.get(vertexIdClass).getClass());
         TreeIndexBulkReLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkReLoadOperatorDescriptor(spec,
-                storageManagerInterface, treeRegistryProvider, secondaryFileSplitProvider, typeTraits,
-                indexCmpFactories, fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, new BTreeDataflowHelperFactory());
+                storageManagerInterface, lcManagerProvider, secondaryFileSplitProvider, typeTraits, indexCmpFactories,
+                fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, new BTreeDataflowHelperFactory());
         ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
 
         /**
@@ -222,18 +222,18 @@
          * add the insert operator to insert vertexes
          */
         TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
-                spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
-                NoOpOperationCallbackProvider.INSTANCE);
+                spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
+                null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, insertOp);
 
         /**
          * add the delete operator to delete vertexes
          */
         TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
-                spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
-                NoOpOperationCallbackProvider.INSTANCE);
+                spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, null, fieldPermutation, IndexOperation.DELETE, new BTreeDataflowHelperFactory(),
+                null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, deleteOp);
 
         /** construct empty sink operator */
@@ -343,7 +343,7 @@
         ITreeIndexFrameFactory leafFrameFactory = new BTreeNSMLeafFrameFactory(new TypeAwareTupleWriterFactory(
                 typeTraits));
         IndexNestedLoopJoinOperatorDescriptor setUnion = new IndexNestedLoopJoinOperatorDescriptor(spec, rdFinal,
-                storageManagerInterface, treeRegistryProvider, secondaryFileSplitProviderRead, interiorFrameFactory,
+                storageManagerInterface, lcManagerProvider, secondaryFileSplitProviderRead, interiorFrameFactory,
                 leafFrameFactory, typeTraits, comparatorFactories, true, keyFields, keyFields, true, true,
                 new BTreeDataflowHelperFactory(), true);
         ClusterConfig.setLocationConstraint(spec, setUnion);
@@ -361,8 +361,8 @@
                 vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
 
         IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
-                spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields, keyFields, true, true,
+                spec, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories,
+                JobGenUtil.getForwardScan(iteration), keyFields, keyFields, true, true,
                 new BTreeDataflowHelperFactory(), inputRdFactory, 6, new ComputeUpdateFunctionFactory(confFactory),
                 preHookFactory, null, rdUnnestedMessage, rdDummy, rdPartialAggregate, rdInsert, rdDelete, rdFinal);
         ClusterConfig.setLocationConstraint(spec, join);
@@ -377,7 +377,7 @@
         String writeFile = iteration % 2 == 0 ? SECONDARY_INDEX_EVEN : SECONDARY_INDEX_ODD;
         IFileSplitProvider secondaryFileSplitProviderWrite = ClusterConfig.getFileSplitProvider(jobId, writeFile);
         TreeIndexBulkReLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkReLoadOperatorDescriptor(spec,
-                storageManagerInterface, treeRegistryProvider, secondaryFileSplitProviderWrite, typeTraits,
+                storageManagerInterface, lcManagerProvider, secondaryFileSplitProviderWrite, typeTraits,
                 indexCmpFactories, fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, new BTreeDataflowHelperFactory());
         ClusterConfig.setLocationConstraint(spec, btreeBulkLoad);
 
@@ -446,18 +446,18 @@
          * add the insert operator to insert vertexes
          */
         TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
-                spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
-                NoOpOperationCallbackProvider.INSTANCE);
+                spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
+                null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, insertOp);
 
         /**
          * add the delete operator to delete vertexes
          */
         TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
-                spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
-                NoOpOperationCallbackProvider.INSTANCE);
+                spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, null, fieldPermutation, IndexOperation.DELETE, new BTreeDataflowHelperFactory(),
+                null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, deleteOp);
 
         /** construct empty sink operator */
diff --git a/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java b/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
index 91c15b2..f1eceb7 100644
--- a/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
+++ b/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoin.java
@@ -40,8 +40,8 @@
 import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
-import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
 import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
 import edu.uci.ics.pregelix.api.graph.MsgList;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -129,7 +129,7 @@
         RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
 
         BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
-                recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+                recordDescriptor, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
                 new BTreeDataflowHelperFactory(), inputRdFactory, 5,
                 new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
@@ -205,18 +205,18 @@
          */
         int[] fieldPermutation = new int[] { 0, 1 };
         TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
-                spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
-                NoOpOperationCallbackProvider.INSTANCE);
+                spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
+                null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, insertOp);
 
         /**
          * add the delete operator to delete vertexes
          */
         TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
-                spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
-                NoOpOperationCallbackProvider.INSTANCE);
+                spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, null, fieldPermutation, IndexOperation.DELETE, new BTreeDataflowHelperFactory(),
+                null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, deleteOp);
 
         /** construct empty sink operator */
@@ -335,7 +335,7 @@
                 vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
 
         IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
-                spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, interiorFrameFactory,
+                spec, storageManagerInterface, lcManagerProvider, fileSplitProvider, interiorFrameFactory,
                 leafFrameFactory, typeTraits, comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields,
                 keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 5,
                 new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
@@ -408,18 +408,18 @@
          */
         int[] fieldPermutation = new int[] { 0, 1 };
         TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
-                spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
-                NoOpOperationCallbackProvider.INSTANCE);
+                spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
+                null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, insertOp);
 
         /**
          * add the delete operator to delete vertexes
          */
         TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
-                spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
-                NoOpOperationCallbackProvider.INSTANCE);
+                spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, null, fieldPermutation, IndexOperation.DELETE, new BTreeDataflowHelperFactory(),
+                null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, deleteOp);
 
         /** construct empty sink operator */
diff --git a/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java b/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
index ee1fd0f..314c393 100644
--- a/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
+++ b/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSingleSort.java
@@ -39,8 +39,8 @@
 import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
-import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
 import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
 import edu.uci.ics.pregelix.api.graph.MsgList;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -131,7 +131,7 @@
         RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
 
         BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
-                recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+                recordDescriptor, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
                 new BTreeDataflowHelperFactory(), inputRdFactory, 5,
                 new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
@@ -197,18 +197,18 @@
          */
         int[] fieldPermutation = new int[] { 0, 1 };
         TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
-                spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
-                NoOpOperationCallbackProvider.INSTANCE);
+                spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
+                null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, insertOp);
 
         /**
          * add the delete operator to delete vertexes
          */
         TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
-                spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
-                NoOpOperationCallbackProvider.INSTANCE);
+                spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, null, fieldPermutation, IndexOperation.DELETE, new BTreeDataflowHelperFactory(),
+                null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, deleteOp);
 
         /** construct empty sink operator */
@@ -323,7 +323,7 @@
                 vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
 
         IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
-                spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, interiorFrameFactory,
+                spec, storageManagerInterface, lcManagerProvider, fileSplitProvider, interiorFrameFactory,
                 leafFrameFactory, typeTraits, comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields,
                 keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 5,
                 new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
@@ -384,18 +384,18 @@
 
         int[] fieldPermutation = new int[] { 0, 1 };
         TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
-                spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
-                NoOpOperationCallbackProvider.INSTANCE);
+                spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
+                null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, insertOp);
 
         /**
          * add the delete operator to delete vertexes
          */
         TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
-                spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
-                NoOpOperationCallbackProvider.INSTANCE);
+                spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, null, fieldPermutation, IndexOperation.DELETE, new BTreeDataflowHelperFactory(),
+                null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, deleteOp);
 
         /** construct empty sink operator */
diff --git a/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java b/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
index 628e9ce..0c3db38 100644
--- a/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
+++ b/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGenOuterJoinSort.java
@@ -39,8 +39,8 @@
 import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexInsertUpdateDeleteOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
-import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
 import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
 import edu.uci.ics.pregelix.api.graph.MsgList;
 import edu.uci.ics.pregelix.api.job.PregelixJob;
@@ -128,7 +128,7 @@
         RecordDescriptor rdDelete = DataflowUtils.getRecordDescriptorFromWritableClasses(vertexIdClass.getName());
 
         BTreeSearchFunctionUpdateOperatorDescriptor scanner = new BTreeSearchFunctionUpdateOperatorDescriptor(spec,
-                recordDescriptor, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
+                recordDescriptor, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, JobGenUtil.getForwardScan(iteration), null, null, true, true,
                 new BTreeDataflowHelperFactory(), inputRdFactory, 5,
                 new StartComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
@@ -211,18 +211,18 @@
          */
         int[] fieldPermutation = new int[] { 0, 1 };
         TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
-                spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
-                NoOpOperationCallbackProvider.INSTANCE);
+                spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
+                null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, insertOp);
 
         /**
          * add the delete operator to delete vertexes
          */
         TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
-                spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
-                NoOpOperationCallbackProvider.INSTANCE);
+                spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, null, fieldPermutation, IndexOperation.DELETE, new BTreeDataflowHelperFactory(),
+                null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, deleteOp);
 
         /** construct empty sink operator */
@@ -337,7 +337,7 @@
                 vertexIdClass.getName(), MsgList.class.getName(), vertexIdClass.getName(), vertexClass.getName());
 
         IndexNestedLoopJoinFunctionUpdateOperatorDescriptor join = new IndexNestedLoopJoinFunctionUpdateOperatorDescriptor(
-                spec, storageManagerInterface, treeRegistryProvider, fileSplitProvider, interiorFrameFactory,
+                spec, storageManagerInterface, lcManagerProvider, fileSplitProvider, interiorFrameFactory,
                 leafFrameFactory, typeTraits, comparatorFactories, JobGenUtil.getForwardScan(iteration), keyFields,
                 keyFields, true, true, new BTreeDataflowHelperFactory(), true, nullWriterFactories, inputRdFactory, 5,
                 new ComputeUpdateFunctionFactory(confFactory), preHookFactory, null, rdUnnestedMessage, rdDummy,
@@ -417,18 +417,18 @@
          */
         int[] fieldPermutation = new int[] { 0, 1 };
         TreeIndexInsertUpdateDeleteOperatorDescriptor insertOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
-                spec, rdInsert, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, fieldPermutation, IndexOp.INSERT, new BTreeDataflowHelperFactory(), null,
-                NoOpOperationCallbackProvider.INSTANCE);
+                spec, rdInsert, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, null, fieldPermutation, IndexOperation.INSERT, new BTreeDataflowHelperFactory(),
+                null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, insertOp);
 
         /**
          * add the delete operator to delete vertexes
          */
         TreeIndexInsertUpdateDeleteOperatorDescriptor deleteOp = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
-                spec, rdDelete, storageManagerInterface, treeRegistryProvider, fileSplitProvider, typeTraits,
-                comparatorFactories, fieldPermutation, IndexOp.DELETE, new BTreeDataflowHelperFactory(), null,
-                NoOpOperationCallbackProvider.INSTANCE);
+                spec, rdDelete, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, null, fieldPermutation, IndexOperation.DELETE, new BTreeDataflowHelperFactory(),
+                null, NoOpOperationCallbackFactory.INSTANCE);
         ClusterConfig.setLocationConstraint(spec, deleteOp);
 
         /** construct empty sink operator */
diff --git a/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java b/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
index cd2a864..d099645 100644
--- a/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
+++ b/fullstack/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/util/PregelixHyracksIntegrationUtil.java
@@ -14,7 +14,6 @@
  */
 package edu.uci.ics.pregelix.core.util;
 
-import java.io.File;
 import java.util.EnumSet;
 
 import edu.uci.ics.hyracks.api.client.HyracksConnection;
@@ -27,6 +26,7 @@
 import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
 import edu.uci.ics.hyracks.control.nc.NodeControllerService;
 import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.runtime.bootstrap.NCApplicationEntryPoint;
 
 public class PregelixHyracksIntegrationUtil {
 
@@ -45,7 +45,7 @@
     private static NodeControllerService nc2;
     private static IHyracksClientConnection hcc;
 
-    public static void init(String topologyFilePath) throws Exception {
+    public static void init() throws Exception {
         CCConfig ccConfig = new CCConfig();
         ccConfig.clientNetIpAddress = CC_HOST;
         ccConfig.clusterNetIpAddress = CC_HOST;
@@ -54,7 +54,6 @@
         ccConfig.defaultMaxJobAttempts = 0;
         ccConfig.jobHistorySize = 0;
         ccConfig.profileDumpPeriod = -1;
-        ccConfig.clusterTopologyDefinition = new File(topologyFilePath);
 
         // cluster controller
         cc = new ClusterControllerService(ccConfig);
@@ -68,6 +67,7 @@
         ncConfig1.dataIPAddress = "127.0.0.1";
         ncConfig1.datasetIPAddress = "127.0.0.1";
         ncConfig1.nodeId = NC1_ID;
+        ncConfig1.appNCMainClass = NCApplicationEntryPoint.class.getName();
         nc1 = new NodeControllerService(ncConfig1);
         nc1.start();
 
@@ -78,6 +78,7 @@
         ncConfig2.dataIPAddress = "127.0.0.1";
         ncConfig2.datasetIPAddress = "127.0.0.1";
         ncConfig2.nodeId = NC2_ID;
+        ncConfig2.appNCMainClass = NCApplicationEntryPoint.class.getName();
         nc2 = new NodeControllerService(ncConfig2);
         nc2.start();
 
@@ -86,14 +87,6 @@
         ClusterConfig.loadClusterConfig(CC_HOST, TEST_HYRACKS_CC_CLIENT_PORT);
     }
 
-    public static void destroyApp(String hyracksAppName) throws Exception {
-        hcc.destroyApplication(hyracksAppName);
-    }
-
-    public static void createApp(String hyracksAppName) throws Exception {
-        hcc.createApplication(hyracksAppName, null);
-    }
-
     public static void deinit() throws Exception {
         nc2.stop();
         nc1.stop();
@@ -102,7 +95,7 @@
 
     public static void runJob(JobSpecification spec, String appName) throws Exception {
         spec.setFrameSize(FRAME_SIZE);
-        JobId jobId = hcc.startJob(appName, spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
+        JobId jobId = hcc.startJob(spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
         hcc.waitForCompletion(jobId);
     }
 
diff --git a/fullstack/pregelix/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java b/fullstack/pregelix/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java
index f7cadf6..3c00cad 100644
--- a/fullstack/pregelix/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java
+++ b/fullstack/pregelix/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java
@@ -50,14 +50,14 @@
 import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
 import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
 import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
 import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+import edu.uci.ics.hyracks.storage.common.file.TransientLocalResourceFactoryProvider;
 import edu.uci.ics.pregelix.core.data.TypeTraits;
 import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
 import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
@@ -65,8 +65,8 @@
 import edu.uci.ics.pregelix.dataflow.VertexWriteOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.std.IndexNestedLoopJoinOperatorDescriptor;
 import edu.uci.ics.pregelix.dataflow.std.ProjectOperatorDescriptor;
+import edu.uci.ics.pregelix.runtime.bootstrap.IndexLifeCycleManagerProvider;
 import edu.uci.ics.pregelix.runtime.bootstrap.StorageManagerInterface;
-import edu.uci.ics.pregelix.runtime.bootstrap.TreeIndexRegistryProvider;
 
 public class JoinTest {
     private final static String ACTUAL_RESULT_DIR = "actual";
@@ -82,7 +82,7 @@
     private static final String PATH_TO_CLUSTER_PROPERTIES = "src/test/resources/cluster/cluster.properties";
 
     private static final float DEFAULT_BTREE_FILL_FACTOR = 1.00f;
-    private IIndexRegistryProvider<IIndex> treeRegistry = TreeIndexRegistryProvider.INSTANCE;
+    private IIndexLifecycleManagerProvider lcManagerProvider = IndexLifeCycleManagerProvider.INSTANCE;
     private IStorageManagerInterface storageManagerInterface = StorageManagerInterface.INSTANCE;
 
     private IBinaryHashFunctionFactory stringHashFactory = new PointableBinaryHashFunctionFactory(
@@ -102,8 +102,7 @@
         ClusterConfig.setStorePath(PATH_TO_CLUSTER_STORE);
         ClusterConfig.setClusterPropertiesPath(PATH_TO_CLUSTER_PROPERTIES);
         cleanupStores();
-        PregelixHyracksIntegrationUtil.init("src/test/resources/topology.xml");
-        PregelixHyracksIntegrationUtil.createApp(HYRACKS_APP_NAME);
+        PregelixHyracksIntegrationUtil.init();
 
         FileUtils.forceMkdir(new File(EXPECT_RESULT_DIR));
         FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
@@ -121,7 +120,6 @@
         runIndexRightOuterJoin();
         TestUtils.compareWithResult(new File(EXPECTED_RESULT_FILE), new File(ACTUAL_RESULT_FILE));
 
-        PregelixHyracksIntegrationUtil.destroyApp(HYRACKS_APP_NAME);
         PregelixHyracksIntegrationUtil.deinit();
     }
 
@@ -195,8 +193,8 @@
         FileSplit[] results = new FileSplit[1];
         results[0] = resultFile;
         IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
-        VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
-                null);
+        VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider,
+                null, null);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { NC1_ID });
         PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
 
@@ -234,8 +232,9 @@
         for (int i = 0; i < typeTraits.length; i++)
             typeTraits[i] = new TypeTraits(false);
         TreeIndexCreateOperatorDescriptor writer = new TreeIndexCreateOperatorDescriptor(spec, storageManagerInterface,
-                treeRegistry, fileSplitProvider, typeTraits, comparatorFactories, new BTreeDataflowHelperFactory(),
-                NoOpOperationCallbackProvider.INSTANCE);
+                lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories, null,
+                new BTreeDataflowHelperFactory(), new TransientLocalResourceFactoryProvider(),
+                NoOpOperationCallbackFactory.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, NC1_ID, NC2_ID);
         spec.addRoot(writer);
         runTest(spec);
@@ -278,9 +277,9 @@
         for (int i = 0; i < typeTraits.length; i++)
             typeTraits[i] = new TypeTraits(false);
         TreeIndexBulkLoadOperatorDescriptor writer = new TreeIndexBulkLoadOperatorDescriptor(spec,
-                storageManagerInterface, treeRegistry, fileSplitProvider, typeTraits, comparatorFactories,
-                fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, new BTreeDataflowHelperFactory(),
-                NoOpOperationCallbackProvider.INSTANCE);
+                storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories, null,
+                fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, false, 0, new BTreeDataflowHelperFactory(),
+                NoOpOperationCallbackFactory.INSTANCE);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, NC1_ID, NC2_ID);
 
         spec.connect(new OneToOneConnectorDescriptor(spec), custScanner, 0, sorter, 0);
@@ -353,8 +352,8 @@
         for (int i = 0; i < typeTraits.length; i++)
             typeTraits[i] = new TypeTraits(false);
         IndexNestedLoopJoinOperatorDescriptor join = new IndexNestedLoopJoinOperatorDescriptor(spec, custOrderJoinDesc,
-                storageManagerInterface, treeRegistry, fileSplitProvider, typeTraits, keyComparatorFactories, true,
-                keyFields, keyFields, true, true, new BTreeDataflowHelperFactory());
+                storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, keyComparatorFactories,
+                true, keyFields, keyFields, true, true, new BTreeDataflowHelperFactory());
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
 
         /** results (already in sorted order) */
@@ -362,8 +361,8 @@
         FileSplit[] results = new FileSplit[1];
         results[0] = resultFile;
         IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
-        VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
-                null);
+        VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider,
+                null, null);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { NC1_ID });
         PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
 
@@ -459,8 +458,8 @@
         FileSplit[] results = new FileSplit[1];
         results[0] = resultFile;
         IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
-        VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
-                null);
+        VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider,
+                null, null);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { NC1_ID });
         PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
 
@@ -556,7 +555,7 @@
         ITreeIndexFrameFactory leafFrameFactory = new BTreeNSMLeafFrameFactory(new TypeAwareTupleWriterFactory(
                 typeTraits));
         IndexNestedLoopJoinOperatorDescriptor join = new IndexNestedLoopJoinOperatorDescriptor(spec, custOrderJoinDesc,
-                storageManagerInterface, treeRegistry, fileSplitProvider, interiorFrameFactory, leafFrameFactory,
+                storageManagerInterface, lcManagerProvider, fileSplitProvider, interiorFrameFactory, leafFrameFactory,
                 typeTraits, keyComparatorFactories, true, keyFields, keyFields, true, true,
                 new BTreeDataflowHelperFactory(), true, nullWriterFactories);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
@@ -566,8 +565,8 @@
         FileSplit[] results = new FileSplit[1];
         results[0] = resultFile;
         IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
-        VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
-                null);
+        VertexWriteOperatorDescriptor writer = new VertexWriteOperatorDescriptor(spec, null, resultFileSplitProvider,
+                null, null);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { NC1_ID });
         PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);