[ASTERIXDB-1969][STO] Ignore corrupted checkpoints
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Ignore and delete corrupted checkpoint files.
- In case all checkpoint files are corrupted, force full recovery.
- Add test to check the new behavior of CheckpointManager.
- Remove unused recovery manager method.
Change-Id: Ied8a188501b63a0d339e6391cac684e3378f4c37
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1871
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 275b055..10af9ff 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -158,27 +158,6 @@
return state;
}
- //This method is used only when replication is disabled.
- @Override
- public void startRecovery(boolean synchronous) throws IOException, ACIDException {
- state = SystemState.RECOVERING;
- LOGGER.log(Level.INFO, "starting recovery ...");
-
- long readableSmallestLSN = logMgr.getReadableSmallestLSN();
- Checkpoint checkpointObject = checkpointManager.getLatest();
- long lowWaterMarkLSN = checkpointObject.getMinMCTFirstLsn();
- if (lowWaterMarkLSN < readableSmallestLSN) {
- lowWaterMarkLSN = readableSmallestLSN;
- }
-
- //delete any recovery files from previous failed recovery attempts
- deleteRecoveryTemporaryFiles();
-
- //get active partitions on this node
- Set<Integer> activePartitions = localResourceRepository.getNodeOrignalPartitions();
- replayPartitionsLogs(activePartitions, logMgr.getLogReader(true), lowWaterMarkLSN);
- }
-
@Override
public void startLocalRecovery(Set<Integer> partitions) throws IOException, ACIDException {
state = SystemState.RECOVERING;
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
index d2bf3d3..370205b 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
@@ -19,6 +19,7 @@
package org.apache.asterix.test.logging;
import java.io.File;
+import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -32,10 +33,12 @@
import org.apache.asterix.common.configuration.AsterixConfiguration;
import org.apache.asterix.common.configuration.Property;
import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
+import org.apache.asterix.common.transactions.Checkpoint;
import org.apache.asterix.common.transactions.DatasetId;
import org.apache.asterix.common.transactions.ICheckpointManager;
import org.apache.asterix.common.transactions.IRecoveryManager;
import org.apache.asterix.common.transactions.ITransactionContext;
+import org.apache.asterix.common.transactions.ITransactionSubsystem;
import org.apache.asterix.external.util.DataflowUtils;
import org.apache.asterix.file.StorageComponentProvider;
import org.apache.asterix.metadata.entities.Dataset;
@@ -47,6 +50,7 @@
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.test.common.TestHelper;
import org.apache.asterix.transaction.management.service.logging.LogManager;
+import org.apache.asterix.transaction.management.service.recovery.AbstractCheckpointManager;
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -208,4 +212,50 @@
Assert.fail(e.getMessage());
}
}
+
+ @Test
+ public void testCorruptedCheckpointFiles() {
+ try {
+ TestNodeController nc = new TestNodeController(new File(TEST_CONFIG_FILE_PATH).getAbsolutePath(), false);
+ nc.init();
+ final ITransactionSubsystem txnSubsystem = nc.getTransactionSubsystem();
+ final AbstractCheckpointManager checkpointManager =
+ (AbstractCheckpointManager) txnSubsystem.getCheckpointManager();
+ // Make a checkpoint with the current minFirstLSN
+ final long minFirstLSN = txnSubsystem.getRecoveryManager().getMinFirstLSN();
+ checkpointManager.tryCheckpoint(minFirstLSN);
+ // Get the just created checkpoint
+ final Checkpoint validCheckpoint = checkpointManager.getLatest();
+ // Make sure the valid checkout wouldn't force full recovery
+ Assert.assertTrue(validCheckpoint.getMinMCTFirstLsn() >= minFirstLSN);
+ // Add a corrupted (empty) checkpoint file with a timestamp > than current checkpoint
+ Path corruptedCheckpointPath = checkpointManager.getCheckpointPath(validCheckpoint.getTimeStamp() + 1);
+ File corruptedCheckpoint = corruptedCheckpointPath.toFile();
+ corruptedCheckpoint.createNewFile();
+ // Make sure the corrupted checkpoint file was created
+ Assert.assertTrue(corruptedCheckpoint.exists());
+ // Try to get the latest checkpoint again
+ Checkpoint cpAfterCorruption = checkpointManager.getLatest();
+ // Make sure the valid checkpoint was returned
+ Assert.assertEquals(validCheckpoint.getTimeStamp(), cpAfterCorruption.getTimeStamp());
+ // Make sure the corrupted checkpoint file was deleted
+ Assert.assertFalse(corruptedCheckpoint.exists());
+ // Corrupt the valid checkpoint by replacing its content
+ final Path validCheckpointPath = checkpointManager.getCheckpointPath(validCheckpoint.getTimeStamp());
+ File validCheckpointFile = validCheckpointPath.toFile();
+ Assert.assertTrue(validCheckpointFile.exists());
+ // Delete the valid checkpoint file and create it as an empty file
+ validCheckpointFile.delete();
+ validCheckpointFile.createNewFile();
+ // Make sure the returned checkpoint (the forged checkpoint) will enforce full recovery
+ Checkpoint forgedCheckpoint = checkpointManager.getLatest();
+ Assert.assertTrue(forgedCheckpoint.getMinMCTFirstLsn() < minFirstLSN);
+ // Make sure the forged checkpoint recovery will start from the first available log
+ final long readableSmallestLSN = txnSubsystem.getLogManager().getReadableSmallestLSN();
+ Assert.assertTrue(forgedCheckpoint.getMinMCTFirstLsn() <= readableSmallestLSN);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
}
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
index 84e1019..7965aa5 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
@@ -33,7 +33,7 @@
*/
public interface IRecoveryManager {
- public enum SystemState {
+ enum SystemState {
BOOTSTRAPPING, // The first time the NC is bootstrapped.
PERMANENT_DATA_LOSS, // No checkpoint files found on NC and it is not BOOTSTRAPPING (data loss).
RECOVERING, // Recovery process is on-going.
@@ -41,7 +41,10 @@
CORRUPTED // Some txn logs need to be replayed (need to perform recover).
}
- public class ResourceType {
+ class ResourceType {
+ private ResourceType() {
+ }
+
public static final byte LSM_BTREE = 0;
public static final byte LSM_RTREE = 1;
public static final byte LSM_INVERTED_INDEX = 2;
@@ -61,38 +64,25 @@
SystemState getSystemState() throws ACIDException;
/**
- * Initiates a crash recovery.
- *
- * @param synchronous
- * indicates if the recovery is to be done in a synchronous
- * manner. In asynchronous mode, the recovery will happen as part
- * of a separate thread.
- * @return SystemState the state of the system (@see SystemState) post
- * recovery.
- * @throws ACIDException
- */
- public void startRecovery(boolean synchronous) throws IOException, ACIDException;
-
- /**
* Rolls back a transaction.
*
* @param txnContext
* the transaction context associated with the transaction
* @throws ACIDException
*/
- public void rollbackTransaction(ITransactionContext txnContext) throws ACIDException;
+ void rollbackTransaction(ITransactionContext txnContext) throws ACIDException;
/**
* @return min first LSN of the open indexes (including remote indexes if replication is enabled)
* @throws HyracksDataException
*/
- public long getMinFirstLSN() throws HyracksDataException;
+ long getMinFirstLSN() throws HyracksDataException;
/**
* @return min first LSN of the open indexes
* @throws HyracksDataException
*/
- public long getLocalMinFirstLSN() throws HyracksDataException;
+ long getLocalMinFirstLSN() throws HyracksDataException;
/**
* Replay the logs that belong to the passed {@code partitions} starting from the {@code lowWaterMarkLSN}
@@ -102,7 +92,7 @@
* @throws IOException
* @throws ACIDException
*/
- public void replayPartitionsLogs(Set<Integer> partitions, ILogReader logReader, long lowWaterMarkLSN)
+ void replayPartitionsLogs(Set<Integer> partitions, ILogReader logReader, long lowWaterMarkLSN)
throws IOException, ACIDException;
/**
@@ -114,12 +104,12 @@
* @throws IOException
* if the file for the specified {@code jobId} with the {@code fileName} already exists
*/
- public File createJobRecoveryFile(int jobId, String fileName) throws IOException;
+ File createJobRecoveryFile(int jobId, String fileName) throws IOException;
/**
* Deletes all temporary recovery files
*/
- public void deleteRecoveryTemporaryFiles();
+ void deleteRecoveryTemporaryFiles();
/**
* Performs the local recovery process on {@code partitions}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java
index 8d3e0a7..24d316b 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java
@@ -88,13 +88,26 @@
List<Checkpoint> checkpointObjectList = new ArrayList<>();
for (File file : checkpoints) {
try {
- LOGGER.log(Level.WARNING, "Reading snapshot file: " + file.getAbsolutePath());
+ LOGGER.log(Level.WARNING, "Reading checkpoint file: " + file.getAbsolutePath());
String jsonString = new String(Files.readAllBytes(Paths.get(file.getAbsolutePath())));
checkpointObjectList.add(Checkpoint.fromJson(jsonString));
} catch (IOException e) {
- throw new ACIDException("Failed to read a checkpoint file", e);
+ // ignore corrupted checkpoint file
+ LOGGER.log(Level.WARNING, "Failed to read checkpoint file: " + file.getAbsolutePath(), e);
+ file.delete();
+ LOGGER.log(Level.INFO, "Deleted corrupted checkpoint file: " + file.getAbsolutePath());
}
}
+ /**
+ * If all checkpoint files are corrupted, we have no option but to try to perform recovery.
+ * We will forge a checkpoint that forces recovery to start from the beginning of the log.
+ * This shouldn't happen unless a hardware corruption happens.
+ */
+ if (checkpointObjectList.isEmpty()) {
+ LOGGER.severe("All checkpoint files are corrupted. Forcing recovery from the beginning of the log");
+ checkpointObjectList.add(forgeForceRecoveryCheckpoint());
+ }
+
// Sort checkpointObjects in descending order by timeStamp to find out the most recent one.
Collections.sort(checkpointObjectList);
@@ -125,6 +138,11 @@
// Nothing to dump
}
+ public Path getCheckpointPath(long checkpointTimestamp) {
+ return Paths.get(checkpointDir.getAbsolutePath() + File.separator + CHECKPOINT_FILENAME_PREFIX + Long
+ .toString(checkpointTimestamp));
+ }
+
protected void capture(long minMCTFirstLSN, boolean sharp) throws HyracksDataException {
ILogManager logMgr = txnSubsystem.getLogManager();
ITransactionManager txnMgr = txnSubsystem.getTransactionManager();
@@ -134,14 +152,24 @@
cleanup();
}
+ protected Checkpoint forgeForceRecoveryCheckpoint() {
+ /**
+ * By setting the checkpoint first LSN (low watermark) to Long.MIN_VALUE, the recovery manager will start from
+ * the first available log.
+ * We set the storage version to the current version. If there is a version mismatch, it will be detected
+ * during recovery.
+ */
+ return new Checkpoint(Long.MIN_VALUE, Long.MIN_VALUE, Integer.MIN_VALUE, System.currentTimeMillis(), false,
+ StorageConstants.VERSION);
+ }
+
private void persist(Checkpoint checkpoint) throws HyracksDataException {
- // Construct checkpoint file name
- String fileName = checkpointDir.getAbsolutePath() + File.separator + CHECKPOINT_FILENAME_PREFIX
- + Long.toString(checkpoint.getTimeStamp());
+ // Get checkpoint file path
+ Path path = getCheckpointPath(checkpoint.getTimeStamp());
// Write checkpoint file to disk
- Path path = Paths.get(fileName);
try (BufferedWriter writer = Files.newBufferedWriter(path)) {
writer.write(checkpoint.asJson());
+ writer.flush();
} catch (IOException e) {
throw new HyracksDataException("Failed to write checkpoint to disk", e);
}