Introduce CheckpointManager API

This change includes the following:
- s/CheckpointObject/Checkpoint
- Add AsterixDB storage version to checkpoints.
- Prevent any txn log access when a storage version mismatch is detected.
- Introduce CheckpointManager API and CheckpointProperties.
- Properly stop checkpointing thread on instance shutdown.
- Separate checkpointing logic when replication enabled/disabled.

Change-Id: I36c00ca195b93bbe1e53f39bb4a3b5a344657f0d
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1380
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <hubailmor@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index a3ae9a0..b1ca062 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -278,7 +278,7 @@
         lccm.register((ILifeCycleComponent) datasetLifecycleManager);
         lccm.register((ILifeCycleComponent) txnSubsystem.getTransactionManager());
         lccm.register((ILifeCycleComponent) txnSubsystem.getLockManager());
-
+        lccm.register(txnSubsystem.getCheckpointManager());
     }
 
     @Override
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index 1887f2e..8998c6b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -37,6 +37,7 @@
 import org.apache.asterix.common.config.IPropertiesProvider;
 import org.apache.asterix.common.config.MessagingProperties;
 import org.apache.asterix.common.replication.IRemoteRecoveryManager;
+import org.apache.asterix.common.transactions.ICheckpointManager;
 import org.apache.asterix.common.transactions.IRecoveryManager;
 import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
 import org.apache.asterix.common.utils.PrintUtil;
@@ -47,7 +48,6 @@
 import org.apache.asterix.messaging.NCMessageBroker;
 import org.apache.asterix.runtime.message.ReportMaxResourceIdMessage;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
-import org.apache.asterix.transaction.management.service.recovery.RecoveryManager;
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.application.INCApplicationContext;
 import org.apache.hyracks.api.application.INCApplicationEntryPoint;
