[NO ISSUE][STO] Ensure resources file operations are synchronized

- user model changes: no
- storage format changes: no
- interface changes: no

Details:

- To avoid an operation reading a partially written resource file,
  ensure all such operations are synchronized.
- Limit partition resources search to the partition's root directory.

Change-Id: I95f8565780675798393d7d43c0051c04e2b0a98c
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/13164
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 4f5a9f8..6736642 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -291,7 +291,7 @@
         final IIndexCheckpointManagerProvider indexCheckpointManagerProvider =
                 ((INcApplicationContext) (serviceCtx.getApplicationContext())).getIndexCheckpointManagerProvider();
 
-        Map<Long, LocalResource> resourcesMap = localResourceRepository.loadAndGetAllResources();
+        Map<Long, LocalResource> resourcesMap = localResourceRepository.getResources(r -> true, partitions);
         final Map<Long, Long> resourceId2MaxLSNMap = new HashMap<>();
         TxnEntityId tempKeyTxnEntityId = new TxnEntityId(-1, -1, -1, null, -1, false);
 
@@ -503,7 +503,8 @@
             final List<DatasetResourceReference> partitionResources = localResourceRepository.getResources(resource -> {
                 DatasetLocalResource dsResource = (DatasetLocalResource) resource.getResource();
                 return dsResource.getPartition() == partition;
-            }).values().stream().map(DatasetResourceReference::of).collect(Collectors.toList());
+            }, Collections.singleton(partition)).values().stream().map(DatasetResourceReference::of)
+                    .collect(Collectors.toList());
             for (DatasetResourceReference indexRef : partitionResources) {
                 try {
                     final IIndexCheckpointManager idxCheckpointMgr = idxCheckpointMgrProvider.get(indexRef);
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java
index 57463cb..004b640 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.replication.logging;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -91,7 +92,8 @@
             return dls.getDatasetId() == datasetId && dls.getPartition() == resourcePartition
                     && !masterPartitions.contains(dls.getPartition());
         };
-        final Map<Long, LocalResource> resources = localResourceRep.getResources(replicaIndexesPredicate);
+        final Map<Long, LocalResource> resources =
+                localResourceRep.getResources(replicaIndexesPredicate, Collections.singleton(resourcePartition));
         final List<DatasetResourceReference> replicaIndexesRef =
                 resources.values().stream().map(DatasetResourceReference::of).collect(Collectors.toList());
         for (DatasetResourceReference replicaIndexRef : replicaIndexesRef) {
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
index 3ee3094..7f26b96 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
@@ -67,7 +67,7 @@
     @Override
     public void perform(INcApplicationContext appCtx, IReplicationWorker worker) {
         try {
-            LOGGER.info("attempting to replicate {}", this);
+            LOGGER.debug("attempting to receive file {} from master", this);
             final IIOManager ioManager = appCtx.getIoManager();
             // resolve path
             final FileReference localPath = ioManager.resolve(file);
@@ -76,7 +76,6 @@
             final Path maskPath = Paths.get(resourceDir.toString(),
                     StorageConstants.MASK_FILE_PREFIX + localPath.getFile().getName());
             Files.createFile(maskPath);
-
             // receive actual file
             final Path filePath = Paths.get(resourceDir.toString(), localPath.getFile().getName());
             Files.createFile(filePath);
@@ -91,7 +90,7 @@
             }
             //delete mask
             Files.delete(maskPath);
-            LOGGER.info(() -> "Replicated file: " + localPath);
+            LOGGER.info("received file {} from master", localPath);
             ReplicationProtocol.sendAck(worker.getChannel(), worker.getReusableBuffer());
         } catch (IOException e) {
             throw new ReplicationException(e);
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 4d15385..ee5b16e 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -35,6 +35,7 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -138,7 +139,7 @@
     private boolean isReplicationEnabled = false;
     private Set<String> filesToBeReplicated;
     private IReplicationManager replicationManager;
-    private final Path[] storageRoots;
+    private final List<Path> storageRoots;
     private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
     private final IPersistedResourceRegistry persistedResourceRegistry;
 
@@ -148,11 +149,11 @@
         this.ioManager = ioManager;
         this.indexCheckpointManagerProvider = indexCheckpointManagerProvider;
         this.persistedResourceRegistry = persistedResourceRegistry;
-        storageRoots = new Path[ioManager.getIODevices().size()];
+        storageRoots = new ArrayList<>();
         final List<IODeviceHandle> ioDevices = ioManager.getIODevices();
         for (int i = 0; i < ioDevices.size(); i++) {
-            storageRoots[i] =
-                    Paths.get(ioDevices.get(i).getMount().getAbsolutePath(), StorageConstants.STORAGE_ROOT_DIR_NAME);
+            storageRoots.add(
+                    Paths.get(ioDevices.get(i).getMount().getAbsolutePath(), StorageConstants.STORAGE_ROOT_DIR_NAME));
         }
         createStorageRoots();
         resourceCache = CacheBuilder.newBuilder().maximumSize(MAX_CACHED_RESOURCES).build();
@@ -262,10 +263,13 @@
         return ioManager.resolve(fileName);
     }
 
-    public synchronized Map<Long, LocalResource> getResources(Predicate<LocalResource> filter)
+    public synchronized Map<Long, LocalResource> getResources(Predicate<LocalResource> filter, List<Path> roots)
             throws HyracksDataException {
         Map<Long, LocalResource> resourcesMap = new HashMap<>();
-        for (Path root : storageRoots) {
+        for (Path root : roots) {
+            if (!Files.exists(root) || !Files.isDirectory(root)) {
+                continue;
+            }
             final Collection<File> files = FileUtils.listFiles(root.toFile(), METADATA_FILES_FILTER, ALL_DIR_FILTER);
             try {
                 for (File file : files) {
@@ -281,6 +285,20 @@
         return resourcesMap;
     }
 
+    public synchronized Map<Long, LocalResource> getResources(Predicate<LocalResource> filter)
+            throws HyracksDataException {
+        return getResources(filter, storageRoots);
+    }
+
+    public synchronized Map<Long, LocalResource> getResources(Predicate<LocalResource> filter, Set<Integer> partitions)
+            throws HyracksDataException {
+        List<Path> partitionsRoots = new ArrayList<>();
+        for (Integer partition : partitions) {
+            partitionsRoots.add(getPartitionRoot(partition));
+        }
+        return getResources(filter, partitionsRoots);
+    }
+
     public synchronized void deleteInvalidIndexes(Predicate<LocalResource> filter) throws HyracksDataException {
         for (Path root : storageRoots) {
             final Collection<File> files = FileUtils.listFiles(root.toFile(), METADATA_FILES_FILTER, ALL_DIR_FILTER);
@@ -304,7 +322,7 @@
     }
 
     @Override
-    public long maxId() throws HyracksDataException {
+    public synchronized long maxId() throws HyracksDataException {
         final Map<Long, LocalResource> allResources = loadAndGetAllResources();
         final Optional<Long> max = allResources.keySet().stream().max(Long::compare);
         return max.isPresent() ? max.get() : 0;
@@ -330,7 +348,7 @@
         }
     }
 
-    public void setReplicationManager(IReplicationManager replicationManager) {
+    public synchronized void setReplicationManager(IReplicationManager replicationManager) {
         this.replicationManager = replicationManager;
         isReplicationEnabled = replicationManager.isReplicationEnabled();
 
@@ -357,7 +375,7 @@
      *
      * @throws IOException
      */
-    public void deleteStorageData() throws IOException {
+    public synchronized void deleteStorageData() throws IOException {
         for (Path root : storageRoots) {
             final File rootFile = root.toFile();
             if (rootFile.exists()) {
@@ -367,13 +385,13 @@
         createStorageRoots();
     }
 
-    public Set<Integer> getAllPartitions() throws HyracksDataException {
+    public synchronized Set<Integer> getAllPartitions() throws HyracksDataException {
         return loadAndGetAllResources().values().stream().map(LocalResource::getResource)
                 .map(DatasetLocalResource.class::cast).map(DatasetLocalResource::getPartition)
                 .collect(Collectors.toSet());
     }
 
-    public Optional<DatasetResourceReference> getLocalResourceReference(String absoluteFilePath)
+    public synchronized Optional<DatasetResourceReference> getLocalResourceReference(String absoluteFilePath)
             throws HyracksDataException {
         final String localResourcePath = StoragePathUtil.getIndexFileRelativePath(absoluteFilePath);
         final LocalResource lr = get(localResourcePath);
@@ -388,11 +406,12 @@
      * @return The set of indexes files
      * @throws HyracksDataException
      */
-    public Set<File> getPartitionIndexes(int partition) throws HyracksDataException {
+    public synchronized Set<File> getPartitionIndexes(int partition) throws HyracksDataException {
+        Path partitionRoot = getPartitionRoot(partition);
         final Map<Long, LocalResource> partitionResourcesMap = getResources(resource -> {
             DatasetLocalResource dsResource = (DatasetLocalResource) resource.getResource();
             return dsResource.getPartition() == partition;
-        });
+        }, Collections.singletonList(partitionRoot));
         Set<File> indexes = new HashSet<>();
         for (LocalResource localResource : partitionResourcesMap.values()) {
             indexes.add(ioManager.resolve(localResource.getPath()).getFile());
@@ -400,14 +419,11 @@
         return indexes;
     }
 
-    public Map<Long, LocalResource> getPartitionResources(int partition) throws HyracksDataException {
-        return getResources(resource -> {
-            DatasetLocalResource dsResource = (DatasetLocalResource) resource.getResource();
-            return dsResource.getPartition() == partition;
-        });
+    public synchronized Map<Long, LocalResource> getPartitionResources(int partition) throws HyracksDataException {
+        return getResources(r -> true, Collections.singleton(partition));
     }
 
-    public Map<String, Long> getPartitionReplicatedResources(int partition, IReplicationStrategy strategy)
+    public synchronized Map<String, Long> getPartitionReplicatedResources(int partition, IReplicationStrategy strategy)
             throws HyracksDataException {
         final Map<String, Long> partitionReplicatedResources = new HashMap<>();
         final Map<Long, LocalResource> partitionResources = getPartitionResources(partition);
@@ -421,7 +437,7 @@
         return partitionReplicatedResources;
     }
 
-    public List<String> getPartitionReplicatedFiles(int partition, IReplicationStrategy strategy)
+    public synchronized List<String> getPartitionReplicatedFiles(int partition, IReplicationStrategy strategy)
             throws HyracksDataException {
         final List<String> partitionReplicatedFiles = new ArrayList<>();
         final Set<File> replicatedIndexes = new HashSet<>();
@@ -438,7 +454,7 @@
         return partitionReplicatedFiles;
     }
 
-    public long getReplicatedIndexesMaxComponentId(int partition, IReplicationStrategy strategy)
+    public synchronized long getReplicatedIndexesMaxComponentId(int partition, IReplicationStrategy strategy)
             throws HyracksDataException {
         long maxComponentId = LSMComponentId.MIN_VALID_COMPONENT_ID;
         final Map<Long, LocalResource> partitionResources = getPartitionResources(partition);
@@ -474,7 +490,7 @@
         }
     }
 
-    public void cleanup(int partition) throws HyracksDataException {
+    public synchronized void cleanup(int partition) throws HyracksDataException {
         final Set<File> partitionIndexes = getPartitionIndexes(partition);
         try {
             for (File index : partitionIndexes) {
@@ -501,7 +517,7 @@
         return resourcesStats;
     }
 
-    public void deleteCorruptedResources() throws HyracksDataException {
+    public synchronized void deleteCorruptedResources() throws HyracksDataException {
         for (Path root : storageRoots) {
             final Collection<File> metadataMaskFiles =
                     FileUtils.listFiles(root.toFile(), METADATA_MASK_FILES_FILTER, ALL_DIR_FILTER);
@@ -601,12 +617,13 @@
         return null;
     }
 
-    public long getDatasetSize(DatasetCopyIdentifier datasetIdentifier) throws HyracksDataException {
+    public long getDatasetSize(DatasetCopyIdentifier datasetIdentifier, Set<Integer> nodePartitions)
+            throws HyracksDataException {
         long totalSize = 0;
         final Map<Long, LocalResource> dataverse = getResources(lr -> {
             final ResourceReference resourceReference = ResourceReference.ofIndex(lr.getPath());
             return datasetIdentifier.isMatch(resourceReference);
-        });
+        }, nodePartitions);
         final List<DatasetResourceReference> allResources =
                 dataverse.values().stream().map(DatasetResourceReference::of).collect(Collectors.toList());
         for (DatasetResourceReference res : allResources) {
@@ -644,11 +661,11 @@
         return COMPONENT_FILES_FILTER.accept(indexDir, fileName);
     }
 
-    public Path[] getStorageRoots() {
+    public List<Path> getStorageRoots() {
         return storageRoots;
     }
 
-    public void keepPartitions(Set<Integer> keepPartitions) {
+    public synchronized void keepPartitions(Set<Integer> keepPartitions) {
         List<File> onDiskPartitions = getOnDiskPartitions();
         for (File onDiskPartition : onDiskPartitions) {
             int partitionNum = StoragePathUtil.getPartitionNumFromRelativePath(onDiskPartition.getAbsolutePath());
@@ -660,7 +677,7 @@
         }
     }
 
-    public List<File> getOnDiskPartitions() {
+    public synchronized List<File> getOnDiskPartitions() {
         List<File> onDiskPartitions = new ArrayList<>();
         for (Path root : storageRoots) {
             File[] partitions = root.toFile().listFiles(
@@ -671,4 +688,11 @@
         }
         return onDiskPartitions;
     }
+
+    public Path getPartitionRoot(int partition) throws HyracksDataException {
+        Path path =
+                Paths.get(StorageConstants.STORAGE_ROOT_DIR_NAME, StorageConstants.PARTITION_DIR_PREFIX + partition);
+        FileReference resolve = ioManager.resolve(path.toString());
+        return resolve.getFile().toPath();
+    }
 }