[ASTERIXDB-1969][STO] Ignore corrupted checkpoints

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Ignore and delete corrupted checkpoint files.
- In case all checkpoint files are corrupted, force full recovery.
- Add test to check the new behavior of CheckpointManager.
- Remove unused recovery manager method.

Change-Id: Ied8a188501b63a0d339e6391cac684e3378f4c37
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1871
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 275b055..10af9ff 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -158,27 +158,6 @@
         return state;
     }
 
-    //This method is used only when replication is disabled.
-    @Override
-    public void startRecovery(boolean synchronous) throws IOException, ACIDException {
-        state = SystemState.RECOVERING;
-        LOGGER.log(Level.INFO, "starting recovery ...");
-
-        long readableSmallestLSN = logMgr.getReadableSmallestLSN();
-        Checkpoint checkpointObject = checkpointManager.getLatest();
-        long lowWaterMarkLSN = checkpointObject.getMinMCTFirstLsn();
-        if (lowWaterMarkLSN < readableSmallestLSN) {
-            lowWaterMarkLSN = readableSmallestLSN;
-        }
-
-        //delete any recovery files from previous failed recovery attempts
-        deleteRecoveryTemporaryFiles();
-
-        //get active partitions on this node
-        Set<Integer> activePartitions = localResourceRepository.getNodeOrignalPartitions();
-        replayPartitionsLogs(activePartitions, logMgr.getLogReader(true), lowWaterMarkLSN);
-    }
-
     @Override
     public void startLocalRecovery(Set<Integer> partitions) throws IOException, ACIDException {
         state = SystemState.RECOVERING;
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
index d2bf3d3..370205b 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.test.logging;
 
 import java.io.File;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -32,10 +33,12 @@
 import org.apache.asterix.common.configuration.AsterixConfiguration;
 import org.apache.asterix.common.configuration.Property;
 import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
+import org.apache.asterix.common.transactions.Checkpoint;
 import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ICheckpointManager;
 import org.apache.asterix.common.transactions.IRecoveryManager;
 import org.apache.asterix.common.transactions.ITransactionContext;
+import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.asterix.external.util.DataflowUtils;
 import org.apache.asterix.file.StorageComponentProvider;
 import org.apache.asterix.metadata.entities.Dataset;
@@ -47,6 +50,7 @@
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.test.common.TestHelper;
 import org.apache.asterix.transaction.management.service.logging.LogManager;
+import org.apache.asterix.transaction.management.service.recovery.AbstractCheckpointManager;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -208,4 +212,50 @@
             Assert.fail(e.getMessage());
         }
     }
