[ASTERIXDB-2444][STO] Avoid Using System Clock in Checkpoints
- user model changes: no
- storage format changes: yes
- interface changes: yes
Details:
- Replace the usage of system clock timestamps in checkpoints
by a sequencer.
- Update Asterix/Hyracks storage version to reflect the recent
changes in storage.
- This change is expected to break storage backward compatibility.
Change-Id: Idc061e6eaccfb308b29a5a263b77a0a849694d4f
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2961
Reviewed-by: Michael Blow <mblow@apache.org>
Integration-Tests: Michael Blow <mblow@apache.org>
Tested-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
index 6009f51..e67246a 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
@@ -297,7 +297,7 @@
// Make sure the valid checkout wouldn't force full recovery
Assert.assertTrue(validCheckpoint.getMinMCTFirstLsn() >= minFirstLSN);
// Add a corrupted (empty) checkpoint file with a timestamp > than current checkpoint
- Path corruptedCheckpointPath = checkpointManager.getCheckpointPath(validCheckpoint.getTimeStamp() + 1);
+ Path corruptedCheckpointPath = checkpointManager.getCheckpointPath(validCheckpoint.getId() + 1);
File corruptedCheckpoint = corruptedCheckpointPath.toFile();
corruptedCheckpoint.createNewFile();
// Make sure the corrupted checkpoint file was created
@@ -305,11 +305,11 @@
// Try to get the latest checkpoint again
Checkpoint cpAfterCorruption = checkpointManager.getLatest();
// Make sure the valid checkpoint was returned
- Assert.assertEquals(validCheckpoint.getTimeStamp(), cpAfterCorruption.getTimeStamp());
+ Assert.assertEquals(validCheckpoint.getId(), cpAfterCorruption.getId());
// Make sure the corrupted checkpoint file was deleted
Assert.assertFalse(corruptedCheckpoint.exists());
// Corrupt the valid checkpoint by replacing its content
- final Path validCheckpointPath = checkpointManager.getCheckpointPath(validCheckpoint.getTimeStamp());
+ final Path validCheckpointPath = checkpointManager.getCheckpointPath(validCheckpoint.getId());
File validCheckpointFile = validCheckpointPath.toFile();
Assert.assertTrue(validCheckpointFile.exists());
// Delete the valid checkpoint file and create it as an empty file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java
index 7f1a1c7..8fe0353 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java
@@ -18,28 +18,29 @@
*/
package org.apache.asterix.common.transactions;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.IJsonSerializable;
import org.apache.hyracks.api.io.IPersistedResourceRegistry;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
public class Checkpoint implements Comparable<Checkpoint>, IJsonSerializable {
private static final long serialVersionUID = 1L;
private final long checkpointLsn;
private final long minMCTFirstLsn;
private final long maxTxnId;
- private final long timeStamp;
private final boolean sharp;
private final int storageVersion;
+ private long id;
- public Checkpoint(long checkpointLsn, long minMCTFirstLsn, long maxTxnId, long timeStamp, boolean sharp,
+ public Checkpoint(long id, long checkpointLsn, long minMCTFirstLsn, long maxTxnId, boolean sharp,
int storageVersion) {
+ this.id = id;
this.checkpointLsn = checkpointLsn;
this.minMCTFirstLsn = minMCTFirstLsn;
this.maxTxnId = maxTxnId;
- this.timeStamp = timeStamp;
this.sharp = sharp;
this.storageVersion = storageVersion;
}
@@ -56,8 +57,8 @@
return maxTxnId;
}
- public long getTimeStamp() {
- return timeStamp;
+ public long getId() {
+ return id;
}
public boolean isSharp() {
@@ -69,68 +70,47 @@
}
@Override
- public int compareTo(Checkpoint checkpoint) {
- long compareTimeStamp = checkpoint.getTimeStamp();
-
- // Descending order
- long diff = compareTimeStamp - this.timeStamp;
- if (diff > 0) {
- return 1;
- } else if (diff == 0) {
- return 0;
- } else {
- return -1;
- }
+ public int compareTo(Checkpoint other) {
+ return Long.compare(this.id, other.id);
}
@Override
- public boolean equals(Object obj) {
- if (this == obj) {
+ public boolean equals(Object o) {
+ if (this == o) {
return true;
}
- if (obj == null) {
+ if (o == null || getClass() != o.getClass()) {
return false;
}
- if (!(obj instanceof Checkpoint)) {
- return false;
- }
- Checkpoint other = (Checkpoint) obj;
- return compareTo(other) == 0;
+ Checkpoint that = (Checkpoint) o;
+ return id == that.id;
}
@Override
public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + (int) (checkpointLsn ^ (checkpointLsn >>> 32));
- result = prime * result + Long.hashCode(maxTxnId);
- result = prime * result + (int) (minMCTFirstLsn ^ (minMCTFirstLsn >>> 32));
- result = prime * result + (sharp ? 1231 : 1237);
- result = prime * result + storageVersion;
- result = prime * result + (int) (timeStamp ^ (timeStamp >>> 32));
- return result;
+ return Long.hashCode(id);
}
@Override
public JsonNode toJson(IPersistedResourceRegistry registry) throws HyracksDataException {
final ObjectNode checkpointJson = registry.getClassIdentifier(getClass(), serialVersionUID);
+ checkpointJson.put("id", id);
checkpointJson.put("checkpointLsn", checkpointLsn);
checkpointJson.put("minMCTFirstLsn", minMCTFirstLsn);
checkpointJson.put("maxTxnId", maxTxnId);
- checkpointJson.put("timeStamp", timeStamp);
- checkpointJson.put("sharp", timeStamp);
+ checkpointJson.put("sharp", sharp);
checkpointJson.put("storageVersion", storageVersion);
return checkpointJson;
}
@SuppressWarnings("squid:S1172") // unused parameter
public static IJsonSerializable fromJson(IPersistedResourceRegistry registry, JsonNode json) {
+ long id = json.get("id").asLong();
long checkpointLsn = json.get("checkpointLsn").asLong();
long minMCTFirstLsn = json.get("minMCTFirstLsn").asLong();
long maxTxnId = json.get("maxTxnId").asLong();
- long timeStamp = json.get("timeStamp").asLong();
boolean sharp = json.get("sharp").asBoolean();
int storageVersion = json.get("storageVersion").asInt();
- return new Checkpoint(checkpointLsn, minMCTFirstLsn, maxTxnId, timeStamp, sharp, storageVersion);
+ return new Checkpoint(id, checkpointLsn, minMCTFirstLsn, maxTxnId, sharp, storageVersion);
}
}
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java
index e3cf8b8..36cea55 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java
@@ -18,7 +18,6 @@
*/
package org.apache.asterix.common.transactions;
-import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
@@ -26,10 +25,8 @@
/**
* @return The latest checkpoint on disk if any exists. Otherwise null.
- * @throws ACIDException
- * when a checkpoint file cannot be read.
*/
- Checkpoint getLatest() throws ACIDException;
+ Checkpoint getLatest();
/**
* Performs a sharp checkpoint.
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
index 19b006f..644f3c0 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
@@ -41,7 +41,7 @@
/**
* The storage version of AsterixDB related artifacts (e.g. log files, checkpoint files, etc..).
*/
- private static final int LOCAL_STORAGE_VERSION = 4;
+ private static final int LOCAL_STORAGE_VERSION = 5;
/**
* The storage version of AsterixDB stack.
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java
index 0cbd6c6..e221da8 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java
@@ -60,6 +60,7 @@
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static final long SHARP_CHECKPOINT_LSN = -1;
private static final FilenameFilter filter = (File dir, String name) -> name.startsWith(CHECKPOINT_FILENAME_PREFIX);
+ private static final long FIRST_CHECKPOINT_ID = 0;
private final File checkpointDir;
private final int historyToKeep;
private final int lsnThreshold;
@@ -88,25 +89,118 @@
lsnThreshold = checkpointProperties.getLsnThreshold();
pollFrequency = checkpointProperties.getPollFrequency();
// We must keep at least the latest checkpoint
- historyToKeep = checkpointProperties.getHistoryToKeep() == 0 ? 1 : checkpointProperties.getHistoryToKeep();
+ historyToKeep = checkpointProperties.getHistoryToKeep() + 1;
persistedResourceRegistry = txnSubsystem.getApplicationContext().getPersistedResourceRegistry();
}
@Override
- public Checkpoint getLatest() throws ACIDException {
- // Read all checkpointObjects from the existing checkpoint files
+ public Checkpoint getLatest() {
LOGGER.log(Level.INFO, "Getting latest checkpoint");
+ final List<File> checkpointFiles = getCheckpointFiles();
+ if (checkpointFiles.isEmpty()) {
+ return null;
+ }
+ final List<Checkpoint> orderedCheckpoints = getOrderedCheckpoints(checkpointFiles);
+ if (orderedCheckpoints.isEmpty()) {
+ /*
+ * If all checkpoint files are corrupted, we have no option but to try to perform recovery.
+ * We will forge a checkpoint that forces recovery to start from the beginning of the log.
+ * This shouldn't happen unless a hardware corruption happens.
+ */
+ return forgeForceRecoveryCheckpoint();
+ }
+ return orderedCheckpoints.get(orderedCheckpoints.size() - 1);
+ }
+
+ @Override
+ public void start() {
+ checkpointer = new CheckpointThread(this, txnSubsystem.getLogManager(), lsnThreshold, pollFrequency);
+ checkpointer.start();
+ }
+
+ @Override
+ public void stop(boolean dumpState, OutputStream ouputStream) throws IOException {
+ checkpointer.shutdown();
+ checkpointer.interrupt();
+ try {
+ // Wait until checkpoint thread stops
+ checkpointer.join();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ @Override
+ public void dumpState(OutputStream os) throws IOException {
+ // Nothing to dump
+ }
+
+ public Path getCheckpointPath(long checkpointId) {
+ return Paths.get(checkpointDir.getAbsolutePath() + File.separator + CHECKPOINT_FILENAME_PREFIX
+ + Long.toString(checkpointId));
+ }
+
+ protected void capture(long minMCTFirstLSN, boolean sharp) throws HyracksDataException {
+ ILogManager logMgr = txnSubsystem.getLogManager();
+ ITransactionManager txnMgr = txnSubsystem.getTransactionManager();
+ final long nextCheckpointId = getNextCheckpointId();
+ final Checkpoint checkpointObject = new Checkpoint(nextCheckpointId, logMgr.getAppendLSN(), minMCTFirstLSN,
+ txnMgr.getMaxTxnId(), sharp, StorageConstants.VERSION);
+ persist(checkpointObject);
+ cleanup();
+ }
+
+ private Checkpoint forgeForceRecoveryCheckpoint() {
+ /*
+ * By setting the checkpoint first LSN (low watermark) to Long.MIN_VALUE, the recovery manager will start from
+ * the first available log.
+ * We set the storage version to the current version. If there is a version mismatch, it will be detected
+ * during recovery.
+ */
+ return new Checkpoint(Long.MIN_VALUE, Long.MIN_VALUE, Integer.MIN_VALUE, FIRST_CHECKPOINT_ID, false,
+ StorageConstants.VERSION);
+ }
+
+ private void persist(Checkpoint checkpoint) throws HyracksDataException {
+ // Get checkpoint file path
+ Path path = getCheckpointPath(checkpoint.getId());
+
+ if (LOGGER.isInfoEnabled()) {
+ File file = path.toFile();
+ LOGGER.log(Level.INFO, "Persisting checkpoint file to " + file + " which "
+ + (file.exists() ? "already exists" : "doesn't exist yet"));
+ }
+ // Write checkpoint file to disk
+ try {
+ byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(checkpoint.toJson(persistedResourceRegistry));
+ Files.write(path, bytes);
+ } catch (IOException e) {
+ LOGGER.log(Level.ERROR, "Failed to write checkpoint to disk", e);
+ throw HyracksDataException.create(e);
+ }
+ if (LOGGER.isInfoEnabled()) {
+ File file = path.toFile();
+ LOGGER.log(Level.INFO, "Completed persisting checkpoint file to " + file + " which now "
+ + (file.exists() ? "exists" : " still doesn't exist"));
+ }
+ }
+
+ private List<File> getCheckpointFiles() {
File[] checkpoints = checkpointDir.listFiles(filter);
if (checkpoints == null || checkpoints.length == 0) {
if (LOGGER.isInfoEnabled()) {
LOGGER.log(Level.INFO,
"Listing of files in the checkpoint dir returned " + (checkpoints == null ? "null" : "empty"));
}
- return null;
+ return Collections.emptyList();
}
if (LOGGER.isInfoEnabled()) {
LOGGER.log(Level.INFO, "Listing of files in the checkpoint dir returned " + Arrays.toString(checkpoints));
}
+ return Arrays.asList(checkpoints);
+ }
+
+ private List<Checkpoint> getOrderedCheckpoints(List<File> checkpoints) {
List<Checkpoint> checkpointObjectList = new ArrayList<>();
for (File file : checkpoints) {
try {
@@ -134,106 +228,30 @@
}
}
}
- /**
- * If all checkpoint files are corrupted, we have no option but to try to perform recovery.
- * We will forge a checkpoint that forces recovery to start from the beginning of the log.
- * This shouldn't happen unless a hardware corruption happens.
- */
- if (checkpointObjectList.isEmpty()) {
- LOGGER.error("All checkpoint files are corrupted. Forcing recovery from the beginning of the log");
- checkpointObjectList.add(forgeForceRecoveryCheckpoint());
- }
-
- // Sort checkpointObjects in descending order by timeStamp to find out the most recent one.
Collections.sort(checkpointObjectList);
-
- // Return the most recent one (the first one in sorted list)
- return checkpointObjectList.get(0);
- }
-
- @Override
- public void start() {
- checkpointer = new CheckpointThread(this, txnSubsystem.getLogManager(), lsnThreshold, pollFrequency);
- checkpointer.start();
- }
-
- @Override
- public void stop(boolean dumpState, OutputStream ouputStream) throws IOException {
- checkpointer.shutdown();
- checkpointer.interrupt();
- try {
- // Wait until checkpoint thread stops
- checkpointer.join();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
-
- @Override
- public void dumpState(OutputStream os) throws IOException {
- // Nothing to dump
- }
-
- public Path getCheckpointPath(long checkpointTimestamp) {
- return Paths.get(checkpointDir.getAbsolutePath() + File.separator + CHECKPOINT_FILENAME_PREFIX
- + Long.toString(checkpointTimestamp));
- }
-
- protected void capture(long minMCTFirstLSN, boolean sharp) throws HyracksDataException {
- ILogManager logMgr = txnSubsystem.getLogManager();
- ITransactionManager txnMgr = txnSubsystem.getTransactionManager();
- Checkpoint checkpointObject = new Checkpoint(logMgr.getAppendLSN(), minMCTFirstLSN, txnMgr.getMaxTxnId(),
- System.currentTimeMillis(), sharp, StorageConstants.VERSION);
- persist(checkpointObject);
- cleanup();
- }
-
- protected Checkpoint forgeForceRecoveryCheckpoint() {
- /**
- * By setting the checkpoint first LSN (low watermark) to Long.MIN_VALUE, the recovery manager will start from
- * the first available log.
- * We set the storage version to the current version. If there is a version mismatch, it will be detected
- * during recovery.
- */
- return new Checkpoint(Long.MIN_VALUE, Long.MIN_VALUE, Integer.MIN_VALUE, System.currentTimeMillis(), false,
- StorageConstants.VERSION);
- }
-
- private void persist(Checkpoint checkpoint) throws HyracksDataException {
- // Get checkpoint file path
- Path path = getCheckpointPath(checkpoint.getTimeStamp());
-
- if (LOGGER.isInfoEnabled()) {
- File file = path.toFile();
- LOGGER.log(Level.INFO, "Persisting checkpoint file to " + file + " which "
- + (file.exists() ? "already exists" : "doesn't exist yet"));
- }
- // Write checkpoint file to disk
- try {
- byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(checkpoint.toJson(persistedResourceRegistry));
- Files.write(path, bytes);
- } catch (IOException e) {
- LOGGER.log(Level.ERROR, "Failed to write checkpoint to disk", e);
- throw HyracksDataException.create(e);
- }
- if (LOGGER.isInfoEnabled()) {
- File file = path.toFile();
- LOGGER.log(Level.INFO, "Completed persisting checkpoint file to " + file + " which now "
- + (file.exists() ? "exists" : " still doesn't exist"));
- }
+ return checkpointObjectList;
}
private void cleanup() {
- File[] checkpointFiles = checkpointDir.listFiles(filter);
- // Sort the filenames lexicographically to keep the latest checkpoint history files.
- Arrays.sort(checkpointFiles);
- for (int i = 0; i < checkpointFiles.length - historyToKeep; i++) {
- if (LOGGER.isWarnEnabled()) {
- LOGGER.warn("Deleting checkpoint file at: " + checkpointFiles[i].getAbsolutePath());
- }
- if (!checkpointFiles[i].delete() && LOGGER.isWarnEnabled()) {
- LOGGER.warn("Could not delete checkpoint file at: " + checkpointFiles[i].getAbsolutePath());
+ final List<File> checkpointFiles = getCheckpointFiles();
+ final List<Checkpoint> orderedCheckpoints = getOrderedCheckpoints(checkpointFiles);
+ final int deleteCount = orderedCheckpoints.size() - historyToKeep;
+ for (int i = 0; i < deleteCount; i++) {
+ final Checkpoint checkpoint = orderedCheckpoints.get(i);
+ final Path checkpointPath = getCheckpointPath(checkpoint.getId());
+ LOGGER.warn("Deleting checkpoint file at: {}", checkpointPath);
+ if (!checkpointPath.toFile().delete()) {
+ LOGGER.warn("Could not delete checkpoint file at: {}", checkpointPath);
}
}
}
+
+ private long getNextCheckpointId() {
+ final Checkpoint latest = getLatest();
+ if (latest == null) {
+ return FIRST_CHECKPOINT_ID;
+ }
+ return latest.getId() + 1;
+ }
+
}
\ No newline at end of file
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
index 6efd0e5..ce523db 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
@@ -38,7 +38,7 @@
public class CheckpointManager extends AbstractCheckpointManager {
private static final Logger LOGGER = LogManager.getLogger();
- private static final long NO_SECURED_LSN = -1l;
+ private static final long NO_SECURED_LSN = -1L;
private final Map<TxnId, Long> securedLSNs;
public CheckpointManager(ITransactionSubsystem txnSubsystem, CheckpointProperties checkpointProperties) {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITreeIndexFrame.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITreeIndexFrame.java
index 7ac49c6..dc59612 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITreeIndexFrame.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITreeIndexFrame.java
@@ -36,7 +36,7 @@
* Storage version #. Change this if you alter any tree frame formats to stop
* possible corruption from old versions reading new formats.
*/
- public static final int VERSION = 6;
+ public static final int VERSION = 7;
public static final int TUPLE_COUNT_OFFSET = 0;
public static final int FREE_SPACE_OFFSET = TUPLE_COUNT_OFFSET + 4;
public static final int LEVEL_OFFSET = FREE_SPACE_OFFSET + 4;