Merge commit '8c99391' from stabilization-f69489

Change-Id: I139c265ba998c32ec049c8a8bbd7a5a213895d2a
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
index 2a3e9b8..4a87629 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
@@ -306,8 +306,8 @@
                 Checkpoint cpAfterCorruption = checkpointManager.getLatest();
                 // Make sure the valid checkpoint was returned
                 Assert.assertEquals(validCheckpoint.getId(), cpAfterCorruption.getId());
-                // Make sure the corrupted checkpoint file was deleted
-                Assert.assertFalse(corruptedCheckpoint.exists());
+                // Make sure the corrupted checkpoint file was not deleted
+                Assert.assertTrue(corruptedCheckpoint.exists());
                 // Corrupt the valid checkpoint by replacing its content
                 final Path validCheckpointPath = checkpointManager.getCheckpointPath(validCheckpoint.getId());
                 File validCheckpointFile = validCheckpointPath.toFile();
@@ -321,6 +321,13 @@
                 // Make sure the forged checkpoint recovery will start from the first available log
                 final long readableSmallestLSN = txnSubsystem.getLogManager().getReadableSmallestLSN();
                 Assert.assertTrue(forgedCheckpoint.getMinMCTFirstLsn() <= readableSmallestLSN);
+                // another call should still give us the forged checkpoint and the corrupted one should still be there
+                forgedCheckpoint = checkpointManager.getLatest();
+                Assert.assertTrue(forgedCheckpoint.getMinMCTFirstLsn() < minFirstLSN);
+                Assert.assertTrue(corruptedCheckpoint.exists());
+                // do a succesful checkpoint and ensure now the corrupted file was deleted
+                checkpointManager.doSharpCheckpoint();
+                Assert.assertFalse(corruptedCheckpoint.exists());
             } finally {
                 nc.deInit();
             }
diff --git a/asterixdb/asterix-app/src/test/resources/cc-small-txn-log-partition.conf b/asterixdb/asterix-app/src/test/resources/cc-small-txn-log-partition.conf
index 8995e19..cd9af8b 100644
--- a/asterixdb/asterix-app/src/test/resources/cc-small-txn-log-partition.conf
+++ b/asterixdb/asterix-app/src/test/resources/cc-small-txn-log-partition.conf
@@ -55,4 +55,5 @@
 txn.log.partitionsize=2MB
 txn.log.buffer.pagesize=128KB
 txn.log.checkpoint.pollfrequency=2147483647
+txn.log.checkpoint.history=0
 storage.max.active.writable.datasets=50
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
index 2bdb6c1..28b0c0e 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
@@ -47,7 +47,7 @@
     "txn\.lock\.timeout\.waitthreshold" : 60000,
     "txn\.log\.buffer\.numpages" : 8,
     "txn\.log\.buffer\.pagesize" : 4194304,
-    "txn\.log\.checkpoint\.history" : 0,
+    "txn\.log\.checkpoint\.history" : 2,
     "txn\.log\.checkpoint\.lsnthreshold" : 67108864,
     "txn\.log\.checkpoint\.pollfrequency" : 120,
     "txn\.log\.partitionsize" : 268435456
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
index 599d6b4..0b9194f6 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
@@ -47,7 +47,7 @@
     "txn\.lock\.timeout\.waitthreshold" : 60000,
     "txn\.log\.buffer\.numpages" : 8,
     "txn\.log\.buffer\.pagesize" : 4194304,
-    "txn\.log\.checkpoint\.history" : 0,
+    "txn\.log\.checkpoint\.history" : 2,
     "txn\.log\.checkpoint\.lsnthreshold" : 67108864,
     "txn\.log\.checkpoint\.pollfrequency" : 120,
     "txn\.log\.partitionsize" : 268435456
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
index 56d9dd9..9f1cbb0 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
@@ -47,7 +47,7 @@
     "txn\.lock\.timeout\.waitthreshold" : 60000,
     "txn\.log\.buffer\.numpages" : 8,
     "txn\.log\.buffer\.pagesize" : 4194304,
-    "txn\.log\.checkpoint\.history" : 0,
+    "txn\.log\.checkpoint\.history" : 2,
     "txn\.log\.checkpoint\.lsnthreshold" : 67108864,
     "txn\.log\.checkpoint\.pollfrequency" : 120,
     "txn\.log\.partitionsize" : 268435456
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/TransactionProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/TransactionProperties.java
index fb6ca6b..0477147 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/TransactionProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/TransactionProperties.java
@@ -59,7 +59,7 @@
                 120,
                 "The frequency (in seconds) the checkpoint thread should check to see if a checkpoint should be "
                         + "written"),
