[NO ISSUE][STO] Allow concurrent modification on persisted resources
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- To allow index's resources to be created on different partitions
concurrently, replace the synchronization by read/write lock in
PersistentLocalResourceRepository.
- Any operation that might modify the persisted files will acquire
a read lock.
- Any operation that attempts to read the persisted files will acquire
a write lock to wait for any on-going modifications.
Change-Id: Id435bfc113a0b8e3e2a1f75712f0ded74ae0ee6f
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17824
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 1eef182..33cbf2d 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -39,6 +39,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@@ -83,7 +84,6 @@
private static final Logger LOGGER = LogManager.getLogger();
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
private static final String METADATA_FILE_MASK_NAME =
StorageConstants.MASK_FILE_PREFIX + StorageConstants.METADATA_FILE_NAME;
private static final FilenameFilter LSM_INDEX_FILES_FILTER =
@@ -94,7 +94,6 @@
(dir, name) -> name.equals(StorageConstants.METADATA_FILE_NAME);
private static final FilenameFilter METADATA_MASK_FILES_FILTER =
(dir, name) -> name.equals(METADATA_FILE_MASK_NAME);
-
private static final int MAX_CACHED_RESOURCES = 1000;
// Finals
@@ -102,11 +101,11 @@
private final Cache<String, LocalResource> resourceCache;
// Mutables
private boolean isReplicationEnabled = false;
- private Set<String> filesToBeReplicated;
private IReplicationManager replicationManager;
private final List<FileReference> storageRoots;
private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
private final IPersistedResourceRegistry persistedResourceRegistry;
+ private final ReentrantReadWriteLock resourcesAccessLock = new ReentrantReadWriteLock(true);
public PersistentLocalResourceRepository(IIOManager ioManager,
IIndexCheckpointManagerProvider indexCheckpointManagerProvider,
@@ -135,23 +134,29 @@
}
@Override
- public synchronized LocalResource get(String relativePath) throws HyracksDataException {
- LocalResource resource = resourceCache.getIfPresent(relativePath);
- if (resource == null) {
- FileReference resourceFile = getLocalResourceFileByName(ioManager, relativePath);
- resource = readLocalResource(resourceFile);
- if (resource != null) {
- resourceCache.put(relativePath, resource);
+ public LocalResource get(String relativePath) throws HyracksDataException {
+ beforeReadAccess();
+ try {
+ LocalResource resource = resourceCache.getIfPresent(relativePath);
+ if (resource == null) {
+ FileReference resourceFile = getLocalResourceFileByName(ioManager, relativePath);
+ resource = readLocalResource(resourceFile);
+ if (resource != null) {
+ resourceCache.put(relativePath, resource);
+ }
}
+ return resource;
+ } finally {
+ afterReadAccess();
}
- return resource;
}
@SuppressWarnings("squid:S1181")
@Override
public void insert(LocalResource resource) throws HyracksDataException {
FileReference resourceFile;
- synchronized (this) {
+ beforeWriteAccess();
+ try {
String relativePath = getFileName(resource.getPath());
resourceFile = ioManager.resolve(relativePath);
if (resourceFile.getFile().exists()) {
@@ -178,6 +183,8 @@
ExitUtil.halt(ExitUtil.EC_ERROR_CREATING_RESOURCES);
}
resourceCache.put(resource.getPath(), resource);
+ } finally {
+ afterWriteAccess();
}
// do not do the replication operation on the synchronized to avoid blocking other threads
// on network operations
@@ -216,7 +223,8 @@
LOGGER.error("failed to delete resource file {} from replicas", resourceFile);
}
}
- synchronized (this) {
+ beforeWriteAccess();
+ try {
try {
if (resourceExists) {
ioManager.delete(resourceFile);
@@ -229,6 +237,8 @@
} finally {
invalidateResource(relativePath);
}
+ } finally {
+ afterWriteAccess();
}
}
@@ -238,72 +248,101 @@
return ioManager.resolve(fileName);
}
- public synchronized Map<Long, LocalResource> getResources(Predicate<LocalResource> filter,
- List<FileReference> roots) throws HyracksDataException {
- Map<Long, LocalResource> resourcesMap = new HashMap<>();
- for (FileReference root : roots) {
- final Collection<FileReference> files = ioManager.list(root, METADATA_FILES_FILTER);
- try {
- for (FileReference file : files) {
- final LocalResource localResource = readLocalResource(file);
- if (localResource != null && filter.test(localResource)) {
- LocalResource duplicate = resourcesMap.putIfAbsent(localResource.getId(), localResource);
- if (duplicate != null) {
- LOGGER.warn("found duplicate resource ids {} and {}", localResource, duplicate);
+ public Map<Long, LocalResource> getResources(Predicate<LocalResource> filter, List<FileReference> roots)
+ throws HyracksDataException {
+ beforeReadAccess();
+ try {
+ Map<Long, LocalResource> resourcesMap = new HashMap<>();
+ for (FileReference root : roots) {
+ final Collection<FileReference> files = ioManager.list(root, METADATA_FILES_FILTER);
+ try {
+ for (FileReference file : files) {
+ final LocalResource localResource = readLocalResource(file);
+ if (localResource != null && filter.test(localResource)) {
+ LocalResource duplicate = resourcesMap.putIfAbsent(localResource.getId(), localResource);
+ if (duplicate != null) {
+ LOGGER.warn("found duplicate resource ids {} and {}", localResource, duplicate);
+ }
}
}
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
}
- } catch (IOException e) {
- throw HyracksDataException.create(e);
}
+ return resourcesMap;
+ } finally {
+ afterReadAccess();
}
- return resourcesMap;
}
- public synchronized Map<Long, LocalResource> getResources(Predicate<LocalResource> filter)
- throws HyracksDataException {
- return getResources(filter, storageRoots);
- }
-
- public synchronized Map<Long, LocalResource> getResources(Predicate<LocalResource> filter, Set<Integer> partitions)
- throws HyracksDataException {
- List<FileReference> partitionsRoots = new ArrayList<>();
- for (Integer partition : partitions) {
- partitionsRoots.add(getPartitionRoot(partition));
+ public Map<Long, LocalResource> getResources(Predicate<LocalResource> filter) throws HyracksDataException {
+ beforeReadAccess();
+ try {
+ return getResources(filter, storageRoots);
+ } finally {
+ afterReadAccess();
}
- return getResources(filter, partitionsRoots);
}
- public synchronized void deleteInvalidIndexes(Predicate<LocalResource> filter) throws HyracksDataException {
- IIOBulkOperation bulkDelete = ioManager.createDeleteBulkOperation();
- for (FileReference root : storageRoots) {
- final Collection<FileReference> files = ioManager.list(root, METADATA_FILES_FILTER);
- try {
- for (FileReference file : files) {
- final LocalResource localResource = readLocalResource(file);
- if (localResource != null && filter.test(localResource)) {
- FileReference parent = file.getParent();
- LOGGER.warn("deleting invalid metadata index {}", parent);
- bulkDelete.add(parent);
+ public Map<Long, LocalResource> getResources(Predicate<LocalResource> filter, Set<Integer> partitions)
+ throws HyracksDataException {
+ beforeReadAccess();
+ try {
+ List<FileReference> partitionsRoots = new ArrayList<>();
+ for (Integer partition : partitions) {
+ partitionsRoots.add(getPartitionRoot(partition));
+ }
+ return getResources(filter, partitionsRoots);
+ } finally {
+ afterReadAccess();
+ }
+ }
+
+ public void deleteInvalidIndexes(Predicate<LocalResource> filter) throws HyracksDataException {
+ beforeReadAccess();
+ try {
+ IIOBulkOperation bulkDelete = ioManager.createDeleteBulkOperation();
+ for (FileReference root : storageRoots) {
+ final Collection<FileReference> files = ioManager.list(root, METADATA_FILES_FILTER);
+ try {
+ for (FileReference file : files) {
+ final LocalResource localResource = readLocalResource(file);
+ if (localResource != null && filter.test(localResource)) {
+ FileReference parent = file.getParent();
+ LOGGER.warn("deleting invalid metadata index {}", parent);
+ bulkDelete.add(parent);
+ }
}
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
}
- } catch (IOException e) {
- throw HyracksDataException.create(e);
}
+ ioManager.performBulkOperation(bulkDelete);
+ resourceCache.invalidateAll();
+ } finally {
+ afterReadAccess();
}
- ioManager.performBulkOperation(bulkDelete);
- resourceCache.invalidateAll();
}
public Map<Long, LocalResource> loadAndGetAllResources() throws HyracksDataException {
- return getResources(p -> true);
+ beforeReadAccess();
+ try {
+ return getResources(p -> true);
+ } finally {
+ afterReadAccess();
+ }
}
@Override
- public synchronized long maxId() throws HyracksDataException {
- final Map<Long, LocalResource> allResources = loadAndGetAllResources();
- final Optional<Long> max = allResources.keySet().stream().max(Long::compare);
- return max.isPresent() ? max.get() : 0;
+ public long maxId() throws HyracksDataException {
+ beforeReadAccess();
+ try {
+ final Map<Long, LocalResource> allResources = loadAndGetAllResources();
+ final Optional<Long> max = allResources.keySet().stream().max(Long::compare);
+ return max.isPresent() ? max.get() : 0;
+ } finally {
+ afterReadAccess();
+ }
}
public void invalidateResource(String relativePath) {
@@ -320,39 +359,42 @@
}
private LocalResource readLocalResource(FileReference fileRef) throws HyracksDataException {
- byte[] bytes = ioManager.readAllBytes(fileRef);
- if (bytes == null) {
- return null;
- }
-
+ beforeReadAccess();
try {
- final JsonNode jsonNode = OBJECT_MAPPER.readValue(bytes, JsonNode.class);
- LocalResource resource = (LocalResource) persistedResourceRegistry.deserialize(jsonNode);
- if (resource.getVersion() == ITreeIndexFrame.Constants.VERSION) {
- return resource;
- } else {
- throw new AsterixException("Storage version mismatch.");
+ byte[] bytes = ioManager.readAllBytes(fileRef);
+ if (bytes == null) {
+ return null;
}
- } catch (Exception e) {
- throw HyracksDataException.create(e);
+ try {
+ final JsonNode jsonNode = OBJECT_MAPPER.readValue(bytes, JsonNode.class);
+ LocalResource resource = (LocalResource) persistedResourceRegistry.deserialize(jsonNode);
+ if (resource.getVersion() == ITreeIndexFrame.Constants.VERSION) {
+ return resource;
+ } else {
+ throw new AsterixException("Storage version mismatch.");
+ }
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
+ } finally {
+ afterReadAccess();
}
}
- public synchronized void setReplicationManager(IReplicationManager replicationManager) {
- this.replicationManager = replicationManager;
- isReplicationEnabled = replicationManager.isReplicationEnabled();
-
- if (isReplicationEnabled) {
- filesToBeReplicated = new HashSet<>();
+ public void setReplicationManager(IReplicationManager replicationManager) {
+ beforeWriteAccess();
+ try {
+ this.replicationManager = replicationManager;
+ isReplicationEnabled = replicationManager.isReplicationEnabled();
+ } finally {
+ afterWriteAccess();
}
}
private void createReplicationJob(ReplicationOperation operation, FileReference fileRef)
throws HyracksDataException {
- filesToBeReplicated.clear();
- filesToBeReplicated.add(fileRef.getAbsolutePath());
ReplicationJob job = new ReplicationJob(ReplicationJobType.METADATA, operation, ReplicationExecutionType.SYNC,
- filesToBeReplicated);
+ Set.of(fileRef.getAbsolutePath()));
try {
replicationManager.submitJob(job);
} catch (IOException e) {
@@ -363,26 +405,41 @@
/**
* Deletes physical files of all data verses.
*/
- public synchronized void deleteStorageData() throws HyracksDataException {
- IIOBulkOperation bulkDelete = ioManager.createDeleteBulkOperation();
- for (FileReference root : storageRoots) {
- bulkDelete.add(root);
+ public void deleteStorageData() throws HyracksDataException {
+ beforeWriteAccess();
+ try {
+ IIOBulkOperation bulkDelete = ioManager.createDeleteBulkOperation();
+ for (FileReference root : storageRoots) {
+ bulkDelete.add(root);
+ }
+ ioManager.performBulkOperation(bulkDelete);
+ createStorageRoots();
+ } finally {
+ afterWriteAccess();
}
- ioManager.performBulkOperation(bulkDelete);
- createStorageRoots();
}
- public synchronized Set<Integer> getAllPartitions() throws HyracksDataException {
- return loadAndGetAllResources().values().stream().map(LocalResource::getResource)
- .map(DatasetLocalResource.class::cast).map(DatasetLocalResource::getPartition)
- .collect(Collectors.toSet());
+ public Set<Integer> getAllPartitions() throws HyracksDataException {
+ beforeReadAccess();
+ try {
+ return loadAndGetAllResources().values().stream().map(LocalResource::getResource)
+ .map(DatasetLocalResource.class::cast).map(DatasetLocalResource::getPartition)
+ .collect(Collectors.toSet());
+ } finally {
+ afterReadAccess();
+ }
}
- public synchronized Optional<DatasetResourceReference> getLocalResourceReference(String absoluteFilePath)
+ public Optional<DatasetResourceReference> getLocalResourceReference(String absoluteFilePath)
throws HyracksDataException {
- final String localResourcePath = StoragePathUtil.getIndexFileRelativePath(absoluteFilePath);
- final LocalResource lr = get(localResourcePath);
- return lr != null ? Optional.of(DatasetResourceReference.of(lr)) : Optional.empty();
+ beforeReadAccess();
+ try {
+ final String localResourcePath = StoragePathUtil.getIndexFileRelativePath(absoluteFilePath);
+ final LocalResource lr = get(localResourcePath);
+ return lr != null ? Optional.of(DatasetResourceReference.of(lr)) : Optional.empty();
+ } finally {
+ afterReadAccess();
+ }
}
/**
@@ -393,67 +450,92 @@
* @return The set of indexes files
* @throws HyracksDataException
*/
- public synchronized Set<FileReference> getPartitionIndexes(int partition) throws HyracksDataException {
- FileReference partitionRoot = getPartitionRoot(partition);
- final Map<Long, LocalResource> partitionResourcesMap = getResources(resource -> {
- DatasetLocalResource dsResource = (DatasetLocalResource) resource.getResource();
- return dsResource.getPartition() == partition;
- }, Collections.singletonList(partitionRoot));
- Set<FileReference> indexes = new HashSet<>();
- for (LocalResource localResource : partitionResourcesMap.values()) {
- indexes.add(ioManager.resolve(localResource.getPath()));
- }
- return indexes;
- }
-
- public synchronized Map<Long, LocalResource> getPartitionResources(int partition) throws HyracksDataException {
- return getResources(r -> true, Collections.singleton(partition));
- }
-
- public synchronized Map<String, Long> getPartitionReplicatedResources(int partition, IReplicationStrategy strategy)
- throws HyracksDataException {
- final Map<String, Long> partitionReplicatedResources = new HashMap<>();
- final Map<Long, LocalResource> partitionResources = getPartitionResources(partition);
- for (LocalResource lr : partitionResources.values()) {
- DatasetLocalResource datasetLocalResource = (DatasetLocalResource) lr.getResource();
- if (strategy.isMatch(datasetLocalResource.getDatasetId())) {
- DatasetResourceReference drr = DatasetResourceReference.of(lr);
- partitionReplicatedResources.put(drr.getFileRelativePath().toString(), lr.getId());
+ public Set<FileReference> getPartitionIndexes(int partition) throws HyracksDataException {
+ beforeReadAccess();
+ try {
+ FileReference partitionRoot = getPartitionRoot(partition);
+ final Map<Long, LocalResource> partitionResourcesMap = getResources(resource -> {
+ DatasetLocalResource dsResource = (DatasetLocalResource) resource.getResource();
+ return dsResource.getPartition() == partition;
+ }, Collections.singletonList(partitionRoot));
+ Set<FileReference> indexes = new HashSet<>();
+ for (LocalResource localResource : partitionResourcesMap.values()) {
+ indexes.add(ioManager.resolve(localResource.getPath()));
}
+ return indexes;
+ } finally {
+ afterReadAccess();
}
- return partitionReplicatedResources;
}
- public synchronized List<String> getPartitionReplicatedFiles(int partition, IReplicationStrategy strategy)
- throws HyracksDataException {
- final List<String> partitionReplicatedFiles = new ArrayList<>();
- final Set<FileReference> replicatedIndexes = new HashSet<>();
- final Map<Long, LocalResource> partitionResources = getPartitionResources(partition);
- for (LocalResource lr : partitionResources.values()) {
- DatasetLocalResource datasetLocalResource = (DatasetLocalResource) lr.getResource();
- if (strategy.isMatch(datasetLocalResource.getDatasetId())) {
- replicatedIndexes.add(ioManager.resolve(lr.getPath()));
- }
+ public Map<Long, LocalResource> getPartitionResources(int partition) throws HyracksDataException {
+ beforeReadAccess();
+ try {
+ return getResources(r -> true, Collections.singleton(partition));
+ } finally {
+ afterReadAccess();
}
- for (FileReference indexDir : replicatedIndexes) {
- partitionReplicatedFiles.addAll(getIndexFiles(indexDir));
- }
- return partitionReplicatedFiles;
}
- public synchronized long getReplicatedIndexesMaxComponentId(int partition, IReplicationStrategy strategy)
+ public Map<String, Long> getPartitionReplicatedResources(int partition, IReplicationStrategy strategy)
throws HyracksDataException {
- long maxComponentId = LSMComponentId.MIN_VALID_COMPONENT_ID;
- final Map<Long, LocalResource> partitionResources = getPartitionResources(partition);
- for (LocalResource lr : partitionResources.values()) {
- DatasetLocalResource datasetLocalResource = (DatasetLocalResource) lr.getResource();
- if (strategy.isMatch(datasetLocalResource.getDatasetId())) {
- final IIndexCheckpointManager indexCheckpointManager =
- indexCheckpointManagerProvider.get(DatasetResourceReference.of(lr));
- maxComponentId = Math.max(maxComponentId, indexCheckpointManager.getLatest().getLastComponentId());
+ beforeReadAccess();
+ try {
+ final Map<String, Long> partitionReplicatedResources = new HashMap<>();
+ final Map<Long, LocalResource> partitionResources = getPartitionResources(partition);
+ for (LocalResource lr : partitionResources.values()) {
+ DatasetLocalResource datasetLocalResource = (DatasetLocalResource) lr.getResource();
+ if (strategy.isMatch(datasetLocalResource.getDatasetId())) {
+ DatasetResourceReference drr = DatasetResourceReference.of(lr);
+ partitionReplicatedResources.put(drr.getFileRelativePath().toString(), lr.getId());
+ }
}
+ return partitionReplicatedResources;
+ } finally {
+ afterReadAccess();
}
- return maxComponentId;
+ }
+
+ public List<String> getPartitionReplicatedFiles(int partition, IReplicationStrategy strategy)
+ throws HyracksDataException {
+ beforeReadAccess();
+ try {
+ final List<String> partitionReplicatedFiles = new ArrayList<>();
+ final Set<FileReference> replicatedIndexes = new HashSet<>();
+ final Map<Long, LocalResource> partitionResources = getPartitionResources(partition);
+ for (LocalResource lr : partitionResources.values()) {
+ DatasetLocalResource datasetLocalResource = (DatasetLocalResource) lr.getResource();
+ if (strategy.isMatch(datasetLocalResource.getDatasetId())) {
+ replicatedIndexes.add(ioManager.resolve(lr.getPath()));
+ }
+ }
+ for (FileReference indexDir : replicatedIndexes) {
+ partitionReplicatedFiles.addAll(getIndexFiles(indexDir));
+ }
+ return partitionReplicatedFiles;
+ } finally {
+ afterReadAccess();
+ }
+ }
+
+ public long getReplicatedIndexesMaxComponentId(int partition, IReplicationStrategy strategy)
+ throws HyracksDataException {
+ beforeReadAccess();
+ try {
+ long maxComponentId = LSMComponentId.MIN_VALID_COMPONENT_ID;
+ final Map<Long, LocalResource> partitionResources = getPartitionResources(partition);
+ for (LocalResource lr : partitionResources.values()) {
+ DatasetLocalResource datasetLocalResource = (DatasetLocalResource) lr.getResource();
+ if (strategy.isMatch(datasetLocalResource.getDatasetId())) {
+ final IIndexCheckpointManager indexCheckpointManager =
+ indexCheckpointManagerProvider.get(DatasetResourceReference.of(lr));
+ maxComponentId = Math.max(maxComponentId, indexCheckpointManager.getLatest().getLastComponentId());
+ }
+ }
+ return maxComponentId;
+ } finally {
+ afterReadAccess();
+ }
}
private List<String> getIndexFiles(FileReference indexDir) throws HyracksDataException {
@@ -469,44 +551,59 @@
}
}
- public synchronized void cleanup(int partition) throws HyracksDataException {
- final Set<FileReference> partitionIndexes = getPartitionIndexes(partition);
+ public void cleanup(int partition) throws HyracksDataException {
+ beforeReadAccess();
try {
- for (FileReference index : partitionIndexes) {
- deleteIndexMaskedFiles(index);
- if (isValidIndex(index)) {
- deleteIndexInvalidComponents(index);
+ final Set<FileReference> partitionIndexes = getPartitionIndexes(partition);
+ try {
+ for (FileReference index : partitionIndexes) {
+ deleteIndexMaskedFiles(index);
+ if (isValidIndex(index)) {
+ deleteIndexInvalidComponents(index);
+ }
}
+ } catch (IOException | ParseException e) {
+ throw HyracksDataException.create(e);
}
- } catch (IOException | ParseException e) {
- throw HyracksDataException.create(e);
+ } finally {
+ afterReadAccess();
}
}
public List<ResourceStorageStats> getStorageStats() throws HyracksDataException {
- final List<DatasetResourceReference> allResources = loadAndGetAllResources().values().stream()
- .map(DatasetResourceReference::of).collect(Collectors.toList());
- final List<ResourceStorageStats> resourcesStats = new ArrayList<>();
- for (DatasetResourceReference res : allResources) {
- final ResourceStorageStats resourceStats = getResourceStats(res);
- if (resourceStats != null) {
- resourcesStats.add(resourceStats);
+ beforeReadAccess();
+ try {
+ final List<DatasetResourceReference> allResources = loadAndGetAllResources().values().stream()
+ .map(DatasetResourceReference::of).collect(Collectors.toList());
+ final List<ResourceStorageStats> resourcesStats = new ArrayList<>();
+ for (DatasetResourceReference res : allResources) {
+ final ResourceStorageStats resourceStats = getResourceStats(res);
+ if (resourceStats != null) {
+ resourcesStats.add(resourceStats);
+ }
}
+ return resourcesStats;
+ } finally {
+ afterReadAccess();
}
- return resourcesStats;
}
- public synchronized void deleteCorruptedResources() throws HyracksDataException {
- IIOBulkOperation bulkDelete = ioManager.createDeleteBulkOperation();
- for (FileReference root : storageRoots) {
- final Collection<FileReference> metadataMaskFiles = ioManager.list(root, METADATA_MASK_FILES_FILTER);
- for (FileReference metadataMaskFile : metadataMaskFiles) {
- final FileReference resourceFile = metadataMaskFile.getParent().getChild(METADATA_FILE_NAME);
- bulkDelete.add(resourceFile);
- bulkDelete.add(metadataMaskFile);
+ public void deleteCorruptedResources() throws HyracksDataException {
+ beforeWriteAccess();
+ try {
+ IIOBulkOperation bulkDelete = ioManager.createDeleteBulkOperation();
+ for (FileReference root : storageRoots) {
+ final Collection<FileReference> metadataMaskFiles = ioManager.list(root, METADATA_MASK_FILES_FILTER);
+ for (FileReference metadataMaskFile : metadataMaskFiles) {
+ final FileReference resourceFile = metadataMaskFile.getParent().getChild(METADATA_FILE_NAME);
+ bulkDelete.add(resourceFile);
+ bulkDelete.add(metadataMaskFile);
+ }
}
+ ioManager.performBulkOperation(bulkDelete);
+ } finally {
+ afterWriteAccess();
}
- ioManager.performBulkOperation(bulkDelete);
}
private void deleteIndexMaskedFiles(FileReference index) throws IOException {
@@ -601,20 +698,25 @@
public long getDatasetSize(DatasetCopyIdentifier datasetIdentifier, Set<Integer> nodePartitions)
throws HyracksDataException {
- long totalSize = 0;
- final Map<Long, LocalResource> dataverse = getResources(lr -> {
- final ResourceReference resourceReference = ResourceReference.ofIndex(lr.getPath());
- return datasetIdentifier.isMatch(resourceReference);
- }, nodePartitions);
- final List<DatasetResourceReference> allResources =
- dataverse.values().stream().map(DatasetResourceReference::of).collect(Collectors.toList());
- for (DatasetResourceReference res : allResources) {
- final ResourceStorageStats resourceStats = getResourceStats(res);
- if (resourceStats != null) {
- totalSize += resourceStats.getTotalSize();
+ beforeReadAccess();
+ try {
+ long totalSize = 0;
+ final Map<Long, LocalResource> dataverse = getResources(lr -> {
+ final ResourceReference resourceReference = ResourceReference.ofIndex(lr.getPath());
+ return datasetIdentifier.isMatch(resourceReference);
+ }, nodePartitions);
+ final List<DatasetResourceReference> allResources =
+ dataverse.values().stream().map(DatasetResourceReference::of).collect(Collectors.toList());
+ for (DatasetResourceReference res : allResources) {
+ final ResourceStorageStats resourceStats = getResourceStats(res);
+ if (resourceStats != null) {
+ totalSize += resourceStats.getTotalSize();
+ }
}
+ return totalSize;
+ } finally {
+ afterReadAccess();
}
- return totalSize;
}
private void createResourceFileMask(FileReference resourceFile) throws HyracksDataException {
@@ -644,13 +746,18 @@
return COMPONENT_FILES_FILTER.accept(indexDir.getFile(), fileName);
}
- public synchronized List<FileReference> getOnDiskPartitions() {
- List<FileReference> onDiskPartitions = new ArrayList<>();
- for (FileReference root : storageRoots) {
- onDiskPartitions.addAll(IoUtil.getMatchingChildren(root, (dir, name) -> dir != null && dir.isDirectory()
- && name.startsWith(StorageConstants.PARTITION_DIR_PREFIX)));
+ public List<FileReference> getOnDiskPartitions() {
+ beforeReadAccess();
+ try {
+ List<FileReference> onDiskPartitions = new ArrayList<>();
+ for (FileReference root : storageRoots) {
+ onDiskPartitions.addAll(IoUtil.getMatchingChildren(root, (dir, name) -> dir != null && dir.isDirectory()
+ && name.startsWith(StorageConstants.PARTITION_DIR_PREFIX)));
+ }
+ return onDiskPartitions;
+ } finally {
+ afterReadAccess();
}
- return onDiskPartitions;
}
public FileReference getPartitionRoot(int partition) throws HyracksDataException {
@@ -660,16 +767,37 @@
}
public void deletePartition(int partitionId) throws HyracksDataException {
- Collection<FileReference> onDiskPartitions = getOnDiskPartitions();
- IIOBulkOperation bulkDelete = ioManager.createDeleteBulkOperation();
- for (FileReference onDiskPartition : onDiskPartitions) {
- int partitionNum = StoragePathUtil.getPartitionNumFromRelativePath(onDiskPartition.getAbsolutePath());
- if (partitionNum == partitionId) {
- LOGGER.warn("deleting partition {}", partitionNum);
- bulkDelete.add(onDiskPartition);
- break;
+ beforeReadAccess();
+ try {
+ Collection<FileReference> onDiskPartitions = getOnDiskPartitions();
+ IIOBulkOperation bulkDelete = ioManager.createDeleteBulkOperation();
+ for (FileReference onDiskPartition : onDiskPartitions) {
+ int partitionNum = StoragePathUtil.getPartitionNumFromRelativePath(onDiskPartition.getAbsolutePath());
+ if (partitionNum == partitionId) {
+ LOGGER.warn("deleting partition {}", partitionNum);
+ bulkDelete.add(onDiskPartition);
+ break;
+ }
}
+ ioManager.performBulkOperation(bulkDelete);
+ } finally {
+ afterReadAccess();
}
- ioManager.performBulkOperation(bulkDelete);
+ }
+
+ private void beforeWriteAccess() {
+ resourcesAccessLock.readLock().lock();
+ }
+
+ private void afterWriteAccess() {
+ resourcesAccessLock.readLock().unlock();
+ }
+
+ private void beforeReadAccess() {
+ resourcesAccessLock.writeLock().lock();
+ }
+
+ private void afterReadAccess() {
+ resourcesAccessLock.writeLock().unlock();
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/build/IndexBuilder.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/build/IndexBuilder.java
index 1c0d7b4..8a67c71 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/build/IndexBuilder.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/build/IndexBuilder.java
@@ -68,49 +68,47 @@
@Override
public void build() throws HyracksDataException {
IResourceLifecycleManager<IIndex> lcManager = storageManager.getLifecycleManager(ctx);
- synchronized (lcManager) {
- // The previous resource Id needs to be removed since calling IIndex.create() may possibly destroy any
- // physical artifact that the LocalResourceRepository is managing (e.g. a file containing the resource Id).
- // Once the index has been created, a new resource Id can be generated.
- ILocalResourceRepository localResourceRepository = storageManager.getLocalResourceRepository(ctx);
- LocalResource lr = localResourceRepository.get(resourceRelPath);
- long resourceId = lr == null ? -1 : lr.getId();
- if (resourceId != -1) {
- localResourceRepository.delete(resourceRelPath);
- }
- resourceId = resourceIdFactory.createId();
- IResource resource = localResourceFactory.createResource(resourceRef);
- lr = new LocalResource(resourceId, ITreeIndexFrame.Constants.VERSION, durable, resource);
- IIndex index = lcManager.get(resourceRelPath);
- if (index != null) {
- //how is this right?????????? <needs to be fixed>
- //The reason for this is to handle many cases such as:
- //1. Crash while delete index is running (we don't do global cleanup on restart)
- //2. Node leaves and then join with old data
- LOGGER.log(Level.WARN, "Removing existing index on index create for the index: " + resourceRelPath);
- lcManager.unregister(resourceRelPath);
- index.destroy();
- } else {
- final FileReference resolvedResourceRef = ctx.getIoManager().resolve(resourceRelPath);
- if (resolvedResourceRef.getFile().exists()) {
- // Index is not registered but the index file exists
- // This is another big problem that we need to disallow soon
- // We can only disallow this if we have a global cleanup after crash
- // on reboot
- LOGGER.warn(
- "Deleting {} on index create. The index is not registered but the file exists in the filesystem",
- resolvedResourceRef);
- ctx.getIoManager().delete(resolvedResourceRef);
- }
- index = resource.createInstance(ctx);
- }
- index.create();
- try {
- localResourceRepository.insert(lr);
- } catch (IOException e) {
- throw HyracksDataException.create(e);
- }
- lcManager.register(resourceRelPath, index);
+ // The previous resource Id needs to be removed since calling IIndex.create() may possibly destroy any
+ // physical artifact that the LocalResourceRepository is managing (e.g. a file containing the resource Id).
+ // Once the index has been created, a new resource Id can be generated.
+ ILocalResourceRepository localResourceRepository = storageManager.getLocalResourceRepository(ctx);
+ LocalResource lr = localResourceRepository.get(resourceRelPath);
+ long resourceId = lr == null ? -1 : lr.getId();
+ if (resourceId != -1) {
+ localResourceRepository.delete(resourceRelPath);
}
+ resourceId = resourceIdFactory.createId();
+ IResource resource = localResourceFactory.createResource(resourceRef);
+ lr = new LocalResource(resourceId, ITreeIndexFrame.Constants.VERSION, durable, resource);
+ IIndex index = lcManager.get(resourceRelPath);
+ if (index != null) {
+ //how is this right?????????? <needs to be fixed>
+ //The reason for this is to handle many cases such as:
+ //1. Crash while delete index is running (we don't do global cleanup on restart)
+ //2. Node leaves and then join with old data
+ LOGGER.log(Level.WARN, "Removing existing index on index create for the index: " + resourceRelPath);
+ lcManager.unregister(resourceRelPath);
+ index.destroy();
+ } else {
+ final FileReference resolvedResourceRef = ctx.getIoManager().resolve(resourceRelPath);
+ if (resolvedResourceRef.getFile().exists()) {
+ // Index is not registered but the index file exists
+ // This is another big problem that we need to disallow soon
+ // We can only disallow this if we have a global cleanup after crash
+ // on reboot
+ LOGGER.warn(
+ "Deleting {} on index create. The index is not registered but the file exists in the filesystem",
+ resolvedResourceRef);
+ ctx.getIoManager().delete(resolvedResourceRef);
+ }
+ index = resource.createInstance(ctx);
+ }
+ index.create();
+ try {
+ localResourceRepository.insert(lr);
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ lcManager.register(resourceRelPath, index);
}
}