[ASTERIXDB-3306][STO] Ensure no ongoing I/Os before cleaning a partition
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
When cleaning a partition, we could end up deleting files that are being
produced by an ongoing operations – namely an asynchronous merge.
Change-Id: Ia9a84eedbcb33c29f03155a8605bb82af372f7f4
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17935
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index bf42541..4d32967 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -229,6 +229,7 @@
datasetLifecycleManager =
new DatasetLifecycleManager(storageProperties, localResourceRepository, txnSubsystem.getLogManager(),
virtualBufferCache, indexCheckpointManagerProvider, ioManager.getIODevices().size());
+ localResourceRepository.setDatasetLifecycleManager(datasetLifecycleManager);
final String nodeId = getServiceContext().getNodeId();
final Set<Integer> nodePartitions = metadataProperties.getNodePartitions(nodeId);
replicaManager = new ReplicaManager(this, nodePartitions);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllDatasetsReplicationStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllDatasetsReplicationStrategy.java
index c969777..84f0a39 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllDatasetsReplicationStrategy.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllDatasetsReplicationStrategy.java
@@ -19,6 +19,7 @@
package org.apache.asterix.common.replication;
public class AllDatasetsReplicationStrategy implements IReplicationStrategy {
+ public static final IReplicationStrategy INSTANCE = new AllDatasetsReplicationStrategy();
@Override
public boolean isMatch(int datasetId) {
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index f9bf175..a80545d 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -46,8 +46,10 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.dataflow.DatasetLocalResource;
import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.replication.AllDatasetsReplicationStrategy;
import org.apache.asterix.common.replication.IReplicationManager;
import org.apache.asterix.common.replication.IReplicationStrategy;
import org.apache.asterix.common.replication.ReplicationJob;
@@ -110,6 +112,7 @@
private final List<Path> storageRoots;
private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
private final IPersistedResourceRegistry persistedResourceRegistry;
+ private IDatasetLifecycleManager datasetLifecycleManager;
public PersistentLocalResourceRepository(IIOManager ioManager,
IIndexCheckpointManagerProvider indexCheckpointManagerProvider,
@@ -346,6 +349,10 @@
}
}
+ public void setDatasetLifecycleManager(IDatasetLifecycleManager datasetLifecycleManager) {
+ this.datasetLifecycleManager = datasetLifecycleManager;
+ }
+
private void createReplicationJob(ReplicationOperation operation, FileReference fileRef)
throws HyracksDataException {
filesToBeReplicated.clear();
@@ -480,6 +487,7 @@
}
public synchronized void cleanup(int partition) throws HyracksDataException {
+ datasetLifecycleManager.waitForIO(AllDatasetsReplicationStrategy.INSTANCE, partition);
final Set<File> partitionIndexes = getPartitionIndexes(partition);
try {
for (File index : partitionIndexes) {