[NO ISSUE][STO] Ensure resources file operations are synchronized
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- To avoid an operation reading a partially written resource file,
ensure all such operations are synchronized.
- Limit partition resources search to the partition's root directory.
Change-Id: I95f8565780675798393d7d43c0051c04e2b0a98c
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/13164
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 4f5a9f8..6736642 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -291,7 +291,7 @@
final IIndexCheckpointManagerProvider indexCheckpointManagerProvider =
((INcApplicationContext) (serviceCtx.getApplicationContext())).getIndexCheckpointManagerProvider();
- Map<Long, LocalResource> resourcesMap = localResourceRepository.loadAndGetAllResources();
+ Map<Long, LocalResource> resourcesMap = localResourceRepository.getResources(r -> true, partitions);
final Map<Long, Long> resourceId2MaxLSNMap = new HashMap<>();
TxnEntityId tempKeyTxnEntityId = new TxnEntityId(-1, -1, -1, null, -1, false);
@@ -503,7 +503,8 @@
final List<DatasetResourceReference> partitionResources = localResourceRepository.getResources(resource -> {
DatasetLocalResource dsResource = (DatasetLocalResource) resource.getResource();
return dsResource.getPartition() == partition;
- }).values().stream().map(DatasetResourceReference::of).collect(Collectors.toList());
+ }, Collections.singleton(partition)).values().stream().map(DatasetResourceReference::of)
+ .collect(Collectors.toList());
for (DatasetResourceReference indexRef : partitionResources) {
try {
final IIndexCheckpointManager idxCheckpointMgr = idxCheckpointMgrProvider.get(indexRef);
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java
index 57463cb..004b640 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java
@@ -19,6 +19,7 @@
package org.apache.asterix.replication.logging;
import java.io.IOException;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -91,7 +92,8 @@
return dls.getDatasetId() == datasetId && dls.getPartition() == resourcePartition
&& !masterPartitions.contains(dls.getPartition());
};
- final Map<Long, LocalResource> resources = localResourceRep.getResources(replicaIndexesPredicate);
+ final Map<Long, LocalResource> resources =
+ localResourceRep.getResources(replicaIndexesPredicate, Collections.singleton(resourcePartition));
final List<DatasetResourceReference> replicaIndexesRef =
resources.values().stream().map(DatasetResourceReference::of).collect(Collectors.toList());
for (DatasetResourceReference replicaIndexRef : replicaIndexesRef) {
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
index 3ee3094..7f26b96 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
@@ -67,7 +67,7 @@
@Override
public void perform(INcApplicationContext appCtx, IReplicationWorker worker) {
try {
- LOGGER.info("attempting to replicate {}", this);
+ LOGGER.debug("attempting to receive file {} from master", this);
final IIOManager ioManager = appCtx.getIoManager();
// resolve path
final FileReference localPath = ioManager.resolve(file);
@@ -76,7 +76,6 @@
final Path maskPath = Paths.get(resourceDir.toString(),
StorageConstants.MASK_FILE_PREFIX + localPath.getFile().getName());
Files.createFile(maskPath);
-
// receive actual file
final Path filePath = Paths.get(resourceDir.toString(), localPath.getFile().getName());
Files.createFile(filePath);
@@ -91,7 +90,7 @@
}
//delete mask
Files.delete(maskPath);
- LOGGER.info(() -> "Replicated file: " + localPath);
+ LOGGER.info("received file {} from master", localPath);
ReplicationProtocol.sendAck(worker.getChannel(), worker.getReusableBuffer());
} catch (IOException e) {
throw new ReplicationException(e);
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 4d15385..ee5b16e 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -35,6 +35,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -138,7 +139,7 @@
private boolean isReplicationEnabled = false;
private Set<String> filesToBeReplicated;
private IReplicationManager replicationManager;
- private final Path[] storageRoots;
+ private final List<Path> storageRoots;
private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
private final IPersistedResourceRegistry persistedResourceRegistry;
@@ -148,11 +149,11 @@
this.ioManager = ioManager;
this.indexCheckpointManagerProvider = indexCheckpointManagerProvider;
this.persistedResourceRegistry = persistedResourceRegistry;
- storageRoots = new Path[ioManager.getIODevices().size()];
+ storageRoots = new ArrayList<>();
final List<IODeviceHandle> ioDevices = ioManager.getIODevices();
for (int i = 0; i < ioDevices.size(); i++) {
- storageRoots[i] =
- Paths.get(ioDevices.get(i).getMount().getAbsolutePath(), StorageConstants.STORAGE_ROOT_DIR_NAME);
+ storageRoots.add(
+ Paths.get(ioDevices.get(i).getMount().getAbsolutePath(), StorageConstants.STORAGE_ROOT_DIR_NAME));
}
createStorageRoots();
resourceCache = CacheBuilder.newBuilder().maximumSize(MAX_CACHED_RESOURCES).build();
@@ -262,10 +263,13 @@
return ioManager.resolve(fileName);
}
- public synchronized Map<Long, LocalResource> getResources(Predicate<LocalResource> filter)
+ public synchronized Map<Long, LocalResource> getResources(Predicate<LocalResource> filter, List<Path> roots)
throws HyracksDataException {
Map<Long, LocalResource> resourcesMap = new HashMap<>();
- for (Path root : storageRoots) {
+ for (Path root : roots) {
+ if (!Files.exists(root) || !Files.isDirectory(root)) {
+ continue;
+ }
final Collection<File> files = FileUtils.listFiles(root.toFile(), METADATA_FILES_FILTER, ALL_DIR_FILTER);
try {
for (File file : files) {
@@ -281,6 +285,20 @@
return resourcesMap;
}
+ public synchronized Map<Long, LocalResource> getResources(Predicate<LocalResource> filter)
+ throws HyracksDataException {
+ return getResources(filter, storageRoots);
+ }
+
+ public synchronized Map<Long, LocalResource> getResources(Predicate<LocalResource> filter, Set<Integer> partitions)
+ throws HyracksDataException {
+ List<Path> partitionsRoots = new ArrayList<>();
+ for (Integer partition : partitions) {
+ partitionsRoots.add(getPartitionRoot(partition));
+ }
+ return getResources(filter, partitionsRoots);
+ }
+
public synchronized void deleteInvalidIndexes(Predicate<LocalResource> filter) throws HyracksDataException {
for (Path root : storageRoots) {
final Collection<File> files = FileUtils.listFiles(root.toFile(), METADATA_FILES_FILTER, ALL_DIR_FILTER);
@@ -304,7 +322,7 @@
}
@Override
- public long maxId() throws HyracksDataException {
+ public synchronized long maxId() throws HyracksDataException {
final Map<Long, LocalResource> allResources = loadAndGetAllResources();
final Optional<Long> max = allResources.keySet().stream().max(Long::compare);
return max.isPresent() ? max.get() : 0;
@@ -330,7 +348,7 @@
}
}
- public void setReplicationManager(IReplicationManager replicationManager) {
+ public synchronized void setReplicationManager(IReplicationManager replicationManager) {
this.replicationManager = replicationManager;
isReplicationEnabled = replicationManager.isReplicationEnabled();
@@ -357,7 +375,7 @@
*
* @throws IOException
*/
- public void deleteStorageData() throws IOException {
+ public synchronized void deleteStorageData() throws IOException {
for (Path root : storageRoots) {
final File rootFile = root.toFile();
if (rootFile.exists()) {
@@ -367,13 +385,13 @@
createStorageRoots();
}
- public Set<Integer> getAllPartitions() throws HyracksDataException {
+ public synchronized Set<Integer> getAllPartitions() throws HyracksDataException {
return loadAndGetAllResources().values().stream().map(LocalResource::getResource)
.map(DatasetLocalResource.class::cast).map(DatasetLocalResource::getPartition)
.collect(Collectors.toSet());
}
- public Optional<DatasetResourceReference> getLocalResourceReference(String absoluteFilePath)
+ public synchronized Optional<DatasetResourceReference> getLocalResourceReference(String absoluteFilePath)
throws HyracksDataException {
final String localResourcePath = StoragePathUtil.getIndexFileRelativePath(absoluteFilePath);
final LocalResource lr = get(localResourcePath);
@@ -388,11 +406,12 @@
* @return The set of indexes files
* @throws HyracksDataException
*/
- public Set<File> getPartitionIndexes(int partition) throws HyracksDataException {
+ public synchronized Set<File> getPartitionIndexes(int partition) throws HyracksDataException {
+ Path partitionRoot = getPartitionRoot(partition);
final Map<Long, LocalResource> partitionResourcesMap = getResources(resource -> {
DatasetLocalResource dsResource = (DatasetLocalResource) resource.getResource();
return dsResource.getPartition() == partition;
- });
+ }, Collections.singletonList(partitionRoot));
Set<File> indexes = new HashSet<>();
for (LocalResource localResource : partitionResourcesMap.values()) {
indexes.add(ioManager.resolve(localResource.getPath()).getFile());
@@ -400,14 +419,11 @@
return indexes;
}
- public Map<Long, LocalResource> getPartitionResources(int partition) throws HyracksDataException {
- return getResources(resource -> {
- DatasetLocalResource dsResource = (DatasetLocalResource) resource.getResource();
- return dsResource.getPartition() == partition;
- });
+ public synchronized Map<Long, LocalResource> getPartitionResources(int partition) throws HyracksDataException {
+ return getResources(r -> true, Collections.singleton(partition));
}
- public Map<String, Long> getPartitionReplicatedResources(int partition, IReplicationStrategy strategy)
+ public synchronized Map<String, Long> getPartitionReplicatedResources(int partition, IReplicationStrategy strategy)
throws HyracksDataException {
final Map<String, Long> partitionReplicatedResources = new HashMap<>();
final Map<Long, LocalResource> partitionResources = getPartitionResources(partition);
@@ -421,7 +437,7 @@
return partitionReplicatedResources;
}
- public List<String> getPartitionReplicatedFiles(int partition, IReplicationStrategy strategy)
+ public synchronized List<String> getPartitionReplicatedFiles(int partition, IReplicationStrategy strategy)
throws HyracksDataException {
final List<String> partitionReplicatedFiles = new ArrayList<>();
final Set<File> replicatedIndexes = new HashSet<>();
@@ -438,7 +454,7 @@
return partitionReplicatedFiles;
}
- public long getReplicatedIndexesMaxComponentId(int partition, IReplicationStrategy strategy)
+ public synchronized long getReplicatedIndexesMaxComponentId(int partition, IReplicationStrategy strategy)
throws HyracksDataException {
long maxComponentId = LSMComponentId.MIN_VALID_COMPONENT_ID;
final Map<Long, LocalResource> partitionResources = getPartitionResources(partition);
@@ -474,7 +490,7 @@
}
}
- public void cleanup(int partition) throws HyracksDataException {
+ public synchronized void cleanup(int partition) throws HyracksDataException {
final Set<File> partitionIndexes = getPartitionIndexes(partition);
try {
for (File index : partitionIndexes) {
@@ -501,7 +517,7 @@
return resourcesStats;
}
- public void deleteCorruptedResources() throws HyracksDataException {
+ public synchronized void deleteCorruptedResources() throws HyracksDataException {
for (Path root : storageRoots) {
final Collection<File> metadataMaskFiles =
FileUtils.listFiles(root.toFile(), METADATA_MASK_FILES_FILTER, ALL_DIR_FILTER);
@@ -601,12 +617,13 @@
return null;
}
- public long getDatasetSize(DatasetCopyIdentifier datasetIdentifier) throws HyracksDataException {
+ public long getDatasetSize(DatasetCopyIdentifier datasetIdentifier, Set<Integer> nodePartitions)
+ throws HyracksDataException {
long totalSize = 0;
final Map<Long, LocalResource> dataverse = getResources(lr -> {
final ResourceReference resourceReference = ResourceReference.ofIndex(lr.getPath());
return datasetIdentifier.isMatch(resourceReference);
- });
+ }, nodePartitions);
final List<DatasetResourceReference> allResources =
dataverse.values().stream().map(DatasetResourceReference::of).collect(Collectors.toList());
for (DatasetResourceReference res : allResources) {
@@ -644,11 +661,11 @@
return COMPONENT_FILES_FILTER.accept(indexDir, fileName);
}
- public Path[] getStorageRoots() {
+ public List<Path> getStorageRoots() {
return storageRoots;
}
- public void keepPartitions(Set<Integer> keepPartitions) {
+ public synchronized void keepPartitions(Set<Integer> keepPartitions) {
List<File> onDiskPartitions = getOnDiskPartitions();
for (File onDiskPartition : onDiskPartitions) {
int partitionNum = StoragePathUtil.getPartitionNumFromRelativePath(onDiskPartition.getAbsolutePath());
@@ -660,7 +677,7 @@
}
}
- public List<File> getOnDiskPartitions() {
+ public synchronized List<File> getOnDiskPartitions() {
List<File> onDiskPartitions = new ArrayList<>();
for (Path root : storageRoots) {
File[] partitions = root.toFile().listFiles(
@@ -671,4 +688,11 @@
}
return onDiskPartitions;
}
+
+ public Path getPartitionRoot(int partition) throws HyracksDataException {
+ Path path =
+ Paths.get(StorageConstants.STORAGE_ROOT_DIR_NAME, StorageConstants.PARTITION_DIR_PREFIX + partition);
+ FileReference resolve = ioManager.resolve(path.toString());
+ return resolve.getFile().toPath();
+ }
}