ASTERIXDB-1053: Allow lazy loading for persistent local resources

- Allow indexes to be accessed by name.
- Allow lazy loading for persistent local resources.
- Caching for local resources.

Change-Id: I48b9260a3280750145f6ddb3783673a299055910
Reviewed-on: https://asterix-gerrit.ics.uci.edu/344
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
diff --git a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
index 13d7e21..35afd8b 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
@@ -96,7 +96,6 @@
     private AsterixTransactionProperties txnProperties;
     private AsterixFeedProperties feedProperties;
 
-
     private AsterixThreadExecutor threadExecutor;
     private DatasetLifecycleManager indexLifecycleManager;
     private IFileMapManager fileMapManager;
@@ -140,7 +139,7 @@
         metadataMergePolicyFactory = new PrefixMergePolicyFactory();
 
         ILocalResourceRepositoryFactory persistentLocalResourceRepositoryFactory = new PersistentLocalResourceRepositoryFactory(
-                ioManager);
+                ioManager, ncApplicationContext.getNodeId());
         localResourceRepository = (PersistentLocalResourceRepository) persistentLocalResourceRepositoryFactory
                 .createRepository();
         resourceIdFactory = (new ResourceIdFactoryProvider(localResourceRepository)).createResourceIdFactory();
@@ -149,10 +148,10 @@
                 this);
         txnSubsystem = new TransactionSubsystem(ncApplicationContext.getNodeId(), asterixAppRuntimeContextProvider,
                 txnProperties);
-        
+
         indexLifecycleManager = new DatasetLifecycleManager(storageProperties, localResourceRepository,
-                MetadataPrimaryIndexes.FIRST_AVAILABLE_USER_DATASET_ID,(LogManager)txnSubsystem.getLogManager());
-        
+                MetadataPrimaryIndexes.FIRST_AVAILABLE_USER_DATASET_ID, (LogManager) txnSubsystem.getLogManager());
+
         isShuttingdown = false;
 
         feedManager = new FeedManager(ncApplicationContext.getNodeId(), feedProperties,
@@ -243,7 +242,7 @@
     public AsterixExternalProperties getExternalProperties() {
         return externalProperties;
     }
-    
+
     @Override
     public AsterixFeedProperties getFeedProperties() {
         return feedProperties;
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
index 1a3953d..1ad34da 100644
--- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
@@ -84,7 +84,7 @@
         MetadataManager.INSTANCE = new MetadataManager(proxy, metadataProperties);
 
         AsterixAppContextInfo.getInstance().getCCApplicationContext()
-        .addJobLifecycleListener(FeedLifecycleListener.INSTANCE);
+                .addJobLifecycleListener(FeedLifecycleListener.INSTANCE);
 
         AsterixExternalProperties externalProperties = AsterixAppContextInfo.getInstance().getExternalProperties();
         setupWebServer(externalProperties);
@@ -92,17 +92,18 @@
 
         setupJSONAPIServer(externalProperties);
         jsonAPIServer.start();
-        ExternalLibraryBootstrap.setUpExternaLibraries(false);
 
         setupFeedServer(externalProperties);
         feedServer.start();
-        centralFeedManager = CentralFeedManager.getInstance(); 
-        centralFeedManager.start();
 
         waitUntilServerStart(webServer);
         waitUntilServerStart(jsonAPIServer);
         waitUntilServerStart(feedServer);
 
+        ExternalLibraryBootstrap.setUpExternaLibraries(false);
+        centralFeedManager = CentralFeedManager.getInstance();
+        centralFeedManager.start();
+
         AsterixGlobalRecoveryManager.INSTANCE = new AsterixGlobalRecoveryManager(
                 (HyracksConnection) getNewHyracksClientConnection());
         ClusterManager.INSTANCE.registerSubscriber(AsterixGlobalRecoveryManager.INSTANCE);
@@ -182,7 +183,7 @@
 
         feedServer.setHandler(context);
         context.addServlet(new ServletHolder(new FeedServlet()), "/");
-   
+
         // add paths here
     }
 }
\ No newline at end of file
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index bb21465..63f862c 100644
--- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -112,12 +112,6 @@
                 LOGGER.info("System is in a state: " + systemState);
             }
 
-            if (systemState != SystemState.NEW_UNIVERSE) {
-                PersistentLocalResourceRepository localResourceRepository = (PersistentLocalResourceRepository) runtimeContext
-                        .getLocalResourceRepository();
-                localResourceRepository.initialize(nodeId, null, false, runtimeContext.getResourceIdFactory());
-            }
-            
             if (systemState == SystemState.CORRUPTED) {
                 recoveryMgr.startRecovery(true);
             }
@@ -165,7 +159,7 @@
 
             PersistentLocalResourceRepository localResourceRepository = (PersistentLocalResourceRepository) runtimeContext
                     .getLocalResourceRepository();
-            localResourceRepository.initialize(nodeId, metadataProperties.getStores().get(nodeId)[0], true, null);
+            localResourceRepository.initialize(nodeId, metadataProperties.getStores().get(nodeId)[0]);
         }
 
         IAsterixStateProxy proxy = null;
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 9df4970..741e106 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -77,8 +77,15 @@
     }
 
     @Override
