[NO ISSUE][STO] Close datasets of flushed indexes after recovery

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:

- After performing redo of a flush log on any index, close its dataset
  to ensure any cached state that might have been changed during recovery
  is cleared (e.g. the component id generator).
- Fix LSMFlushRecoveryTest total number of records to be inserted.
- Update LSMFlushRecoveryTest to check for duplicate component ids.

Change-Id: I29072f475cc7b4d7d6efde415be0329fc568443e
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/11423
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 65cb36a..0359cf1 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -173,15 +173,14 @@
         deleteRecoveryTemporaryFiles();
 
         //get active partitions on this node
-        replayPartitionsLogs(partitions, logMgr.getLogReader(true), lowWaterMarkLSN);
+        replayPartitionsLogs(partitions, logMgr.getLogReader(true), lowWaterMarkLSN, true);
     }
 
-    @Override
-    public synchronized void replayPartitionsLogs(Set<Integer> partitions, ILogReader logReader, long lowWaterMarkLSN)
-            throws IOException, ACIDException {
+    public synchronized void replayPartitionsLogs(Set<Integer> partitions, ILogReader logReader, long lowWaterMarkLSN,
+            boolean closeOnFlushRedo) throws IOException, ACIDException {
         try {
             Set<Long> winnerJobSet = startRecoverysAnalysisPhase(partitions, logReader, lowWaterMarkLSN);
-            startRecoveryRedoPhase(partitions, logReader, lowWaterMarkLSN, winnerJobSet);
+            startRecoveryRedoPhase(partitions, logReader, lowWaterMarkLSN, winnerJobSet, closeOnFlushRedo);
         } finally {
             logReader.close();
             deleteRecoveryTemporaryFiles();
@@ -277,7 +276,7 @@
     }
 
     private synchronized void startRecoveryRedoPhase(Set<Integer> partitions, ILogReader logReader,
-            long lowWaterMarkLSN, Set<Long> winnerTxnSet) throws IOException, ACIDException {
+            long lowWaterMarkLSN, Set<Long> winnerTxnSet, boolean closeOnFlushRedo) throws IOException, ACIDException {
         int redoCount = 0;
         long txnId = 0;
 
@@ -299,6 +298,7 @@
         TxnEntityId tempKeyTxnEntityId = new TxnEntityId(-1, -1, -1, null, -1, false);
 
         ILogRecord logRecord = null;
+        Set<Integer> flushRedoDatasets = new HashSet<>();
         try {
             logReader.setPosition(lowWaterMarkLSN);
             logRecord = logReader.next();
@@ -409,6 +409,7 @@
                                                 && !index.isCurrentMutableComponentEmpty()) {
                                             // schedule flush
                                             redoFlush(index, logRecord);
+                                            flushRedoDatasets.add(datasetId);
                                             redoCount++;
                                         } else {
                                             // TODO: update checkpoint file?
@@ -441,6 +442,11 @@
             for (long r : resourceIdList) {
                 datasetLifecycleManager.close(resourcesMap.get(r).getPath());
             }
+            if (closeOnFlushRedo) {
+                // close datasets of indexes to ensure any cached state that might've been changed by recovery is cleared
+                // e.g. when redoing a flush, the component id generator needs to be reinitialized
+                datasetLifecycleManager.closeDatasets(flushRedoDatasets);
+            }
         }
     }
 
@@ -525,7 +531,7 @@
             if (minLSN < readableSmallestLSN) {
                 minLSN = readableSmallestLSN;
             }
-            replayPartitionsLogs(partitions, logMgr.getLogReader(true), minLSN);
+            replayPartitionsLogs(partitions, logMgr.getLogReader(true), minLSN, false);
             if (flush) {
                 appCtx.getDatasetLifecycleManager().flushAllDatasets();
             }
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java
index 9c6e95e..2aad416 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java
@@ -23,7 +23,9 @@
 import java.rmi.RemoteException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.Semaphore;
 
 import org.apache.asterix.app.bootstrap.TestNodeController;
@@ -62,6 +64,7 @@
 import org.apache.hyracks.storage.am.lsm.btree.impl.ITestOpCallback;
 import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
 import org.apache.hyracks.storage.am.lsm.common.api.IIoOperationFailedCallback;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
@@ -143,8 +146,7 @@
         checkComponentIds();
         // insert more records
         createInsertOps();
-        insertRecords(PARTITION_0, StorageTestUtils.RECORDS_PER_COMPONENT, StorageTestUtils.RECORDS_PER_COMPONENT,
-                true);
+        insertRecords(PARTITION_0, StorageTestUtils.TOTAL_NUM_OF_RECORDS, StorageTestUtils.RECORDS_PER_COMPONENT, true);
 
         dsInfo.waitForIO();
         checkComponentIds();
@@ -487,8 +489,14 @@
         List<ILSMDiskComponent> secondaryDiskComponents = secondaryIndexes[partitionIndex].getDiskComponents();
 
         Assert.assertEquals(primaryDiskComponents.size(), secondaryDiskComponents.size());
+        Set<ILSMComponentId> uniqueIds = new HashSet<>();
         for (int i = 0; i < primaryDiskComponents.size(); i++) {
             Assert.assertEquals(primaryDiskComponents.get(i).getId(), secondaryDiskComponents.get(i).getId());
+            ILSMComponentId id = primaryDiskComponents.get(i).getId();
+            boolean added = uniqueIds.add(id);
+            if (!added) {
+                throw new IllegalStateException("found duplicate component ids: " + id);
+            }
         }
     }
 
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
index 7b737a0..b03af55 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.common.api;
 
 import java.util.List;
+import java.util.Set;
 import java.util.function.Predicate;
 
 import org.apache.asterix.common.context.DatasetInfo;
@@ -124,6 +125,14 @@
     List<IVirtualBufferCache> getVirtualBufferCaches(int datasetId, int ioDeviceNum);
 
     /**
+     * Attempts to close the datasets in {@code datasetsToClose}
+     *
+     * @param datasetsToClose
+     * @throws HyracksDataException
+     */
+    void closeDatasets(Set<Integer> datasetsToClose) throws HyracksDataException;
+
+    /**
      * Flushes then closes all open datasets
      */
     void closeAllDatasets() throws HyracksDataException;
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index b26220d..b2f4034 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -25,6 +25,7 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Predicate;
 
@@ -452,6 +453,16 @@
     }
 
     @Override