+
+    @Test
+    public void testCorruptedCheckpointFiles() {
+        try {
+            TestNodeController nc = new TestNodeController(new File(TEST_CONFIG_FILE_PATH).getAbsolutePath(), false);
+            nc.init();
+            final ITransactionSubsystem txnSubsystem = nc.getTransactionSubsystem();
+            final AbstractCheckpointManager checkpointManager =
+                    (AbstractCheckpointManager) txnSubsystem.getCheckpointManager();
+            // Make a checkpoint with the current minFirstLSN
+            final long minFirstLSN = txnSubsystem.getRecoveryManager().getMinFirstLSN();
+            checkpointManager.tryCheckpoint(minFirstLSN);
+            // Get the just created checkpoint
+            final Checkpoint validCheckpoint = checkpointManager.getLatest();
+            // Make sure the valid checkout wouldn't force full recovery
+            Assert.assertTrue(validCheckpoint.getMinMCTFirstLsn() >= minFirstLSN);
+            // Add a corrupted (empty) checkpoint file with a timestamp > than current checkpoint
+            Path corruptedCheckpointPath = checkpointManager.getCheckpointPath(validCheckpoint.getTimeStamp() + 1);
+            File corruptedCheckpoint = corruptedCheckpointPath.toFile();
+            corruptedCheckpoint.createNewFile();
+            // Make sure the corrupted checkpoint file was created
+            Assert.assertTrue(corruptedCheckpoint.exists());
+            // Try to get the latest checkpoint again
+            Checkpoint cpAfterCorruption = checkpointManager.getLatest();
+            // Make sure the valid checkpoint was returned
+            Assert.assertEquals(validCheckpoint.getTimeStamp(), cpAfterCorruption.getTimeStamp());
+            // Make sure the corrupted checkpoint file was deleted
+            Assert.assertFalse(corruptedCheckpoint.exists());
+            // Corrupt the valid checkpoint by replacing its content
+            final Path validCheckpointPath = checkpointManager.getCheckpointPath(validCheckpoint.getTimeStamp());
+            File validCheckpointFile = validCheckpointPath.toFile();
+            Assert.assertTrue(validCheckpointFile.exists());
+            // Delete the valid checkpoint file and create it as an empty file
+            validCheckpointFile.delete();
+            validCheckpointFile.createNewFile();
+            // Make sure the returned checkpoint (the forged checkpoint) will enforce full recovery
+            Checkpoint forgedCheckpoint = checkpointManager.getLatest();
+            Assert.assertTrue(forgedCheckpoint.getMinMCTFirstLsn() < minFirstLSN);
+            // Make sure the forged checkpoint recovery will start from the first available log
+            final long readableSmallestLSN = txnSubsystem.getLogManager().getReadableSmallestLSN();
+            Assert.assertTrue(forgedCheckpoint.getMinMCTFirstLsn() <= readableSmallestLSN);
+        } catch (Throwable e) {
+            e.printStackTrace();
+            Assert.fail(e.getMessage());
+        }
+    }
 }
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
index 84e1019..7965aa5 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
@@ -33,7 +33,7 @@
  */
 public interface IRecoveryManager {
 
-    public enum SystemState {
+    enum SystemState {
         BOOTSTRAPPING, // The first time the NC is bootstrapped.
         PERMANENT_DATA_LOSS, // No checkpoint files found on NC and it is not BOOTSTRAPPING (data loss).
         RECOVERING, // Recovery process is on-going.
@@ -41,7 +41,10 @@
         CORRUPTED // Some txn logs need to be replayed (need to perform recover).
     }
 
-    public class ResourceType {
+    class ResourceType {
+        private ResourceType() {
+        }
+
         public static final byte LSM_BTREE = 0;
         public static final byte LSM_RTREE = 1;
         public static final byte LSM_INVERTED_INDEX = 2;
@@ -61,38 +64,25 @@
     SystemState getSystemState() throws ACIDException;
 
     /**
-     * Initiates a crash recovery.
-     *
-     * @param synchronous
-     *            indicates if the recovery is to be done in a synchronous
-     *            manner. In asynchronous mode, the recovery will happen as part
-     *            of a separate thread.
-     * @return SystemState the state of the system (@see SystemState) post
-     *         recovery.
-     * @throws ACIDException
-     */
-    public void startRecovery(boolean synchronous) throws IOException, ACIDException;
-
-    /**
      * Rolls back a transaction.
      *
      * @param txnContext
      *            the transaction context associated with the transaction
      * @throws ACIDException
      */
-    public void rollbackTransaction(ITransactionContext txnContext) throws ACIDException;
+    void rollbackTransaction(ITransactionContext txnContext) throws ACIDException;
 
     /**
      * @return min first LSN of the open indexes (including remote indexes if replication is enabled)
      * @throws HyracksDataException
      */
-    public long getMinFirstLSN() throws HyracksDataException;
+    long getMinFirstLSN() throws HyracksDataException;
 
     /**
      * @return min first LSN of the open indexes
      * @throws HyracksDataException
      */
-    public long getLocalMinFirstLSN() throws HyracksDataException;
+    long getLocalMinFirstLSN() throws HyracksDataException;
 
     /**
      * Replay the logs that belong to the passed {@code partitions} starting from the {@code lowWaterMarkLSN}
@@ -102,7 +92,7 @@
      * @throws IOException
      * @throws ACIDException
      */
-    public void replayPartitionsLogs(Set<Integer> partitions, ILogReader logReader, long lowWaterMarkLSN)
+    void replayPartitionsLogs(Set<Integer> partitions, ILogReader logReader, long lowWaterMarkLSN)
             throws IOException, ACIDException;
 
     /**
@@ -114,12 +104,12 @@
      * @throws IOException
      *             if the file for the specified {@code jobId} with the {@code fileName} already exists
      */
-    public File createJobRecoveryFile(int jobId, String fileName) throws IOException;
+    File createJobRecoveryFile(int jobId, String fileName) throws IOException;
 
     /**
      * Deletes all temporary recovery files
      */
-    public void deleteRecoveryTemporaryFiles();
+    void deleteRecoveryTemporaryFiles();
 
     /**
      * Performs the local recovery process on {@code partitions}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java
index 8d3e0a7..24d316b 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java
@@ -88,13 +88,26 @@
         List<Checkpoint> checkpointObjectList = new ArrayList<>();
         for (File file : checkpoints) {
             try {
-                LOGGER.log(Level.WARNING, "Reading snapshot file: " + file.getAbsolutePath());
+                LOGGER.log(Level.WARNING, "Reading checkpoint file: " + file.getAbsolutePath());
                 String jsonString = new String(Files.readAllBytes(Paths.get(file.getAbsolutePath())));
                 checkpointObjectList.add(Checkpoint.fromJson(jsonString));
             } catch (IOException e) {
-                throw new ACIDException("Failed to read a checkpoint file", e);
+                // ignore corrupted checkpoint file
+                LOGGER.log(Level.WARNING, "Failed to read checkpoint file: " + file.getAbsolutePath(), e);
+                file.delete();
+                LOGGER.log(Level.INFO, "Deleted corrupted checkpoint file: " + file.getAbsolutePath());
             }
         }
+        /**
+         * If all checkpoint files are corrupted, we have no option but to try to perform recovery.
+         * We will forge a checkpoint that forces recovery to start from the beginning of the log.
+         * This shouldn't happen unless a hardware corruption happens.
+         */
+        if (checkpointObjectList.isEmpty()) {
+            LOGGER.severe("All checkpoint files are corrupted. Forcing recovery from the beginning of the log");
+            checkpointObjectList.add(forgeForceRecoveryCheckpoint());
+        }
+
         // Sort checkpointObjects in descending order by timeStamp to find out the most recent one.
         Collections.sort(checkpointObjectList);
 
@@ -125,6 +138,11 @@
         // Nothing to dump
     }
 
+    public Path getCheckpointPath(long checkpointTimestamp) {
+        return Paths.get(checkpointDir.getAbsolutePath() + File.separator + CHECKPOINT_FILENAME_PREFIX + Long
+                .toString(checkpointTimestamp));
+    }
+
     protected void capture(long minMCTFirstLSN, boolean sharp) throws HyracksDataException {
         ILogManager logMgr = txnSubsystem.getLogManager();
         ITransactionManager txnMgr = txnSubsystem.getTransactionManager();
@@ -134,14 +152,24 @@
         cleanup();
     }
 
+    protected Checkpoint forgeForceRecoveryCheckpoint() {
+        /**
+         * By setting the checkpoint first LSN (low watermark) to Long.MIN_VALUE, the recovery manager will start from
+         * the first available log.
+         * We set the storage version to the current version. If there is a version mismatch, it will be detected
+         * during recovery.
+         */
+        return new Checkpoint(Long.MIN_VALUE, Long.MIN_VALUE, Integer.MIN_VALUE, System.currentTimeMillis(), false,
+                StorageConstants.VERSION);
+    }
+
     private void persist(Checkpoint checkpoint) throws HyracksDataException {
-        // Construct checkpoint file name
-        String fileName = checkpointDir.getAbsolutePath() + File.separator + CHECKPOINT_FILENAME_PREFIX
-                + Long.toString(checkpoint.getTimeStamp());
+        // Get checkpoint file path
+        Path path = getCheckpointPath(checkpoint.getTimeStamp());
         // Write checkpoint file to disk
-        Path path = Paths.get(fileName);
         try (BufferedWriter writer = Files.newBufferedWriter(path)) {
             writer.write(checkpoint.asJson());
+            writer.flush();
         } catch (IOException e) {
             throw new HyracksDataException("Failed to write checkpoint to disk", e);
         }