Merge commit '8c99391' from stabilization-f69489
Change-Id: I139c265ba998c32ec049c8a8bbd7a5a213895d2a
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
index 2a3e9b8..4a87629 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
@@ -306,8 +306,8 @@
Checkpoint cpAfterCorruption = checkpointManager.getLatest();
// Make sure the valid checkpoint was returned
Assert.assertEquals(validCheckpoint.getId(), cpAfterCorruption.getId());
- // Make sure the corrupted checkpoint file was deleted
- Assert.assertFalse(corruptedCheckpoint.exists());
+ // Make sure the corrupted checkpoint file was not deleted
+ Assert.assertTrue(corruptedCheckpoint.exists());
// Corrupt the valid checkpoint by replacing its content
final Path validCheckpointPath = checkpointManager.getCheckpointPath(validCheckpoint.getId());
File validCheckpointFile = validCheckpointPath.toFile();
@@ -321,6 +321,13 @@
// Make sure the forged checkpoint recovery will start from the first available log
final long readableSmallestLSN = txnSubsystem.getLogManager().getReadableSmallestLSN();
Assert.assertTrue(forgedCheckpoint.getMinMCTFirstLsn() <= readableSmallestLSN);
+ // another call should still give us the forged checkpoint and the corrupted one should still be there
+ forgedCheckpoint = checkpointManager.getLatest();
+ Assert.assertTrue(forgedCheckpoint.getMinMCTFirstLsn() < minFirstLSN);
+ Assert.assertTrue(corruptedCheckpoint.exists());
+ // do a succesful checkpoint and ensure now the corrupted file was deleted
+ checkpointManager.doSharpCheckpoint();
+ Assert.assertFalse(corruptedCheckpoint.exists());
} finally {
nc.deInit();
}
diff --git a/asterixdb/asterix-app/src/test/resources/cc-small-txn-log-partition.conf b/asterixdb/asterix-app/src/test/resources/cc-small-txn-log-partition.conf
index 8995e19..cd9af8b 100644
--- a/asterixdb/asterix-app/src/test/resources/cc-small-txn-log-partition.conf
+++ b/asterixdb/asterix-app/src/test/resources/cc-small-txn-log-partition.conf
@@ -55,4 +55,5 @@
txn.log.partitionsize=2MB
txn.log.buffer.pagesize=128KB
txn.log.checkpoint.pollfrequency=2147483647
+txn.log.checkpoint.history=0
storage.max.active.writable.datasets=50
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
index 2bdb6c1..28b0c0e 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
@@ -47,7 +47,7 @@
"txn\.lock\.timeout\.waitthreshold" : 60000,
"txn\.log\.buffer\.numpages" : 8,
"txn\.log\.buffer\.pagesize" : 4194304,
- "txn\.log\.checkpoint\.history" : 0,
+ "txn\.log\.checkpoint\.history" : 2,
"txn\.log\.checkpoint\.lsnthreshold" : 67108864,
"txn\.log\.checkpoint\.pollfrequency" : 120,
"txn\.log\.partitionsize" : 268435456
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
index 599d6b4..0b9194f6 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
@@ -47,7 +47,7 @@
"txn\.lock\.timeout\.waitthreshold" : 60000,
"txn\.log\.buffer\.numpages" : 8,
"txn\.log\.buffer\.pagesize" : 4194304,
- "txn\.log\.checkpoint\.history" : 0,
+ "txn\.log\.checkpoint\.history" : 2,
"txn\.log\.checkpoint\.lsnthreshold" : 67108864,
"txn\.log\.checkpoint\.pollfrequency" : 120,
"txn\.log\.partitionsize" : 268435456
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
index 56d9dd9..9f1cbb0 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
@@ -47,7 +47,7 @@
"txn\.lock\.timeout\.waitthreshold" : 60000,
"txn\.log\.buffer\.numpages" : 8,
"txn\.log\.buffer\.pagesize" : 4194304,
- "txn\.log\.checkpoint\.history" : 0,
+ "txn\.log\.checkpoint\.history" : 2,
"txn\.log\.checkpoint\.lsnthreshold" : 67108864,
"txn\.log\.checkpoint\.pollfrequency" : 120,
"txn\.log\.partitionsize" : 268435456
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/TransactionProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/TransactionProperties.java
index fb6ca6b..0477147 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/TransactionProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/TransactionProperties.java
@@ -59,7 +59,7 @@
120,
"The frequency (in seconds) the checkpoint thread should check to see if a checkpoint should be "
+ "written"),
- TXN_LOG_CHECKPOINT_HISTORY(UNSIGNED_INTEGER, 0, "The number of checkpoints to keep in the transaction log"),
+ TXN_LOG_CHECKPOINT_HISTORY(UNSIGNED_INTEGER, 2, "The number of checkpoints to keep in the transaction log"),
TXN_LOCK_ESCALATIONTHRESHOLD(
UNSIGNED_INTEGER,
1000,
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java
index a9199a4..81002be 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java
@@ -26,6 +26,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -100,7 +101,7 @@
if (checkpointFiles.isEmpty()) {
return null;
}
- final List<Checkpoint> orderedCheckpoints = getOrderedCheckpoints(checkpointFiles);
+ final List<Checkpoint> orderedCheckpoints = getOrderedValidCheckpoints(checkpointFiles, false);
if (orderedCheckpoints.isEmpty()) {
/*
* If all checkpoint files are corrupted, we have no option but to try to perform recovery.
@@ -136,8 +137,7 @@
}
public Path getCheckpointPath(long checkpointId) {
- return Paths.get(checkpointDir.getAbsolutePath() + File.separator + CHECKPOINT_FILENAME_PREFIX
- + Long.toString(checkpointId));
+ return Paths.get(checkpointDir.getAbsolutePath() + File.separator + CHECKPOINT_FILENAME_PREFIX + checkpointId);
}
protected void capture(long minMCTFirstLSN, boolean sharp) throws HyracksDataException {
@@ -173,7 +173,8 @@
// Write checkpoint file to disk
try {
byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(checkpoint.toJson(persistedResourceRegistry));
- Files.write(path, bytes);
+ Files.write(path, bytes, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);
+ readCheckpoint(path);
} catch (IOException e) {
LOGGER.log(Level.ERROR, "Failed to write checkpoint to disk", e);
throw HyracksDataException.create(e);
@@ -200,16 +201,14 @@
return Arrays.asList(checkpoints);
}
- private List<Checkpoint> getOrderedCheckpoints(List<File> checkpoints) {
+ private List<Checkpoint> getOrderedValidCheckpoints(List<File> checkpoints, boolean deleteCorrupted) {
List<Checkpoint> checkpointObjectList = new ArrayList<>();
for (File file : checkpoints) {
try {
if (LOGGER.isWarnEnabled()) {
LOGGER.log(Level.WARN, "Reading checkpoint file: " + file.getAbsolutePath());
}
- final JsonNode jsonNode =
- OBJECT_MAPPER.readValue(Files.readAllBytes(Paths.get(file.getAbsolutePath())), JsonNode.class);
- Checkpoint cp = (Checkpoint) persistedResourceRegistry.deserialize(jsonNode);
+ Checkpoint cp = readCheckpoint(Paths.get(file.getAbsolutePath()));
checkpointObjectList.add(cp);
} catch (ClosedByInterruptException e) {
Thread.currentThread().interrupt();
@@ -222,9 +221,8 @@
if (LOGGER.isWarnEnabled()) {
LOGGER.log(Level.WARN, "Failed to read checkpoint file: " + file.getAbsolutePath(), e);
}
- file.delete();
- if (LOGGER.isWarnEnabled()) {
- LOGGER.log(Level.WARN, "Deleted corrupted checkpoint file: " + file.getAbsolutePath());
+ if (deleteCorrupted && file.delete()) {
+ LOGGER.warn("Deleted corrupted checkpoint file: {}", file::getAbsolutePath);
}
}
}
@@ -234,7 +232,7 @@
private void cleanup() {
final List<File> checkpointFiles = getCheckpointFiles();
- final List<Checkpoint> orderedCheckpoints = getOrderedCheckpoints(checkpointFiles);
+ final List<Checkpoint> orderedCheckpoints = getOrderedValidCheckpoints(checkpointFiles, true);
final int deleteCount = orderedCheckpoints.size() - historyToKeep;
for (int i = 0; i < deleteCount; i++) {
final Checkpoint checkpoint = orderedCheckpoints.get(i);
@@ -247,11 +245,20 @@
}
private long getNextCheckpointId() {
- final Checkpoint latest = getLatest();
- if (latest == null) {
+ final List<File> checkpointFiles = getCheckpointFiles();
+ if (checkpointFiles.isEmpty()) {
return FIRST_CHECKPOINT_ID;
}
- return latest.getId() + 1;
+ long maxOnDiskId = -1;
+ for (File checkpointFile : checkpointFiles) {
+ long fileId = Long.parseLong(checkpointFile.getName().substring(CHECKPOINT_FILENAME_PREFIX.length()));
+ maxOnDiskId = Math.max(maxOnDiskId, fileId);
+ }
+ return maxOnDiskId + 1;
}
+ private Checkpoint readCheckpoint(Path checkpointPath) throws IOException {
+ final JsonNode jsonNode = OBJECT_MAPPER.readValue(Files.readAllBytes(checkpointPath), JsonNode.class);
+ return (Checkpoint) persistedResourceRegistry.deserialize(jsonNode);
+ }
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointThread.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointThread.java
index 446eec5..cbc5e35 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointThread.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointThread.java
@@ -85,13 +85,12 @@
if (currentCheckpointAttemptMinLSN >= targetCheckpointLSN) {
lastCheckpointLSN = currentCheckpointAttemptMinLSN;
}
-
}
} catch (InterruptedException e) {
LOGGER.info("Checkpoint thread interrupted", e);
Thread.currentThread().interrupt();
} catch (Exception e) {
- LOGGER.error("Error during checkpoint", e);
+ LOGGER.error("checkpoint attempt failed", e);
}
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
index 0a26097..3f88630 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
@@ -128,7 +128,6 @@
}
void reportRemoteError(int ecode) {
- ri.flush();
ri.getFullBufferAcceptor().error(ecode);
remoteClose.set(true);
}