+    public synchronized void closeDatasets(Set<Integer> datasetsToClose) throws HyracksDataException {
+        ArrayList<DatasetResource> openDatasets = new ArrayList<>(datasets.values());
+        for (DatasetResource dsr : openDatasets) {
+            if (dsr.isOpen() && datasetsToClose.contains(dsr.getDatasetID())) {
+                closeDataset(dsr);
+            }
+        }
+    }
+
+    @Override
     public synchronized void closeAllDatasets() throws HyracksDataException {
         ArrayList<DatasetResource> openDatasets = new ArrayList<>(datasets.values());
         for (DatasetResource dsr : openDatasets) {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
index bfe7963..8a5f34e 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
@@ -85,17 +85,6 @@
     long getLocalMinFirstLSN() throws HyracksDataException;
 
     /**
-     * Replay the logs that belong to the passed {@code partitions} starting from the {@code lowWaterMarkLSN}
-     *
-     * @param partitions
-     * @param lowWaterMarkLSN
-     * @throws IOException
-     * @throws ACIDException
-     */
-    void replayPartitionsLogs(Set<Integer> partitions, ILogReader logReader, long lowWaterMarkLSN)
-            throws IOException, ACIDException;
-
-    /**
      * Creates a temporary file to be used during recovery
      *
      * @param txnId