[ASTERIXDB-2444][STO] Avoid Using System Clock in Checkpoints

- user model changes: no
- storage format changes: yes
- interface changes: yes

Details:
- Replace the usage of system clock timestamps in checkpoints
  by a sequencer.
- Update Asterix/Hyracks storage version to reflect the recent
  changes in storage.
- This change is expected to break storage backward compatibility.

Change-Id: Idc061e6eaccfb308b29a5a263b77a0a849694d4f
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2961
Reviewed-by: Michael Blow <mblow@apache.org>
Integration-Tests: Michael Blow <mblow@apache.org>
Tested-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
index 6009f51..e67246a 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
@@ -297,7 +297,7 @@
                 // Make sure the valid checkout wouldn't force full recovery
                 Assert.assertTrue(validCheckpoint.getMinMCTFirstLsn() >= minFirstLSN);
                 // Add a corrupted (empty) checkpoint file with a timestamp > than current checkpoint
-                Path corruptedCheckpointPath = checkpointManager.getCheckpointPath(validCheckpoint.getTimeStamp() + 1);
+                Path corruptedCheckpointPath = checkpointManager.getCheckpointPath(validCheckpoint.getId() + 1);
                 File corruptedCheckpoint = corruptedCheckpointPath.toFile();
                 corruptedCheckpoint.createNewFile();
                 // Make sure the corrupted checkpoint file was created
@@ -305,11 +305,11 @@
                 // Try to get the latest checkpoint again
                 Checkpoint cpAfterCorruption = checkpointManager.getLatest();
                 // Make sure the valid checkpoint was returned
-                Assert.assertEquals(validCheckpoint.getTimeStamp(), cpAfterCorruption.getTimeStamp());
+                Assert.assertEquals(validCheckpoint.getId(), cpAfterCorruption.getId());
                 // Make sure the corrupted checkpoint file was deleted
                 Assert.assertFalse(corruptedCheckpoint.exists());
                 // Corrupt the valid checkpoint by replacing its content
-                final Path validCheckpointPath = checkpointManager.getCheckpointPath(validCheckpoint.getTimeStamp());
+                final Path validCheckpointPath = checkpointManager.getCheckpointPath(validCheckpoint.getId());
                 File validCheckpointFile = validCheckpointPath.toFile();
                 Assert.assertTrue(validCheckpointFile.exists());
                 // Delete the valid checkpoint file and create it as an empty file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java
