[ASTERIXDB-3306][STO] Ensure no ongoing I/Os before cleaning a partition

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
When cleaning a partition, we could end up deleting files that are being
produced by an ongoing operations – namely an asynchronous merge.

Change-Id: Ia9a84eedbcb33c29f03155a8605bb82af372f7f4
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17935
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index bf42541..4d32967 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -229,6 +229,7 @@
         datasetLifecycleManager =
                 new DatasetLifecycleManager(storageProperties, localResourceRepository, txnSubsystem.getLogManager(),
                         virtualBufferCache, indexCheckpointManagerProvider, ioManager.getIODevices().size());
+        localResourceRepository.setDatasetLifecycleManager(datasetLifecycleManager);
         final String nodeId = getServiceContext().getNodeId();
         final Set<Integer> nodePartitions = metadataProperties.getNodePartitions(nodeId);
         replicaManager = new ReplicaManager(this, nodePartitions);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllDatasetsReplicationStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllDatasetsReplicationStrategy.java
index c969777..84f0a39 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllDatasetsReplicationStrategy.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/AllDatasetsReplicationStrategy.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.common.replication;
 
 public class AllDatasetsReplicationStrategy implements IReplicationStrategy {
+    public static final IReplicationStrategy INSTANCE = new AllDatasetsReplicationStrategy();
 
     @Override
     public boolean isMatch(int datasetId) {
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index f9bf175..a80545d 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -46,8 +46,10 @@
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.dataflow.DatasetLocalResource;
 import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.replication.AllDatasetsReplicationStrategy;
 import org.apache.asterix.common.replication.IReplicationManager;
 import org.apache.asterix.common.replication.IReplicationStrategy;
 import org.apache.asterix.common.replication.ReplicationJob;
@@ -110,6 +112,7 @@
     private final List<Path> storageRoots;
     private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
     private final IPersistedResourceRegistry persistedResourceRegistry;
+    private IDatasetLifecycleManager datasetLifecycleManager;
 
     public PersistentLocalResourceRepository(IIOManager ioManager,
             IIndexCheckpointManagerProvider indexCheckpointManagerProvider,
@@ -346,6 +349,10 @@
         }
     }
 
+    public void setDatasetLifecycleManager(IDatasetLifecycleManager datasetLifecycleManager) {
+        this.datasetLifecycleManager = datasetLifecycleManager;
+    }
+
     private void createReplicationJob(ReplicationOperation operation, FileReference fileRef)
             throws HyracksDataException {
         filesToBeReplicated.clear();
@@ -480,6 +487,7 @@
     }
 
     public synchronized void cleanup(int partition) throws HyracksDataException {
+        datasetLifecycleManager.waitForIO(AllDatasetsReplicationStrategy.INSTANCE, partition);
         final Set<File> partitionIndexes = getPartitionIndexes(partition);
         try {
             for (File index : partitionIndexes) {