@@ -249,8 +249,8 @@
         lccm.startAll();
 
         if (!pendingFailbackCompletion) {
-            IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager();
-            recoveryMgr.checkpoint(true, RecoveryManager.NON_SHARP_CHECKPOINT_TARGET_LSN);
+            ICheckpointManager checkpointMgr = runtimeContext.getTransactionSubsystem().getCheckpointManager();
+            checkpointMgr.doSharpCheckpoint();
 
             if (isMetadataNode) {
                 runtimeContext.exportMetadataNodeStub();
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
index deb53ad..9fbf850 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
@@ -30,6 +30,7 @@
 import org.apache.asterix.common.configuration.Property;
 import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
 import org.apache.asterix.common.transactions.DatasetId;
+import org.apache.asterix.common.transactions.ICheckpointManager;
 import org.apache.asterix.common.transactions.IRecoveryManager;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.external.util.DataflowUtils;
@@ -125,6 +126,7 @@
                 FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
 
                 IRecoveryManager recoveryManager = nc.getTransactionSubsystem().getRecoveryManager();
+                ICheckpointManager checkpointManager = nc.getTransactionSubsystem().getCheckpointManager();
                 LogManager logManager = (LogManager) nc.getTransactionSubsystem().getLogManager();
                 // Number of log files after node startup should be one
                 int numberOfLogFiles = logManager.getLogFileIds().size();
@@ -154,7 +156,7 @@
                      * recovery)
                      */
                     int numberOfLogFilesBeforeCheckpoint = logManager.getLogFileIds().size();
-                    recoveryManager.checkpoint(false, logManager.getAppendLSN());
+                    checkpointManager.tryCheckpoint(logManager.getAppendLSN());
                     int numberOfLogFilesAfterCheckpoint = logManager.getLogFileIds().size();
                     Assert.assertEquals(numberOfLogFilesBeforeCheckpoint, numberOfLogFilesAfterCheckpoint);
 
@@ -175,7 +177,7 @@
                  * At this point, the low-water mark is not in the initialLowWaterMarkFileId, so
                  * a checkpoint should delete it.
                  */
-                recoveryManager.checkpoint(false, recoveryManager.getMinFirstLSN());
+                checkpointManager.tryCheckpoint(recoveryManager.getMinFirstLSN());
 
                 // Validate initialLowWaterMarkFileId was deleted
                 for (Long fileId : logManager.getLogFileIds()) {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java
new file mode 100644
index 0000000..8bbdab7
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.transactions;
+
+import java.io.Serializable;
+
+public class Checkpoint implements Serializable, Comparable<Checkpoint> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final long checkpointLsn;
+    private final long minMCTFirstLsn;
+    private final int maxJobId;
+    private final long timeStamp;
+    private final boolean sharp;
+    private final int storageVersion;
+
+    public Checkpoint(long checkpointLsn, long minMCTFirstLsn, int maxJobId, long timeStamp, boolean sharp,
+            int storageVersion) {
+        this.checkpointLsn = checkpointLsn;
+        this.minMCTFirstLsn = minMCTFirstLsn;
+        this.maxJobId = maxJobId;
+        this.timeStamp = timeStamp;
+        this.sharp = sharp;
+        this.storageVersion = storageVersion;
+    }
+
+    public long getCheckpointLsn() {
+        return checkpointLsn;
+    }
+
+    public long getMinMCTFirstLsn() {
+        return minMCTFirstLsn;
+    }
+
+    public int getMaxJobId() {
+        return maxJobId;
+    }
+
+    public long getTimeStamp() {
+        return timeStamp;
+    }
+
+    public boolean isSharp() {
+        return sharp;
+    }
+
+    public int getStorageVersion() {
+        return storageVersion;
+    }
+
+    @Override
+    public int compareTo(Checkpoint checkpoint) {
+        long compareTimeStamp = checkpoint.getTimeStamp();
+
+        // Descending order
+        long diff = compareTimeStamp - this.timeStamp;
+        if (diff > 0) {
+            return 1;
+        } else if (diff == 0) {
+            return 0;
+        } else {
+            return -1;
+        }
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (!(obj instanceof Checkpoint)) {
+            return false;
+        }
+        Checkpoint other = (Checkpoint) obj;
+        return compareTo(other) == 0;
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + (int) (checkpointLsn ^ (checkpointLsn >>> 32));
+        result = prime * result + maxJobId;
+        result = prime * result + (int) (minMCTFirstLsn ^ (minMCTFirstLsn >>> 32));
+        result = prime * result + (sharp ? 1231 : 1237);
+        result = prime * result + storageVersion;
+        result = prime * result + (int) (timeStamp ^ (timeStamp >>> 32));
+        return result;
+    }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/CheckpointProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/CheckpointProperties.java
new file mode 100644
index 0000000..b8af3a6
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/CheckpointProperties.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.transactions;
+
+import org.apache.asterix.common.config.TransactionProperties;
+
+public class CheckpointProperties {
+
+    private final String checkpointDirPath;
+    private final int lsnThreshold;
+    private final int pollFrequency;
+    private final int historyToKeep;
+
+    public CheckpointProperties(TransactionProperties txnProperties, String nodeId) {
+        // Currently we use the log files directory for checkpoints
+        checkpointDirPath = txnProperties.getLogDirectory(nodeId);
+        lsnThreshold = txnProperties.getCheckpointLSNThreshold();
+        pollFrequency = txnProperties.getCheckpointPollFrequency();
+        historyToKeep = txnProperties.getCheckpointHistory();
+    }
+
+    public int getLsnThreshold() {
+        return lsnThreshold;
+    }
+
+    public int getPollFrequency() {
+        return pollFrequency;
+    }
+
+    public int getHistoryToKeep() {
+        return historyToKeep;
+    }
+
+    public String getCheckpointDirPath() {
+        return checkpointDirPath;
+    }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java
new file mode 100644
index 0000000..9e7eb0d
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.transactions;
+
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
+
+public interface ICheckpointManager extends ILifeCycleComponent {
+
+    /**
+     * @return The latest checkpoint on disk if any exists. Otherwise null.
+     * @throws ACIDException
+     *             when a checkpoint file cannot be read.
+     */
+    Checkpoint getLatest() throws ACIDException;
+
+    /**
+     * Performs a sharp checkpoint.
+     *
+     * @throws HyracksDataException
+     */
+    void doSharpCheckpoint() throws HyracksDataException;
+
+    /**
+     * Attempts to perform a soft checkpoint at the specified {@code checkpointTargetLSN}.
+     *
+     * @param checkpointTargetLSN
+     * @return The LSN recorded on the captured checkpoint.
+     * @throws HyracksDataException
+     */
+    long tryCheckpoint(long checkpointTargetLSN) throws HyracksDataException;
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java
index 97d4897..aa018ba 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java
@@ -105,4 +105,9 @@
      * @throws IOException
      */
     public void closeLogFile(TxnLogFile logFileRef, FileChannel fileChannel) throws IOException;
+
+    /**
+     * Deletes all current log files and start the next log file partition
+     */
+    void renewLogFiles();
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
index a3115e7..6816116 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
@@ -82,20 +82,6 @@
     public void rollbackTransaction(ITransactionContext txnContext) throws ACIDException;
 
     /**
-     * Makes a system checkpoint.
-     *
-     * @param isSharpCheckpoint
-     *            a flag indicating whether to perform a sharp or non-sharp checkpoint.
-     * @param nonSharpCheckpointTargetLSN
-     *            if a non-sharp checkpoint to be performed, what is the minimum LSN it should target.
-     * @return the LSN at which the checkpoint was performed.
-     * @throws ACIDException
-     * @throws HyracksDataException
-     */
-    public long checkpoint(boolean isSharpCheckpoint, long nonSharpCheckpointTargetLSN)
-            throws ACIDException, HyracksDataException;
-
-    /**
      * @return min first LSN of the open indexes (including remote indexes if replication is enabled)
      * @throws HyracksDataException
      */
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java
index c81faf0..af056ae 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java
@@ -124,4 +124,9 @@
      */
     public ITransactionSubsystem getTransactionProvider();
 
+    /**
+     * @return The current max job id.
+     */
+    int getMaxJobId();
+
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionSubsystem.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionSubsystem.java
index dc1e6ed..b3a3eba 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionSubsystem.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionSubsystem.java
@@ -18,7 +18,6 @@
  */
 package org.apache.asterix.common.transactions;
 
-
 public interface ITransactionSubsystem {
 
     public ILogManager getLogManager();
@@ -33,4 +32,5 @@
 
     public String getId();
 
+    public ICheckpointManager getCheckpointManager();
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
index 382c94b..a885e93 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
@@ -18,12 +18,19 @@
  */
 package org.apache.asterix.common.utils;
 
+import org.apache.hyracks.storage.am.common.api.ITreeIndexMetaDataFrame;
+
 /**
  * A static class that stores storage constants
  */
 public class StorageConstants {
     public static final String METADATA_ROOT = "root_metadata";
+    /** The storage version of AsterixDB related artifacts (e.g. log files, checkpoint files, etc..). */
+    private static final int LOCAL_STORAGE_VERSION = 1;
+
+    /** The storage version of AsterixDB stack. */
+    public static final int VERSION = LOCAL_STORAGE_VERSION + ITreeIndexMetaDataFrame.VERSION;
 
     private StorageConstants() {
     }
-}
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
index 57d5c39..25096f6 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
@@ -351,7 +351,8 @@
         return logFileSize * fileId + offset;
     }
 
-    public void renewLogFiles() throws IOException {
+    @Override
+    public void renewLogFiles() {
         terminateLogFlusher();
         long lastMaxLogFileId = deleteAllLogFiles();
         initializeLogManager(lastMaxLogFileId + 1);
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java
new file mode 100644
index 0000000..0b86ea5
--- /dev/null
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/AbstractCheckpointManager.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.service.recovery;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.Checkpoint;
+import org.apache.asterix.common.transactions.CheckpointProperties;
+import org.apache.asterix.common.transactions.ICheckpointManager;
+import org.apache.asterix.common.transactions.ILogManager;
+import org.apache.asterix.common.transactions.ITransactionManager;
+import org.apache.asterix.common.transactions.ITransactionSubsystem;
+import org.apache.asterix.common.utils.StorageConstants;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * An abstract implementation of {@link ICheckpointManager}.
+ * The AbstractCheckpointManager contains the implementation of
+ * the base operations on checkpoints such as persisting and deleting them.
+ */
+public abstract class AbstractCheckpointManager implements ICheckpointManager {
+
+    private static final Logger LOGGER = Logger.getLogger(AbstractCheckpointManager.class.getName());
+    private static final String CHECKPOINT_FILENAME_PREFIX = "checkpoint_";
+    public static final long SHARP_CHECKPOINT_LSN = -1;
+    private static final FilenameFilter filter = (File dir, String name) -> name.startsWith(CHECKPOINT_FILENAME_PREFIX);
+    private final File checkpointDir;
+    private final int historyToKeep;
+    private final int lsnThreshold;
+    private final int pollFrequency;
+    protected final ITransactionSubsystem txnSubsystem;
+    private CheckpointThread checkpointer;
+
+    public AbstractCheckpointManager(ITransactionSubsystem txnSubsystem, CheckpointProperties checkpointProperties) {
+        this.txnSubsystem = txnSubsystem;
+        String checkpointDirPath = checkpointProperties.getCheckpointDirPath();
+        if (!checkpointDirPath.endsWith(File.separator)) {
+            checkpointDirPath += File.separator;
+        }
+        checkpointDir = new File(checkpointDirPath);
+        // Create the checkpoint directory if missing
+        if (!checkpointDir.exists()) {
+            (new File(checkpointDirPath)).mkdir();
+        }
+        lsnThreshold = checkpointProperties.getLsnThreshold();
+        pollFrequency = checkpointProperties.getPollFrequency();
+        // We must keep at least the latest checkpoint
+        historyToKeep = checkpointProperties.getHistoryToKeep() == 0 ? 1 : checkpointProperties.getHistoryToKeep();
+    }
+
+    @Override
+    public Checkpoint getLatest() throws ACIDException {
+        // Read all checkpointObjects from the existing checkpoint files
+        File[] checkpoints = checkpointDir.listFiles(filter);
+        if (checkpoints == null || checkpoints.length == 0) {
+            return null;
+        }
+
+        Checkpoint checkpointObject;
+        List<Checkpoint> checkpointObjectList = new ArrayList<>();
+        for (File file : checkpoints) {
+            try (FileInputStream fis = new FileInputStream(file);
+                    ObjectInputStream oisFromFis = new ObjectInputStream(fis)) {
+                checkpointObject = (Checkpoint) oisFromFis.readObject();
+                checkpointObjectList.add(checkpointObject);
+            } catch (IOException | ClassNotFoundException e) {
+                throw new ACIDException("Failed to read a checkpoint file", e);
+            }
+        }
+        // Sort checkpointObjects in descending order by timeStamp to find out the most recent one.
+        Collections.sort(checkpointObjectList);
+
+        // Return the most recent one (the first one in sorted list)
+        return checkpointObjectList.get(0);
+    }
+
+    @Override
+    public void start() {
+        checkpointer = new CheckpointThread(this, txnSubsystem.getLogManager(), lsnThreshold, pollFrequency);
+        checkpointer.start();
+    }
+
+    @Override
+    public void stop(boolean dumpState, OutputStream ouputStream) throws IOException {
+        checkpointer.shutdown();
+        checkpointer.interrupt();
+        try {
+            // Wait until checkpoint thread stops
+            checkpointer.join();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    @Override
+    public void dumpState(OutputStream os) throws IOException {
+        // Nothing to dump
+    }
+
+    protected void capture(long minMCTFirstLSN, boolean sharp) throws HyracksDataException {
+        ILogManager logMgr = txnSubsystem.getLogManager();
+        ITransactionManager txnMgr = txnSubsystem.getTransactionManager();
+        Checkpoint checkpointObject = new Checkpoint(logMgr.getAppendLSN(), minMCTFirstLSN, txnMgr.getMaxJobId(),
+                System.currentTimeMillis(), sharp, StorageConstants.VERSION);
+        persist(checkpointObject);
+        cleanup();
+    }
+
+    private void persist(Checkpoint checkpoint) throws HyracksDataException {
+        // Construct checkpoint file name
+        String fileName = checkpointDir.getAbsolutePath() + File.separator + CHECKPOINT_FILENAME_PREFIX
+                + Long.toString(checkpoint.getTimeStamp());
+        //TODO: replace java serialization
+        // Write checkpoint file to disk
+        try (FileOutputStream fos = new FileOutputStream(fileName);
+                ObjectOutputStream oosToFos = new ObjectOutputStream(fos)) {
+            oosToFos.writeObject(checkpoint);
+            oosToFos.flush();
+        } catch (IOException e) {
+            throw new HyracksDataException("Failed to write checkpoint to disk", e);
+        }
+    }
+
+    private void cleanup() {
+        File[] checkpointFiles = checkpointDir.listFiles(filter);
+        // Sort the filenames lexicographically to keep the latest checkpoint history files.
+        Arrays.sort(checkpointFiles);
+        for (int i = 0; i < checkpointFiles.length - historyToKeep; i++) {
+            if (!checkpointFiles[i].delete()) {
+                LOGGER.warning("Could not delete checkpoint file at: " + checkpointFiles[i].getAbsolutePath());
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
new file mode 100644
index 0000000..ea711a5
--- /dev/null
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.service.recovery;
+
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.transactions.CheckpointProperties;
+import org.apache.asterix.common.transactions.ICheckpointManager;
+import org.apache.asterix.common.transactions.ITransactionSubsystem;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * An implementation of {@link ICheckpointManager} that defines the logic
+ * of checkpoints.
+ */
+public class CheckpointManager extends AbstractCheckpointManager {
+
+    private static final Logger LOGGER = Logger.getLogger(CheckpointManager.class.getName());
+
+    public CheckpointManager(ITransactionSubsystem txnSubsystem, CheckpointProperties checkpointProperties) {
+        super(txnSubsystem, checkpointProperties);
+    }
+
+    /**
+     * Performs a sharp checkpoint. All datasets are flushed and all transaction
+     * log files are deleted.
+     */
+    @Override
+    public synchronized void doSharpCheckpoint() throws HyracksDataException {
+        LOGGER.info("Starting sharp checkpoint...");
+        final IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
+                .getDatasetLifecycleManager();
+        datasetLifecycleManager.flushAllDatasets();
+        capture(SHARP_CHECKPOINT_LSN, true);
+        txnSubsystem.getLogManager().renewLogFiles();
+        LOGGER.info("Completed sharp checkpoint.");
+    }
+
+    /***
+     * Attempts to perform a soft checkpoint at the specified {@code checkpointTargetLSN}.
+     * If a checkpoint cannot be captured due to datasets having LSN < {@code checkpointTargetLSN},
+     * an asynchronous flush is triggered on them. When a checkpoint is successful, all transaction
+     * log files that end with LSN < {@code checkpointTargetLSN} are deleted.
+     */
+    @Override
+    public synchronized long tryCheckpoint(long checkpointTargetLSN) throws HyracksDataException {
+        LOGGER.info("Attemping soft checkpoint...");
+        final long minFirstLSN = txnSubsystem.getRecoveryManager().getMinFirstLSN();
+        boolean checkpointSucceeded = minFirstLSN >= checkpointTargetLSN;
+        if (!checkpointSucceeded) {
+            // Flush datasets with indexes behind target checkpoint LSN
+            IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
+                    .getDatasetLifecycleManager();
+            datasetLifecycleManager.scheduleAsyncFlushForLaggingDatasets(checkpointTargetLSN);
+        }
+        capture(minFirstLSN, false);
+        if (checkpointSucceeded) {
+            txnSubsystem.getLogManager().deleteOldLogFiles(minFirstLSN);
+            LOGGER.info(String.format("soft checkpoint succeeded at LSN(%s)", minFirstLSN));
+        }
+        return minFirstLSN;
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManagerFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManagerFactory.java
new file mode 100644
index 0000000..68c5ce1
--- /dev/null
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManagerFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.service.recovery;
+
+import org.apache.asterix.common.transactions.CheckpointProperties;
+import org.apache.asterix.common.transactions.ICheckpointManager;
+import org.apache.asterix.common.transactions.ITransactionSubsystem;
+
+public class CheckpointManagerFactory {
+
+    private CheckpointManagerFactory() {
+        throw new AssertionError();
+    }
+
+    public static ICheckpointManager create(ITransactionSubsystem txnSubsystem,
+            CheckpointProperties checkpointProperties, boolean replicationEnabled) {
+        if (!replicationEnabled) {
+            return new CheckpointManager(txnSubsystem, checkpointProperties);
+        } else {
+            return new ReplicationCheckpointManager(txnSubsystem, checkpointProperties);
+        }
+    }
+}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointObject.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointObject.java
deleted file mode 100644
index 3356298..0000000
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointObject.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.transaction.management.service.recovery;
-
-import java.io.Serializable;
-
-public class CheckpointObject implements Serializable, Comparable<CheckpointObject> {
-
-    private static final long serialVersionUID = 1L;
-
-    private final long checkpointLsn;
-    private final long minMCTFirstLsn;
-    private final int maxJobId;
-    private final long timeStamp;
-    private final boolean sharp;
-
-    public CheckpointObject(long checkpointLsn, long minMCTFirstLsn, int maxJobId, long timeStamp, boolean sharp) {
-        this.checkpointLsn = checkpointLsn;
-        this.minMCTFirstLsn = minMCTFirstLsn;
-        this.maxJobId = maxJobId;
-        this.timeStamp = timeStamp;
-        this.sharp = sharp;
-    }
-
-    public long getCheckpointLsn() {
-        return checkpointLsn;
-    }
-
-    public long getMinMCTFirstLsn() {
-        return minMCTFirstLsn;
-    }
-
-    public int getMaxJobId() {
-        return maxJobId;
-    }
-
-    public long getTimeStamp() {
-        return timeStamp;
-    }
-
-    public boolean isSharp() {
-        return sharp;
-    }
-
-    @Override
-    public int compareTo(CheckpointObject checkpointObject) {
-        long compareTimeStamp = checkpointObject.getTimeStamp();
-
-        //decending order
-        long diff = compareTimeStamp - this.timeStamp;
-        if (diff > 0) {
-            return 1;
-        } else if (diff == 0) {
-            return 0;
-        } else {
-            return -1;
-        }
-
-        //ascending order
-        //return this.timeStamp - compareTimeStamp;
-    }
-}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointThread.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointThread.java
index 851289e..39c7c98 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointThread.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointThread.java
@@ -18,15 +18,18 @@
  */
 package org.apache.asterix.transaction.management.service.recovery;
 
-import java.io.IOError;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.ICheckpointManager;
 import org.apache.asterix.common.transactions.ILogManager;
-import org.apache.asterix.common.transactions.IRecoveryManager;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
+/**
+ * A daemon thread that periodically attempts to perform checkpoints.
+ * A checkpoint attempt is made when the volume of transaction logs written
+ * since the last successful checkpoint exceeds a certain threshold.
+ */
 public class CheckpointThread extends Thread {
 
     private static final Logger LOGGER = Logger.getLogger(CheckpointThread.class.getName());
@@ -34,33 +37,34 @@
     private long checkpointTermInSecs;
 
     private final ILogManager logManager;
-    private final IRecoveryManager recoveryMgr;
+    private final ICheckpointManager checkpointManager;
+    private volatile boolean shouldRun = true;
 
-    public CheckpointThread(IRecoveryManager recoveryMgr, ILogManager logManager,
-            long lsnThreshold, long checkpointTermInSecs) {
-        this.recoveryMgr = recoveryMgr;
+    public CheckpointThread(ICheckpointManager checkpointManager, ILogManager logManager, long lsnThreshold,
+            long checkpointTermInSecs) {
+        this.checkpointManager = checkpointManager;
         this.logManager = logManager;
         this.lsnThreshold = lsnThreshold;
         this.checkpointTermInSecs = checkpointTermInSecs;
+        setDaemon(true);
     }
 
     @Override
     public void run() {
-
         Thread.currentThread().setName("Checkpoint Thread");
-
         long currentCheckpointAttemptMinLSN;
         long lastCheckpointLSN = -1;
         long currentLogLSN;
         long targetCheckpointLSN;
-        while (true) {
+        while (shouldRun) {
             try {
                 sleep(checkpointTermInSecs * 1000);
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
-                //ignore
             }
-
+            if (!shouldRun) {
+                return;
+            }
             if (lastCheckpointLSN == -1) {
                 try {
                     //Since the system just started up after sharp checkpoint,
@@ -84,18 +88,20 @@
                     //3. next time checkpoint comes, it will be able to remove log files which have end range less than current targetCheckpointLSN
 
                     targetCheckpointLSN = lastCheckpointLSN + lsnThreshold;
-                    currentCheckpointAttemptMinLSN = recoveryMgr.checkpoint(false, targetCheckpointLSN);
+                    currentCheckpointAttemptMinLSN = checkpointManager.tryCheckpoint(targetCheckpointLSN);
 
                     //checkpoint was completed at target LSN or above
                     if (currentCheckpointAttemptMinLSN >= targetCheckpointLSN) {
                         lastCheckpointLSN = currentCheckpointAttemptMinLSN;
                     }
-
-                } catch (ACIDException | HyracksDataException e) {
-                    throw new IOError(e);
+                } catch (HyracksDataException e) {
+                    LOGGER.log(Level.SEVERE, "Error during checkpoint", e);
                 }
             }
         }
     }
 
+    public void shutdown() {
+        shouldRun = false;
+    }
 }
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
index 825e5c9..f8b6384 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -20,13 +20,9 @@
 
 import java.io.File;
 import java.io.FileInputStream;
-import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
-import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
@@ -34,7 +30,6 @@
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -48,15 +43,13 @@
 import java.util.logging.Logger;
 
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
-import org.apache.asterix.common.cluster.ClusterPartition;
-import org.apache.asterix.common.config.MetadataProperties;
 import org.apache.asterix.common.config.ClusterProperties;
-import org.apache.asterix.common.config.IPropertiesProvider;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
 import org.apache.asterix.common.replication.IReplicaResourcesManager;
-import org.apache.asterix.common.replication.IReplicationManager;
+import org.apache.asterix.common.transactions.Checkpoint;
 import org.apache.asterix.common.transactions.IAppRuntimeContextProvider;
+import org.apache.asterix.common.transactions.ICheckpointManager;
 import org.apache.asterix.common.transactions.ILogReader;
 import org.apache.asterix.common.transactions.ILogRecord;
 import org.apache.asterix.common.transactions.IRecoveryManager;
@@ -66,7 +59,6 @@
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.asterix.transaction.management.service.logging.LogManager;
 import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants;
-import org.apache.asterix.transaction.management.service.transaction.TransactionManager;
 import org.apache.asterix.transaction.management.service.transaction.TransactionSubsystem;
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -89,30 +81,22 @@
     private static final Logger LOGGER = Logger.getLogger(RecoveryManager.class.getName());
     private final TransactionSubsystem txnSubsystem;
     private final LogManager logMgr;
-    private final int checkpointHistory;
-    private final long SHARP_CHECKPOINT_LSN = -1;
     private final boolean replicationEnabled;
-    public static final long NON_SHARP_CHECKPOINT_TARGET_LSN = -1;
     private static final String RECOVERY_FILES_DIR_NAME = "recovery_temp";
     private Map<Integer, JobEntityCommits> jobId2WinnerEntitiesMap = null;
     private final long cachedEntityCommitsPerJobSize;
     private final PersistentLocalResourceRepository localResourceRepository;
-
-    /**
-     * A file at a known location that contains the LSN of the last log record
-     * traversed doing a successful checkpoint.
-     */
-    private static final String CHECKPOINT_FILENAME_PREFIX = "checkpoint_";
+    private final ICheckpointManager checkpointManager;
     private SystemState state;
 
     public RecoveryManager(TransactionSubsystem txnSubsystem) {
         this.txnSubsystem = txnSubsystem;
         logMgr = (LogManager) txnSubsystem.getLogManager();
-        checkpointHistory = txnSubsystem.getTransactionProperties().getCheckpointHistory();
         replicationEnabled = ClusterProperties.INSTANCE.isReplicationEnabled();
         localResourceRepository = (PersistentLocalResourceRepository) txnSubsystem.getAsterixAppRuntimeContextProvider()
                 .getLocalResourceRepository();
         cachedEntityCommitsPerJobSize = txnSubsystem.getTransactionProperties().getJobRecoveryMemorySize();
+        checkpointManager = txnSubsystem.getCheckpointManager();
     }
 
     /**
@@ -126,10 +110,8 @@
     @Override
     public SystemState getSystemState() throws ACIDException {
         //read checkpoint file
-        CheckpointObject checkpointObject = null;
-        try {
-            checkpointObject = readCheckpoint();
-        } catch (FileNotFoundException e) {
+        Checkpoint checkpointObject = checkpointManager.getLatest();
+        if (checkpointObject == null) {
             //The checkpoint file doesn't exist => Failure happened during NC initialization.
             //Retry to initialize the NC by setting the state to NEW_UNIVERSE
             state = SystemState.NEW_UNIVERSE;
@@ -140,7 +122,7 @@
         }
 
         if (replicationEnabled) {
-            if (checkpointObject.getMinMCTFirstLsn() == SHARP_CHECKPOINT_LSN) {
+            if (checkpointObject.getMinMCTFirstLsn() == AbstractCheckpointManager.SHARP_CHECKPOINT_LSN) {
                 //no logs exist
                 state = SystemState.HEALTHY;
                 return state;
@@ -156,14 +138,14 @@
         } else {
             long readableSmallestLSN = logMgr.getReadableSmallestLSN();
             if (logMgr.getAppendLSN() == readableSmallestLSN) {
-                if (checkpointObject.getMinMCTFirstLsn() != SHARP_CHECKPOINT_LSN) {
+                if (checkpointObject.getMinMCTFirstLsn() != AbstractCheckpointManager.SHARP_CHECKPOINT_LSN) {
                     LOGGER.warning("Some(or all) of transaction log files are lost.");
                     //No choice but continuing when the log files are lost.
                 }
                 state = SystemState.HEALTHY;
                 return state;
             } else if (checkpointObject.getCheckpointLsn() == logMgr.getAppendLSN()
-                    && checkpointObject.getMinMCTFirstLsn() == SHARP_CHECKPOINT_LSN) {
+                    && checkpointObject.getMinMCTFirstLsn() == AbstractCheckpointManager.SHARP_CHECKPOINT_LSN) {
                 state = SystemState.HEALTHY;
                 return state;
             } else {
@@ -180,7 +162,7 @@
         LOGGER.log(Level.INFO, "starting recovery ...");
 
         long readableSmallestLSN = logMgr.getReadableSmallestLSN();
-        CheckpointObject checkpointObject = readCheckpoint();
+        Checkpoint checkpointObject = checkpointManager.getLatest();
         long lowWaterMarkLSN = checkpointObject.getMinMCTFirstLsn();
         if (lowWaterMarkLSN < readableSmallestLSN) {
             lowWaterMarkLSN = readableSmallestLSN;
@@ -372,8 +354,8 @@
                                     //#. get maxDiskLastLSN
                                     ILSMIndex lsmIndex = index;
                                     try {
-                                        maxDiskLastLsn =
-                                                ((AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback())
+                                        maxDiskLastLsn = ((AbstractLSMIOOperationCallback) lsmIndex
+                                                .getIOOperationCallback())
                                                         .getComponentLSN(lsmIndex.getImmutableComponents());
                                     } catch (HyracksDataException e) {
                                         datasetLifecycleManager.close(localResource.getPath());
@@ -422,129 +404,6 @@
     }
 
     @Override
-    public synchronized long checkpoint(boolean isSharpCheckpoint, long nonSharpCheckpointTargetLSN)
-            throws ACIDException, HyracksDataException {
-        long minMCTFirstLSN;
-        boolean nonSharpCheckpointSucceeded = false;
-
-        if (isSharpCheckpoint) {
-            LOGGER.log(Level.INFO, "Starting sharp checkpoint ... ");
-        }
-
-        TransactionManager txnMgr = (TransactionManager) txnSubsystem.getTransactionManager();
-        String logDir = logMgr.getLogManagerProperties().getLogDir();
-
-        //get the filename of the previous checkpoint files which are about to be deleted
-        //right after the new checkpoint file is written.
-        File[] prevCheckpointFiles = getPreviousCheckpointFiles();
-
-        IDatasetLifecycleManager datasetLifecycleManager =
-                txnSubsystem.getAsterixAppRuntimeContextProvider().getDatasetLifecycleManager();
-        //flush all in-memory components if it is the sharp checkpoint
-        if (isSharpCheckpoint) {
-            datasetLifecycleManager.flushAllDatasets();
-            if (!replicationEnabled) {
-                minMCTFirstLSN = SHARP_CHECKPOINT_LSN;
-            } else {
-                //if is shutting down, need to check if we need to keep any remote logs for dead replicas
-                if (txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext().isShuttingdown()) {
-                    Set<String> deadReplicaIds = txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext()
-                            .getReplicationManager().getDeadReplicasIds();
-                    if (deadReplicaIds.isEmpty()) {
-                        minMCTFirstLSN = SHARP_CHECKPOINT_LSN;
-                    } else {
-                        //get min LSN of dead replicas remote resources
-                        IReplicaResourcesManager remoteResourcesManager = txnSubsystem
-                                .getAsterixAppRuntimeContextProvider().getAppContext().getReplicaResourcesManager();
-                        IPropertiesProvider propertiesProvider = (IPropertiesProvider) txnSubsystem
-                                .getAsterixAppRuntimeContextProvider().getAppContext();
-                        MetadataProperties metadataProperties = propertiesProvider.getMetadataProperties();
-                        Set<Integer> deadReplicasPartitions = new HashSet<>();
-                        //get partitions of the dead replicas that are not active on this node
-                        for (String deadReplicaId : deadReplicaIds) {
-                            ClusterPartition[] nodePartitons =
-                                    metadataProperties.getNodePartitions().get(deadReplicaId);
-                            for (ClusterPartition partition : nodePartitons) {
-                                if (!localResourceRepository.getActivePartitions()
-                                        .contains(partition.getPartitionId())) {
-                                    deadReplicasPartitions.add(partition.getPartitionId());
-                                }
-                            }
-                        }
-                        minMCTFirstLSN = remoteResourcesManager.getPartitionsMinLSN(deadReplicasPartitions);
-                    }
-                } else {
-                    //start up complete checkpoint. Avoid deleting remote recovery logs.
-                    minMCTFirstLSN = getMinFirstLSN();
-                }
-            }
-        } else {
-            minMCTFirstLSN = getMinFirstLSN();
-            if (minMCTFirstLSN >= nonSharpCheckpointTargetLSN) {
-                nonSharpCheckpointSucceeded = true;
-            } else {
-                //flush datasets with indexes behind target checkpoint LSN
-                datasetLifecycleManager.scheduleAsyncFlushForLaggingDatasets(nonSharpCheckpointTargetLSN);
-                if (replicationEnabled) {
-                    //request remote replicas to flush lagging indexes
-                    IReplicationManager replicationManager =
-                            txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext().getReplicationManager();
-                    try {
-                        replicationManager.requestFlushLaggingReplicaIndexes(nonSharpCheckpointTargetLSN);
-                    } catch (IOException e) {
-                        e.printStackTrace();
-                    }
-                }
-            }
-        }
-
-        CheckpointObject checkpointObject = new CheckpointObject(logMgr.getAppendLSN(), minMCTFirstLSN,
-                txnMgr.getMaxJobId(), System.currentTimeMillis(), isSharpCheckpoint);
-
-        String fileName = getCheckpointFileName(logDir, Long.toString(checkpointObject.getTimeStamp()));
-
-        try (FileOutputStream fos = new FileOutputStream(fileName);
-                ObjectOutputStream oosToFos = new ObjectOutputStream(fos)) {
-            oosToFos.writeObject(checkpointObject);
-            oosToFos.flush();
-        } catch (IOException e) {
-            throw new ACIDException("Failed to checkpoint", e);
-        }
-
-        //#. delete the previous checkpoint files
-        if (prevCheckpointFiles != null) {
-            // sort the filenames lexicographically to keep the latest checkpointHistory files.
-            Arrays.sort(prevCheckpointFiles);
-            for (int i = 0; i < prevCheckpointFiles.length - this.checkpointHistory; ++i) {
-                prevCheckpointFiles[i].delete();
-            }
-        }
-
-        if (isSharpCheckpoint) {
-            try {
-                if (minMCTFirstLSN == SHARP_CHECKPOINT_LSN) {
-                    logMgr.renewLogFiles();
-                } else {
-                    logMgr.deleteOldLogFiles(minMCTFirstLSN);
-                }
-            } catch (IOException e) {
-                throw new HyracksDataException(e);
-            }
-        }
-
-        if (nonSharpCheckpointSucceeded) {
-            logMgr.deleteOldLogFiles(minMCTFirstLSN);
-        }
-
-        if (isSharpCheckpoint) {
-            LOGGER.info("Completed sharp checkpoint.");
-        }
-
-        //return the min LSN that was recorded in the checkpoint
-        return minMCTFirstLSN;
-    }
-
-    @Override
     public long getMinFirstLSN() throws HyracksDataException {
         long minFirstLSN = getLocalMinFirstLSN();
 
@@ -559,16 +418,16 @@
 
     @Override
     public long getLocalMinFirstLSN() throws HyracksDataException {
-        IDatasetLifecycleManager datasetLifecycleManager =
-                txnSubsystem.getAsterixAppRuntimeContextProvider().getDatasetLifecycleManager();
+        IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
+                .getDatasetLifecycleManager();
         List<IIndex> openIndexList = datasetLifecycleManager.getOpenResources();
         long firstLSN;
         //the min first lsn can only be the current append or smaller
         long minFirstLSN = logMgr.getAppendLSN();
         if (openIndexList.size() > 0) {
             for (IIndex index : openIndexList) {
-                AbstractLSMIOOperationCallback ioCallback =
-                        (AbstractLSMIOOperationCallback) ((ILSMIndex) index).getIOOperationCallback();
+                AbstractLSMIOOperationCallback ioCallback = (AbstractLSMIOOperationCallback) ((ILSMIndex) index)
+                        .getIOOperationCallback();
 
                 if (!((AbstractLSMIndex) index).isCurrentMutableComponentEmpty() || ioCallback.hasPendingFlush()) {
                     firstLSN = ioCallback.getFirstLSN();
@@ -580,60 +439,12 @@
     }
 
     private long getRemoteMinFirstLSN() {
-        IReplicaResourcesManager remoteResourcesManager =
-                txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext().getReplicaResourcesManager();
+        IReplicaResourcesManager remoteResourcesManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
+                .getAppContext().getReplicaResourcesManager();
         long minRemoteLSN = remoteResourcesManager.getPartitionsMinLSN(localResourceRepository.getInactivePartitions());
         return minRemoteLSN;
     }
 
-    private CheckpointObject readCheckpoint() throws ACIDException, FileNotFoundException {
-        CheckpointObject checkpointObject = null;
-
-        //read all checkpointObjects from the existing checkpoint files
-        File[] prevCheckpointFiles = getPreviousCheckpointFiles();
-        if (prevCheckpointFiles == null || prevCheckpointFiles.length == 0) {
-            throw new FileNotFoundException("Checkpoint file is not found");
-        }
-
-        List<CheckpointObject> checkpointObjectList = new ArrayList<>();
-        for (File file : prevCheckpointFiles) {
-            try (FileInputStream fis = new FileInputStream(file);
-                    ObjectInputStream oisFromFis = new ObjectInputStream(fis)) {
-                checkpointObject = (CheckpointObject) oisFromFis.readObject();
-                checkpointObjectList.add(checkpointObject);
-            } catch (Exception e) {
-                throw new ACIDException("Failed to read a checkpoint file", e);
-            }
-        }
-
-        //sort checkpointObjects in descending order by timeStamp to find out the most recent one.
-        Collections.sort(checkpointObjectList);
-
-        //return the most recent one (the first one in sorted list)
-        return checkpointObjectList.get(0);
-    }
-
-    private File[] getPreviousCheckpointFiles() {
-        String logDir = ((LogManager) txnSubsystem.getLogManager()).getLogManagerProperties().getLogDir();
-        File parentDir = new File(logDir);
-
-        FilenameFilter filter = new FilenameFilter() {
-            @Override
-            public boolean accept(File dir, String name) {
-                return name.contains(CHECKPOINT_FILENAME_PREFIX);
-            }
-        };
-
-        return parentDir.listFiles(filter);
-    }
-
-    private static String getCheckpointFileName(String baseDir, String suffix) {
-        if (!baseDir.endsWith(System.getProperty("file.separator"))) {
-            baseDir += System.getProperty("file.separator");
-        }
-        return baseDir + CHECKPOINT_FILENAME_PREFIX + suffix;
-    }
-
     @Override
     public File createJobRecoveryFile(int jobId, String fileName) throws IOException {
         String recoveryDirPath = getRecoveryDirPath();
@@ -794,8 +605,8 @@
             //undo loserTxn's effect
             LOGGER.log(Level.INFO, "undoing loser transaction's effect");
 
-            IDatasetLifecycleManager datasetLifecycleManager =
-                    txnSubsystem.getAsterixAppRuntimeContextProvider().getDatasetLifecycleManager();
+            IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
+                    .getDatasetLifecycleManager();
             //TODO sort loser entities by smallest LSN to undo in one pass.
             Iterator<Entry<TxnId, List<Long>>> iter = jobLoserEntity2LSNsMap.entrySet().iterator();
             int undoCount = 0;
@@ -836,12 +647,8 @@
 
     @Override
     public void stop(boolean dumpState, OutputStream os) throws IOException {
-        try {
-            checkpoint(true, NON_SHARP_CHECKPOINT_TARGET_LSN);
-        } catch (HyracksDataException | ACIDException e) {
-            e.printStackTrace();
-            throw new IOException(e);
-        }
+        // Shutdown checkpoint
+        checkpointManager.doSharpCheckpoint();
     }
 
     @Override
@@ -851,10 +658,10 @@
 
     private static void undo(ILogRecord logRecord, IDatasetLifecycleManager datasetLifecycleManager) {
         try {
-            ILSMIndex index =
-                    (ILSMIndex) datasetLifecycleManager.getIndex(logRecord.getDatasetId(), logRecord.getResourceId());
-            ILSMIndexAccessor indexAccessor =
-                    index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+            ILSMIndex index = (ILSMIndex) datasetLifecycleManager.getIndex(logRecord.getDatasetId(),
+                    logRecord.getResourceId());
+            ILSMIndexAccessor indexAccessor = index.createAccessor(NoOpOperationCallback.INSTANCE,
+                    NoOpOperationCallback.INSTANCE);
             if (logRecord.getNewOp() == IndexOperation.INSERT.ordinal()) {
                 indexAccessor.forceDelete(logRecord.getNewValue());
             } else if (logRecord.getNewOp() == IndexOperation.DELETE.ordinal()) {
@@ -871,10 +678,9 @@
         try {
             int datasetId = logRecord.getDatasetId();
             long resourceId = logRecord.getResourceId();
-            ILSMIndex index =
-                    (ILSMIndex) datasetLifecycleManager.getIndex(datasetId, resourceId);
-            ILSMIndexAccessor indexAccessor =
-                    index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+            ILSMIndex index = (ILSMIndex) datasetLifecycleManager.getIndex(datasetId, resourceId);
+            ILSMIndexAccessor indexAccessor = index.createAccessor(NoOpOperationCallback.INSTANCE,
+                    NoOpOperationCallback.INSTANCE);
             if (logRecord.getNewOp() == IndexOperation.INSERT.ordinal()) {
                 indexAccessor.forceInsert(logRecord.getNewValue());
             } else if (logRecord.getNewOp() == IndexOperation.DELETE.ordinal()) {
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/ReplicationCheckpointManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/ReplicationCheckpointManager.java
new file mode 100644
index 0000000..6fdee33
--- /dev/null
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/ReplicationCheckpointManager.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.service.recovery;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.common.config.IPropertiesProvider;
+import org.apache.asterix.common.config.MetadataProperties;
+import org.apache.asterix.common.replication.IReplicaResourcesManager;
+import org.apache.asterix.common.replication.IReplicationManager;
+import org.apache.asterix.common.transactions.CheckpointProperties;
+import org.apache.asterix.common.transactions.ICheckpointManager;
+import org.apache.asterix.common.transactions.ITransactionSubsystem;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * An implementation of {@link ICheckpointManager} that defines the logic
+ * of checkpoints when replication is enabled..
+ */
+public class ReplicationCheckpointManager extends AbstractCheckpointManager {
+
+    private static final Logger LOGGER = Logger.getLogger(ReplicationCheckpointManager.class.getName());
+
+    public ReplicationCheckpointManager(ITransactionSubsystem txnSubsystem, CheckpointProperties checkpointProperties) {
+        super(txnSubsystem, checkpointProperties);
+    }
+
+    /**
+     * Performs a sharp checkpoint. All datasets are flushed and all transaction
+     * log files are deleted except the files that are needed for dead replicas.
+     */
+    @Override
+    public synchronized void doSharpCheckpoint() throws HyracksDataException {
+        LOGGER.info("Starting sharp checkpoint...");
+        final IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
+                .getDatasetLifecycleManager();
+        datasetLifecycleManager.flushAllDatasets();
+        long minFirstLSN;
+        // If shutting down, need to check if we need to keep any remote logs for dead replicas
+        if (txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext().isShuttingdown()) {
+            final Set<String> deadReplicaIds = txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext()
+                    .getReplicationManager().getDeadReplicasIds();
+            if (deadReplicaIds.isEmpty()) {
+                // No dead replicas => no need to keep any log
+                minFirstLSN = SHARP_CHECKPOINT_LSN;
+            } else {
+                // Get min LSN of dead replicas remote resources
+                minFirstLSN = getDeadReplicasMinFirstLSN(deadReplicaIds);
+            }
+        } else {
+            // Start up complete checkpoint. Avoid deleting remote recovery logs.
+            minFirstLSN = txnSubsystem.getRecoveryManager().getMinFirstLSN();
+        }
+        capture(minFirstLSN, true);
+        if (minFirstLSN == SHARP_CHECKPOINT_LSN) {
+            // No need to keep any logs
+            txnSubsystem.getLogManager().renewLogFiles();
+        } else {
+            // Delete only log files with LSNs < any dead replica partition minimum LSN
+            txnSubsystem.getLogManager().deleteOldLogFiles(minFirstLSN);
+        }
+        LOGGER.info("Completed sharp checkpoint.");
+    }
+
+    /***
+     * Attempts to perform a soft checkpoint at the specified {@code checkpointTargetLSN}.
+     * If a checkpoint cannot be captured due to datasets having LSN < {@code checkpointTargetLSN},
+     * an asynchronous flush is triggered on them. If the checkpoint fails due to a replica index,
+     * a request is sent to the primary replica of the index to flush it.
+     * When a checkpoint is successful, all transaction log files that end with
+     * LSN < {@code checkpointTargetLSN} are deleted.
+     */
+    @Override
+    public synchronized long tryCheckpoint(long checkpointTargetLSN) throws HyracksDataException {
+        LOGGER.info("Attemping soft checkpoint...");
+        final long minFirstLSN = txnSubsystem.getRecoveryManager().getMinFirstLSN();
+        boolean checkpointSucceeded = minFirstLSN >= checkpointTargetLSN;
+        if (!checkpointSucceeded) {
+            // Flush datasets with indexes behind target checkpoint LSN
+            final IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
+                    .getDatasetLifecycleManager();
+            datasetLifecycleManager.scheduleAsyncFlushForLaggingDatasets(checkpointTargetLSN);
+            // Request remote replicas to flush lagging indexes
+            final IReplicationManager replicationManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
+                    .getAppContext().getReplicationManager();
+            try {
+                replicationManager.requestFlushLaggingReplicaIndexes(checkpointTargetLSN);
+            } catch (IOException e) {
+                throw new HyracksDataException(e);
+            }
+        }
+        capture(minFirstLSN, false);
+        if (checkpointSucceeded) {
+            txnSubsystem.getLogManager().deleteOldLogFiles(minFirstLSN);
+            LOGGER.info(String.format("soft checkpoint succeeded with at LSN(%s)", minFirstLSN));
+        }
+        return minFirstLSN;
+    }
+
+    private long getDeadReplicasMinFirstLSN(Set<String> deadReplicaIds) {
+        final IReplicaResourcesManager remoteResourcesManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
+                .getAppContext().getReplicaResourcesManager();
+        final IPropertiesProvider propertiesProvider = (IPropertiesProvider) txnSubsystem
+                .getAsterixAppRuntimeContextProvider().getAppContext();
+        final MetadataProperties metadataProperties = propertiesProvider.getMetadataProperties();
+        final PersistentLocalResourceRepository localResourceRepository =
+                (PersistentLocalResourceRepository) txnSubsystem
+                .getAsterixAppRuntimeContextProvider().getLocalResourceRepository();
+        // Get partitions of the dead replicas that are not active on this node
+        final Set<Integer> deadReplicasPartitions = new HashSet<>();
+        for (String deadReplicaId : deadReplicaIds) {
+            final ClusterPartition[] nodePartitons = metadataProperties.getNodePartitions().get(deadReplicaId);
+            for (ClusterPartition partition : nodePartitons) {
+                if (!localResourceRepository.getActivePartitions().contains(partition.getPartitionId())) {
+                    deadReplicasPartitions.add(partition.getPartitionId());
+                }
+            }
+        }
+        return remoteResourcesManager.getPartitionsMinLSN(deadReplicasPartitions);
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
index f035029..b08ecbb 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -145,6 +145,7 @@
         }
     }
 
+    @Override
     public int getMaxJobId() {
         return maxJobId.get();
     }
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
index ce1752a..09183fe 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
@@ -22,21 +22,25 @@
 import java.util.concurrent.Future;
 import java.util.logging.Logger;
 
-import org.apache.asterix.common.config.ReplicationProperties;
-import org.apache.asterix.common.config.TransactionProperties;
 import org.apache.asterix.common.config.ClusterProperties;
 import org.apache.asterix.common.config.IPropertiesProvider;
+import org.apache.asterix.common.config.ReplicationProperties;
+import org.apache.asterix.common.config.TransactionProperties;
 import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.Checkpoint;
+import org.apache.asterix.common.transactions.CheckpointProperties;
 import org.apache.asterix.common.transactions.IAppRuntimeContextProvider;
+import org.apache.asterix.common.transactions.ICheckpointManager;
 import org.apache.asterix.common.transactions.ILockManager;
 import org.apache.asterix.common.transactions.ILogManager;
 import org.apache.asterix.common.transactions.IRecoveryManager;
 import org.apache.asterix.common.transactions.ITransactionManager;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
+import org.apache.asterix.common.utils.StorageConstants;
 import org.apache.asterix.transaction.management.service.locking.ConcurrentLockManager;
 import org.apache.asterix.transaction.management.service.logging.LogManager;
 import org.apache.asterix.transaction.management.service.logging.LogManagerWithReplication;
-import org.apache.asterix.transaction.management.service.recovery.CheckpointThread;
+import org.apache.asterix.transaction.management.service.recovery.CheckpointManagerFactory;
 import org.apache.asterix.transaction.management.service.recovery.RecoveryManager;
 
 /**
@@ -50,8 +54,8 @@
     private final ITransactionManager transactionManager;
     private final IRecoveryManager recoveryManager;
     private final IAppRuntimeContextProvider asterixAppRuntimeContextProvider;
-    private final CheckpointThread checkpointThread;
     private final TransactionProperties txnProperties;
+    private final ICheckpointManager checkpointManager;
 
     //for profiling purpose
     public static final boolean IS_PROFILE_MODE = false;//true
@@ -66,6 +70,15 @@
         this.txnProperties = txnProperties;
         this.transactionManager = new TransactionManager(this);
         this.lockManager = new ConcurrentLockManager(txnProperties.getLockManagerShrinkTimer());
+        final boolean replicationEnabled = ClusterProperties.INSTANCE.isReplicationEnabled();
+        final CheckpointProperties checkpointProperties = new CheckpointProperties(txnProperties, id);
+        checkpointManager = CheckpointManagerFactory.create(this, checkpointProperties, replicationEnabled);
+        final Checkpoint latestCheckpoint = checkpointManager.getLatest();
+        if (latestCheckpoint != null && latestCheckpoint.getStorageVersion() != StorageConstants.VERSION) {
+            throw new IllegalStateException(
+                    String.format("Storage version mismatch. Current version (%s). On disk version: (%s)",
+                            latestCheckpoint.getStorageVersion(), StorageConstants.VERSION));
+        }
 
         ReplicationProperties asterixReplicationProperties = null;
         if (asterixAppRuntimeContextProvider != null) {
@@ -73,22 +86,13 @@
                     .getAppContext()).getReplicationProperties();
         }
 
-        if (asterixReplicationProperties != null && ClusterProperties.INSTANCE.isReplicationEnabled()) {
+        if (asterixReplicationProperties != null && replicationEnabled) {
             this.logManager = new LogManagerWithReplication(this);
         } else {
             this.logManager = new LogManager(this);
         }
-
         this.recoveryManager = new RecoveryManager(this);
 
-        if (asterixAppRuntimeContextProvider != null) {
-            this.checkpointThread = new CheckpointThread(recoveryManager, logManager,
-                    this.txnProperties.getCheckpointLSNThreshold(), this.txnProperties.getCheckpointPollFrequency());
-            this.checkpointThread.start();
-        } else {
-            this.checkpointThread = null;
-        }
-
         if (IS_PROFILE_MODE) {
             ecp = new EntityCommitProfiler(this, this.txnProperties.getCommitProfilerReportInterval());
             fecp = (Future<Object>) getAsterixAppRuntimeContextProvider().getThreadExecutor().submit(ecp);
@@ -133,6 +137,11 @@
         ++profilerEntityCommitLogCount;
     }
 
+    @Override
+    public ICheckpointManager getCheckpointManager() {
+        return checkpointManager;
+    }
+
     /**
      * Thread for profiling entity level commit count
      * This thread takes a report interval (in seconds) parameter and