[ASTERIXDB-2177][STO] Use Fixed Storage Root Dir Name

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Eliminate the need to read the storage root dir name
  from cluster properties and use a fixed name (storage).
- Eliminate the need to maintain root_metadata file.

Change-Id: I4e9772e9da10cff33f11353610788ba541a35571
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2182
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index e77d535..a3def26 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -197,7 +197,7 @@
                 LOGGER.log(Level.WARNING,
                         "Deleting the storage dir. initialRun = " + initialRun + ", systemState = " + systemState);
             }
-            localResourceRepository.deleteStorageData(true);
+            localResourceRepository.deleteStorageData();
         }
 
         datasetMemoryManager = new DatasetMemoryManager(storageProperties);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index 47e5ac9..8b417a9 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -41,12 +41,12 @@
 import org.apache.asterix.common.transactions.IRecoveryManager;
 import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
 import org.apache.asterix.common.utils.PrintUtil;
+import org.apache.asterix.common.utils.StorageConstants;
 import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.asterix.event.schema.cluster.Cluster;
 import org.apache.asterix.event.schema.cluster.Node;
 import org.apache.asterix.messaging.MessagingChannelInterfaceFactory;
 import org.apache.asterix.messaging.NCMessageBroker;
-import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.asterix.utils.CompatibilityUtil;
 import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.api.application.IServiceContext;
@@ -120,22 +120,13 @@
         if (latestCheckpoint != null) {
             CompatibilityUtil.ensureCompatibility(controllerService, latestCheckpoint.getStorageVersion());
         }
-        IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager();
-        final SystemState stateOnStartup = recoveryMgr.getSystemState();
-        if (stateOnStartup == SystemState.PERMANENT_DATA_LOSS) {
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("System state: " + SystemState.PERMANENT_DATA_LOSS);
-                LOGGER.info("Node ID: " + nodeId);
-                LOGGER.info("Stores: " + PrintUtil.toString(metadataProperties.getStores()));
-                LOGGER.info("Root Metadata Store: " + metadataProperties.getStores().get(nodeId)[0]);
-            }
-            PersistentLocalResourceRepository localResourceRepository =
-                    (PersistentLocalResourceRepository) runtimeContext.getLocalResourceRepository();
-            localResourceRepository.initializeNewUniverse(ClusterProperties.INSTANCE.getStorageDirectoryName());
+        if (LOGGER.isLoggable(Level.INFO)) {
+            IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager();
+            LOGGER.info("System state: " + recoveryMgr.getSystemState());
+            LOGGER.info("Node ID: " + nodeId);
+            LOGGER.info("Stores: " + PrintUtil.toString(metadataProperties.getStores()));
         }
-
         webManager = new WebManager();
-
         performLocalCleanUp();
     }
 
