Add Checkpoint Test

This change adds a unit test case which validates that
checkpoints do not delete log files that are still required
for recovery, and delete those that are no longer needed.

Change-Id: I4cb4743fe488deb5ad10f65604adc2231948795e
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1270
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ian Maxon <imaxon@apache.org>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index a806532..88ca736 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -32,11 +32,8 @@
 import org.apache.asterix.common.context.TransactionSubsystemProvider;
 import org.apache.asterix.common.dataflow.AsterixLSMInsertDeleteOperatorNodePushable;
 import org.apache.asterix.common.dataflow.AsterixLSMTreeInsertDeleteOperatorDescriptor;
-import org.apache.asterix.common.exceptions.ACIDException;
-import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
 import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
-import org.apache.asterix.runtime.util.AsterixRuntimeComponentsProvider;
 import org.apache.asterix.common.transactions.ITransactionManager;
 import org.apache.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
 import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
@@ -46,6 +43,7 @@
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
+import org.apache.asterix.runtime.util.AsterixRuntimeComponentsProvider;
 import org.apache.asterix.test.runtime.ExecutionTestUtil;
 import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallbackFactory;
 import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerProvider;
@@ -67,7 +65,6 @@
 import org.apache.hyracks.api.dataflow.value.ITypeTraits;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.util.HyracksConstants;
@@ -115,8 +112,12 @@
     private JobId jobId;
     private long jobCounter = 0L;
     private IHyracksJobletContext jobletCtx;
+    private final String testConfigFileName;
+    private final boolean runHDFS;
 
-    public TestNodeController() throws AsterixException, HyracksException, ACIDException {
+    public TestNodeController(String testConfigFileName, boolean runHDFS) {
+        this.testConfigFileName = testConfigFileName;
+        this.runHDFS = runHDFS;
     }
 
     public void init() throws Exception {
@@ -125,7 +126,9 @@
             outdir.mkdirs();
             // remove library directory
             TestLibrarian.removeLibraryDir();
-            ExecutionTestUtil.setUp(cleanupOnStart);
+            ExecutionTestUtil.setUp(cleanupOnStart,
+                    testConfigFileName == null ? TEST_CONFIG_FILE_NAME : testConfigFileName,
+                    ExecutionTestUtil.integrationUtil, runHDFS);
         } catch (Throwable th) {
             th.printStackTrace();
             throw th;
@@ -299,7 +302,7 @@
         PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType,
                 mergePolicyFactory, mergePolicyProperties, filterFields);
         TreeIndexCreateOperatorDescriptor indexOpDesc = getIndexCreateOpDesc(primaryIndexInfo);
-        return getPrimaryIndexDataflowHelper(createTestContext(), primaryIndexInfo, indexOpDesc);
+        return getPrimaryIndexDataflowHelper(createTestContext(true), primaryIndexInfo, indexOpDesc);
     }
 
     public void createPrimaryIndex(Dataset dataset, IAType[] primaryKeyTypes, ARecordType recordType,
@@ -308,7 +311,7 @@
         PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType,
                 mergePolicyFactory, mergePolicyProperties, filterFields);
         TreeIndexCreateOperatorDescriptor indexOpDesc = getIndexCreateOpDesc(primaryIndexInfo);
-        LSMBTreeDataflowHelper dataflowHelper = getPrimaryIndexDataflowHelper(createTestContext(), primaryIndexInfo,
+        LSMBTreeDataflowHelper dataflowHelper = getPrimaryIndexDataflowHelper(createTestContext(true), primaryIndexInfo,
                 indexOpDesc);
         dataflowHelper.create();
     }
@@ -359,9 +362,11 @@
         return primaryIndexTypeTraits;
     }
 
