ASTERIXDB-1152: Delete storage data of old instances

Change-Id: Ibb6c6949bdf2ed6c3e491fa66a23491ff34fc830
Reviewed-on: https://asterix-gerrit.ics.uci.edu/469
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
Reviewed-by: Taewoo Kim <wangsaeu@gmail.com>
Reviewed-by: Ian Maxon <imaxon@apache.org>
diff --git a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
index 15252e9..ce19139 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
@@ -45,6 +45,8 @@
 import org.apache.asterix.common.replication.IReplicationChannel;
 import org.apache.asterix.common.replication.IReplicationManager;
 import org.apache.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
+import org.apache.asterix.common.transactions.IRecoveryManager;
+import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.asterix.feeds.FeedManager;
 import org.apache.asterix.metadata.bootstrap.MetadataPrimaryIndexes;
@@ -140,7 +142,7 @@
                 AsterixClusterProperties.INSTANCE.getCluster());
     }
 
-    public void initialize() throws IOException, ACIDException, AsterixException {
+    public void initialize(boolean initialRun) throws IOException, ACIDException, AsterixException {
         Logger.getLogger("org.apache").setLevel(externalProperties.getLogLevel());
 
         threadExecutor = new AsterixThreadExecutor(ncApplicationContext.getThreadFactory());
@@ -156,15 +158,21 @@
 
         metadataMergePolicyFactory = new PrefixMergePolicyFactory();
 
-        ILocalResourceRepositoryFactory persistentLocalResourceRepositoryFactory = new PersistentLocalResourceRepositoryFactory(
-                ioManager, ncApplicationContext.getNodeId());
-        localResourceRepository = persistentLocalResourceRepositoryFactory.createRepository();
-        initializeResourceIdFactory();
-
         IAsterixAppRuntimeContextProvider asterixAppRuntimeContextProvider = new AsterixAppRuntimeContextProdiverForRecovery(
                 this);
         txnSubsystem = new TransactionSubsystem(ncApplicationContext.getNodeId(), asterixAppRuntimeContextProvider,
                 txnProperties);
+        ILocalResourceRepositoryFactory persistentLocalResourceRepositoryFactory = new PersistentLocalResourceRepositoryFactory(
+                ioManager, ncApplicationContext.getNodeId());
+        localResourceRepository = persistentLocalResourceRepositoryFactory.createRepository();
+
+        IRecoveryManager recoveryMgr = txnSubsystem.getRecoveryManager();
+        SystemState systemState = recoveryMgr.getSystemState();
+        if (initialRun || systemState == SystemState.NEW_UNIVERSE) {
+            //delete any storage data before the resource factory is initialized
+            ((PersistentLocalResourceRepository) localResourceRepository).deleteStorageData(true);
+        }
+        initializeResourceIdFactory();
 
         datasetLifecycleManager = new DatasetLifecycleManager(storageProperties, localResourceRepository,
                 MetadataPrimaryIndexes.FIRST_AVAILABLE_USER_DATASET_ID, txnSubsystem.getLogManager());
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index a95d747..490f42b 100644
--- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -33,7 +33,6 @@
 import org.apache.asterix.common.config.AsterixReplicationProperties;
 import org.apache.asterix.common.config.AsterixTransactionProperties;
 import org.apache.asterix.common.config.IAsterixPropertiesProvider;
-import org.apache.asterix.common.context.DatasetLifecycleManager;
 import org.apache.asterix.common.replication.IRemoteRecoveryManager;
 import org.apache.asterix.common.transactions.IRecoveryManager;
 import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
@@ -104,7 +103,7 @@
             }
             updateOnNodeJoin();
         }
-        runtimeContext.initialize();
+        runtimeContext.initialize(initialRun);
         ncApplicationContext.setApplicationObject(runtimeContext);
 
         //if replication is enabled, check if there is a replica for this node
