[NO ISSUE][STO] Close datasets of flushed indexes after recovery
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- After performing redo of a flush log on any index, close its dataset
to ensure any cached state that might have been changed during recovery
is cleared (e.g. the component id generator).
- Fix LSMFlushRecoveryTest total number of records to be inserted.
- Update LSMFlushRecoveryTest to check for duplicate component ids.
Change-Id: I29072f475cc7b4d7d6efde415be0329fc568443e
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/11423
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 65cb36a..0359cf1 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -173,15 +173,14 @@
deleteRecoveryTemporaryFiles();
//get active partitions on this node
- replayPartitionsLogs(partitions, logMgr.getLogReader(true), lowWaterMarkLSN);
+ replayPartitionsLogs(partitions, logMgr.getLogReader(true), lowWaterMarkLSN, true);
}
- @Override
- public synchronized void replayPartitionsLogs(Set<Integer> partitions, ILogReader logReader, long lowWaterMarkLSN)
- throws IOException, ACIDException {
+ public synchronized void replayPartitionsLogs(Set<Integer> partitions, ILogReader logReader, long lowWaterMarkLSN,
+ boolean closeOnFlushRedo) throws IOException, ACIDException {
try {
Set<Long> winnerJobSet = startRecoverysAnalysisPhase(partitions, logReader, lowWaterMarkLSN);
- startRecoveryRedoPhase(partitions, logReader, lowWaterMarkLSN, winnerJobSet);
+ startRecoveryRedoPhase(partitions, logReader, lowWaterMarkLSN, winnerJobSet, closeOnFlushRedo);
} finally {
logReader.close();
deleteRecoveryTemporaryFiles();
@@ -277,7 +276,7 @@
}
private synchronized void startRecoveryRedoPhase(Set<Integer> partitions, ILogReader logReader,
- long lowWaterMarkLSN, Set<Long> winnerTxnSet) throws IOException, ACIDException {
+ long lowWaterMarkLSN, Set<Long> winnerTxnSet, boolean closeOnFlushRedo) throws IOException, ACIDException {
int redoCount = 0;
long txnId = 0;
@@ -299,6 +298,7 @@
TxnEntityId tempKeyTxnEntityId = new TxnEntityId(-1, -1, -1, null, -1, false);
ILogRecord logRecord = null;
+ Set<Integer> flushRedoDatasets = new HashSet<>();
try {
logReader.setPosition(lowWaterMarkLSN);
logRecord = logReader.next();
@@ -409,6 +409,7 @@
&& !index.isCurrentMutableComponentEmpty()) {
// schedule flush
redoFlush(index, logRecord);
+ flushRedoDatasets.add(datasetId);
redoCount++;
} else {
// TODO: update checkpoint file?
@@ -441,6 +442,11 @@
for (long r : resourceIdList) {
datasetLifecycleManager.close(resourcesMap.get(r).getPath());
}
+ if (closeOnFlushRedo) {
+ // close datasets of indexes to ensure any cached state that might've been changed by recovery is cleared
+ // e.g. when redoing a flush, the component id generator needs to be reinitialized
+ datasetLifecycleManager.closeDatasets(flushRedoDatasets);
+ }
}
}
@@ -525,7 +531,7 @@
if (minLSN < readableSmallestLSN) {
minLSN = readableSmallestLSN;
}
- replayPartitionsLogs(partitions, logMgr.getLogReader(true), minLSN);
+ replayPartitionsLogs(partitions, logMgr.getLogReader(true), minLSN, false);
if (flush) {
appCtx.getDatasetLifecycleManager().flushAllDatasets();
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java
index 9c6e95e..2aad416 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java
@@ -23,7 +23,9 @@
import java.rmi.RemoteException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.Semaphore;
import org.apache.asterix.app.bootstrap.TestNodeController;
@@ -62,6 +64,7 @@
import org.apache.hyracks.storage.am.lsm.btree.impl.ITestOpCallback;
import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
import org.apache.hyracks.storage.am.lsm.common.api.IIoOperationFailedCallback;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
@@ -143,8 +146,7 @@
checkComponentIds();
// insert more records
createInsertOps();
- insertRecords(PARTITION_0, StorageTestUtils.RECORDS_PER_COMPONENT, StorageTestUtils.RECORDS_PER_COMPONENT,
- true);
+ insertRecords(PARTITION_0, StorageTestUtils.TOTAL_NUM_OF_RECORDS, StorageTestUtils.RECORDS_PER_COMPONENT, true);
dsInfo.waitForIO();
checkComponentIds();
@@ -487,8 +489,14 @@
List<ILSMDiskComponent> secondaryDiskComponents = secondaryIndexes[partitionIndex].getDiskComponents();
Assert.assertEquals(primaryDiskComponents.size(), secondaryDiskComponents.size());
+ Set<ILSMComponentId> uniqueIds = new HashSet<>();
for (int i = 0; i < primaryDiskComponents.size(); i++) {
Assert.assertEquals(primaryDiskComponents.get(i).getId(), secondaryDiskComponents.get(i).getId());
+ ILSMComponentId id = primaryDiskComponents.get(i).getId();
+ boolean added = uniqueIds.add(id);
+ if (!added) {
+ throw new IllegalStateException("found duplicate component ids: " + id);
+ }
}
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
index 7b737a0..b03af55 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
@@ -19,6 +19,7 @@
package org.apache.asterix.common.api;
import java.util.List;
+import java.util.Set;
import java.util.function.Predicate;
import org.apache.asterix.common.context.DatasetInfo;
@@ -124,6 +125,14 @@
List<IVirtualBufferCache> getVirtualBufferCaches(int datasetId, int ioDeviceNum);
/**
+ * Attempts to close the datasets in {@code datasetsToClose}
+ *
+ * @param datasetsToClose
+ * @throws HyracksDataException
+ */
+ void closeDatasets(Set<Integer> datasetsToClose) throws HyracksDataException;
+
+ /**
* Flushes then closes all open datasets
*/
void closeAllDatasets() throws HyracksDataException;
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index b26220d..b2f4034 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -25,6 +25,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate;
@@ -452,6 +453,16 @@
}
@Override
+ public synchronized void closeDatasets(Set<Integer> datasetsToClose) throws HyracksDataException {
+ ArrayList<DatasetResource> openDatasets = new ArrayList<>(datasets.values());
+ for (DatasetResource dsr : openDatasets) {
+ if (dsr.isOpen() && datasetsToClose.contains(dsr.getDatasetID())) {
+ closeDataset(dsr);
+ }
+ }
+ }
+
+ @Override
public synchronized void closeAllDatasets() throws HyracksDataException {
ArrayList<DatasetResource> openDatasets = new ArrayList<>(datasets.values());
for (DatasetResource dsr : openDatasets) {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
index bfe7963..8a5f34e 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
@@ -85,17 +85,6 @@
long getLocalMinFirstLSN() throws HyracksDataException;
/**
- * Replay the logs that belong to the passed {@code partitions} starting from the {@code lowWaterMarkLSN}
- *
- * @param partitions
- * @param lowWaterMarkLSN
- * @throws IOException
- * @throws ACIDException
- */
- void replayPartitionsLogs(Set<Integer> partitions, ILogReader logReader, long lowWaterMarkLSN)
- throws IOException, ACIDException;
-
- /**
* Creates a temporary file to be used during recovery
*
* @param txnId