[NO ISSUE][STO] Limit flushes to impacted partitions

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:

- When requesting a flush, limit the indexes to be flushed
  to the impacted partitions.
- Invalidate cached resources on replica promotion.
- Invalidate cached resources on resource file deletion.

Change-Id: I4c1408627c8e11240c3575c4b8f190d746588867
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/15683
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 6736642..2ca3fbc 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -535,7 +535,7 @@
             }
             replayPartitionsLogs(partitions, logMgr.getLogReader(true), minLSN, false);
             if (flush) {
-                appCtx.getDatasetLifecycleManager().flushAllDatasets();
+                appCtx.getDatasetLifecycleManager().flushAllDatasets(partitions::contains);
             }
             cleanUp(partitions);
         } catch (IOException | ACIDException e) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
index a159e09..1372016 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
@@ -130,6 +130,7 @@
         final PersistentLocalResourceRepository localResourceRepository =
                 (PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
         localResourceRepository.cleanup(partition);
+        localResourceRepository.clearResourcesCache();
         final IRecoveryManager recoveryManager = appCtx.getTransactionSubsystem().getRecoveryManager();
         recoveryManager.replayReplicaPartitionLogs(Stream.of(partition).collect(Collectors.toSet()), true);
         partitions.put(partition, new Object());
@@ -169,8 +170,7 @@
 
     public void closePartitionResources(int partition) throws HyracksDataException {
         final IDatasetLifecycleManager datasetLifecycleManager = appCtx.getDatasetLifecycleManager();
-        //TODO(mhubail) we can flush only datasets of the requested partition
-        datasetLifecycleManager.flushAllDatasets();
+        datasetLifecycleManager.flushAllDatasets(p -> p == partition);
         final PersistentLocalResourceRepository resourceRepository =
                 (PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
         final Map<Long, LocalResource> partitionResources = resourceRepository.getPartitionResources(partition);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
index 44b83d4..1c2a047 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
@@ -20,6 +20,7 @@
 
 import java.util.List;
 import java.util.Set;
+import java.util.function.IntPredicate;
 import java.util.function.Predicate;
 
 import org.apache.asterix.common.context.DatasetInfo;
@@ -61,6 +62,14 @@
     void flushAllDatasets() throws HyracksDataException;
 
     /**
+     * Flushes all open datasets synchronously for partitions {@code partitions}
+     *
+     * @param partitions
+     * @throws HyracksDataException
+     */
+    void flushAllDatasets(IntPredicate partitions) throws HyracksDataException;
+
+    /**
      * Schedules asynchronous flush on indexes matching the predicate {@code indexPredicate}
      *
      * @param indexPredicate
@@ -143,19 +152,13 @@
     List<IndexInfo> getOpenIndexesInfo();
 
     /**
-     * Flushes and closes all user datasets (non-metadata datasets)
-     *
-     * @throws HyracksDataException
-     */
-    void closeUserDatasets() throws HyracksDataException;
-
-    /**
      * Flushes all opened datasets that are matching {@code replicationStrategy}.
      *
      * @param replicationStrategy
+     * @param partitions
      * @throws HyracksDataException
      */
-    void flushDataset(IReplicationStrategy replicationStrategy) throws HyracksDataException;
+    void flushDataset(IReplicationStrategy replicationStrategy, IntPredicate partitions) throws HyracksDataException;
 
     /**
      * Waits for all ongoing IO operations on all open datasets that are matching {@code replicationStrategy}.
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index c431dca..117b4fc 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -27,6 +27,7 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.IntPredicate;
 import java.util.function.Predicate;
 
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
@@ -362,9 +363,14 @@
 
     @Override
     public synchronized void flushAllDatasets() throws HyracksDataException {
+        flushAllDatasets(partition -> true);
+    }
+
+    @Override
+    public synchronized void flushAllDatasets(IntPredicate partitions) throws HyracksDataException {
         for (DatasetResource dsr : datasets.values()) {
             if (dsr.getDatasetInfo().isOpen()) {
-                flushDatasetOpenIndexes(dsr, false);
+                flushDatasetOpenIndexes(dsr, partitions, false);
             }
         }
     }
@@ -373,7 +379,7 @@
     public synchronized void flushDataset(int datasetId, boolean asyncFlush) throws HyracksDataException {
         DatasetResource dsr = datasets.get(datasetId);
         if (dsr != null) {
-            flushDatasetOpenIndexes(dsr, asyncFlush);
+            flushDatasetOpenIndexes(dsr, p -> true, asyncFlush);
         }
     }
 
@@ -407,7 +413,8 @@
      * This method can only be called asynchronously safely if we're sure no modify operation
      * will take place until the flush is scheduled
      */
-    private void flushDatasetOpenIndexes(DatasetResource dsr, boolean asyncFlush) throws HyracksDataException {
+    private void flushDatasetOpenIndexes(DatasetResource dsr, IntPredicate partitions, boolean asyncFlush)
+            throws HyracksDataException {
         DatasetInfo dsInfo = dsr.getDatasetInfo();
         if (!dsInfo.isOpen()) {
             throw new IllegalStateException("flushDatasetOpenIndexes is called on a dataset that is closed");
@@ -419,6 +426,9 @@
         // ensure all in-flight flushes gets scheduled
         logManager.log(waitLog);
         for (PrimaryIndexOperationTracker primaryOpTracker : dsr.getOpTrackers()) {
+            if (!partitions.test(primaryOpTracker.getPartition())) {
+                continue;
+            }
             // flush each partition one by one
             int numActiveOperations = primaryOpTracker.getNumActiveOperations();
             if (numActiveOperations > 0) {
@@ -433,6 +443,9 @@
         if (!asyncFlush) {
             List<FlushOperation> flushes = new ArrayList<>();
             for (PrimaryIndexOperationTracker primaryOpTracker : dsr.getOpTrackers()) {
+                if (!partitions.test(primaryOpTracker.getPartition())) {
+                    continue;
+                }
                 flushes.addAll(primaryOpTracker.getScheduledFlushes());
             }
             LSMIndexUtil.waitFor(flushes);
@@ -443,7 +456,7 @@
         // First wait for any ongoing IO operations
         DatasetInfo dsInfo = dsr.getDatasetInfo();
         try {
-            flushDatasetOpenIndexes(dsr, false);
+            flushDatasetOpenIndexes(dsr, p -> true, false);
         } catch (Exception e) {
             throw HyracksDataException.create(e);
         }
@@ -480,16 +493,6 @@
     }
 
     @Override
-    public synchronized void closeUserDatasets() throws HyracksDataException {
-        ArrayList<DatasetResource> openDatasets = new ArrayList<>(datasets.values());
-        for (DatasetResource dsr : openDatasets) {
-            if (!dsr.isMetadataDataset()) {
-                closeDataset(dsr);
-            }
-        }
-    }
-
-    @Override
     public synchronized void stop(boolean dumpState, OutputStream outputStream) throws IOException {
         if (stopped) {
             return;
@@ -539,10 +542,11 @@
     }
 
     @Override
-    public void flushDataset(IReplicationStrategy replicationStrategy) throws HyracksDataException {
+    public void flushDataset(IReplicationStrategy replicationStrategy, IntPredicate partitions)
+            throws HyracksDataException {
         for (DatasetResource dsr : datasets.values()) {
             if (dsr.isOpen() && replicationStrategy.isMatch(dsr.getDatasetID())) {
-                flushDatasetOpenIndexes(dsr, false);
+                flushDatasetOpenIndexes(dsr, partitions, false);
             }
         }
     }
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java
index 5eef84e..1e93228 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java
@@ -27,8 +27,10 @@
 
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.exceptions.ReplicationException;
+import org.apache.asterix.common.storage.ResourceReference;
 import org.apache.asterix.replication.api.IReplicaTask;
 import org.apache.asterix.replication.api.IReplicationWorker;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IIOManager;
 import org.apache.logging.log4j.LogManager;
@@ -53,6 +55,11 @@
             final File localFile = ioManager.resolve(file).getFile();
             if (localFile.exists()) {
                 Files.delete(localFile.toPath());
+                ResourceReference replicaRes = ResourceReference.of(localFile.getAbsolutePath());
+                if (replicaRes.isMetadataResource()) {
+                    ((PersistentLocalResourceRepository) appCtx.getLocalResourceRepository())
+                            .invalidateResource(replicaRes.getRelativePath().toString());
+                }
                 LOGGER.info(() -> "Deleted file: " + localFile.getAbsolutePath());
             } else {
                 LOGGER.warn(() -> "Requested to delete a non-existing file: " + localFile.getAbsolutePath());
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
index a8eee4f..0d0ef19 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
@@ -65,7 +65,8 @@
         final ReplicaFilesSynchronizer fileSync = new ReplicaFilesSynchronizer(appCtx, replica, deltaRecovery);
         // flush replicated dataset to generate disk component for any remaining in-memory components
         final IReplicationStrategy replStrategy = appCtx.getReplicationManager().getReplicationStrategy();
-        appCtx.getDatasetLifecycleManager().flushDataset(replStrategy);
+        appCtx.getDatasetLifecycleManager().flushDataset(replStrategy,
+                p -> p == replica.getIdentifier().getPartition());
         waitForReplicatedDatasetsIO();
         fileSync.sync();
     }
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index cc396ad..27e753f 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -300,6 +300,10 @@
         resourceCache.invalidate(relativePath);
     }
 
+    public void clearResourcesCache() {
+        resourceCache.invalidateAll();
+    }
+
     private static String getFileName(String path) {
         return path.endsWith(File.separator) ? (path + StorageConstants.METADATA_FILE_NAME)
                 : (path + File.separator + StorageConstants.METADATA_FILE_NAME);