@@ -115,7 +114,6 @@
 
         if (initialRun) {
             LOGGER.info("System is being initialized. (first run)");
-            systemState = SystemState.NEW_UNIVERSE;
         } else {
             // #. recover if the system is corrupted by checking system state.
             IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager();
@@ -194,7 +192,7 @@
         AsterixMetadataProperties metadataProperties = ((IAsterixPropertiesProvider) runtimeContext)
                 .getMetadataProperties();
 
-        if (systemState == SystemState.NEW_UNIVERSE) {
+        if (initialRun || systemState == SystemState.NEW_UNIVERSE) {
             if (LOGGER.isLoggable(Level.INFO)) {
                 LOGGER.info("System state: " + SystemState.NEW_UNIVERSE);
                 LOGGER.info("Node ID: " + nodeId);
@@ -204,7 +202,7 @@
 
             PersistentLocalResourceRepository localResourceRepository = (PersistentLocalResourceRepository) runtimeContext
                     .getLocalResourceRepository();
-            localResourceRepository.initialize(nodeId, metadataProperties.getStores().get(nodeId)[0]);
+            localResourceRepository.initializeNewUniverse(metadataProperties.getStores().get(nodeId)[0]);
         }
 
         IAsterixStateProxy proxy = null;
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java b/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
index 63851bf..94f5b2f 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
@@ -69,7 +69,7 @@
 
     public ILSMOperationTracker getLSMBTreeOperationTracker(int datasetID);
 
-    public void initialize() throws IOException, ACIDException, AsterixException;
+    public void initialize(boolean initialRun) throws IOException, ACIDException, AsterixException;
 
     public void setShuttingdown(boolean b);
 
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java b/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java
index f9481a0..c796f37 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java
@@ -18,7 +18,6 @@
  */
 package org.apache.asterix.common.replication;
 
-import java.io.IOException;
 import java.util.Set;
 
 public interface IReplicaResourcesManager {
@@ -29,6 +28,4 @@
 
     public long getMinRemoteLSN(Set<String> remoteNodes);
 
-    public void deleteAsterixStorageData() throws IOException;
-
 }
\ No newline at end of file
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java b/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
index f424bc3..a82b535 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
@@ -30,7 +30,6 @@
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.config.AsterixReplicationProperties;
-import org.apache.asterix.common.context.DatasetLifecycleManager;
 import org.apache.asterix.common.replication.IRemoteRecoveryManager;
 import org.apache.asterix.common.replication.IReplicationManager;
 import org.apache.asterix.common.transactions.ILogManager;
@@ -59,7 +58,8 @@
         //The whole remote recovery process should be atomic.
         //Any error happens, we should start the recovery from the start until the recovery is complete or an illegal state is reached (cannot recovery).
         int maxRecoveryAttempts = 10;
-
+        PersistentLocalResourceRepository resourceRepository = (PersistentLocalResourceRepository) runtimeContext
+                .getLocalResourceRepository();
         while (true) {
             //start recovery recovery steps
             try {
@@ -84,7 +84,7 @@
                 datasetLifeCycleManager.closeAllDatasets();
 
                 //3. remove any existing storage data
-                runtimeContext.getReplicaResourcesManager().deleteAsterixStorageData();
+                resourceRepository.deleteStorageData(true);
 
                 //4. select remote replicas to recover from per lost replica data
                 Map<String, Set<String>> selectedRemoteReplicas = constructRemoteRecoveryPlan();
@@ -106,9 +106,9 @@
 
                     //2. Initialize local resources based on the newly received files (if we are recovering the primary replica on this node)
                     if (replicasDataToRecover.contains(logManager.getNodeId())) {
-                        ((PersistentLocalResourceRepository) runtimeContext.getLocalResourceRepository()).initialize(
-                                logManager.getNodeId(),
-                                runtimeContext.getReplicaResourcesManager().getLocalStorageFolder());
+                        ((PersistentLocalResourceRepository) runtimeContext.getLocalResourceRepository())
+                                .initializeNewUniverse(
+                                        runtimeContext.getReplicaResourcesManager().getLocalStorageFolder());
                         //initialize resource id factor to correct max resource id
                         runtimeContext.initializeResourceIdFactory();
                     }
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
index 5e321f1..3e3043c 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
@@ -399,17 +399,4 @@
             }
         }
     };
-
-    @Override
-    public void deleteAsterixStorageData() throws IOException {
-        for (int i = 0; i < mountPoints.length; i++) {
-            File mountingPoint = new File(mountPoints[i]);
-
-            File[] storageFolders = mountingPoint.listFiles();
-
-            for (File storageFolder : storageFolders) {
-                AsterixFilesUtil.deleteFolder(storageFolder.getAbsolutePath());
-            }
-        }
-    }
 }
\ No newline at end of file
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 6fa60ca..8ae3eb1 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -25,6 +25,9 @@
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -34,6 +37,7 @@
 
 import org.apache.asterix.common.replication.AsterixReplicationJob;
 import org.apache.asterix.common.replication.IReplicationManager;
+import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IODeviceHandle;
 import org.apache.hyracks.api.replication.IReplicationJob.ReplicationExecutionType;
@@ -49,9 +53,9 @@
 
     private static final Logger LOGGER = Logger.getLogger(PersistentLocalResourceRepository.class.getName());
     private final String[] mountPoints;
-    private static final String ROOT_METADATA_DIRECTORY = "asterix_root_metadata";
-    private static final String ROOT_METADATA_FILE_NAME_PREFIX = ".asterix_root_metadata";
-    private static final long ROOT_LOCAL_RESOURCE_ID = -4321;
+    private static final String STORAGE_METADATA_DIRECTORY = "asterix_root_metadata";
+    private static final String STORAGE_METADATA_FILE_NAME_PREFIX = ".asterix_root_metadata";
+    private static final long STORAGE_LOCAL_RESOURCE_ID = -4321;
     public static final String METADATA_FILE_NAME = ".metadata";
     private final Cache<String, LocalResource> resourceCache;
     private final String nodeId;
@@ -59,8 +63,9 @@
     private IReplicationManager replicationManager;
     private boolean isReplicationEnabled = false;
     private Set<String> filesToBeReplicated;
-    
-    public PersistentLocalResourceRepository(List<IODeviceHandle> devices, String nodeId) throws HyracksDataException {
+
+    public PersistentLocalResourceRepository(List<IODeviceHandle> devices, String nodeId)
+            throws HyracksDataException {
         mountPoints = new String[devices.size()];
         this.nodeId = nodeId;
         for (int i = 0; i < mountPoints.length; i++) {
@@ -75,57 +80,54 @@
                 mountPoints[i] = new String(mountPoint);
             }
         }
-
         resourceCache = CacheBuilder.newBuilder().maximumSize(MAX_CACHED_RESOURCES).build();
     }
 
-    private String prepareRootMetaDataFileName(String mountPoint, String nodeId, int ioDeviceId) {
-        return mountPoint + ROOT_METADATA_DIRECTORY + File.separator + nodeId + "_" + "iodevice" + ioDeviceId;
+    private static String getStorageMetadataDirPath(String mountPoint, String nodeId, int ioDeviceId) {
+        return mountPoint + STORAGE_METADATA_DIRECTORY + File.separator + nodeId + "_" + "iodevice" + ioDeviceId;
     }
 
-    public void initialize(String nodeId, String rootDir) throws HyracksDataException {
+    private static File getStorageMetadataBaseDir(File storageMetadataFile) {
+        //STORAGE_METADATA_DIRECTORY / Node Id / STORAGE_METADATA_FILE_NAME_PREFIX
+        return storageMetadataFile.getParentFile().getParentFile();
+    }
+
+    public void initializeNewUniverse(String storageRootDirName) throws HyracksDataException {
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Initializing local resource repository ... ");
         }
 
-        //if the rootMetadataFile doesn't exist, create it.
+        //create storage metadata file (This file is used to locate the root storage directory after instance restarts).
+        //TODO with the existing cluster configuration file being static and distributed on all NCs, we can find out the storage root
+        //directory without looking at this file. This file could potentially store more information, otherwise no need to keep it. 
         for (int i = 0; i < mountPoints.length; i++) {
-            String rootMetadataFileName = prepareRootMetaDataFileName(mountPoints[i], nodeId, i) + File.separator
-                    + ROOT_METADATA_FILE_NAME_PREFIX;
-            File rootMetadataFile = new File(rootMetadataFileName);
-
-            File rootMetadataDir = new File(prepareRootMetaDataFileName(mountPoints[i], nodeId, i));
-            if (!rootMetadataDir.exists()) {
-                boolean success = rootMetadataDir.mkdirs();
-                if (!success) {
-                    throw new IllegalStateException(
-                            "Unable to create root metadata directory of PersistentLocalResourceRepository in "
-                                    + rootMetadataDir.getAbsolutePath());
-                }
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("created the root-metadata-file's directory: " + rootMetadataDir.getAbsolutePath());
-                }
+            File storageMetadataFile = getStorageMetadataFile(mountPoints[i], nodeId, i);
+            File storageMetadataDir = storageMetadataFile.getParentFile();
+            //make dirs for the storage metadata file
+            boolean success = storageMetadataDir.mkdirs();
+            if (!success) {
+                throw new IllegalStateException(
+                        "Unable to create storage metadata directory of PersistentLocalResourceRepository in "
+                                + storageMetadataDir.getAbsolutePath() + " or directory already exists");
             }
 
-            rootMetadataFile.delete();
-            String mountedRootDir;
-            if (rootDir.startsWith(System.getProperty("file.separator"))) {
-                mountedRootDir = new String(mountPoints[i]
-                        + rootDir.substring(System.getProperty("file.separator").length()));
+            LOGGER.log(Level.INFO,
+                    "created the root-metadata-file's directory: " + storageMetadataDir.getAbsolutePath());
+
+            String storageRootDirPath;
+            if (storageRootDirName.startsWith(System.getProperty("file.separator"))) {
+                storageRootDirPath = new String(
+                        mountPoints[i] + storageRootDirName.substring(System.getProperty("file.separator").length()));
             } else {
-                mountedRootDir = new String(mountPoints[i] + rootDir);
+                storageRootDirPath = new String(mountPoints[i] + storageRootDirName);
             }
-            LocalResource rootLocalResource = new LocalResource(ROOT_LOCAL_RESOURCE_ID, rootMetadataFileName, 0, 0,
-                    mountedRootDir);
-            insert(rootLocalResource);
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("created the root-metadata-file: " + rootMetadataFileName);
-            }
-        }
 
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Completed the initialization of the local resource repository");
+            LocalResource rootLocalResource = new LocalResource(STORAGE_LOCAL_RESOURCE_ID,
+                    storageMetadataFile.getAbsolutePath(), 0, 0, storageRootDirPath);
+            insert(rootLocalResource);
+            LOGGER.log(Level.INFO, "created the root-metadata-file: " + storageMetadataFile.getAbsolutePath());
         }
+        LOGGER.log(Level.INFO, "Completed the initialization of the local resource repository");
     }
 
     @Override
@@ -144,12 +146,11 @@
     @Override
     public synchronized void insert(LocalResource resource) throws HyracksDataException {
         File resourceFile = new File(getFileName(resource.getResourceName(), resource.getResourceId()));
-
         if (resourceFile.exists()) {
-            throw new HyracksDataException("Duplicate resource");
+            throw new HyracksDataException("Duplicate resource: " + resourceFile.getAbsolutePath());
         }
 
-        if (resource.getResourceId() != ROOT_LOCAL_RESOURCE_ID) {
+        if (resource.getResourceId() != STORAGE_LOCAL_RESOURCE_ID) {
             resourceCache.put(resource.getResourceName(), resource);
         }
 
@@ -180,7 +181,7 @@
             }
 
             //if replication enabled, send resource metadata info to remote nodes
-            if (isReplicationEnabled && resource.getResourceId() != ROOT_LOCAL_RESOURCE_ID) {
+            if (isReplicationEnabled && resource.getResourceId() != STORAGE_LOCAL_RESOURCE_ID) {
                 String filePath = getFileName(resource.getResourceName(), resource.getResourceId());
                 createReplicationJob(ReplicationOperation.REPLICATE, filePath);
             }
@@ -193,9 +194,9 @@
         if (resourceFile.exists()) {
             resourceFile.delete();
             resourceCache.invalidate(name);
-            
+
             //if replication enabled, delete resource from remote replicas
-            if (isReplicationEnabled && !resourceFile.getName().startsWith(ROOT_METADATA_FILE_NAME_PREFIX)) {
+            if (isReplicationEnabled && !resourceFile.getName().startsWith(STORAGE_METADATA_FILE_NAME_PREFIX)) {
                 createReplicationJob(ReplicationOperation.DELETE, resourceFile.getAbsolutePath());
             }
         } else {
@@ -213,24 +214,13 @@
         HashMap<Long, LocalResource> resourcesMap = new HashMap<Long, LocalResource>();
 
         for (int i = 0; i < mountPoints.length; i++) {
-            String rootMetadataFileName = prepareRootMetaDataFileName(mountPoints[i], nodeId, i) + File.separator
-                    + ROOT_METADATA_FILE_NAME_PREFIX;
-            File rootMetadataFile = new File(rootMetadataFileName);
-            if (!rootMetadataFile.exists()) {
-                continue;
-            }
-            //if the rootMetadataFile exists, read it and set it as mounting point root
-            LocalResource rootLocalResource = readLocalResource(rootMetadataFile);
-            String mountedRootDir = (String) rootLocalResource.getResourceObject();
-
-            File rootDirFile = new File(mountedRootDir);
-            if (!rootDirFile.exists()) {
-                //rootDir may not exist if this node is not the metadata node and doesn't have any user data.
+            File storageRootDir = getStorageRootDirectoryIfExists(mountPoints[i], nodeId, i);
+            if (storageRootDir == null) {
                 continue;
             }
 
             //load all local resources.
-            File[] dataverseFileList = rootDirFile.listFiles();
+            File[] dataverseFileList = storageRootDir.listFiles();
             if (dataverseFileList != null) {
                 for (File dataverseFile : dataverseFileList) {
                     if (dataverseFile.isDirectory()) {
@@ -268,24 +258,13 @@
         long maxResourceId = 0;
 
         for (int i = 0; i < mountPoints.length; i++) {
-            String rootMetadataFileName = prepareRootMetaDataFileName(mountPoints[i], nodeId, i) + File.separator
-                    + ROOT_METADATA_FILE_NAME_PREFIX;
-            File rootMetadataFile = new File(rootMetadataFileName);
-            if (!rootMetadataFile.exists()) {
-                continue;
-            }
-
-            //if the rootMetadataFile exists, read it and set it as mounting point root
-            LocalResource rootLocalResource = readLocalResource(rootMetadataFile);
-            String mountedRootDir = (String) rootLocalResource.getResourceObject();
-
-            File rootDirFile = new File(mountedRootDir);
-            if (!rootDirFile.exists()) {
+            File storageRootDir = getStorageRootDirectoryIfExists(mountPoints[i], nodeId, i);
+            if (storageRootDir == null) {
                 continue;
             }
 
             //traverse all local resources.
-            File[] dataverseFileList = rootDirFile.listFiles();
+            File[] dataverseFileList = storageRootDir.listFiles();
             if (dataverseFileList != null) {
                 for (File dataverseFile : dataverseFileList) {
                     if (dataverseFile.isDirectory()) {
@@ -319,8 +298,8 @@
         return maxResourceId;
     }
 
-    private String getFileName(String baseDir, long resourceId) {
-        if (resourceId == ROOT_LOCAL_RESOURCE_ID) {
+    private static String getFileName(String baseDir, long resourceId) {
+        if (resourceId == STORAGE_LOCAL_RESOURCE_ID) {
             return baseDir;
         } else {
             if (!baseDir.endsWith(System.getProperty("file.separator"))) {
@@ -369,6 +348,7 @@
             }
         }
     };
+
     public void setReplicationManager(IReplicationManager replicationManager) {
         this.replicationManager = replicationManager;
         isReplicationEnabled = replicationManager.isReplicationEnabled();
@@ -394,4 +374,63 @@
         return mountPoints;
     }
 
-}
+    /**
+     * Deletes physical files of all data verses.
+     * @param deleteStorageMetadata
+     * @throws IOException
+     */
+    public void deleteStorageData(boolean deleteStorageMetadata) throws IOException {
+        for (int i = 0; i < mountPoints.length; i++) {
+            File storageDir = getStorageRootDirectoryIfExists(mountPoints[i], nodeId, i);
+            if (storageDir != null) {
+                if (storageDir.isDirectory()) {
+                    FileUtils.deleteDirectory(storageDir);
+                }
+            }
+
+            if (deleteStorageMetadata) {
+                //delete the metadata root directory
+                File storageMetadataFile = getStorageMetadataFile(mountPoints[i], nodeId, i);
+                File storageMetadataDir = getStorageMetadataBaseDir(storageMetadataFile);
+                if (storageMetadataDir.exists() && storageMetadataDir.isDirectory()) {
+                    FileUtils.deleteDirectory(storageMetadataDir);
+                }
+            }
+        }
+    }
+
+    /**
+     * @param mountPoint
+     * @param nodeId
+     * @param ioDeviceId
+     * @return A file reference to the storage metadata file.
+     */
+    private static File getStorageMetadataFile(String mountPoint, String nodeId, int ioDeviceId) {
+        String storageMetadataFileName = getStorageMetadataDirPath(mountPoint, nodeId, ioDeviceId) + File.separator
+                + STORAGE_METADATA_FILE_NAME_PREFIX;
+        File storageMetadataFile = new File(storageMetadataFileName);
+        return storageMetadataFile;
+    }
+
+    /**
+     * @param mountPoint
+     * @param nodeId
+     * @param ioDeviceId
+     * @return A file reference to the storage root directory if exists, otherwise null.
+     * @throws HyracksDataException
+     */
+    public static File getStorageRootDirectoryIfExists(String mountPoint, String nodeId, int ioDeviceId)
+            throws HyracksDataException {
+        File storageRootDir = null;
+        File storageMetadataFile = getStorageMetadataFile(mountPoint, nodeId, ioDeviceId);
+        if (storageMetadataFile.exists()) {
+            LocalResource rootLocalResource = readLocalResource(storageMetadataFile);
+            String storageRootDirPath = (String) rootLocalResource.getResourceObject();
+            Path path = Paths.get(storageRootDirPath);
+            if (Files.exists(path)) {
+                storageRootDir = new File(storageRootDirPath);
+            }
+        }
+        return storageRootDir;
+    }
+}
\ No newline at end of file