-        TXN_LOG_CHECKPOINT_HISTORY(UNSIGNED_INTEGER, 0, "The number of checkpoints to keep in the transaction log"),
+        TXN_LOG_CHECKPOINT_HISTORY(UNSIGNED_INTEGER, 2, "The number of checkpoints to keep in the transaction log"),
         TXN_LOCK_ESCALATIONTHRESHOLD(
                 UNSIGNED_INTEGER,
                 1000,
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java
index a9199a4..81002be 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java
@@ -26,6 +26,7 @@
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -100,7 +101,7 @@
         if (checkpointFiles.isEmpty()) {
             return null;
         }
-        final List<Checkpoint> orderedCheckpoints = getOrderedCheckpoints(checkpointFiles);
+        final List<Checkpoint> orderedCheckpoints = getOrderedValidCheckpoints(checkpointFiles, false);
         if (orderedCheckpoints.isEmpty()) {
             /*
              * If all checkpoint files are corrupted, we have no option but to try to perform recovery.
@@ -136,8 +137,7 @@
     }
 
     public Path getCheckpointPath(long checkpointId) {
-        return Paths.get(checkpointDir.getAbsolutePath() + File.separator + CHECKPOINT_FILENAME_PREFIX
-                + Long.toString(checkpointId));
+        return Paths.get(checkpointDir.getAbsolutePath() + File.separator + CHECKPOINT_FILENAME_PREFIX + checkpointId);
     }
 
     protected void capture(long minMCTFirstLSN, boolean sharp) throws HyracksDataException {
@@ -173,7 +173,8 @@
         // Write checkpoint file to disk
         try {
             byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(checkpoint.toJson(persistedResourceRegistry));
-            Files.write(path, bytes);
+            Files.write(path, bytes, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);
+            readCheckpoint(path);
         } catch (IOException e) {
             LOGGER.log(Level.ERROR, "Failed to write checkpoint to disk", e);
             throw HyracksDataException.create(e);
@@ -200,16 +201,14 @@
         return Arrays.asList(checkpoints);
     }
 
-    private List<Checkpoint> getOrderedCheckpoints(List<File> checkpoints) {
+    private List<Checkpoint> getOrderedValidCheckpoints(List<File> checkpoints, boolean deleteCorrupted) {
         List<Checkpoint> checkpointObjectList = new ArrayList<>();
         for (File file : checkpoints) {
             try {
                 if (LOGGER.isWarnEnabled()) {
                     LOGGER.log(Level.WARN, "Reading checkpoint file: " + file.getAbsolutePath());
                 }
-                final JsonNode jsonNode =
-                        OBJECT_MAPPER.readValue(Files.readAllBytes(Paths.get(file.getAbsolutePath())), JsonNode.class);
-                Checkpoint cp = (Checkpoint) persistedResourceRegistry.deserialize(jsonNode);
+                Checkpoint cp = readCheckpoint(Paths.get(file.getAbsolutePath()));
                 checkpointObjectList.add(cp);
             } catch (ClosedByInterruptException e) {
                 Thread.currentThread().interrupt();
@@ -222,9 +221,8 @@
                 if (LOGGER.isWarnEnabled()) {
                     LOGGER.log(Level.WARN, "Failed to read checkpoint file: " + file.getAbsolutePath(), e);
                 }
-                file.delete();
-                if (LOGGER.isWarnEnabled()) {
-                    LOGGER.log(Level.WARN, "Deleted corrupted checkpoint file: " + file.getAbsolutePath());
+                if (deleteCorrupted && file.delete()) {
+                    LOGGER.warn("Deleted corrupted checkpoint file: {}", file::getAbsolutePath);
                 }
             }
         }
@@ -234,7 +232,7 @@
 
     private void cleanup() {
         final List<File> checkpointFiles = getCheckpointFiles();
-        final List<Checkpoint> orderedCheckpoints = getOrderedCheckpoints(checkpointFiles);
+        final List<Checkpoint> orderedCheckpoints = getOrderedValidCheckpoints(checkpointFiles, true);
         final int deleteCount = orderedCheckpoints.size() - historyToKeep;
         for (int i = 0; i < deleteCount; i++) {
             final Checkpoint checkpoint = orderedCheckpoints.get(i);
@@ -247,11 +245,20 @@
     }
 
     private long getNextCheckpointId() {
-        final Checkpoint latest = getLatest();
-        if (latest == null) {
+        final List<File> checkpointFiles = getCheckpointFiles();
+        if (checkpointFiles.isEmpty()) {
             return FIRST_CHECKPOINT_ID;
         }
-        return latest.getId() + 1;
+        long maxOnDiskId = -1;
+        for (File checkpointFile : checkpointFiles) {
+            long fileId = Long.parseLong(checkpointFile.getName().substring(CHECKPOINT_FILENAME_PREFIX.length()));
+            maxOnDiskId = Math.max(maxOnDiskId, fileId);
+        }
+        return maxOnDiskId + 1;
     }
 
+    private Checkpoint readCheckpoint(Path checkpointPath) throws IOException {
+        final JsonNode jsonNode = OBJECT_MAPPER.readValue(Files.readAllBytes(checkpointPath), JsonNode.class);
+        return (Checkpoint) persistedResourceRegistry.deserialize(jsonNode);
+    }
 }
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointThread.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointThread.java
index 446eec5..cbc5e35 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointThread.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointThread.java
@@ -85,13 +85,12 @@
                     if (currentCheckpointAttemptMinLSN >= targetCheckpointLSN) {
                         lastCheckpointLSN = currentCheckpointAttemptMinLSN;
                     }
-
                 }
             } catch (InterruptedException e) {
                 LOGGER.info("Checkpoint thread interrupted", e);
                 Thread.currentThread().interrupt();
             } catch (Exception e) {
-                LOGGER.error("Error during checkpoint", e);
+                LOGGER.error("checkpoint attempt failed", e);
             }
         }
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
index 0a26097..3f88630 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
+++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
@@ -128,7 +128,6 @@
     }
 
     void reportRemoteError(int ecode) {
-        ri.flush();
         ri.getFullBufferAcceptor().error(ecode);
         remoteClose.set(true);
     }