[NO ISSUE][REP] Do not sync on resource repository during replication
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- To avoid blocking other threads while waiting for network operations,
do not synchronize on resource repository during resource file
replication/deletion.
Change-Id: I11e0f4b3f7a4db3eb55a2e00e9564e193950d84f
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/15883
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 27e753f..bb3cde5 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -153,33 +153,38 @@
@SuppressWarnings("squid:S1181")
@Override
- public synchronized void insert(LocalResource resource) throws HyracksDataException {
- String relativePath = getFileName(resource.getPath());
- FileReference resourceFile = ioManager.resolve(relativePath);
- if (resourceFile.getFile().exists()) {
- throw new HyracksDataException("Duplicate resource: " + resourceFile.getAbsolutePath());
- }
+ public void insert(LocalResource resource) throws HyracksDataException {
+ FileReference resourceFile;
+ synchronized (this) {
+ String relativePath = getFileName(resource.getPath());
+ resourceFile = ioManager.resolve(relativePath);
+ if (resourceFile.getFile().exists()) {
+ throw new HyracksDataException("Duplicate resource: " + resourceFile.getAbsolutePath());
+ }
- final File parent = resourceFile.getFile().getParentFile();
- if (!parent.exists() && !parent.mkdirs()) {
- throw HyracksDataException.create(CANNOT_CREATE_FILE, parent.getAbsolutePath());
+ final File parent = resourceFile.getFile().getParentFile();
+ if (!parent.exists() && !parent.mkdirs()) {
+ throw HyracksDataException.create(CANNOT_CREATE_FILE, parent.getAbsolutePath());
+ }
+ // The next block should be all or nothing
+ try {
+ createResourceFileMask(resourceFile);
+ byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(resource.toJson(persistedResourceRegistry));
+ FileUtil.writeAndForce(Paths.get(resourceFile.getAbsolutePath()), bytes);
+ indexCheckpointManagerProvider.get(DatasetResourceReference.of(resource)).init(
+ UNINITIALIZED_COMPONENT_SEQ, 0, LSMComponentId.EMPTY_INDEX_LAST_COMPONENT_ID.getMaxId(), null);
+ deleteResourceFileMask(resourceFile);
+ } catch (Exception e) {
+ cleanup(resourceFile);
+ throw HyracksDataException.create(e);
+ } catch (Throwable th) {
+ LOGGER.error("Error creating resource {}", resourceFile, th);
+ ExitUtil.halt(ExitUtil.EC_ERROR_CREATING_RESOURCES);
+ }
+ resourceCache.put(resource.getPath(), resource);
}
- // The next block should be all or nothing
- try {
- createResourceFileMask(resourceFile);
- byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(resource.toJson(persistedResourceRegistry));
- FileUtil.writeAndForce(Paths.get(resourceFile.getAbsolutePath()), bytes);
- indexCheckpointManagerProvider.get(DatasetResourceReference.of(resource)).init(UNINITIALIZED_COMPONENT_SEQ,
- 0, LSMComponentId.EMPTY_INDEX_LAST_COMPONENT_ID.getMaxId(), null);
- deleteResourceFileMask(resourceFile);
- } catch (Exception e) {
- cleanup(resourceFile);
- throw HyracksDataException.create(e);
- } catch (Throwable th) {
- LOGGER.error("Error creating resource {}", resourceFile, th);
- ExitUtil.halt(ExitUtil.EC_ERROR_CREATING_RESOURCES);
- }
- resourceCache.put(resource.getPath(), resource);
+ // do not do the replication operation on the synchronized to avoid blocking other threads
+ // on network operations
//if replication enabled, send resource metadata info to remote nodes
if (isReplicationEnabled) {
try {
@@ -203,25 +208,30 @@
}
@Override
- public synchronized void delete(String relativePath) throws HyracksDataException {
+ public void delete(String relativePath) throws HyracksDataException {
FileReference resourceFile = getLocalResourceFileByName(ioManager, relativePath);
- try {
- if (resourceFile.getFile().exists()) {
- try {
- createReplicationJob(ReplicationOperation.DELETE, resourceFile);
- } catch (Exception e) {
- LOGGER.error("failed to delete resource file {} from replicas", resourceFile);
- }
- final LocalResource localResource = readLocalResource(resourceFile.getFile());
- IoUtil.delete(resourceFile);
- // delete all checkpoints
- indexCheckpointManagerProvider.get(DatasetResourceReference.of(localResource)).delete();
- } else {
- throw HyracksDataException.create(org.apache.hyracks.api.exceptions.ErrorCode.RESOURCE_DOES_NOT_EXIST,
- relativePath);
+ boolean resourceExists = resourceFile.getFile().exists();
+ if (resourceExists) {
+ try {
+ createReplicationJob(ReplicationOperation.DELETE, resourceFile);
+ } catch (Exception e) {
+ LOGGER.error("failed to delete resource file {} from replicas", resourceFile);
}
- } finally {
- invalidateResource(relativePath);
+ }
+ synchronized (this) {
+ try {
+ if (resourceExists) {
+ final LocalResource localResource = readLocalResource(resourceFile.getFile());
+ IoUtil.delete(resourceFile);
+ // delete all checkpoints
+ indexCheckpointManagerProvider.get(DatasetResourceReference.of(localResource)).delete();
+ } else {
+ throw HyracksDataException
+ .create(org.apache.hyracks.api.exceptions.ErrorCode.RESOURCE_DOES_NOT_EXIST, relativePath);
+ }
+ } finally {
+ invalidateResource(relativePath);
+ }
}
}