-    public synchronized IIndex getIndex(long resourceID) throws HyracksDataException {
-        DatasetInfo dsInfo = datasetInfos.get(getDIDfromRID(resourceID));
+    public synchronized IIndex getIndex(String resourceName) throws HyracksDataException {
+        int datasetID = getDIDfromResourceName(resourceName);
+        long resourceID = getResourceIDfromResourceName(resourceName);
+        return getIndex(datasetID, resourceID);
+    }
+
+    @Override
+    public synchronized IIndex getIndex(int datasetID, long resourceID) throws HyracksDataException {
+        DatasetInfo dsInfo = datasetInfos.get(datasetID);
         if (dsInfo == null) {
             return null;
         }
@@ -90,8 +97,9 @@
     }
 
     @Override
-    public synchronized void register(long resourceID, IIndex index) throws HyracksDataException {
-        int did = getDIDfromRID(resourceID);
+    public synchronized void register(String resourceName, IIndex index) throws HyracksDataException {
+        int did = getDIDfromResourceName(resourceName);
+        long resourceID = getResourceIDfromResourceName(resourceName);
         DatasetInfo dsInfo = datasetInfos.get(did);
         if (dsInfo == null) {
             dsInfo = getDatasetInfo(did);
@@ -106,17 +114,27 @@
         dsInfo.indexes.put(resourceID, new IndexInfo((ILSMIndex) index));
     }
 
-    private int getDIDfromRID(long resourceID) throws HyracksDataException {
-        LocalResource lr = resourceRepository.getResourceById(resourceID);
+    private int getDIDfromResourceName(String resourceName) throws HyracksDataException {
+        LocalResource lr = resourceRepository.getResourceByName(resourceName);
         if (lr == null) {
             return -1;
         }
         return ((ILocalResourceMetadata) lr.getResourceObject()).getDatasetID();
     }
 
+    private long getResourceIDfromResourceName(String resourceName) throws HyracksDataException {
+        LocalResource lr = resourceRepository.getResourceByName(resourceName);
+        if (lr == null) {
+            return -1;
+        }
+        return lr.getResourceId();
+    }
+
     @Override
-    public synchronized void unregister(long resourceID) throws HyracksDataException {
-        int did = getDIDfromRID(resourceID);
+    public synchronized void unregister(String resourceName) throws HyracksDataException {
+        int did = getDIDfromResourceName(resourceName);
+        long resourceID = getResourceIDfromResourceName(resourceName);
+
         DatasetInfo dsInfo = datasetInfos.get(did);
         IndexInfo iInfo = dsInfo.indexes.get(resourceID);
 
@@ -155,20 +173,15 @@
 
         dsInfo.indexes.remove(resourceID);
         if (dsInfo.referenceCount == 0 && dsInfo.isOpen && dsInfo.indexes.isEmpty() && !dsInfo.isExternal) {
-            List<IVirtualBufferCache> vbcs = getVirtualBufferCaches(did);
-            assert vbcs != null;
-            for (IVirtualBufferCache vbc : vbcs) {
-                used -= (vbc.getNumPages() * vbc.getPageSize());
-            }
-            datasetInfos.remove(did);
-            datasetVirtualBufferCaches.remove(did);
-            datasetOpTrackers.remove(did);
+            removeDatasetFromCache(dsInfo.datasetID);
         }
     }
 
     @Override
-    public synchronized void open(long resourceID) throws HyracksDataException {
-        int did = getDIDfromRID(resourceID);
+    public synchronized void open(String resourceName) throws HyracksDataException {
+        int did = getDIDfromResourceName(resourceName);
+        long resourceID = getResourceIDfromResourceName(resourceName);
+
         DatasetInfo dsInfo = datasetInfos.get(did);
         if (dsInfo == null || !dsInfo.isRegistered) {
             throw new HyracksDataException("Failed to open index with resource ID " + resourceID
@@ -260,8 +273,10 @@
     }
 
     @Override
-    public synchronized void close(long resourceID) throws HyracksDataException {
-        int did = getDIDfromRID(resourceID);
+    public synchronized void close(String resourceName) throws HyracksDataException {
+        int did = getDIDfromResourceName(resourceName);
+        long resourceID = getResourceIDfromResourceName(resourceName);
+
         DatasetInfo dsInfo = datasetInfos.get(did);
         if (dsInfo == null) {
             throw new HyracksDataException("No index found with resourceID " + resourceID);
@@ -306,6 +321,17 @@
         }
     }
 
+    private void removeDatasetFromCache(int datasetID) {
+        List<IVirtualBufferCache> vbcs = getVirtualBufferCaches(datasetID);
+        assert vbcs != null;
+        for (IVirtualBufferCache vbc : vbcs) {
+            used -= (vbc.getNumPages() * vbc.getPageSize());
+        }
+        datasetInfos.remove(datasetID);
+        datasetVirtualBufferCaches.remove(datasetID);
+        datasetOpTrackers.remove(datasetID);
+    }
+
     public ILSMOperationTracker getOperationTracker(int datasetID) {
         synchronized (datasetOpTrackers) {
             ILSMOperationTracker opTracker = datasetOpTrackers.get(datasetID);
@@ -505,7 +531,6 @@
         }
 
         if (asyncFlush) {
-
             for (IndexInfo iInfo : dsInfo.indexes.values()) {
                 ILSMIndexAccessor accessor = iInfo.index.createAccessor(NoOpOperationCallback.INSTANCE,
                         NoOpOperationCallback.INSTANCE);
@@ -552,10 +577,7 @@
         }
         dsInfo.isOpen = false;
 
-        List<IVirtualBufferCache> vbcs = getVirtualBufferCaches(dsInfo.datasetID);
-        for (IVirtualBufferCache vbc : vbcs) {
-            used -= vbc.getNumPages() * vbc.getPageSize();
-        }
+        removeDatasetFromCache(dsInfo.datasetID);
     }
 
     @Override
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java b/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
index 41f86b4..a5dcf18 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
@@ -58,7 +58,7 @@
         AbstractLSMIndex lsmIndex = (AbstractLSMIndex) indexHelper.getIndexInstance();
         try {
             modCallback = opDesc.getModificationOpCallbackFactory().createModificationOperationCallback(
-                    indexHelper.getResourceID(), lsmIndex, ctx);
+                    indexHelper.getResourceName(), indexHelper.getResourceID(), lsmIndex, ctx);
             indexAccessor = lsmIndex.createAccessor(modCallback, NoOpOperationCallback.INSTANCE);
             ITupleFilterFactory tupleFilterFactory = opDesc.getTupleFilterFactory();
             if (tupleFilterFactory != null) {
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
index 2b1d678..d600f57 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
@@ -284,9 +284,10 @@
     private void insertTupleIntoIndex(JobId jobId, IMetadataIndex metadataIndex, ITupleReference tuple)
             throws Exception {
         long resourceID = metadataIndex.getResourceID();
-        ILSMIndex lsmIndex = (ILSMIndex) indexLifecycleManager.getIndex(resourceID);
+        String resourceName = metadataIndex.getFile().toString();
+        ILSMIndex lsmIndex = (ILSMIndex) indexLifecycleManager.getIndex(resourceName);
         try {
-            indexLifecycleManager.open(resourceID);
+            indexLifecycleManager.open(resourceName);
 
             // prepare a Callback for logging
             IModificationOperationCallback modCallback = createIndexModificationCallback(jobId, resourceID,
@@ -307,7 +308,7 @@
         } catch (Exception e) {
             throw e;
         } finally {
-            indexLifecycleManager.close(resourceID);
+            indexLifecycleManager.close(resourceName);
         }
     }
 
@@ -633,9 +634,10 @@
     private void deleteTupleFromIndex(JobId jobId, IMetadataIndex metadataIndex, ITupleReference tuple)
             throws Exception {
         long resourceID = metadataIndex.getResourceID();
-        ILSMIndex lsmIndex = (ILSMIndex) indexLifecycleManager.getIndex(resourceID);
+        String resourceName = metadataIndex.getFile().toString();
+        ILSMIndex lsmIndex = (ILSMIndex) indexLifecycleManager.getIndex(resourceName);
         try {
-            indexLifecycleManager.open(resourceID);
+            indexLifecycleManager.open(resourceName);
             // prepare a Callback for logging
             IModificationOperationCallback modCallback = createIndexModificationCallback(jobId, resourceID,
                     metadataIndex, lsmIndex, IndexOperation.DELETE);
@@ -653,7 +655,7 @@
         } catch (Exception e) {
             throw e;
         } finally {
-            indexLifecycleManager.close(resourceID);
+            indexLifecycleManager.close(resourceName);
         }
     }
 
@@ -966,9 +968,9 @@
         StringBuilder sb = new StringBuilder();
         try {
             IMetadataIndex index = MetadataPrimaryIndexes.DATAVERSE_DATASET;
-            long resourceID = index.getResourceID();
-            IIndex indexInstance = indexLifecycleManager.getIndex(resourceID);
-            indexLifecycleManager.open(resourceID);
+            String resourceName = index.getFile().toString();
+            IIndex indexInstance = indexLifecycleManager.getIndex(resourceName);
+            indexLifecycleManager.open(resourceName);
             IIndexAccessor indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE,
                     NoOpOperationCallback.INSTANCE);
             ITreeIndexCursor rangeCursor = (ITreeIndexCursor) indexAccessor.createSearchCursor(false);
@@ -986,12 +988,11 @@
             } finally {
                 rangeCursor.close();
             }
-            indexLifecycleManager.close(resourceID);
+            indexLifecycleManager.close(resourceName);
 
             index = MetadataPrimaryIndexes.DATASET_DATASET;
-            resourceID = index.getResourceID();
-            indexInstance = indexLifecycleManager.getIndex(resourceID);
-            indexLifecycleManager.open(resourceID);
+            indexInstance = indexLifecycleManager.getIndex(resourceName);
+            indexLifecycleManager.open(resourceName);
             indexAccessor = indexInstance
                     .createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
             rangeCursor = (ITreeIndexCursor) indexAccessor.createSearchCursor(false);
@@ -1009,12 +1010,11 @@
             } finally {
                 rangeCursor.close();
             }
-            indexLifecycleManager.close(resourceID);
+            indexLifecycleManager.close(resourceName);
 
             index = MetadataPrimaryIndexes.INDEX_DATASET;
-            resourceID = index.getResourceID();
-            indexInstance = indexLifecycleManager.getIndex(resourceID);
-            indexLifecycleManager.open(resourceID);
+            indexInstance = indexLifecycleManager.getIndex(resourceName);
+            indexLifecycleManager.open(resourceName);
             indexAccessor = indexInstance
                     .createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
             rangeCursor = (ITreeIndexCursor) indexAccessor.createSearchCursor(false);
@@ -1033,7 +1033,7 @@
             } finally {
                 rangeCursor.close();
             }
-            indexLifecycleManager.close(resourceID);
+            indexLifecycleManager.close(resourceName);
         } catch (Exception e) {
             e.printStackTrace();
         }
@@ -1043,9 +1043,9 @@
     private <ResultType> void searchIndex(JobId jobId, IMetadataIndex index, ITupleReference searchKey,
             IValueExtractor<ResultType> valueExtractor, List<ResultType> results) throws Exception {
         IBinaryComparatorFactory[] comparatorFactories = index.getKeyBinaryComparatorFactory();
-        long resourceID = index.getResourceID();
-        IIndex indexInstance = indexLifecycleManager.getIndex(resourceID);
-        indexLifecycleManager.open(resourceID);
+        String resourceName = index.getFile().toString();
+        IIndex indexInstance = indexLifecycleManager.getIndex(resourceName);
+        indexLifecycleManager.open(resourceName);
         IIndexAccessor indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE,
                 NoOpOperationCallback.INSTANCE);
         ITreeIndexCursor rangeCursor = (ITreeIndexCursor) indexAccessor.createSearchCursor(false);
@@ -1074,16 +1074,16 @@
         } finally {
             rangeCursor.close();
         }
-        indexLifecycleManager.close(resourceID);
+        indexLifecycleManager.close(resourceName);
     }
 
     @Override
     public void initializeDatasetIdFactory(JobId jobId) throws MetadataException, RemoteException {
         int mostRecentDatasetId = MetadataPrimaryIndexes.FIRST_AVAILABLE_USER_DATASET_ID;
-        long resourceID = MetadataPrimaryIndexes.DATASET_DATASET.getResourceID();
         try {
-            IIndex indexInstance = indexLifecycleManager.getIndex(resourceID);
-            indexLifecycleManager.open(resourceID);
+            String resourceName = MetadataPrimaryIndexes.DATASET_DATASET.getFile().toString();
+            IIndex indexInstance = indexLifecycleManager.getIndex(resourceName);
+            indexLifecycleManager.open(resourceName);
             try {
                 IIndexAccessor indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE,
                         NoOpOperationCallback.INSTANCE);
@@ -1110,7 +1110,7 @@
                     rangeCursor.close();
                 }
             } finally {
-                indexLifecycleManager.close(resourceID);
+                indexLifecycleManager.close(resourceName);
             }
 
         } catch (Exception e) {
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index 83d6c8f..c94133a 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -420,11 +420,11 @@
                     localResourceMetadata, LocalResource.LSMBTreeResource);
             ILocalResourceFactory localResourceFactory = localResourceFactoryProvider.getLocalResourceFactory();
             localResourceRepository.insert(localResourceFactory.createLocalResource(resourceID, path, 0));
-            indexLifecycleManager.register(resourceID, lsmBtree);
+            indexLifecycleManager.register(path, lsmBtree);
         } else {
             final LocalResource resource = localResourceRepository.getResourceByName(path);
             resourceID = resource.getResourceId();
-            lsmBtree = (LSMBTree) indexLifecycleManager.getIndex(resourceID);
+            lsmBtree = (LSMBTree) indexLifecycleManager.getIndex(resource.getResourceName());
             if (lsmBtree == null) {
                 lsmBtree = LSMBTreeUtils.createLSMTree(
                         virtualBufferCaches,
@@ -439,7 +439,7 @@
                                 GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES, indexLifecycleManager), opTracker,
                         runtimeContext.getLSMIOScheduler(), LSMBTreeIOOperationCallbackFactory.INSTANCE
                                 .createIOOperationCallback(), index.isPrimaryIndex(), null, null, null, null, true);
-                indexLifecycleManager.register(resourceID, lsmBtree);
+                indexLifecycleManager.register(path, lsmBtree);
             }
         }
 
diff --git a/asterix-transactions/pom.xml b/asterix-transactions/pom.xml
index 447c762..b9a26a0 100644
--- a/asterix-transactions/pom.xml
+++ b/asterix-transactions/pom.xml
@@ -109,13 +109,18 @@
 			<groupId>org.apache.hyracks</groupId>
 			<artifactId>hyracks-storage-am-lsm-invertedindex</artifactId>
 		</dependency>
-                <dependency>
-                        <groupId>org.apache.asterix</groupId>
-                        <artifactId>asterix-common</artifactId>
-                        <version>0.8.7-SNAPSHOT</version>
-                        <type>jar</type>
-                        <scope>compile</scope>
-                </dependency>
+        <dependency>
+            <groupId>org.apache.asterix</groupId>
+            <artifactId>asterix-common</artifactId>
+            <version>0.8.7-SNAPSHOT</version>
+            <type>jar</type>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+			<groupId>com.google.guava</groupId>
+			<artifactId>guava</artifactId>
+			<version>18.0</version>
+		</dependency>
 	</dependencies>
 
 </project>
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
index 21b90ed..4c2f25d 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
@@ -50,13 +50,13 @@
     }
 
     @Override
-    public IModificationOperationCallback createModificationOperationCallback(long resourceId, Object resource,
-            IHyracksTaskContext ctx) throws HyracksDataException {
+    public IModificationOperationCallback createModificationOperationCallback(String resourceName, long resourceId,
+            Object resource, IHyracksTaskContext ctx) throws HyracksDataException {
 
         ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
         IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
                 .getIndexLifecycleManager();
-        ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourceId);
+        ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourceName);
         if (index == null) {
             throw new HyracksDataException("Index(id:" + resourceId + ") is not registered.");
         }
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
index df29491..9c897f2 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
@@ -47,13 +47,12 @@
     }
 
     @Override
-    public IModificationOperationCallback createModificationOperationCallback(long resourceId, Object resource,
-            IHyracksTaskContext ctx) throws HyracksDataException {
-
+    public IModificationOperationCallback createModificationOperationCallback(String resourceName, long resourceId,
+            Object resource, IHyracksTaskContext ctx) throws HyracksDataException {
         ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
         IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
                 .getIndexLifecycleManager();
-        ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourceId);
+        ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourceName);
         if (index == null) {
             throw new HyracksDataException("Index(id:" + resourceId + ") is not registered.");
         }
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
index 5f9fd0e..4de0749 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
@@ -34,25 +34,26 @@
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 
-public class TempDatasetPrimaryIndexModificationOperationCallbackFactory extends AbstractOperationCallbackFactory implements
-        IModificationOperationCallbackFactory {
+public class TempDatasetPrimaryIndexModificationOperationCallbackFactory extends AbstractOperationCallbackFactory
+        implements IModificationOperationCallbackFactory {
 
     private static final long serialVersionUID = 1L;
     private final IndexOperation indexOp;
 
-    public TempDatasetPrimaryIndexModificationOperationCallbackFactory(JobId jobId, int datasetId, int[] primaryKeyFields,
-            ITransactionSubsystemProvider txnSubsystemProvider, IndexOperation indexOp, byte resourceType) {
+    public TempDatasetPrimaryIndexModificationOperationCallbackFactory(JobId jobId, int datasetId,
+            int[] primaryKeyFields, ITransactionSubsystemProvider txnSubsystemProvider, IndexOperation indexOp,
+            byte resourceType) {
         super(jobId, datasetId, primaryKeyFields, txnSubsystemProvider, resourceType);
         this.indexOp = indexOp;
     }
 
     @Override
-    public IModificationOperationCallback createModificationOperationCallback(long resourceId, Object resource,
-            IHyracksTaskContext ctx) throws HyracksDataException {
+    public IModificationOperationCallback createModificationOperationCallback(String resourceName, long resourceId,
+            Object resource, IHyracksTaskContext ctx) throws HyracksDataException {
         ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
         IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
                 .getIndexLifecycleManager();
-        ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourceId);
+        ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourceName);
         if (index == null) {
             throw new HyracksDataException("Index(id:" + resourceId + ") is not registered.");
         }
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
index 8a51c20..c2f56a0 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
@@ -34,25 +34,26 @@
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 
-public class TempDatasetSecondaryIndexModificationOperationCallbackFactory extends AbstractOperationCallbackFactory implements
-        IModificationOperationCallbackFactory {
+public class TempDatasetSecondaryIndexModificationOperationCallbackFactory extends AbstractOperationCallbackFactory
+        implements IModificationOperationCallbackFactory {
 
     private static final long serialVersionUID = 1L;
     private final IndexOperation indexOp;
 
-    public TempDatasetSecondaryIndexModificationOperationCallbackFactory(JobId jobId, int datasetId, int[] primaryKeyFields,
-            ITransactionSubsystemProvider txnSubsystemProvider, IndexOperation indexOp, byte resourceType) {
+    public TempDatasetSecondaryIndexModificationOperationCallbackFactory(JobId jobId, int datasetId,
+            int[] primaryKeyFields, ITransactionSubsystemProvider txnSubsystemProvider, IndexOperation indexOp,
+            byte resourceType) {
         super(jobId, datasetId, primaryKeyFields, txnSubsystemProvider, resourceType);
         this.indexOp = indexOp;
     }
 
     @Override
-    public IModificationOperationCallback createModificationOperationCallback(long resourceId, Object resource,
-            IHyracksTaskContext ctx) throws HyracksDataException {
+    public IModificationOperationCallback createModificationOperationCallback(String resourceName, long resourceId,
+            Object resource, IHyracksTaskContext ctx) throws HyracksDataException {
         ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
         IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
                 .getIndexLifecycleManager();
-        ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourceId);
+        ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourceName);
         if (index == null) {
             throw new HyracksDataException("Index(id:" + resourceId + ") is not registered.");
         }
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index abc98c7..1eda9cc 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -25,10 +25,8 @@
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -36,7 +34,9 @@
 import org.apache.hyracks.api.io.IODeviceHandle;
 import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
 import org.apache.hyracks.storage.common.file.LocalResource;
-import org.apache.hyracks.storage.common.file.ResourceIdFactory;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 
 public class PersistentLocalResourceRepository implements ILocalResourceRepository {
 
@@ -46,14 +46,14 @@
     private static final String ROOT_METADATA_FILE_NAME_PREFIX = ".asterix_root_metadata";
     private static final long ROOT_LOCAL_RESOURCE_ID = -4321;
     private static final String METADATA_FILE_NAME = ".metadata";
-    private Map<String, LocalResource> name2ResourceMap = new HashMap<String, LocalResource>();
-    private Map<Long, LocalResource> id2ResourceMap = new HashMap<Long, LocalResource>();
-    private final int numIODevices;
+    private final Cache<String, LocalResource> resourceCache;
+    private final String nodeId;
+    private static final int MAX_CACHED_RESOURCES = 1000;
 
-    public PersistentLocalResourceRepository(List<IODeviceHandle> devices) throws HyracksDataException {
-        numIODevices = devices.size();
-        this.mountPoints = new String[numIODevices];
-        for (int i = 0; i < numIODevices; i++) {
+    public PersistentLocalResourceRepository(List<IODeviceHandle> devices, String nodeId) throws HyracksDataException {
+        mountPoints = new String[devices.size()];
+        this.nodeId = nodeId;
+        for (int i = 0; i < mountPoints.length; i++) {
             String mountPoint = devices.get(i).getPath().getPath();
             File mountPointDir = new File(mountPoint);
             if (!mountPointDir.exists()) {
@@ -65,167 +65,89 @@
                 mountPoints[i] = new String(mountPoint);
             }
         }
+
+        resourceCache = CacheBuilder.newBuilder().maximumSize(MAX_CACHED_RESOURCES).build();
     }
 
     private String prepareRootMetaDataFileName(String mountPoint, String nodeId, int ioDeviceId) {
         return mountPoint + ROOT_METADATA_DIRECTORY + File.separator + nodeId + "_" + "iodevice" + ioDeviceId;
     }
 
-    public void initialize(String nodeId, String rootDir, boolean isNewUniverse, ResourceIdFactory resourceIdFactory)
-            throws HyracksDataException {
+    public void initialize(String nodeId, String rootDir) throws HyracksDataException {
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Initializing local resource repository ... ");
         }
 
-        if (isNewUniverse) {
-            //#. if the rootMetadataFile doesn't exist, create it and return.
-            for (int i = 0; i < numIODevices; i++) {
-                String rootMetadataFileName = prepareRootMetaDataFileName(mountPoints[i], nodeId, i) + File.separator
-                        + ROOT_METADATA_FILE_NAME_PREFIX;
-                File rootMetadataFile = new File(rootMetadataFileName);
-
-                File rootMetadataDir = new File(prepareRootMetaDataFileName(mountPoints[i], nodeId, i));
-                if (!rootMetadataDir.exists()) {
-                    boolean success = rootMetadataDir.mkdirs();
-                    if (!success) {
-                        throw new IllegalStateException(
-                                "Unable to create root metadata directory of PersistentLocalResourceRepository in "
-                                        + rootMetadataDir.getAbsolutePath());
-                    }
-                    if (LOGGER.isLoggable(Level.INFO)) {
-                        LOGGER.info("created the root-metadata-file's directory: " + rootMetadataDir.getAbsolutePath());
-                    }
-                }
-
-                rootMetadataFile.delete();
-                String mountedRootDir;
-                if (rootDir.startsWith(System.getProperty("file.separator"))) {
-                    mountedRootDir = new String(mountPoints[i]
-                            + rootDir.substring(System.getProperty("file.separator").length()));
-                } else {
-                    mountedRootDir = new String(mountPoints[i] + rootDir);
-                }
-                LocalResource rootLocalResource = new LocalResource(ROOT_LOCAL_RESOURCE_ID, rootMetadataFileName, 0, 0,
-                        mountedRootDir);
-                insert(rootLocalResource);
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("created the root-metadata-file: " + rootMetadataFileName);
-                }
-
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Completed the initialization of the local resource repository");
-                }
-            }
-            return;
-        }
-
-        FilenameFilter filter = new FilenameFilter() {
-            public boolean accept(File dir, String name) {
-                if (name.equalsIgnoreCase(METADATA_FILE_NAME)) {
-                    return true;
-                } else {
-                    return false;
-                }
-            }
-        };
-
-        long maxResourceId = 0;
-        for (int i = 0; i < numIODevices; i++) {
+        //if the rootMetadataFile doesn't exist, create it.
+        for (int i = 0; i < mountPoints.length; i++) {
             String rootMetadataFileName = prepareRootMetaDataFileName(mountPoints[i], nodeId, i) + File.separator
                     + ROOT_METADATA_FILE_NAME_PREFIX;
             File rootMetadataFile = new File(rootMetadataFileName);
-            //#. if the rootMetadataFile exists, read it and set this.rootDir.
-            LocalResource rootLocalResource = readLocalResource(rootMetadataFile);
-            String mountedRootDir = (String) rootLocalResource.getResourceObject();
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("The root directory of the local resource repository is " + mountedRootDir);
-            }
 
-            //#. load all local resources. 
-            File rootDirFile = new File(mountedRootDir);
-            if (!rootDirFile.exists()) {
-                //rootDir may not exist if this node is not the metadata node and doesn't have any user data.
+            File rootMetadataDir = new File(prepareRootMetaDataFileName(mountPoints[i], nodeId, i));
+            if (!rootMetadataDir.exists()) {
+                boolean success = rootMetadataDir.mkdirs();
+                if (!success) {
+                    throw new IllegalStateException(
+                            "Unable to create root metadata directory of PersistentLocalResourceRepository in "
+                                    + rootMetadataDir.getAbsolutePath());
+                }
                 if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("The root directory of the local resource repository doesn't exist: there is no local resource.");
-                    LOGGER.info("Completed the initialization of the local resource repository");
+                    LOGGER.info("created the root-metadata-file's directory: " + rootMetadataDir.getAbsolutePath());
                 }
-                continue;
             }
 
-            File[] dataverseFileList = rootDirFile.listFiles();
-            if (dataverseFileList == null) {
-                throw new HyracksDataException("Metadata dataverse doesn't exist.");
+            rootMetadataFile.delete();
+            String mountedRootDir;
+            if (rootDir.startsWith(System.getProperty("file.separator"))) {
+                mountedRootDir = new String(mountPoints[i]
+                        + rootDir.substring(System.getProperty("file.separator").length()));
+            } else {
+                mountedRootDir = new String(mountPoints[i] + rootDir);
             }
-            for (File dataverseFile : dataverseFileList) {
-                if (dataverseFile.isDirectory()) {
-                    File[] indexFileList = dataverseFile.listFiles();
-                    if (indexFileList != null) {
-                        for (File indexFile : indexFileList) {
-                            if (indexFile.isDirectory()) {
-                                File[] ioDevicesList = indexFile.listFiles();
-                                if (ioDevicesList != null) {
-                                    for (File ioDeviceFile : ioDevicesList) {
-                                        if (ioDeviceFile.isDirectory()) {
-                                            File[] metadataFiles = ioDeviceFile.listFiles(filter);
-                                            if (metadataFiles != null) {
-                                                for (File metadataFile : metadataFiles) {
-                                                    LocalResource localResource = readLocalResource(metadataFile);
-                                                    id2ResourceMap.put(localResource.getResourceId(), localResource);
-                                                    name2ResourceMap
-                                                            .put(localResource.getResourceName(), localResource);
-                                                    maxResourceId = Math.max(localResource.getResourceId(),
-                                                            maxResourceId);
-                                                    if (LOGGER.isLoggable(Level.INFO)) {
-                                                        LOGGER.info("loaded local resource - [id: "
-                                                                + localResource.getResourceId() + ", name: "
-                                                                + localResource.getResourceName() + "]");
-                                                    }
-                                                }
-                                            }
-                                        }
-                                    }
-                                }
-                            }
-                        }
-                    }
-                }
+            LocalResource rootLocalResource = new LocalResource(ROOT_LOCAL_RESOURCE_ID, rootMetadataFileName, 0, 0,
+                    mountedRootDir);
+            insert(rootLocalResource);
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("created the root-metadata-file: " + rootMetadataFileName);
             }
         }
-        resourceIdFactory.initId(maxResourceId + 1);
+
         if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("The resource id factory is intialized with the value: " + (maxResourceId + 1));
             LOGGER.info("Completed the initialization of the local resource repository");
         }
     }
 
     @Override
-    public LocalResource getResourceById(long id) throws HyracksDataException {
-        return id2ResourceMap.get(id);
-    }
-
-    @Override
     public LocalResource getResourceByName(String name) throws HyracksDataException {
-        return name2ResourceMap.get(name);
+        LocalResource resource = resourceCache.getIfPresent(name);
+        if (resource == null) {
+            File resourceFile = getLocalResourceFileByName(name);
+            if (resourceFile.exists()) {
+                resource = readLocalResource(resourceFile);
+                resourceCache.put(name, resource);
+            }
+        }
+        return resource;
     }
 
     @Override
     public synchronized void insert(LocalResource resource) throws HyracksDataException {
-        long id = resource.getResourceId();
+        File resourceFile = new File(getFileName(resource.getResourceName(), resource.getResourceId()));
 
-        if (id2ResourceMap.containsKey(id)) {
+        if (resourceFile.exists()) {
             throw new HyracksDataException("Duplicate resource");
         }
 
         if (resource.getResourceId() != ROOT_LOCAL_RESOURCE_ID) {
-            id2ResourceMap.put(id, resource);
-            name2ResourceMap.put(resource.getResourceName(), resource);
+            resourceCache.put(resource.getResourceName(), resource);
         }
 
         FileOutputStream fos = null;
         ObjectOutputStream oosToFos = null;
 
         try {
-            fos = new FileOutputStream(getFileName(resource.getResourceName(), resource.getResourceId()));
+            fos = new FileOutputStream(resourceFile);
             oosToFos = new ObjectOutputStream(fos);
             oosToFos.writeObject(resource);
             oosToFos.flush();
@@ -250,40 +172,133 @@
     }
 
     @Override
-    public synchronized void deleteResourceById(long id) throws HyracksDataException {
-        LocalResource resource = id2ResourceMap.get(id);
-        if (resource == null) {
-            throw new HyracksDataException("Resource doesn't exist");
-        }
-        id2ResourceMap.remove(id);
-        name2ResourceMap.remove(resource.getResourceName());
-        File file = new File(getFileName(resource.getResourceName(), resource.getResourceId()));
-        file.delete();
-    }
-
-    @Override
     public synchronized void deleteResourceByName(String name) throws HyracksDataException {
-        LocalResource resource = name2ResourceMap.get(name);
-        if (resource == null) {
+        File resourceFile = getLocalResourceFileByName(name);
+        if (resourceFile.exists()) {
+            resourceFile.delete();
+            resourceCache.invalidate(name);
+        } else {
             throw new HyracksDataException("Resource doesn't exist");
         }
-        id2ResourceMap.remove(resource.getResourceId());
-        name2ResourceMap.remove(name);
-        File file = new File(getFileName(resource.getResourceName(), resource.getResourceId()));
-        file.delete();
+    }
+
+    private static File getLocalResourceFileByName(String resourceName) {
+        return new File(resourceName + File.separator + METADATA_FILE_NAME);
+    }
+
+    public HashMap<Long, LocalResource> loadAndGetAllResources() throws HyracksDataException {
+        //TODO During recovery, the memory usage currently is proportional to the number of resources available.
+        //This could be fixed by traversing all resources on disk until the required resource is found.
+        HashMap<Long, LocalResource> resourcesMap = new HashMap<Long, LocalResource>();
+
+        for (int i = 0; i < mountPoints.length; i++) {
+            String rootMetadataFileName = prepareRootMetaDataFileName(mountPoints[i], nodeId, i) + File.separator
+                    + ROOT_METADATA_FILE_NAME_PREFIX;
+            File rootMetadataFile = new File(rootMetadataFileName);
+            if (!rootMetadataFile.exists()) {
+                continue;
+            }
+            //if the rootMetadataFile exists, read it and set it as mounting point root
+            LocalResource rootLocalResource = readLocalResource(rootMetadataFile);
+            String mountedRootDir = (String) rootLocalResource.getResourceObject();
+
+            File rootDirFile = new File(mountedRootDir);
+            if (!rootDirFile.exists()) {
+                //rootDir may not exist if this node is not the metadata node and doesn't have any user data.
+                continue;
+            }
+
+            //load all local resources.
+            File[] dataverseFileList = rootDirFile.listFiles();
+            if (dataverseFileList != null) {
+                for (File dataverseFile : dataverseFileList) {
+                    if (dataverseFile.isDirectory()) {
+                        File[] indexFileList = dataverseFile.listFiles();
+                        if (indexFileList != null) {
+                            for (File indexFile : indexFileList) {
+                                if (indexFile.isDirectory()) {
+                                    File[] ioDevicesList = indexFile.listFiles();
+                                    if (ioDevicesList != null) {
+                                        for (File ioDeviceFile : ioDevicesList) {
+                                            if (ioDeviceFile.isDirectory()) {
+                                                File[] metadataFiles = ioDeviceFile.listFiles(METADATA_FILES_FILTER);
+                                                if (metadataFiles != null) {
+                                                    for (File metadataFile : metadataFiles) {
+                                                        LocalResource localResource = readLocalResource(metadataFile);
+                                                        resourcesMap.put(localResource.getResourceId(), localResource);
+                                                    }
+                                                }
+                                            }
+                                        }
+                                    }
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+        }
+
+        return resourcesMap;
     }
 
     @Override
-    public List<LocalResource> getAllResources() throws HyracksDataException {
-        List<LocalResource> resources = new ArrayList<LocalResource>();
-        for (LocalResource resource : id2ResourceMap.values()) {
-            resources.add(resource);
+    public long getMaxResourceID() throws HyracksDataException {
+        long maxResourceId = 0;
+
+        for (int i = 0; i < mountPoints.length; i++) {
+            String rootMetadataFileName = prepareRootMetaDataFileName(mountPoints[i], nodeId, i) + File.separator
+                    + ROOT_METADATA_FILE_NAME_PREFIX;
+            File rootMetadataFile = new File(rootMetadataFileName);
+            if (!rootMetadataFile.exists()) {
+                continue;
+            }
+
+            //if the rootMetadataFile exists, read it and set it as mounting point root
+            LocalResource rootLocalResource = readLocalResource(rootMetadataFile);
+            String mountedRootDir = (String) rootLocalResource.getResourceObject();
+
+            File rootDirFile = new File(mountedRootDir);
+            if (!rootDirFile.exists()) {
+                continue;
+            }
+
+            //traverse all local resources.
+            File[] dataverseFileList = rootDirFile.listFiles();
+            if (dataverseFileList != null) {
+                for (File dataverseFile : dataverseFileList) {
+                    if (dataverseFile.isDirectory()) {
+                        File[] indexFileList = dataverseFile.listFiles();
+                        if (indexFileList != null) {
+                            for (File indexFile : indexFileList) {
+                                if (indexFile.isDirectory()) {
+                                    File[] ioDevicesList = indexFile.listFiles();
+                                    if (ioDevicesList != null) {
+                                        for (File ioDeviceFile : ioDevicesList) {
+                                            if (ioDeviceFile.isDirectory()) {
+                                                File[] metadataFiles = ioDeviceFile.listFiles(METADATA_FILES_FILTER);
+                                                if (metadataFiles != null) {
+                                                    for (File metadataFile : metadataFiles) {
+                                                        LocalResource localResource = readLocalResource(metadataFile);
+                                                        maxResourceId = Math.max(maxResourceId,
+                                                                localResource.getResourceId());
+                                                    }
+                                                }
+                                            }
+                                        }
+                                    }
+                                }
+                            }
+                        }
+                    }
+                }
+            }
         }
-        return resources;
+
+        return maxResourceId;
     }
 
     private String getFileName(String baseDir, long resourceId) {
-
         if (resourceId == ROOT_LOCAL_RESOURCE_ID) {
             return baseDir;
         } else {
@@ -323,4 +338,14 @@
             }
         }
     }
+
+    private static final FilenameFilter METADATA_FILES_FILTER = new FilenameFilter() {
+        public boolean accept(File dir, String name) {
+            if (name.equalsIgnoreCase(METADATA_FILE_NAME)) {
+                return true;
+            } else {
+                return false;
+            }
+        }
+    };
 }
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java
index 0638cae..b6bb7dc 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java
@@ -24,14 +24,16 @@
 import org.apache.hyracks.storage.common.file.ILocalResourceRepositoryFactory;
 
 public class PersistentLocalResourceRepositoryFactory implements ILocalResourceRepositoryFactory {
-    private IIOManager ioManager;
+    private final IIOManager ioManager;
+    private final String nodeId;
 
-    public PersistentLocalResourceRepositoryFactory(IIOManager ioManager) {
+    public PersistentLocalResourceRepositoryFactory(IIOManager ioManager, String nodeId) {
         this.ioManager = ioManager;
+        this.nodeId = nodeId;
     }
 
     @Override
     public ILocalResourceRepository createRepository() throws HyracksDataException {
-        return new PersistentLocalResourceRepository(ioManager.getIODevices());
+        return new PersistentLocalResourceRepository(ioManager.getIODevices(), nodeId);
     }
 }
\ No newline at end of file
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java
index 730116a..f6880db 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java
@@ -39,9 +39,9 @@
 
 class TestRuntimeContextProvider implements IAsterixAppRuntimeContextProvider {
 
-    AsterixThreadExecutor ate = new AsterixThreadExecutor(Executors.defaultThreadFactory());    
+    AsterixThreadExecutor ate = new AsterixThreadExecutor(Executors.defaultThreadFactory());
     IIndexLifecycleManager ilm = new IndexLifecycleManager();
-    
+
     @Override
     public AsterixThreadExecutor getThreadExecutor() {
         return ate;
@@ -108,36 +108,39 @@
     }
 
     static class IndexLifecycleManager implements IIndexLifecycleManager {
-
-        @Override
-        public IIndex getIndex(long resourceID) throws HyracksDataException {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public void register(long resourceID, IIndex index) throws HyracksDataException {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public void unregister(long resourceID) throws HyracksDataException {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public void open(long resourceID) throws HyracksDataException {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public void close(long resourceID) throws HyracksDataException {
-            throw new UnsupportedOperationException();
-        }
-
         @Override
         public List<IIndex> getOpenIndexes() {
             throw new UnsupportedOperationException();
         }
-        
+
+        @Override
+        public void register(String resourceName, IIndex index) throws HyracksDataException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void open(String resourceName) throws HyracksDataException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void close(String resourceName) throws HyracksDataException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public IIndex getIndex(String resourceName) throws HyracksDataException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void unregister(String resourceName) throws HyracksDataException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public IIndex getIndex(int datasetID, long resourceID) throws HyracksDataException {
+            throw new UnsupportedOperationException();
+        }
     }
 }
\ No newline at end of file
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
index f59bc84..82ad32d 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -51,6 +51,7 @@
 import org.apache.asterix.common.transactions.IRecoveryManager;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.LogType;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.asterix.transaction.management.service.logging.LogManager;
 import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants;
 import org.apache.asterix.transaction.management.service.transaction.TransactionManager;
@@ -82,7 +83,6 @@
     private final LogManager logMgr;
     private final int checkpointHistory;
     private final long SHARP_CHECKPOINT_LSN = -1;
-
     /**
      * A file at a known location that contains the LSN of the last log record
      * traversed doing a successful checkpoint.
@@ -249,6 +249,8 @@
         IIndexLifecycleManager indexLifecycleManager = appRuntimeContext.getIndexLifecycleManager();
         ILocalResourceRepository localResourceRepository = appRuntimeContext.getLocalResourceRepository();
 
+        Map<Long, LocalResource> resourcesMap = ((PersistentLocalResourceRepository) localResourceRepository)
+                .loadAndGetAllResources();
         //#. set log reader to the lowWaterMarkLsn again.
         logReader.initializeScan(lowWaterMarkLSN);
         logRecord = logReader.next();
@@ -273,39 +275,38 @@
                     }
                     if (foundWinner) {
                         resourceId = logRecord.getResourceId();
-                        localResource = localResourceRepository.getResourceById(resourceId);
+                        localResource = resourcesMap.get(resourceId);
+
+                        /*******************************************************************
+                         * [Notice]
+                         * -> Issue
+                         * Delete index may cause a problem during redo.
+                         * The index operation to be redone couldn't be redone because the corresponding index
+                         * may not exist in NC due to the possible index drop DDL operation.
+                         * -> Approach
+                         * Avoid the problem during redo.
+                         * More specifically, the problem will be detected when the localResource of
+                         * the corresponding index is retrieved, which will end up with 'null'.
+                         * If null is returned, then just go and process the next
+                         * log record.
+                         *******************************************************************/
+                        if (localResource == null) {
+                            logRecord = logReader.next();
+                            continue;
+                        }
+                        /*******************************************************************/
 
                         //get index instance from IndexLifeCycleManager
                         //if index is not registered into IndexLifeCycleManager,
                         //create the index using LocalMetadata stored in LocalResourceRepository
-                        index = (ILSMIndex) indexLifecycleManager.getIndex(resourceId);
+                        index = (ILSMIndex) indexLifecycleManager.getIndex(localResource.getResourceName());
                         if (index == null) {
-
-                            /*******************************************************************
-                             * [Notice]
-                             * -> Issue
-                             * Delete index may cause a problem during redo.
-                             * The index operation to be redone couldn't be redone because the corresponding index
-                             * may not exist in NC due to the possible index drop DDL operation.
-                             * -> Approach
-                             * Avoid the problem during redo.
-                             * More specifically, the problem will be detected when the localResource of
-                             * the corresponding index is retrieved, which will end up with 'null' return from
-                             * localResourceRepository. If null is returned, then just go and process the next
-                             * log record.
-                             *******************************************************************/
-                            if (localResource == null) {
-                                logRecord = logReader.next();
-                                continue;
-                            }
-                            /*******************************************************************/
-
                             //#. create index instance and register to indexLifeCycleManager
                             localResourceMetadata = (ILocalResourceMetadata) localResource.getResourceObject();
                             index = localResourceMetadata.createIndexInstance(appRuntimeContext,
                                     localResource.getResourceName(), localResource.getPartition());
-                            indexLifecycleManager.register(resourceId, index);
-                            indexLifecycleManager.open(resourceId);
+                            indexLifecycleManager.register(localResource.getResourceName(), index);
+                            indexLifecycleManager.open(localResource.getResourceName());
 
                             //#. get maxDiskLastLSN
                             ILSMIndex lsmIndex = (ILSMIndex) index;
@@ -342,7 +343,7 @@
         //close all indexes
         Set<Long> resourceIdList = resourceId2MaxLSNMap.keySet();
         for (long r : resourceIdList) {
-            indexLifecycleManager.close(r);
+            indexLifecycleManager.close(resourcesMap.get(r).getResourceName());
         }
 
         logReader.close();
@@ -356,11 +357,12 @@
     }
 
     @Override
-    public synchronized long checkpoint(boolean isSharpCheckpoint, long nonSharpCheckpointTargetLSN) throws ACIDException, HyracksDataException {
+    public synchronized long checkpoint(boolean isSharpCheckpoint, long nonSharpCheckpointTargetLSN)
+            throws ACIDException, HyracksDataException {
 
         long minMCTFirstLSN;
         boolean nonSharpCheckpointSucceeded = false;
-        
+
         if (isSharpCheckpoint && LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Starting sharp checkpoint ... ");
         }
@@ -372,7 +374,8 @@
         //   right after the new checkpoint file is written.
         File[] prevCheckpointFiles = getPreviousCheckpointFiles();
 
-        DatasetLifecycleManager datasetLifecycleManager = (DatasetLifecycleManager)txnSubsystem.getAsterixAppRuntimeContextProvider().getIndexLifecycleManager();
+        DatasetLifecycleManager datasetLifecycleManager = (DatasetLifecycleManager) txnSubsystem
+                .getAsterixAppRuntimeContextProvider().getIndexLifecycleManager();
         //#. flush all in-memory components if it is the sharp checkpoint
         if (isSharpCheckpoint) {
 
@@ -382,11 +385,10 @@
         } else {
 
             minMCTFirstLSN = getMinFirstLSN();
-            
-            if(minMCTFirstLSN >= nonSharpCheckpointTargetLSN){
+
+            if (minMCTFirstLSN >= nonSharpCheckpointTargetLSN) {
                 nonSharpCheckpointSucceeded = true;
-            }
-            else{
+            } else {
                 //flush datasets with indexes behind target checkpoint LSN
                 datasetLifecycleManager.scheduleAsyncFlushForLaggingDatasets(nonSharpCheckpointTargetLSN);
             }
@@ -439,10 +441,10 @@
             }
         }
 
-        if(nonSharpCheckpointSucceeded){
+        if (nonSharpCheckpointSucceeded) {
             logMgr.deleteOldLogFiles(minMCTFirstLSN);
         }
-        
+
         if (isSharpCheckpoint && LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Completed sharp checkpoint.");
         }
@@ -451,9 +453,9 @@
         return minMCTFirstLSN;
     }
 
-    public long getMinFirstLSN() throws HyracksDataException
-    {
-        IIndexLifecycleManager indexLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider().getIndexLifecycleManager();
+    public long getMinFirstLSN() throws HyracksDataException {
+        IIndexLifecycleManager indexLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
+                .getIndexLifecycleManager();
         List<IIndex> openIndexList = indexLifecycleManager.getOpenIndexes();
         long firstLSN;
         //the min first lsn can only be the current append or smaller
@@ -463,8 +465,9 @@
 
             for (IIndex index : openIndexList) {
 
-                AbstractLSMIOOperationCallback ioCallback =  (AbstractLSMIOOperationCallback)((ILSMIndex) index).getIOOperationCallback();
-                if(!((AbstractLSMIndex)index).isCurrentMutableComponentEmpty() || ioCallback.hasPendingFlush()){
+                AbstractLSMIOOperationCallback ioCallback = (AbstractLSMIOOperationCallback) ((ILSMIndex) index)
+                        .getIOOperationCallback();
+                if (!((AbstractLSMIndex) index).isCurrentMutableComponentEmpty() || ioCallback.hasPendingFlush()) {
                     firstLSN = ioCallback.getFirstLSN();
                     minFirstLSN = Math.min(minFirstLSN, firstLSN);
                 }
@@ -594,6 +597,7 @@
         ILogReader logReader = logMgr.getLogReader(false);
         logReader.initializeScan(firstLSN);
         ILogRecord logRecord = null;
+
         while (currentLSN < lastLSN) {
             logRecord = logReader.next();
             if (logRecord == null) {
@@ -710,7 +714,7 @@
     private void undo(ILogRecord logRecord) {
         try {
             ILSMIndex index = (ILSMIndex) txnSubsystem.getAsterixAppRuntimeContextProvider().getIndexLifecycleManager()
-                    .getIndex(logRecord.getResourceId());
+                    .getIndex(logRecord.getDatasetId(), logRecord.getResourceId());
             ILSMIndexAccessor indexAccessor = index.createAccessor(NoOpOperationCallback.INSTANCE,
                     NoOpOperationCallback.INSTANCE);
             if (logRecord.getNewOp() == IndexOperation.INSERT.ordinal()) {
@@ -728,7 +732,7 @@
     private void redo(ILogRecord logRecord) {
         try {
             ILSMIndex index = (ILSMIndex) txnSubsystem.getAsterixAppRuntimeContextProvider().getIndexLifecycleManager()
-                    .getIndex(logRecord.getResourceId());
+                    .getIndex(logRecord.getDatasetId(), logRecord.getResourceId());
             ILSMIndexAccessor indexAccessor = index.createAccessor(NoOpOperationCallback.INSTANCE,
                     NoOpOperationCallback.INSTANCE);
             if (logRecord.getNewOp() == IndexOperation.INSERT.ordinal()) {