-    public IHyracksTaskContext createTestContext() throws HyracksDataException {
+    public IHyracksTaskContext createTestContext(boolean withMessaging) throws HyracksDataException {
         IHyracksTaskContext ctx = TestUtils.create(KB32);
-        TaskUtils.putInSharedMap(HyracksConstants.KEY_MESSAGE, new VSizeFrame(ctx), ctx);
+        if (withMessaging) {
+            TaskUtils.putInSharedMap(HyracksConstants.KEY_MESSAGE, new VSizeFrame(ctx), ctx);
+        }
         ctx = Mockito.spy(ctx);
         Mockito.when(ctx.getJobletContext()).thenReturn(jobletCtx);
         Mockito.when(ctx.getIOManager())
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestHelper.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestHelper.java
index 5661258..c1399fb 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestHelper.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestHelper.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.test.common;
 
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -28,11 +29,22 @@
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipFile;
 
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Marshaller;
+import javax.xml.bind.Unmarshaller;
+
+import org.apache.asterix.common.configuration.AsterixConfiguration;
+import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.commons.compress.utils.IOUtils;
+import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
 
 public final class TestHelper {
 
+    private static final String TEST_DIR_BASE_PATH = System.getProperty("user.dir") + File.separator + "target";
+    private static final String[] TEST_DIRS = new String[] { "txnLogDir", "IODevice", "spill_area", "config" };
+
     public static boolean isInPrefixList(List<String> prefixList, String s) {
         for (String s2 : prefixList) {
             if (s.startsWith(s2)) {
@@ -52,11 +64,11 @@
                 Enumeration<? extends ZipEntry> entries = zipFile.entries();
                 while (entries.hasMoreElements()) {
                     ZipEntry entry = entries.nextElement();
-                    File entryDestination = new File(outputDir,  entry.getName());
+                    File entryDestination = new File(outputDir, entry.getName());
                     if (!entry.isDirectory()) {
                         entryDestination.getParentFile().mkdirs();
                         try (InputStream in = zipFile.getInputStream(entry);
-                             OutputStream out = new FileOutputStream(entryDestination)) {
+                                OutputStream out = new FileOutputStream(entryDestination)) {
                             IOUtils.copy(in, out);
                         }
                     }
@@ -72,4 +84,43 @@
             }
         }
     }
-}
+
+    public static AsterixConfiguration getConfigurations(String fileName)
+            throws IOException, JAXBException, AsterixException {
+        try (InputStream is = TestHelper.class.getClassLoader().getResourceAsStream(fileName)) {
+            if (is != null) {
+                JAXBContext ctx = JAXBContext.newInstance(AsterixConfiguration.class);
+                Unmarshaller unmarshaller = ctx.createUnmarshaller();
+                return (AsterixConfiguration) unmarshaller.unmarshal(is);
+            } else {
+                throw new AsterixException("Could not find configuration file " + fileName);
+            }
+        }
+    }
+
+    public static void writeConfigurations(AsterixConfiguration ac, String fileName)
+            throws FileNotFoundException, IOException, JAXBException {
+        File configFile = new File(fileName);
+        if (!configFile.exists()) {
+            configFile.getParentFile().mkdirs();
+            configFile.createNewFile();
+        } else {
+            configFile.delete();
+        }
+        try (FileOutputStream os = new FileOutputStream(fileName)) {
+            JAXBContext ctx = JAXBContext.newInstance(AsterixConfiguration.class);
+            Marshaller marshaller = ctx.createMarshaller();
+            marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true);
+            marshaller.marshal(ac, os);
+        }
+    }
+
+    public static void deleteExistingInstanceFiles() {
+        for (String dirName : TEST_DIRS) {
+            File f = new File(joinPath(TEST_DIR_BASE_PATH, dirName));
+            if (FileUtils.deleteQuietly(f)) {
+                System.out.println("Dir " + f.getName() + " deleted");
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
index a0ef31e..f41666d 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
@@ -18,7 +18,6 @@
  */
 package org.apache.asterix.test.dataflow;
 
-import java.io.File;
 import java.util.Collection;
 import java.util.Collections;
 
@@ -27,7 +26,6 @@
 import org.apache.asterix.app.data.gen.TupleGenerator;
 import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.common.dataflow.AsterixLSMInsertDeleteOperatorNodePushable;
 import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ILogRecord;
@@ -39,8 +37,8 @@
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.test.common.TestHelper;
 import org.apache.asterix.transaction.management.service.logging.LogReader;
-import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -63,12 +61,11 @@
 
 public class LogMarkerTest {
 
-    private static final String TEST_CONFIG_FILE_NAME = "asterix-build-configuration.xml";
     private static final IAType[] KEY_TYPES = { BuiltinType.AINT32 };
     private static final ARecordType RECORD_TYPE = new ARecordType("TestRecordType", new String[] { "key", "value" },
             new IAType[] { BuiltinType.AINT32, BuiltinType.AINT64 }, false);
-    private static final GenerationFunction[] RECORD_GEN_FUNCTION =
-            { GenerationFunction.DETERMINISTIC, GenerationFunction.DETERMINISTIC };
+    private static final GenerationFunction[] RECORD_GEN_FUNCTION = { GenerationFunction.DETERMINISTIC,
+            GenerationFunction.DETERMINISTIC };
     private static final boolean[] UNIQUE_RECORD_FIELDS = { true, false };
     private static final ARecordType META_TYPE = null;
     private static final GenerationFunction[] META_GEN_FUNCTION = null;
@@ -78,7 +75,6 @@
     private static final int NUM_OF_RECORDS = 100000;
     private static final int SNAPSHOT_SIZE = 1000;
     private static final int DATASET_ID = 101;
-    private static final String SPILL_AREA = "target" + File.separator + "spill_area";
     private static final String DATAVERSE_NAME = "TestDV";
     private static final String DATASET_NAME = "TestDS";
     private static final String DATA_TYPE_NAME = "DUMMY";
@@ -86,37 +82,20 @@
 
     @Before
     public void setUp() throws Exception {
-        System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, TEST_CONFIG_FILE_NAME);
         System.out.println("SetUp: ");
-        File f = new File(System.getProperty("user.dir") + File.separator + "target" + File.separator + "txnLogDir");
-        FileUtils.deleteQuietly(f);
-        System.out.println("Dir " + f.getName() + " deleted");
-        f = new File(System.getProperty("user.dir") + File.separator + "target" + File.separator + "IODevice");
-        FileUtils.deleteQuietly(f);
-        System.out.println("Dir " + f.getName() + " deleted");
-        f = new File(System.getProperty("user.dir") + File.separator + SPILL_AREA);
-        FileUtils.deleteQuietly(f);
-        System.out.println("Dir " + f.getName() + " deleted");
+        TestHelper.deleteExistingInstanceFiles();
     }
 
     @After
     public void tearDown() throws Exception {
         System.out.println("TearDown");
-        File f = new File(System.getProperty("user.dir") + File.separator + "target" + File.separator + "txnLogDir");
-        FileUtils.deleteQuietly(f);
-        System.out.println("Dir " + f.getName() + " deleted");
-        f = new File(System.getProperty("user.dir") + File.separator + "target" + File.separator + "IODevice");
-        FileUtils.deleteQuietly(f);
-        System.out.println("Dir " + f.getName() + " deleted");
-        f = new File(System.getProperty("user.dir") + File.separator + SPILL_AREA);
-        FileUtils.deleteQuietly(f);
-        System.out.println("Dir " + f.getName() + " deleted");
+        TestHelper.deleteExistingInstanceFiles();
     }
 
     @Test
     public void testInsertWithSnapshot() {
         try {
-            TestNodeController nc = new TestNodeController();
+            TestNodeController nc = new TestNodeController(null, false);
             nc.init();
             Dataset dataset = new Dataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME,
                     NODE_GROUP_NAME, null, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH,
@@ -125,7 +104,7 @@
             try {
                 nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(), null,
                         null);
-                IHyracksTaskContext ctx = nc.createTestContext();
+                IHyracksTaskContext ctx = nc.createTestContext(true);
                 nc.newJobId();
                 ITransactionContext txnCtx = nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(), true);
                 AsterixLSMInsertDeleteOperatorNodePushable insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES,
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
new file mode 100644
index 0000000..e3932ca
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.logging;
+
+import java.io.File;
+import java.util.Collections;
+
+import org.apache.asterix.app.bootstrap.TestNodeController;
+import org.apache.asterix.app.data.gen.TupleGenerator;
+import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
+import org.apache.asterix.common.config.AsterixTransactionProperties;
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.configuration.AsterixConfiguration;
+import org.apache.asterix.common.configuration.Property;
+import org.apache.asterix.common.dataflow.AsterixLSMInsertDeleteOperatorNodePushable;
+import org.apache.asterix.common.transactions.DatasetId;
+import org.apache.asterix.common.transactions.ITransactionContext;
+import org.apache.asterix.external.util.DataflowUtils;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails.PartitioningStrategy;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.test.common.TestHelper;
+import org.apache.asterix.transaction.management.service.logging.LogManager;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
+import org.apache.hyracks.util.StorageUtil;
+import org.apache.hyracks.util.StorageUtil.StorageUnit;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class CheckpointingTest {
+
+    private static final String DEFAULT_TEST_CONFIG_FILE_NAME = "asterix-build-configuration.xml";
+    private static final String TEST_CONFIG_FILE_NAME = "asterix-test-configuration.xml";
+    private static final String TEST_CONFIG_PATH = System.getProperty("user.dir") + File.separator + "target"
+            + File.separator + "config";
+    private static final String TEST_CONFIG_FILE_PATH = TEST_CONFIG_PATH + File.separator + TEST_CONFIG_FILE_NAME;
+    private static final IAType[] KEY_TYPES = { BuiltinType.AINT32 };
+    private static final ARecordType RECORD_TYPE = new ARecordType("TestRecordType", new String[] { "key", "value" },
+            new IAType[] { BuiltinType.AINT32, BuiltinType.AINT64 }, false);
+    private static final GenerationFunction[] RECORD_GEN_FUNCTION = { GenerationFunction.DETERMINISTIC,
+            GenerationFunction.DETERMINISTIC };
+    private static final boolean[] UNIQUE_RECORD_FIELDS = { true, false };
+    private static final ARecordType META_TYPE = null;
+    private static final GenerationFunction[] META_GEN_FUNCTION = null;
+    private static final boolean[] UNIQUE_META_FIELDS = null;
+    private static final int[] KEY_INDEXES = { 0 };
+    private static final int[] KEY_INDICATORS = { 0 };
+    private static final int DATASET_ID = 101;
+    private static final String DATAVERSE_NAME = "TestDV";
+    private static final String DATASET_NAME = "TestDS";
+    private static final String DATA_TYPE_NAME = "DUMMY";
+    private static final String NODE_GROUP_NAME = "DEFAULT";
+    private static final int TXN_LOG_PARTITION_SIZE = StorageUtil.getSizeInBytes(2, StorageUnit.MEGABYTE);
+
+    @Before
+    public void setUp() throws Exception {
+        System.out.println("SetUp: ");
+        TestHelper.deleteExistingInstanceFiles();
+        // Read default test configurations
+        AsterixConfiguration ac = TestHelper.getConfigurations(DEFAULT_TEST_CONFIG_FILE_NAME);
+        // Set log file size to 2MB
+        ac.getProperty().add(new Property(AsterixTransactionProperties.TXN_LOG_PARTITIONSIZE_KEY,
+                String.valueOf(TXN_LOG_PARTITION_SIZE), ""));
+        // Disable checkpointing by making checkpoint thread wait max wait time
+        ac.getProperty().add(new Property(AsterixTransactionProperties.TXN_LOG_CHECKPOINT_POLLFREQUENCY_KEY,
+                String.valueOf(Integer.MAX_VALUE), ""));
+        // Write test config file
+        TestHelper.writeConfigurations(ac, TEST_CONFIG_FILE_PATH);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        System.out.println("TearDown");
+        TestHelper.deleteExistingInstanceFiles();
+    }
+
+    @Test
+    public void testDeleteOldLogFiles() {
+        try {
+            TestNodeController nc = new TestNodeController(new File(TEST_CONFIG_FILE_PATH).getAbsolutePath(), false);
+            nc.init();
+            Dataset dataset = new Dataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME,
+                    NODE_GROUP_NAME, null, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH,
+                            Collections.emptyList(), null, null, null, false, null, false),
+                    null, DatasetType.INTERNAL, DATASET_ID, 0);
+            try {
+                nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(), null,
+                        null);
+                IHyracksTaskContext ctx = nc.createTestContext(false);
+                nc.newJobId();
+                ITransactionContext txnCtx = nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(), true);
+                // Prepare insert operation
+                AsterixLSMInsertDeleteOperatorNodePushable insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES,
+                        RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(), null, null);
+                insertOp.open();
+                TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
+                        RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+                VSizeFrame frame = new VSizeFrame(ctx);
+                FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
+
+                LogManager logManager = (LogManager) nc.getTransactionSubsystem().getLogManager();
+                // Number of log files after node startup should be one
+                int numberOfLogFiles = logManager.getLogFileIds().size();
+                Assert.assertEquals(1, numberOfLogFiles);
+
+                // Low-water mark LSN
+                long lowWaterMarkLSN = nc.getTransactionSubsystem().getRecoveryManager().getMinFirstLSN();
+                // Low-water mark log file id
+                long initialLowWaterMarkFileId = logManager.getLogFileId(lowWaterMarkLSN);
+                // Initial Low-water mark should be in the only available log file
+                Assert.assertEquals(initialLowWaterMarkFileId, logManager.getLogFileIds().get(0).longValue());
+
+                // Insert records until a new log file is created
+                while (logManager.getLogFileIds().size() == 1) {
+                    ITupleReference tuple = tupleGenerator.next();
+                    DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
+                }
+
+                // Check if the new low-water mark is still in the initial low-water mark log file
+                lowWaterMarkLSN = nc.getTransactionSubsystem().getRecoveryManager().getMinFirstLSN();
+                long currentLowWaterMarkLogFileId = logManager.getLogFileId(lowWaterMarkLSN);
+
+                if (currentLowWaterMarkLogFileId == initialLowWaterMarkFileId) {
+                    /*
+                     * Make sure checkpoint will not delete the initial log file since
+                     * the low-water mark is still in it (i.e. it is still required for
+                     * recovery)
+                     */
+                    int numberOfLogFilesBeforeCheckpoint = logManager.getLogFileIds().size();
+                    nc.getTransactionSubsystem().getRecoveryManager().checkpoint(false, logManager.getAppendLSN());
+                    int numberOfLogFilesAfterCheckpoint = logManager.getLogFileIds().size();
+                    Assert.assertEquals(numberOfLogFilesBeforeCheckpoint, numberOfLogFilesAfterCheckpoint);
+
+                    /*
+                     * Insert records until the low-water mark is not in the initialLowWaterMarkFileId
+                     * either because of the asynchronous flush caused by the previous checkpoint or a flush
+                     * due to the dataset memory budget getting full.
+                     */
+                    while (currentLowWaterMarkLogFileId == initialLowWaterMarkFileId) {
+                        ITupleReference tuple = tupleGenerator.next();
+                        DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
+                        lowWaterMarkLSN = nc.getTransactionSubsystem().getRecoveryManager().getMinFirstLSN();
+                        currentLowWaterMarkLogFileId = logManager.getLogFileId(lowWaterMarkLSN);
+                    }
+                }
+
+                /*
+                 * At this point, the low-water mark is not in the initialLowWaterMarkFileId, so
+                 * a checkpoint should delete it.
+                 */
+                nc.getTransactionSubsystem().getRecoveryManager().checkpoint(false, logManager.getAppendLSN());
+
+                // Validate initialLowWaterMarkFileId was deleted
+                for (Long fileId : logManager.getLogFileIds()) {
+                    Assert.assertNotEquals(initialLowWaterMarkFileId, fileId.longValue());
+                }
+
+                if (tupleAppender.getTupleCount() > 0) {
+                    tupleAppender.write(insertOp, true);
+                }
+                insertOp.close();
+                nc.getTransactionManager().completedTransaction(txnCtx, new DatasetId(-1), -1, true);
+            } finally {
+                nc.deInit();
+            }
+        } catch (Throwable e) {
+            e.printStackTrace();
+            Assert.fail(e.getMessage());
+        }
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixTransactionProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixTransactionProperties.java
index afc9e4f..0480887 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixTransactionProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixTransactionProperties.java
@@ -18,13 +18,13 @@
  */
 package org.apache.asterix.common.config;
 
+import static org.apache.hyracks.util.StorageUtil.StorageUnit.KILOBYTE;
+import static org.apache.hyracks.util.StorageUtil.StorageUnit.MEGABYTE;
+
 import java.util.Map;
 
 import org.apache.hyracks.util.StorageUtil;
 
-import static org.apache.hyracks.util.StorageUtil.StorageUnit.KILOBYTE;
-import static org.apache.hyracks.util.StorageUtil.StorageUnit.MEGABYTE;
-
 public class AsterixTransactionProperties extends AbstractAsterixProperties {
 
     private static final String TXN_LOG_BUFFER_NUMPAGES_KEY = "txn.log.buffer.numpages";
@@ -33,13 +33,13 @@
     private static final String TXN_LOG_BUFFER_PAGESIZE_KEY = "txn.log.buffer.pagesize";
     private static final int TXN_LOG_BUFFER_PAGESIZE_DEFAULT = StorageUtil.getSizeInBytes(128, KILOBYTE);
 
-    private static final String TXN_LOG_PARTITIONSIZE_KEY = "txn.log.partitionsize";
+    public static final String TXN_LOG_PARTITIONSIZE_KEY = "txn.log.partitionsize";
     private static final long TXN_LOG_PARTITIONSIZE_DEFAULT = StorageUtil.getSizeInBytes(256L, MEGABYTE);
 
     private static final String TXN_LOG_CHECKPOINT_LSNTHRESHOLD_KEY = "txn.log.checkpoint.lsnthreshold";
     private static final int TXN_LOG_CHECKPOINT_LSNTHRESHOLD_DEFAULT = StorageUtil.getSizeInBytes(64, MEGABYTE);
 
-    private static final String TXN_LOG_CHECKPOINT_POLLFREQUENCY_KEY = "txn.log.checkpoint.pollfrequency";
+    public static final String TXN_LOG_CHECKPOINT_POLLFREQUENCY_KEY = "txn.log.checkpoint.pollfrequency";
     private static final int TXN_LOG_CHECKPOINT_POLLFREQUENCY_DEFAULT = 120; // 120s
 
     private static final String TXN_LOG_CHECKPOINT_HISTORY_KEY = "txn.log.checkpoint.history";
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
index 7d61462..947ebc7 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
@@ -423,7 +423,7 @@
         }
     }
 
-    private List<Long> getLogFileIds() {
+    public List<Long> getLogFileIds() {
         File fileLogDir = new File(logDir);
         String[] logFileNames = null;
         List<Long> logFileIds = null;