index 7f1a1c7..8fe0353 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java
@@ -18,28 +18,29 @@
  */
 package org.apache.asterix.common.transactions;
 
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IJsonSerializable;
 import org.apache.hyracks.api.io.IPersistedResourceRegistry;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
 public class Checkpoint implements Comparable<Checkpoint>, IJsonSerializable {
 
     private static final long serialVersionUID = 1L;
     private final long checkpointLsn;
     private final long minMCTFirstLsn;
     private final long maxTxnId;
-    private final long timeStamp;
     private final boolean sharp;
     private final int storageVersion;
+    private long id;
 
-    public Checkpoint(long checkpointLsn, long minMCTFirstLsn, long maxTxnId, long timeStamp, boolean sharp,
+    public Checkpoint(long id, long checkpointLsn, long minMCTFirstLsn, long maxTxnId, boolean sharp,
             int storageVersion) {
+        this.id = id;
         this.checkpointLsn = checkpointLsn;
         this.minMCTFirstLsn = minMCTFirstLsn;
         this.maxTxnId = maxTxnId;
-        this.timeStamp = timeStamp;
         this.sharp = sharp;
         this.storageVersion = storageVersion;
     }
@@ -56,8 +57,8 @@
         return maxTxnId;
     }
 
-    public long getTimeStamp() {
-        return timeStamp;
+    public long getId() {
+        return id;
     }
 
     public boolean isSharp() {
@@ -69,68 +70,47 @@
     }
 
     @Override
-    public int compareTo(Checkpoint checkpoint) {
-        long compareTimeStamp = checkpoint.getTimeStamp();
-
-        // Descending order
-        long diff = compareTimeStamp - this.timeStamp;
-        if (diff > 0) {
-            return 1;
-        } else if (diff == 0) {
-            return 0;
-        } else {
-            return -1;
-        }
+    public int compareTo(Checkpoint other) {
+        return Long.compare(this.id, other.id);
     }
 
     @Override
-    public boolean equals(Object obj) {
-        if (this == obj) {
+    public boolean equals(Object o) {
+        if (this == o) {
             return true;
         }
-        if (obj == null) {
+        if (o == null || getClass() != o.getClass()) {
             return false;
         }
-        if (!(obj instanceof Checkpoint)) {
-            return false;
-        }
-        Checkpoint other = (Checkpoint) obj;
-        return compareTo(other) == 0;
+        Checkpoint that = (Checkpoint) o;
+        return id == that.id;
     }
 
     @Override
     public int hashCode() {
-        final int prime = 31;
-        int result = 1;
-        result = prime * result + (int) (checkpointLsn ^ (checkpointLsn >>> 32));
-        result = prime * result + Long.hashCode(maxTxnId);
-        result = prime * result + (int) (minMCTFirstLsn ^ (minMCTFirstLsn >>> 32));
-        result = prime * result + (sharp ? 1231 : 1237);
-        result = prime * result + storageVersion;
-        result = prime * result + (int) (timeStamp ^ (timeStamp >>> 32));
-        return result;
+        return Long.hashCode(id);
     }
 
     @Override
     public JsonNode toJson(IPersistedResourceRegistry registry) throws HyracksDataException {
         final ObjectNode checkpointJson = registry.getClassIdentifier(getClass(), serialVersionUID);
+        checkpointJson.put("id", id);
         checkpointJson.put("checkpointLsn", checkpointLsn);
         checkpointJson.put("minMCTFirstLsn", minMCTFirstLsn);
         checkpointJson.put("maxTxnId", maxTxnId);
-        checkpointJson.put("timeStamp", timeStamp);
-        checkpointJson.put("sharp", timeStamp);
+        checkpointJson.put("sharp", sharp);
         checkpointJson.put("storageVersion", storageVersion);
         return checkpointJson;
     }
 
     @SuppressWarnings("squid:S1172") // unused parameter
     public static IJsonSerializable fromJson(IPersistedResourceRegistry registry, JsonNode json) {
+        long id = json.get("id").asLong();
         long checkpointLsn = json.get("checkpointLsn").asLong();
         long minMCTFirstLsn = json.get("minMCTFirstLsn").asLong();
         long maxTxnId = json.get("maxTxnId").asLong();
-        long timeStamp = json.get("timeStamp").asLong();
         boolean sharp = json.get("sharp").asBoolean();
         int storageVersion = json.get("storageVersion").asInt();
-        return new Checkpoint(checkpointLsn, minMCTFirstLsn, maxTxnId, timeStamp, sharp, storageVersion);
+        return new Checkpoint(id, checkpointLsn, minMCTFirstLsn, maxTxnId, sharp, storageVersion);
     }
 }
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java
index e3cf8b8..36cea55 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java
@@ -18,7 +18,6 @@
  */
 package org.apache.asterix.common.transactions;
 
-import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
 
@@ -26,10 +25,8 @@
 
     /**
      * @return The latest checkpoint on disk if any exists. Otherwise null.
-     * @throws ACIDException
-     *             when a checkpoint file cannot be read.
      */
-    Checkpoint getLatest() throws ACIDException;
+    Checkpoint getLatest();
 
     /**
      * Performs a sharp checkpoint.
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
index 19b006f..644f3c0 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
@@ -41,7 +41,7 @@
     /**
      * The storage version of AsterixDB related artifacts (e.g. log files, checkpoint files, etc..).
      */
-    private static final int LOCAL_STORAGE_VERSION = 4;
+    private static final int LOCAL_STORAGE_VERSION = 5;
 
     /**
      * The storage version of AsterixDB stack.
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java
index 0cbd6c6..e221da8 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java
@@ -60,6 +60,7 @@
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
     public static final long SHARP_CHECKPOINT_LSN = -1;
     private static final FilenameFilter filter = (File dir, String name) -> name.startsWith(CHECKPOINT_FILENAME_PREFIX);
+    private static final long FIRST_CHECKPOINT_ID = 0;
     private final File checkpointDir;
     private final int historyToKeep;
     private final int lsnThreshold;
@@ -88,25 +89,118 @@
         lsnThreshold = checkpointProperties.getLsnThreshold();
         pollFrequency = checkpointProperties.getPollFrequency();
         // We must keep at least the latest checkpoint
-        historyToKeep = checkpointProperties.getHistoryToKeep() == 0 ? 1 : checkpointProperties.getHistoryToKeep();
+        historyToKeep = checkpointProperties.getHistoryToKeep() + 1;
         persistedResourceRegistry = txnSubsystem.getApplicationContext().getPersistedResourceRegistry();
     }
 
     @Override
-    public Checkpoint getLatest() throws ACIDException {
-        // Read all checkpointObjects from the existing checkpoint files
+    public Checkpoint getLatest() {
         LOGGER.log(Level.INFO, "Getting latest checkpoint");
+        final List<File> checkpointFiles = getCheckpointFiles();
+        if (checkpointFiles.isEmpty()) {
+            return null;
+        }
+        final List<Checkpoint> orderedCheckpoints = getOrderedCheckpoints(checkpointFiles);
+        if (orderedCheckpoints.isEmpty()) {
+            /*
+             * If all checkpoint files are corrupted, we have no option but to try to perform recovery.
+             * We will forge a checkpoint that forces recovery to start from the beginning of the log.
+             * This shouldn't happen unless a hardware corruption happens.
+             */
+            return forgeForceRecoveryCheckpoint();
+        }
+        return orderedCheckpoints.get(orderedCheckpoints.size() - 1);
+    }
+
+    @Override
+    public void start() {
+        checkpointer = new CheckpointThread(this, txnSubsystem.getLogManager(), lsnThreshold, pollFrequency);
+        checkpointer.start();
+    }
+
+    @Override
+    public void stop(boolean dumpState, OutputStream ouputStream) throws IOException {
+        checkpointer.shutdown();
+        checkpointer.interrupt();
+        try {
+            // Wait until checkpoint thread stops
+            checkpointer.join();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    @Override
+    public void dumpState(OutputStream os) throws IOException {
+        // Nothing to dump
+    }
+
+    public Path getCheckpointPath(long checkpointId) {
+        return Paths.get(checkpointDir.getAbsolutePath() + File.separator + CHECKPOINT_FILENAME_PREFIX
+                + Long.toString(checkpointId));
+    }
+
+    protected void capture(long minMCTFirstLSN, boolean sharp) throws HyracksDataException {
+        ILogManager logMgr = txnSubsystem.getLogManager();
+        ITransactionManager txnMgr = txnSubsystem.getTransactionManager();
+        final long nextCheckpointId = getNextCheckpointId();
+        final Checkpoint checkpointObject = new Checkpoint(nextCheckpointId, logMgr.getAppendLSN(), minMCTFirstLSN,
+                txnMgr.getMaxTxnId(), sharp, StorageConstants.VERSION);
+        persist(checkpointObject);
+        cleanup();
+    }
+
+    private Checkpoint forgeForceRecoveryCheckpoint() {
+        /*
+         * By setting the checkpoint first LSN (low watermark) to Long.MIN_VALUE, the recovery manager will start from
+         * the first available log.
+         * We set the storage version to the current version. If there is a version mismatch, it will be detected
+         * during recovery.
+         */
+        return new Checkpoint(Long.MIN_VALUE, Long.MIN_VALUE, Integer.MIN_VALUE, FIRST_CHECKPOINT_ID, false,
+                StorageConstants.VERSION);
+    }
+
+    private void persist(Checkpoint checkpoint) throws HyracksDataException {
+        // Get checkpoint file path
+        Path path = getCheckpointPath(checkpoint.getId());
+
+        if (LOGGER.isInfoEnabled()) {
+            File file = path.toFile();
+            LOGGER.log(Level.INFO, "Persisting checkpoint file to " + file + " which "
+                    + (file.exists() ? "already exists" : "doesn't exist yet"));
+        }
+        // Write checkpoint file to disk
+        try {
+            byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(checkpoint.toJson(persistedResourceRegistry));
+            Files.write(path, bytes);
+        } catch (IOException e) {
+            LOGGER.log(Level.ERROR, "Failed to write checkpoint to disk", e);
+            throw HyracksDataException.create(e);
+        }
+        if (LOGGER.isInfoEnabled()) {
+            File file = path.toFile();
+            LOGGER.log(Level.INFO, "Completed persisting checkpoint file to " + file + " which now "
+                    + (file.exists() ? "exists" : " still doesn't exist"));
+        }
+    }
+
+    private List<File> getCheckpointFiles() {
         File[] checkpoints = checkpointDir.listFiles(filter);
         if (checkpoints == null || checkpoints.length == 0) {
             if (LOGGER.isInfoEnabled()) {
                 LOGGER.log(Level.INFO,
                         "Listing of files in the checkpoint dir returned " + (checkpoints == null ? "null" : "empty"));
             }
-            return null;
+            return Collections.emptyList();
         }
         if (LOGGER.isInfoEnabled()) {
             LOGGER.log(Level.INFO, "Listing of files in the checkpoint dir returned " + Arrays.toString(checkpoints));
         }
+        return Arrays.asList(checkpoints);
+    }
+
+    private List<Checkpoint> getOrderedCheckpoints(List<File> checkpoints) {
         List<Checkpoint> checkpointObjectList = new ArrayList<>();
         for (File file : checkpoints) {
             try {
@@ -134,106 +228,30 @@
                 }
             }
         }
-        /**
-         * If all checkpoint files are corrupted, we have no option but to try to perform recovery.
-         * We will forge a checkpoint that forces recovery to start from the beginning of the log.
-         * This shouldn't happen unless a hardware corruption happens.
-         */
-        if (checkpointObjectList.isEmpty()) {
-            LOGGER.error("All checkpoint files are corrupted. Forcing recovery from the beginning of the log");
-            checkpointObjectList.add(forgeForceRecoveryCheckpoint());
-        }
-
-        // Sort checkpointObjects in descending order by timeStamp to find out the most recent one.
         Collections.sort(checkpointObjectList);
-
-        // Return the most recent one (the first one in sorted list)
-        return checkpointObjectList.get(0);
-    }
-
-    @Override
-    public void start() {
-        checkpointer = new CheckpointThread(this, txnSubsystem.getLogManager(), lsnThreshold, pollFrequency);
-        checkpointer.start();
-    }
-
-    @Override
-    public void stop(boolean dumpState, OutputStream ouputStream) throws IOException {
-        checkpointer.shutdown();
-        checkpointer.interrupt();
-        try {
-            // Wait until checkpoint thread stops
-            checkpointer.join();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-        }
-    }
-
-    @Override
-    public void dumpState(OutputStream os) throws IOException {
-        // Nothing to dump
-    }
-
-    public Path getCheckpointPath(long checkpointTimestamp) {
-        return Paths.get(checkpointDir.getAbsolutePath() + File.separator + CHECKPOINT_FILENAME_PREFIX
-                + Long.toString(checkpointTimestamp));
-    }
-
-    protected void capture(long minMCTFirstLSN, boolean sharp) throws HyracksDataException {
-        ILogManager logMgr = txnSubsystem.getLogManager();
-        ITransactionManager txnMgr = txnSubsystem.getTransactionManager();
-        Checkpoint checkpointObject = new Checkpoint(logMgr.getAppendLSN(), minMCTFirstLSN, txnMgr.getMaxTxnId(),
-                System.currentTimeMillis(), sharp, StorageConstants.VERSION);
-        persist(checkpointObject);
-        cleanup();
-    }
-
-    protected Checkpoint forgeForceRecoveryCheckpoint() {
-        /**
-         * By setting the checkpoint first LSN (low watermark) to Long.MIN_VALUE, the recovery manager will start from
-         * the first available log.
-         * We set the storage version to the current version. If there is a version mismatch, it will be detected
-         * during recovery.
-         */
-        return new Checkpoint(Long.MIN_VALUE, Long.MIN_VALUE, Integer.MIN_VALUE, System.currentTimeMillis(), false,
-                StorageConstants.VERSION);
-    }
-
-    private void persist(Checkpoint checkpoint) throws HyracksDataException {
-        // Get checkpoint file path
-        Path path = getCheckpointPath(checkpoint.getTimeStamp());
-
-        if (LOGGER.isInfoEnabled()) {
-            File file = path.toFile();
-            LOGGER.log(Level.INFO, "Persisting checkpoint file to " + file + " which "
-                    + (file.exists() ? "already exists" : "doesn't exist yet"));
-        }
-        // Write checkpoint file to disk
-        try {
-            byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(checkpoint.toJson(persistedResourceRegistry));
-            Files.write(path, bytes);
-        } catch (IOException e) {
-            LOGGER.log(Level.ERROR, "Failed to write checkpoint to disk", e);
-            throw HyracksDataException.create(e);
-        }
-        if (LOGGER.isInfoEnabled()) {
-            File file = path.toFile();
-            LOGGER.log(Level.INFO, "Completed persisting checkpoint file to " + file + " which now "
-                    + (file.exists() ? "exists" : " still doesn't exist"));
-        }
+        return checkpointObjectList;
     }
 
     private void cleanup() {
-        File[] checkpointFiles = checkpointDir.listFiles(filter);
-        // Sort the filenames lexicographically to keep the latest checkpoint history files.
-        Arrays.sort(checkpointFiles);
-        for (int i = 0; i < checkpointFiles.length - historyToKeep; i++) {
-            if (LOGGER.isWarnEnabled()) {
-                LOGGER.warn("Deleting checkpoint file at: " + checkpointFiles[i].getAbsolutePath());
-            }
-            if (!checkpointFiles[i].delete() && LOGGER.isWarnEnabled()) {
-                LOGGER.warn("Could not delete checkpoint file at: " + checkpointFiles[i].getAbsolutePath());
+        final List<File> checkpointFiles = getCheckpointFiles();
+        final List<Checkpoint> orderedCheckpoints = getOrderedCheckpoints(checkpointFiles);
+        final int deleteCount = orderedCheckpoints.size() - historyToKeep;
+        for (int i = 0; i < deleteCount; i++) {
+            final Checkpoint checkpoint = orderedCheckpoints.get(i);
+            final Path checkpointPath = getCheckpointPath(checkpoint.getId());
+            LOGGER.warn("Deleting checkpoint file at: {}", checkpointPath);
+            if (!checkpointPath.toFile().delete()) {
+                LOGGER.warn("Could not delete checkpoint file at: {}", checkpointPath);
             }
         }
     }
+
+    private long getNextCheckpointId() {
+        final Checkpoint latest = getLatest();
+        if (latest == null) {
+            return FIRST_CHECKPOINT_ID;
+        }
+        return latest.getId() + 1;
+    }
+
 }
\ No newline at end of file
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
index 6efd0e5..ce523db 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
@@ -38,7 +38,7 @@
 public class CheckpointManager extends AbstractCheckpointManager {
 
     private static final Logger LOGGER = LogManager.getLogger();
-    private static final long NO_SECURED_LSN = -1l;
+    private static final long NO_SECURED_LSN = -1L;
     private final Map<TxnId, Long> securedLSNs;
 
     public CheckpointManager(ITransactionSubsystem txnSubsystem, CheckpointProperties checkpointProperties) {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITreeIndexFrame.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITreeIndexFrame.java
index 7ac49c6..dc59612 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITreeIndexFrame.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITreeIndexFrame.java
@@ -36,7 +36,7 @@
          * Storage version #. Change this if you alter any tree frame formats to stop
          * possible corruption from old versions reading new formats.
          */
-        public static final int VERSION = 6;
+        public static final int VERSION = 7;
         public static final int TUPLE_COUNT_OFFSET = 0;
         public static final int FREE_SPACE_OFFSET = TUPLE_COUNT_OFFSET + 4;
         public static final int LEVEL_OFFSET = FREE_SPACE_OFFSET + 4;