@@ -256,7 +247,7 @@
             for (Node node : nodes) {
                 String ncId = asterixInstanceName + "_" + node.getId();
                 if (ncId.equalsIgnoreCase(nodeId)) {
-                    String storeDir = ClusterProperties.INSTANCE.getStorageDirectoryName();
+                    String storeDir = StorageConstants.STORAGE_ROOT_DIR_NAME;
                     String nodeIoDevices = node.getIodevices() == null ? cluster.getIodevices() : node.getIodevices();
                     String[] ioDevicePaths = nodeIoDevices.trim().split(",");
                     for (int i = 0; i < ioDevicePaths.length; i++) {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
index 78bed6a..9d73407 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
@@ -34,6 +34,7 @@
 import org.apache.asterix.app.external.ExternalUDFLibrarian;
 import org.apache.asterix.common.config.ClusterProperties;
 import org.apache.asterix.common.library.ILibraryManager;
+import org.apache.asterix.common.utils.StorageConstants;
 import org.apache.asterix.test.common.TestExecutor;
 import org.apache.asterix.testframework.context.TestCaseContext;
 import org.apache.commons.lang.SystemUtils;
@@ -152,7 +153,7 @@
             File[] dataDirs = ioDevice.getMount().listFiles();
             for (File dataDir : dataDirs) {
                 String dirName = dataDir.getName();
-                if (!dirName.equals(ClusterProperties.DEFAULT_STORAGE_DIR_NAME)) {
+                if (!dirName.equals(StorageConstants.STORAGE_ROOT_DIR_NAME)) {
                     // Skips non-storage directories.
                     continue;
                 }
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/MigrateStorageResourcesTaskTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/MigrateStorageResourcesTaskTest.java
index 5b53041..47a561a 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/MigrateStorageResourcesTaskTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/MigrateStorageResourcesTaskTest.java
@@ -59,8 +59,8 @@
     public void storageStructureMigration() throws Exception {
         Function<IndexPathElements, String> legacyIndexPathProvider = (pathElements) ->
                 (pathElements.getRebalanceCount().equals("0") ? "" : pathElements.getRebalanceCount() + File.separator)
-                        + pathElements.getDatasetName() + StoragePathUtil.DATASET_INDEX_NAME_SEPARATOR + pathElements
-                        .getIndexName();
+                        + pathElements.getDatasetName() + StorageConstants.LEGACY_DATASET_INDEX_NAME_SEPARATOR
+                        + pathElements.getIndexName();
         StoragePathUtil.setIndexPathProvider(legacyIndexPathProvider);
         integrationUtil.init(true);
         // create dataset and insert data using legacy structure
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ClusterProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ClusterProperties.java
index 0abb92f..cc3291d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ClusterProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ClusterProperties.java
@@ -38,7 +38,6 @@
 
     public static final ClusterProperties INSTANCE = new ClusterProperties();
     public static final String CLUSTER_CONFIGURATION_FILE = "cluster.xml";
-    public static final String DEFAULT_STORAGE_DIR_NAME = "storage";
     private String nodeNamePrefix = StringUtils.EMPTY;
     private Cluster cluster;
 
@@ -61,14 +60,6 @@
         return cluster;
     }
 
-    public String getStorageDirectoryName() {
-        if (cluster != null) {
-            return cluster.getStore();
-        }
-        // virtual cluster without cluster config file
-        return DEFAULT_STORAGE_DIR_NAME;
-    }
-
     public Node getNodeById(String nodeId) {
         Optional<Node> matchingNode = cluster.getNode().stream().filter(node -> node.getId().equals(nodeId)).findAny();
         return matchingNode.isPresent() ? matchingNode.get() : null;
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
index 0d65067..bd057fa 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
@@ -22,7 +22,7 @@
 import java.nio.file.Path;
 import java.nio.file.Paths;
 
-import org.apache.asterix.common.utils.StoragePathUtil;
+import org.apache.asterix.common.utils.StorageConstants;
 
 public class ResourceReference {
 
@@ -96,7 +96,7 @@
         int offset = tokens.length;
         ref.name = tokens[--offset];
         // split combined dataset/index name
-        final String[] indexTokens = tokens[--offset].split(StoragePathUtil.DATASET_INDEX_NAME_SEPARATOR);
+        final String[] indexTokens = tokens[--offset].split(StorageConstants.LEGACY_DATASET_INDEX_NAME_SEPARATOR);
         if (indexTokens.length != 2) {
             throw new IllegalStateException("Unrecognized legacy path structure: " + path);
         }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
index 48769d4..6262f71 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
@@ -25,8 +25,10 @@
  */
 public class StorageConstants {
 
-    public static final String METADATA_ROOT = "root_metadata";
+    public static final String STORAGE_ROOT_DIR_NAME = "storage";
+    public static final String PARTITION_DIR_PREFIX = "partition_";
     public static final String METADATA_FILE_NAME = ".metadata";
+    public static final String LEGACY_DATASET_INDEX_NAME_SEPARATOR = "_idx_";
 
     /**
      * The storage version of AsterixDB related artifacts (e.g. log files, checkpoint files, etc..).
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
index 5110d74..b93ccb5 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
@@ -37,9 +37,8 @@
 import org.apache.log4j.Logger;
 
 public class StoragePathUtil {
+
     private static final Logger LOGGER = Logger.getLogger(StoragePathUtil.class.getName());
-    public static final String PARTITION_DIR_PREFIX = "partition_";
-    public static final String DATASET_INDEX_NAME_SEPARATOR = "_idx_";
     private static Function<IndexPathElements, String> indexPathProvider;
 
     private StoragePathUtil() {
@@ -60,8 +59,9 @@
         return new MappedFileSplit(partition.getActiveNodeId(), relativePath, partition.getIODeviceNum());
     }
 
-    public static String prepareStoragePartitionPath(String storageDirName, int partitonId) {
-        return storageDirName + File.separator + StoragePathUtil.PARTITION_DIR_PREFIX + partitonId;
+    public static String prepareStoragePartitionPath(int partitonId) {
+        return Paths.get(StorageConstants.STORAGE_ROOT_DIR_NAME, StorageConstants.PARTITION_DIR_PREFIX + partitonId)
+                .toString();
     }
 
     public static String prepareDataverseIndexName(String dataverseName, String datasetName, String idxName,
@@ -80,12 +80,10 @@
         return datasetName + File.separator + rebalanceCount + File.separator + idxName;
     }
 
-    public static int getPartitionNumFromName(String name) {
-        return Integer.parseInt(name.substring(PARTITION_DIR_PREFIX.length()));
-    }
-
     public static int getPartitionNumFromRelativePath(String relativePath) {
-        int startIdx = relativePath.indexOf(PARTITION_DIR_PREFIX) + PARTITION_DIR_PREFIX.length();
+        int startIdx =
+                relativePath.indexOf(StorageConstants.PARTITION_DIR_PREFIX) + StorageConstants.PARTITION_DIR_PREFIX
+                        .length();
         String partition = relativePath.substring(startIdx, relativePath.indexOf(File.separatorChar, startIdx));
         return Integer.parseInt(partition);
     }
diff --git a/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/util/PatternCreator.java b/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/util/PatternCreator.java
index 71cef80..c0f8f5f 100644
--- a/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/util/PatternCreator.java
+++ b/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/util/PatternCreator.java
@@ -27,7 +27,6 @@
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.asterix.common.utils.StorageConstants;
 import org.apache.asterix.event.driver.EventDriver;
 import org.apache.asterix.event.error.VerificationUtil;
 import org.apache.asterix.event.model.AsterixInstance;
@@ -142,10 +141,9 @@
         for (Node node : cluster.getNode()) {
             Nodeid nodeid = new Nodeid(new Value(null, node.getId()));
             iodevices = node.getIodevices() == null ? instance.getCluster().getIodevices() : node.getIodevices();
-            pargs = workingDir + " " + instance.getName() + " " + iodevices + " " + store + " "
-                    + StorageConstants.METADATA_ROOT + " " + AsterixEventServiceUtil.TXN_LOG_DIR + " "
-                    + backupId + " " + hdfsBackupDir + " " + "hdfs" + " " + node.getId() + " " + hdfsUrl + " "
-                    + hadoopVersion;
+            pargs = workingDir + " " + instance.getName() + " " + iodevices + " " + store + " " + " "
+                    + AsterixEventServiceUtil.TXN_LOG_DIR + " " + backupId + " " + hdfsBackupDir + " " + "hdfs" + " "
+                    + node.getId() + " " + hdfsUrl + " " + hadoopVersion;
             Event event = new Event("backup", nodeid, pargs);
             patternList.add(new Pattern(null, 1, null, event));
         }
@@ -167,9 +165,8 @@
             Nodeid nodeid = new Nodeid(new Value(null, node.getId()));
             iodevices = node.getIodevices() == null ? instance.getCluster().getIodevices() : node.getIodevices();
             txnLogDir = node.getTxnLogDir() == null ? instance.getCluster().getTxnLogDir() : node.getTxnLogDir();
-            pargs = workingDir + " " + instance.getName() + " " + iodevices + " " + store + " "
-                    + StorageConstants.METADATA_ROOT + " " + txnLogDir + " " + backupId + " " + backupDir
-                    + " " + "local" + " " + node.getId();
+            pargs = workingDir + " " + instance.getName() + " " + iodevices + " " + store + " " + " " + txnLogDir + " "
+                    + backupId + " " + backupDir + " " + "local" + " " + node.getId();
             Event event = new Event("backup", nodeid, pargs);
             patternList.add(new Pattern(null, 1, null, event));
         }
@@ -190,10 +187,9 @@
         for (Node node : cluster.getNode()) {
             Nodeid nodeid = new Nodeid(new Value(null, node.getId()));
             String iodevices = node.getIodevices() == null ? cluster.getIodevices() : node.getIodevices();
-            pargs = workingDir + " " + instance.getName() + " " + iodevices + " " + clusterStore + " "
-                    + StorageConstants.METADATA_ROOT + " " + AsterixEventServiceUtil.TXN_LOG_DIR + " "
-                    + backupId + " " + " " + hdfsBackupDir + " " + "hdfs" + " " + node.getId() + " " + hdfsUrl + " "
-                    + hadoopVersion;
+            pargs = workingDir + " " + instance.getName() + " " + iodevices + " " + clusterStore + " " + " "
+                    + AsterixEventServiceUtil.TXN_LOG_DIR + " " + backupId + " " + " " + hdfsBackupDir + " " + "hdfs"
+                    + " " + node.getId() + " " + hdfsUrl + " " + hadoopVersion;
             Event event = new Event("restore", nodeid, pargs);
             patternList.add(new Pattern(null, 1, null, event));
         }
@@ -211,9 +207,9 @@
         for (Node node : cluster.getNode()) {
             Nodeid nodeid = new Nodeid(new Value(null, node.getId()));
             String iodevices = node.getIodevices() == null ? cluster.getIodevices() : node.getIodevices();
-            pargs = workingDir + " " + instance.getName() + " " + iodevices + " " + clusterStore + " "
-                    + StorageConstants.METADATA_ROOT + " " + AsterixEventServiceUtil.TXN_LOG_DIR + " "
-                    + backupId + " " + backupDir + " " + "local" + " " + node.getId();
+            pargs = workingDir + " " + instance.getName() + " " + iodevices + " " + clusterStore + " " + " "
+                    + AsterixEventServiceUtil.TXN_LOG_DIR + " " + backupId + " " + backupDir + " " + "local" + " "
+                    + node.getId();
             Event event = new Event("restore", nodeid, pargs);
             patternList.add(new Pattern(null, 1, null, event));
         }
@@ -285,7 +281,6 @@
             }
         }
         patternList.addAll(createRemoveAsterixLogDirPattern(instance).getPattern());
-        patternList.addAll(createRemoveAsterixRootMetadata(instance).getPattern());
         patternList.addAll(createRemoveAsterixTxnLogs(instance).getPattern());
         return new Patterns(patternList);
     }
@@ -438,24 +433,6 @@
         return new Patterns(patternList);
     }
 
-    private Patterns createRemoveAsterixRootMetadata(AsterixInstance instance) throws Exception {
-        List<Pattern> patternList = new ArrayList<>();
-        Cluster cluster = instance.getCluster();
-        Nodeid nodeid;
-        String pargs;
-        Event event;
-        for (Node node : cluster.getNode()) {
-            String iodevices = node.getIodevices() == null ? cluster.getIodevices() : node.getIodevices();
-            String primaryIODevice = iodevices.split(",")[0].trim();
-            pargs = primaryIODevice + File.separator + StorageConstants.METADATA_ROOT;
-            nodeid = new Nodeid(new Value(null, node.getId()));
-            event = new Event("file_delete", nodeid, pargs);
-            patternList.add(new Pattern(null, 1, null, event));
-        }
-
-        return new Patterns(patternList);
-    }
-
     private Patterns createRemoveAsterixLogDirPattern(AsterixInstance instance) throws Exception {
         List<Pattern> patternList = new ArrayList<>();
         Cluster cluster = instance.getCluster();
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
index dad0d51..dc8a8aa 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
@@ -25,7 +25,6 @@
 import java.util.Map;
 
 import org.apache.asterix.common.cluster.ClusterPartition;
-import org.apache.asterix.common.config.ClusterProperties;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.utils.StoragePathUtil;
@@ -78,9 +77,7 @@
     public static FileSplit splitsForAdapter(String dataverseName, String feedName, String nodeName,
             ClusterPartition partition) {
         File relPathFile = new File(prepareDataverseFeedName(dataverseName, feedName));
-        String storageDirName = ClusterProperties.INSTANCE.getStorageDirectoryName();
-        String storagePartitionPath =
-                StoragePathUtil.prepareStoragePartitionPath(storageDirName, partition.getPartitionId());
+        String storagePartitionPath = StoragePathUtil.prepareStoragePartitionPath(partition.getPartitionId());
         // Note: feed adapter instances in a single node share the feed logger
         // format: 'storage dir name'/partition_#/dataverse/feed/node
         File f = new File(storagePartitionPath + File.separator + relPathFile + File.separator + nodeName);
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index 4c98904..0303392 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -28,13 +28,12 @@
 
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.cluster.ClusterPartition;
-import org.apache.asterix.common.config.ClusterProperties;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.common.config.MetadataProperties;
 import org.apache.asterix.common.context.AsterixVirtualBufferCacheProvider;
-import org.apache.asterix.common.context.DatasetLSMComponentIdGeneratorFactory;
 import org.apache.asterix.common.context.CorrelatedPrefixMergePolicyFactory;
+import org.apache.asterix.common.context.DatasetLSMComponentIdGeneratorFactory;
 import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.exceptions.MetadataException;
@@ -313,8 +312,7 @@
         }
         ClusterPartition metadataPartition = appContext.getMetadataProperties().getMetadataPartition();
         int metadataDeviceId = metadataPartition.getIODeviceNum();
-        String metadataPartitionPath = StoragePathUtil.prepareStoragePartitionPath(
-                ClusterProperties.INSTANCE.getStorageDirectoryName(), metadataPartition.getPartitionId());
+        String metadataPartitionPath = StoragePathUtil.prepareStoragePartitionPath(metadataPartition.getPartitionId());
         String resourceName = metadataPartitionPath + File.separator + index.getFileNameRelativePath();
         FileReference file = ioManager.getFileReference(metadataDeviceId, resourceName);
         index.setFile(file);
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
index e5f5c19..7ac6183 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
@@ -24,7 +24,6 @@
 
 import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.cluster.IClusterStateManager;
-import org.apache.asterix.common.config.ClusterProperties;
 import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
@@ -42,15 +41,12 @@
     }
 
     private static FileSplit[] getDataverseSplits(IClusterStateManager clusterStateManager, String dataverseName) {
-        File relPathFile = new File(dataverseName);
         List<FileSplit> splits = new ArrayList<>();
         // get all partitions
         ClusterPartition[] clusterPartition = clusterStateManager.getClusterPartitons();
-        String storageDirName = ClusterProperties.INSTANCE.getStorageDirectoryName();
         for (int j = 0; j < clusterPartition.length; j++) {
-            File f = new File(
-                    StoragePathUtil.prepareStoragePartitionPath(storageDirName, clusterPartition[j].getPartitionId())
-                            + File.separator + relPathFile);
+            File f = new File(StoragePathUtil.prepareStoragePartitionPath(clusterPartition[j].getPartitionId()),
+                    dataverseName);
             splits.add(StoragePathUtil.getFileSplitForClusterPartition(clusterPartition[j], f.getPath()));
         }
         return splits.toArray(new FileSplit[] {});
@@ -68,9 +64,8 @@
 
     public static FileSplit[] getIndexSplits(IClusterStateManager clusterStateManager, Dataset dataset,
             String indexName, List<String> nodes) {
-        File relPathFile = new File(StoragePathUtil.prepareDataverseIndexName(dataset.getDataverseName(),
-                dataset.getDatasetName(), indexName, dataset.getRebalanceCount()));
-        String storageDirName = ClusterProperties.INSTANCE.getStorageDirectoryName();
+        final String relPath = StoragePathUtil.prepareDataverseIndexName(dataset.getDataverseName(),
+                dataset.getDatasetName(), indexName, dataset.getRebalanceCount());
         List<FileSplit> splits = new ArrayList<>();
         for (String nd : nodes) {
             int numPartitions = clusterStateManager.getNodePartitionsCount(nd);
@@ -81,9 +76,8 @@
             }
 
             for (int k = 0; k < numPartitions; k++) {
-                // format: 'storage dir name'/partition_#/dataverse/dataset_idx_index
-                File f = new File(StoragePathUtil.prepareStoragePartitionPath(storageDirName,
-                        nodePartitions[k].getPartitionId()) + File.separator + relPathFile);
+                File f = new File(StoragePathUtil.prepareStoragePartitionPath(nodePartitions[k].getPartitionId()),
+                        relPath);
                 splits.add(StoragePathUtil.getFileSplitForClusterPartition(nodePartitions[k], f.getPath()));
             }
         }
@@ -95,10 +89,4 @@
         FileSplit[] splits = getDataverseSplits(clusterStateManager, dataverse);
         return StoragePathUtil.splitProviderAndPartitionConstraints(splits);
     }
-
-    public static String getIndexPath(String partitionPath, int partition, String dataverse, String fullIndexName) {
-        String storageDirName = ClusterProperties.INSTANCE.getStorageDirectoryName();
-        return partitionPath + StoragePathUtil.prepareStoragePartitionPath(storageDirName, partition) + File.separator
-                + StoragePathUtil.prepareDataverseIndexName(dataverse, fullIndexName);
-    }
 }
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
index df17987..f3eea32 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
@@ -39,6 +39,7 @@
 import org.apache.asterix.common.replication.IReplicationManager;
 import org.apache.asterix.common.transactions.ILogManager;
 import org.apache.asterix.common.transactions.IRecoveryManager;
+import org.apache.asterix.common.utils.StorageConstants;
 import org.apache.asterix.replication.storage.ReplicaResourcesManager;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -191,8 +192,7 @@
                 datasetLifeCycleManager.closeAllDatasets();
 
                 //3. remove any existing storage data and initialize storage metadata
-                resourceRepository.deleteStorageData(true);
-                resourceRepository.initializeNewUniverse(ClusterProperties.INSTANCE.getStorageDirectoryName());
+                resourceRepository.deleteStorageData();
 
                 //4. select remote replicas to recover from per lost replica data
                 failbackRecoveryReplicas = constructRemoteRecoveryPlan();
@@ -295,8 +295,7 @@
                 datasetLifeCycleManager.closeAllDatasets();
 
                 //2. remove any existing storage data and initialize storage metadata
-                resourceRepository.deleteStorageData(true);
-                resourceRepository.initializeNewUniverse(ClusterProperties.INSTANCE.getStorageDirectoryName());
+                resourceRepository.deleteStorageData();
 
                 /*** Start Recovery Per Lost Replica ***/
                 for (Entry<String, Set<Integer>> remoteReplica : recoveryPlan.entrySet()) {
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 04cbef9..e87a39b 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -26,9 +26,9 @@
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
-import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -36,11 +36,8 @@
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import java.util.SortedMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Predicate;
-import java.util.logging.Level;
-import java.util.logging.Logger;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -48,7 +45,6 @@
 import org.apache.asterix.common.config.MetadataProperties;
 import org.apache.asterix.common.dataflow.DatasetLocalResource;
 import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.replication.IReplicationManager;
 import org.apache.asterix.common.replication.ReplicationJob;
 import org.apache.asterix.common.storage.DatasetResourceReference;
@@ -75,18 +71,12 @@
 
     public static final Predicate<Path> INDEX_COMPONENTS = path -> !path.endsWith(StorageConstants.METADATA_FILE_NAME);
     // Private constants
-    private static final Logger LOGGER = Logger.getLogger(PersistentLocalResourceRepository.class.getName());
-    private static final String STORAGE_METADATA_DIRECTORY = StorageConstants.METADATA_ROOT;
-    private static final String STORAGE_METADATA_FILE_NAME_PREFIX = "." + StorageConstants.METADATA_ROOT;
     private static final int MAX_CACHED_RESOURCES = 1000;
-    public static final int RESOURCES_TREE_DEPTH_FROM_STORAGE_ROOT = 6;
+    private static final int RESOURCES_TREE_DEPTH_FROM_STORAGE_ROOT = 6;
 
     // Finals
     private final IIOManager ioManager;
-    private final String[] mountPoints;
-    private final String nodeId;
     private final Cache<String, LocalResource> resourceCache;
-    private final SortedMap<Integer, ClusterPartition> clusterPartitions;
     private final Set<Integer> nodeOriginalPartitions;
     private final Set<Integer> nodeActivePartitions;
     // Mutables
@@ -94,27 +84,19 @@
     private Set<String> filesToBeReplicated;
     private IReplicationManager replicationManager;
     private Set<Integer> nodeInactivePartitions;
+    private final Path[] storageRoots;
 
-    public PersistentLocalResourceRepository(IIOManager ioManager, List<IODeviceHandle> devices, String nodeId,
-            MetadataProperties metadataProperties) throws HyracksDataException {
+    public PersistentLocalResourceRepository(IIOManager ioManager, String nodeId,
+            MetadataProperties metadataProperties) {
         this.ioManager = ioManager;
-        mountPoints = new String[devices.size()];
-        this.nodeId = nodeId;
-        this.clusterPartitions = metadataProperties.getClusterPartitions();
-        for (int i = 0; i < mountPoints.length; i++) {
-            String mountPoint = devices.get(i).getMount().getPath();
-            File mountPointDir = new File(mountPoint);
-            if (!mountPointDir.exists()) {
-                throw new HyracksDataException(mountPointDir.getAbsolutePath() + " doesn't exist.");
-            }
-            if (!mountPoint.endsWith(File.separator)) {
-                mountPoints[i] = mountPoint + File.separator;
-            } else {
-                mountPoints[i] = mountPoint;
-            }
+        storageRoots = new Path[ioManager.getIODevices().size()];
+        final List<IODeviceHandle> ioDevices = ioManager.getIODevices();
+        for (int i = 0; i < ioDevices.size(); i++) {
+            storageRoots[i] =
+                    Paths.get(ioDevices.get(i).getMount().getAbsolutePath(), StorageConstants.STORAGE_ROOT_DIR_NAME);
         }
+        createStorageRoots();
         resourceCache = CacheBuilder.newBuilder().maximumSize(MAX_CACHED_RESOURCES).build();
-
         ClusterPartition[] nodePartitions = metadataProperties.getNodePartitions().get(nodeId);
         //initially the node active partitions are the same as the original partitions
         nodeOriginalPartitions = new HashSet<>(nodePartitions.length);
@@ -136,47 +118,6 @@
         return aString.toString();
     }
 
-    public void initializeNewUniverse(String storageRoot) throws HyracksDataException {
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("Initializing local resource repository ... ");
-        }
-        /*
-         * create storage metadata file
-         * (This file is used to locate the root storage directory after instance restarts).
-         * TODO with the existing cluster configuration file being static and distributed on all NCs
-         * we can find out the storage root directory without looking at this file.
-         * This file could potentially store more information, otherwise no need to keep it.
-         */
-        String storageRootDirName = storageRoot;
-        while (storageRootDirName.startsWith(File.separator)) {
-            storageRootDirName = storageRootDirName.substring(File.separator.length());
-        }
-        for (int i = 0; i < mountPoints.length; i++) {
-            FileReference storageMetadataFile = getStorageMetadataFile(ioManager, nodeId, i);
-            File storageMetadataDir = storageMetadataFile.getFile().getParentFile();
-            if (storageMetadataDir.exists()) {
-                throw HyracksDataException.create(ErrorCode.ROOT_LOCAL_RESOURCE_EXISTS, getClass().getSimpleName(),
-                        storageMetadataDir.getAbsolutePath());
-            }
-            //make dirs for the storage metadata file
-            boolean success = storageMetadataDir.mkdirs();
-            if (!success) {
-                throw HyracksDataException
-                        .create(ErrorCode.ROOT_LOCAL_RESOURCE_COULD_NOT_BE_CREATED, getClass().getSimpleName(),
-                                storageMetadataDir.getAbsolutePath());
-            }
-            LOGGER.log(Level.INFO,
-                    "created the root-metadata-file's directory: " + storageMetadataDir.getAbsolutePath());
-            try (FileOutputStream fos = new FileOutputStream(storageMetadataFile.getFile())) {
-                fos.write(storageRootDirName.getBytes(StandardCharsets.UTF_8));
-            } catch (IOException e) {
-                throw HyracksDataException.create(e);
-            }
-            LOGGER.log(Level.INFO, "created the root-metadata-file: " + storageMetadataFile.getAbsolutePath());
-        }
-        LOGGER.log(Level.INFO, "Completed the initialization of the local resource repository");
-    }
-
     @Override
     public LocalResource get(String relativePath) throws HyracksDataException {
         LocalResource resource = resourceCache.getIfPresent(relativePath);
@@ -208,7 +149,7 @@
             oosToFos.writeObject(resource);
             oosToFos.flush();
         } catch (IOException e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
 
         resourceCache.put(resource.getPath(), resource);
@@ -231,8 +172,7 @@
             } finally {
                 // Regardless of successfully deleted or not, the operation should be replicated.
                 //if replication enabled, delete resource from remote replicas
-                if (isReplicationEnabled && !resourceFile.getFile().getName()
-                        .startsWith(STORAGE_METADATA_FILE_NAME_PREFIX)) {
+                if (isReplicationEnabled) {
                     createReplicationJob(ReplicationOperation.DELETE, resourceFile);
                 }
             }
@@ -247,16 +187,11 @@
         String fileName = resourcePath + File.separator + StorageConstants.METADATA_FILE_NAME;
         return ioManager.resolve(fileName);
     }
+
     public Map<Long, LocalResource> getResources(Predicate<LocalResource> filter) throws HyracksDataException {
         Map<Long, LocalResource> resourcesMap = new HashMap<>();
-        for (int i = 0; i < mountPoints.length; i++) {
-            File storageRootDir = getStorageRootDirectoryIfExists(ioManager, nodeId, i);
-            if (storageRootDir == null) {
-                LOGGER.log(Level.INFO, "Getting storage root dir returned null. Returning");
-                continue;
-            }
-            LOGGER.log(Level.INFO, "Getting storage root dir returned " + storageRootDir.getAbsolutePath());
-            try (Stream<Path> stream = Files.find(storageRootDir.toPath(), RESOURCES_TREE_DEPTH_FROM_STORAGE_ROOT,
+        for (Path root : storageRoots) {
+            try (Stream<Path> stream = Files.find(root, RESOURCES_TREE_DEPTH_FROM_STORAGE_ROOT,
                     (path, attr) -> path.getFileName().toString().equals(StorageConstants.METADATA_FILE_NAME))) {
                 final List<File> resourceMetadataFiles = stream.map(Path::toFile).collect(Collectors.toList());
                 for (File file : resourceMetadataFiles) {
@@ -270,7 +205,6 @@
             }
         }
         return resourcesMap;
-
     }
 
     public Map<Long, LocalResource> loadAndGetAllResources() throws HyracksDataException {
@@ -300,7 +234,7 @@
                 throw new AsterixException("Storage version mismatch.");
             }
         } catch (Exception e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 
@@ -323,93 +257,23 @@
         try {
             replicationManager.submitJob(job);
         } catch (IOException e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 
-    public String[] getStorageMountingPoints() {
-        return mountPoints;
-    }
-
     /**
      * Deletes physical files of all data verses.
      *
-     * @param deleteStorageMetadata
      * @throws IOException
      */
-    public void deleteStorageData(boolean deleteStorageMetadata) throws IOException {
-        for (int i = 0; i < mountPoints.length; i++) {
-            File storageDir = getStorageRootDirectoryIfExists(ioManager, nodeId, i);
-            if (storageDir != null && storageDir.isDirectory()) {
-                FileUtils.deleteDirectory(storageDir);
-            }
-            if (deleteStorageMetadata) {
-                //delete the metadata root directory
-                FileReference storageMetadataFile = getStorageMetadataFile(ioManager, nodeId, i);
-                File storageMetadataDir = storageMetadataFile.getFile().getParentFile().getParentFile();
-                if (storageMetadataDir.exists() && storageMetadataDir.isDirectory()) {
-                    FileUtils.deleteDirectory(storageMetadataDir);
-                }
+    public void deleteStorageData() throws IOException {
+        for (Path root : storageRoots) {
+            final File rootFile = root.toFile();
+            if (rootFile.exists()) {
+                FileUtils.deleteDirectory(rootFile);
             }
         }
-    }
-
-    /**
-     * @param mountPoint
-     * @param nodeId
-     * @param ioDeviceId
-     * @return A file reference to the storage metadata file.
-     */
-    private static FileReference getStorageMetadataFile(IIOManager ioManager, String nodeId, int ioDeviceId) {
-        String storageMetadataFileName =
-                STORAGE_METADATA_DIRECTORY + File.separator + nodeId + "_" + "iodevice" + ioDeviceId + File.separator
-                        + STORAGE_METADATA_FILE_NAME_PREFIX;
-        return new FileReference(ioManager.getIODevices().get(ioDeviceId), storageMetadataFileName);
-    }
-
-    /**
-     * @param mountPoint
-     * @param nodeId
-     * @param ioDeviceId
-     * @return A file reference to the storage root directory if exists, otherwise null.
-     * @throws HyracksDataException
-     */
-    public static File getStorageRootDirectoryIfExists(IIOManager ioManager, String nodeId, int ioDeviceId)
-            throws HyracksDataException {
-        try {
-            FileReference storageMetadataFile = getStorageMetadataFile(ioManager, nodeId, ioDeviceId);
-            LOGGER.log(Level.INFO, "Storage metadata file is " + storageMetadataFile.getAbsolutePath());
-            if (storageMetadataFile.getFile().exists()) {
-                String storageRootDirPath =
-                        new String(Files.readAllBytes(storageMetadataFile.getFile().toPath()), StandardCharsets.UTF_8);
-                LOGGER.log(Level.INFO, "Storage metadata file found and root dir is " + storageRootDirPath);
-                FileReference storageRootFileRef =
-                        new FileReference(ioManager.getIODevices().get(ioDeviceId), storageRootDirPath);
-                if (storageRootFileRef.getFile().exists()) {
-                    return storageRootFileRef.getFile();
-                } else {
-                    LOGGER.log(Level.INFO, "Storage root doesn't exist");
-                }
-            } else {
-                LOGGER.log(Level.INFO, "Storage metadata file doesn't exist");
-            }
-            return null;
-        } catch (IOException ioe) {
-            throw HyracksDataException.create(ioe);
-        }
-    }
-
-    /**
-     * @param partition
-     * @return The partition local path on this NC.
-     */
-    public String getPartitionPath(int partition) {
-        //currently each partition is replicated on the same IO device number on all NCs.
-        return mountPoints[getIODeviceNum(partition)];
-    }
-
-    public int getIODeviceNum(int partition) {
-        return clusterPartitions.get(partition).getIODeviceNum();
+        createStorageRoots();
     }
 
     public Set<Integer> getActivePartitions() {
@@ -439,7 +303,7 @@
 
     /**
      * Gets a set of files for the indexes in partition {@code partition}. Each file points
-     * the to where the index's files are stored.
+     * to where the index's files are stored.
      *
      * @param partition
      * @return The set of indexes files
@@ -469,4 +333,14 @@
         final ResourceReference ref = ResourceReference.of(indexFile.toString());
         return ioManager.resolve(ref.getRelativePath().toString());
     }
+
+    private void createStorageRoots() {
+        for (Path root : storageRoots) {
+            try {
+                Files.createDirectories(root);
+            } catch (IOException e) {
+                throw new IllegalStateException("Failed to create storage root directory at " + root, e);
+            }
+        }
+    }
 }
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java
index 93a7d83..43024b6 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java
@@ -38,6 +38,6 @@
 
     @Override
     public ILocalResourceRepository createRepository() throws HyracksDataException {
-        return new PersistentLocalResourceRepository(ioManager, ioManager.getIODevices(), nodeId, metadataProperties);
+        return new PersistentLocalResourceRepository(ioManager, nodeId, metadataProperties);
     }
 }