[NO ISSUE][STO] Introduce Index Checkpoints

- user model changes: no
- storage format changes: yes
  - Add index checkpoints.
  - Use index checkpoint to determine low watermark
    during recovery.
- interface changes: yes
  - Introduce IIndexCheckpointManager for managing
    indexes checkpoints.
  - Introduce IIndexCheckpointProvider for tracking
    IIndexCheckpointManager references.

Details:
- Unify LSM flush/merge operations completion order.
- Introduce index checkpoints which contains:
   - Index low watermark.
   - Latest valid LSM component
   - Mapping between master replica and local replica.
- Use index checkpoints instead of LSM component metadata
  for identifying low watermark in recovery.
- Use index checkpoints in replication instead of overwriting
  LSN byte offset in replica component metadata.
- Replace LSN_MAP used in replication by index checkpoints.
- Replace NIO Files.find by Commons FileUtils.listFiles to
  avoid no NoSuchFileException on any file deletion.

Change-Id: Ib22800002bf8ea3660242e599b3f5f20678301a8
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2200
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
new file mode 100644
index 0000000..446d04d
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.nc;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.storage.IIndexCheckpointManager;
+import org.apache.asterix.common.storage.IndexCheckpoint;
+import org.apache.asterix.common.utils.StorageConstants;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.util.annotations.ThreadSafe;
+
+@ThreadSafe
+public class IndexCheckpointManager implements IIndexCheckpointManager {
+
+    private static final Logger LOGGER = Logger.getLogger(IndexCheckpointManager.class.getName());
+    private static final int HISTORY_CHECKPOINTS = 1;
+    private static final int MAX_CHECKPOINT_WRITE_ATTEMPTS = 5;
+    private static final FilenameFilter CHECKPOINT_FILE_FILTER =
+            (file, name) -> name.startsWith(StorageConstants.INDEX_CHECKPOINT_FILE_PREFIX);
+    private static final long BULKLOAD_LSN = 0;
+    private final Path indexPath;
+
+    public IndexCheckpointManager(Path indexPath) {
+        this.indexPath = indexPath;
+    }
+
+    @Override
+    public synchronized void init(long lsn) throws HyracksDataException {
+        final List<IndexCheckpoint> checkpoints = getCheckpoints();
+        if (!checkpoints.isEmpty()) {
+            LOGGER.warning(() -> "Checkpoints found on initializing: " + indexPath);
+            delete();
+        }
+        IndexCheckpoint firstCheckpoint = IndexCheckpoint.first(lsn);
+        persist(firstCheckpoint);
+    }
+
+    @Override
+    public synchronized void replicated(String componentTimestamp, long masterLsn) throws HyracksDataException {
+        final Long localLsn = getLatest().getMasterNodeFlushMap().get(masterLsn);
+        if (localLsn == null) {
+            throw new IllegalStateException("Component flushed before lsn mapping was received");
+        }
+        flushed(componentTimestamp, localLsn);
+    }
+
+    @Override
+    public synchronized void flushed(String componentTimestamp, long lsn) throws HyracksDataException {
+        final IndexCheckpoint latest = getLatest();
+        IndexCheckpoint nextCheckpoint = IndexCheckpoint.next(latest, lsn, componentTimestamp);
+        persist(nextCheckpoint);
+        deleteHistory(nextCheckpoint.getId(), HISTORY_CHECKPOINTS);
+    }
+
+    @Override
+    public synchronized void masterFlush(long masterLsn, long localLsn) throws HyracksDataException {
+        final IndexCheckpoint latest = getLatest();
+        latest.getMasterNodeFlushMap().put(masterLsn, localLsn);
+        final IndexCheckpoint next =
+                IndexCheckpoint.next(latest, latest.getLowWatermark(), latest.getValidComponentTimestamp());
+        persist(next);
+        notifyAll();
+    }
+
+    @Override
+    public synchronized long getLowWatermark() throws HyracksDataException {
+        return getLatest().getLowWatermark();
+    }
+
+    @Override
+    public synchronized boolean isFlushed(long masterLsn) throws HyracksDataException {
+        if (masterLsn == BULKLOAD_LSN) {
+            return true;
+        }
+        return getLatest().getMasterNodeFlushMap().containsKey(masterLsn);
+    }
+
+    @Override
+    public synchronized void advanceLowWatermark(long lsn) throws HyracksDataException {
+        flushed(getLatest().getValidComponentTimestamp(), lsn);
+    }
+
+    @Override
+    public synchronized void delete() {
+        deleteHistory(Long.MAX_VALUE, 0);
+    }
+
+    private IndexCheckpoint getLatest() {
+        final List<IndexCheckpoint> checkpoints = getCheckpoints();
+        if (checkpoints.isEmpty()) {
+            throw new IllegalStateException("Couldn't find any checkpoints for resource: " + indexPath);
+        }
+        checkpoints.sort(Comparator.comparingLong(IndexCheckpoint::getId).reversed());
+        return checkpoints.get(0);
+    }
+
+    private List<IndexCheckpoint> getCheckpoints() {
+        List<IndexCheckpoint> checkpoints = new ArrayList<>();
+        final File[] checkpointFiles = indexPath.toFile().listFiles(CHECKPOINT_FILE_FILTER);
+        if (checkpointFiles != null) {
+            for (File checkpointFile : checkpointFiles) {
+                try {
+                    checkpoints.add(read(checkpointFile.toPath()));
+                } catch (IOException e) {
+                    LOGGER.log(Level.WARNING, e, () -> "Couldn't read index checkpoint file: " + e);
+                }
+            }
+        }
+        return checkpoints;
+    }
+
+    private void persist(IndexCheckpoint checkpoint) throws HyracksDataException {
+        final Path checkpointPath = getCheckpointPath(checkpoint);
+        for (int i = 1; i <= MAX_CHECKPOINT_WRITE_ATTEMPTS; i++) {
+            try {
+                // clean up from previous write failure
+                if (checkpointPath.toFile().exists()) {
+                    Files.delete(checkpointPath);
+                }
+                try (BufferedWriter writer = Files.newBufferedWriter(checkpointPath)) {
+                    writer.write(checkpoint.asJson());
+                }
+                // ensure it was written correctly by reading it
+                read(checkpointPath);
+            } catch (IOException e) {
+                if (i == MAX_CHECKPOINT_WRITE_ATTEMPTS) {
+                    throw HyracksDataException.create(e);
+                }
+                LOGGER.log(Level.WARNING, e, () -> "Filed to write checkpoint at: " + indexPath);
+                int nextAttempt = i + 1;
+                LOGGER.info(() -> "Checkpoint write attempt " + nextAttempt + "/" + MAX_CHECKPOINT_WRITE_ATTEMPTS);
+            }
+        }
+    }
+
+    private IndexCheckpoint read(Path checkpointPath) throws IOException {
+        return IndexCheckpoint.fromJson(new String(Files.readAllBytes(checkpointPath)));
+    }
+
+    private void deleteHistory(long latestId, int historyToKeep) {
+        try {
+            final File[] checkpointFiles = indexPath.toFile().listFiles(CHECKPOINT_FILE_FILTER);
+            if (checkpointFiles != null) {
+                for (File checkpointFile : checkpointFiles) {
+                    if (getCheckpointIdFromFileName(checkpointFile.toPath()) < (latestId - historyToKeep)) {
+                        Files.delete(checkpointFile.toPath());
+                    }
+                }
+            }
+        } catch (Exception e) {
+            LOGGER.log(Level.WARNING, e, () -> "Couldn't delete history checkpoints at " + indexPath);
+        }
+    }
+
+    private Path getCheckpointPath(IndexCheckpoint checkpoint) {
+        return Paths.get(indexPath.toString(),
+                StorageConstants.INDEX_CHECKPOINT_FILE_PREFIX + String.valueOf(checkpoint.getId()));
+    }
+
+    private long getCheckpointIdFromFileName(Path checkpointPath) {
+        return Long.valueOf(checkpointPath.getFileName().toString()
+                .substring(StorageConstants.INDEX_CHECKPOINT_FILE_PREFIX.length()));
+    }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManagerProvider.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManagerProvider.java
new file mode 100644
index 0000000..19ad8f6
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManagerProvider.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.nc;
+
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.asterix.common.storage.IIndexCheckpointManager;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
+import org.apache.asterix.common.storage.ResourceReference;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IIOManager;
+
+public class IndexCheckpointManagerProvider implements IIndexCheckpointManagerProvider {
+
+    private final Map<ResourceReference, IndexCheckpointManager> indexCheckpointManagerMap = new HashMap<>();
+    private final IIOManager ioManager;
+
+    public IndexCheckpointManagerProvider(IIOManager ioManager) {
+        this.ioManager = ioManager;
+    }
+
+    @Override
+    public IIndexCheckpointManager get(ResourceReference ref) throws HyracksDataException {
+        synchronized (indexCheckpointManagerMap) {
+            return indexCheckpointManagerMap.computeIfAbsent(ref, this::create);
+        }
+    }
+
+    @Override
+    public void close(ResourceReference ref) {
+        synchronized (indexCheckpointManagerMap) {
+            indexCheckpointManagerMap.remove(ref);
+        }
+    }
+
+    private IndexCheckpointManager create(ResourceReference ref) {
+        try {
+            final Path indexPath = getIndexPath(ref);
+            return new IndexCheckpointManager(indexPath);
+        } catch (HyracksDataException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    private Path getIndexPath(ResourceReference indexRef) throws HyracksDataException {
+        return ioManager.resolve(indexRef.getRelativePath().toString()).getFile().toPath();
+    }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 5cae2d6..b6bf2df 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -60,6 +60,7 @@
 import org.apache.asterix.common.replication.IReplicaResourcesManager;
 import org.apache.asterix.common.replication.IReplicationChannel;
 import org.apache.asterix.common.replication.IReplicationManager;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
 import org.apache.asterix.common.storage.IStorageSubsystem;
 import org.apache.asterix.common.transactions.IAppRuntimeContextProvider;
 import org.apache.asterix.common.transactions.IRecoveryManager;
@@ -142,6 +143,7 @@
     private final IStorageComponentProvider componentProvider;
     private IHyracksClientConnection hcc;
     private IStorageSubsystem storageSubsystem;
+    private IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
 
     public NCAppRuntimeContext(INCServiceContext ncServiceContext, List<AsterixExtension> extensions)
             throws AsterixException, InstantiationException, IllegalAccessException, ClassNotFoundException,
@@ -182,11 +184,11 @@
         lsmIOScheduler = AsynchronousScheduler.INSTANCE;
 
         metadataMergePolicyFactory = new PrefixMergePolicyFactory();
+        indexCheckpointManagerProvider = new IndexCheckpointManagerProvider(ioManager);
 
         ILocalResourceRepositoryFactory persistentLocalResourceRepositoryFactory =
                 new PersistentLocalResourceRepositoryFactory(ioManager, getServiceContext().getNodeId(),
-                        metadataProperties);
-
+                        metadataProperties, indexCheckpointManagerProvider);
         localResourceRepository =
                 (PersistentLocalResourceRepository) persistentLocalResourceRepositoryFactory.createRepository();
 
@@ -203,11 +205,10 @@
             }
             localResourceRepository.deleteStorageData();
         }
-
         datasetMemoryManager = new DatasetMemoryManager(storageProperties);
         datasetLifecycleManager =
                 new DatasetLifecycleManager(storageProperties, localResourceRepository, txnSubsystem.getLogManager(),
-                        datasetMemoryManager, ioManager.getIODevices().size());
+                        datasetMemoryManager, indexCheckpointManagerProvider, ioManager.getIODevices().size());
         final String nodeId = getServiceContext().getNodeId();
         final ClusterPartition[] nodePartitions = metadataProperties.getNodePartitions().get(nodeId);
         final Set<Integer> nodePartitionsIds = Arrays.stream(nodePartitions).map(ClusterPartition::getPartitionId)
@@ -220,7 +221,8 @@
 
         if (replicationProperties.isParticipant(getServiceContext().getNodeId())) {
 
-            replicaResourcesManager = new ReplicaResourcesManager(localResourceRepository, metadataProperties);
+            replicaResourcesManager = new ReplicaResourcesManager(localResourceRepository, metadataProperties,
+                    indexCheckpointManagerProvider);
 
             replicationManager = new ReplicationManager(nodeId, replicationProperties, replicaResourcesManager,
                     txnSubsystem.getLogManager(), asterixAppRuntimeContextProvider);
@@ -229,13 +231,13 @@
             //LogManager to replicate logs
             txnSubsystem.getLogManager().setReplicationManager(replicationManager);
 
-            //PersistentLocalResourceRepository to replicate metadata files and delete backups on drop index
+            //PersistentLocalResourceRepository to replicated metadata files and delete backups on drop index
             localResourceRepository.setReplicationManager(replicationManager);
 
             /*
              * add the partitions that will be replicated in this node as inactive partitions
              */
-            //get nodes which replicate to this node
+            //get nodes which replicated to this node
             Set<String> remotePrimaryReplicas = replicationProperties.getRemotePrimaryReplicasIds(nodeId);
             for (String clientId : remotePrimaryReplicas) {
                 //get the partitions of each client
@@ -529,4 +531,9 @@
     public IStorageSubsystem getStorageSubsystem() {
         return storageSubsystem;
     }
+
+    @Override
+    public IIndexCheckpointManagerProvider getIndexCheckpointManagerProvider() {
+        return indexCheckpointManagerProvider;
+    }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index e29e3fe..f0ed5e9 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -43,11 +43,14 @@
 import java.util.logging.Logger;
 
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.config.ReplicationProperties;
 import org.apache.asterix.common.dataflow.DatasetLocalResource;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
 import org.apache.asterix.common.replication.IReplicaResourcesManager;
+import org.apache.asterix.common.storage.DatasetResourceReference;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
 import org.apache.asterix.common.transactions.Checkpoint;
 import org.apache.asterix.common.transactions.IAppRuntimeContextProvider;
 import org.apache.asterix.common.transactions.ICheckpointManager;
@@ -293,6 +296,8 @@
 
         IAppRuntimeContextProvider appRuntimeContext = txnSubsystem.getAsterixAppRuntimeContextProvider();
         IDatasetLifecycleManager datasetLifecycleManager = appRuntimeContext.getDatasetLifecycleManager();
+        final IIndexCheckpointManagerProvider indexCheckpointManagerProvider =
+                ((INcApplicationContext) (serviceCtx.getApplicationContext())).getIndexCheckpointManagerProvider();
 
         Map<Long, LocalResource> resourcesMap = localResourceRepository.loadAndGetAllResources();
         Map<Long, Long> resourceId2MaxLSNMap = new HashMap<>();
@@ -356,18 +361,15 @@
                                     index = (ILSMIndex) localResourceMetadata.createInstance(serviceCtx);
                                     datasetLifecycleManager.register(localResource.getPath(), index);
                                     datasetLifecycleManager.open(localResource.getPath());
-
-                                    //#. get maxDiskLastLSN
-                                    ILSMIndex lsmIndex = index;
                                     try {
+                                        final DatasetResourceReference resourceReference =
+                                                DatasetResourceReference.of(localResource);
                                         maxDiskLastLsn =
-                                                ((AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback())
-                                                        .getComponentLSN(lsmIndex.getDiskComponents());
+                                                indexCheckpointManagerProvider.get(resourceReference).getLowWatermark();
                                     } catch (HyracksDataException e) {
                                         datasetLifecycleManager.close(localResource.getPath());
                                         throw e;
                                     }
-
                                     //#. set resourceId and maxDiskLastLSN to the map
                                     resourceId2MaxLSNMap.put(resourceId, maxDiskLastLsn);
                                 } else {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java
index fea6cd8..4bfc581 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java
@@ -22,6 +22,8 @@
 
 import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallback;
 import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
+import org.apache.asterix.common.storage.IIndexCheckpointManager;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
@@ -58,7 +60,7 @@
         // Whenever this is called, it resets the counter
         // However, the counters for the failed operations are never reset since we expect them
         // To be always 0
-        return new TestLsmBtreeIoOpCallback(index, getComponentIdGenerator());
+        return new TestLsmBtreeIoOpCallback(index, getComponentIdGenerator(), getIndexCheckpointManagerProvider());
     }
 
     public int getTotalFlushes() {
@@ -100,8 +102,9 @@
     public class TestLsmBtreeIoOpCallback extends LSMBTreeIOOperationCallback {
         private final TestLsmBtree lsmBtree;
 
-        public TestLsmBtreeIoOpCallback(ILSMIndex index, ILSMComponentIdGenerator idGenerator) {
-            super(index, idGenerator);
+        public TestLsmBtreeIoOpCallback(ILSMIndex index, ILSMComponentIdGenerator idGenerator,
+                IIndexCheckpointManagerProvider checkpointManagerProvider) {
+            super(index, idGenerator, checkpointManagerProvider);
             lsmBtree = (TestLsmBtree) index;
         }
 
@@ -121,7 +124,8 @@
         }
 
         @Override
-        public void afterFinalize(LSMIOOperationType opType, ILSMDiskComponent newComponent) {
+        public void afterFinalize(LSMIOOperationType opType, ILSMDiskComponent newComponent)
+                throws HyracksDataException {
             lsmBtree.afterIoFinalizeCalled();
             super.afterFinalize(opType, newComponent);
             synchronized (TestLsmBtreeIoOpCallbackFactory.this) {
diff --git a/asterixdb/asterix-common/pom.xml b/asterixdb/asterix-common/pom.xml
index ad83d60..b909e91 100644
--- a/asterixdb/asterix-common/pom.xml
+++ b/asterixdb/asterix-common/pom.xml
@@ -225,18 +225,10 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hyracks</groupId>
-      <artifactId>hyracks-storage-am-btree</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hyracks</groupId>
       <artifactId>hyracks-storage-am-bloomfilter</artifactId>
     </dependency>
     <dependency>
       <groupId>org.apache.hyracks</groupId>
-      <artifactId>hyracks-storage-am-rtree</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hyracks</groupId>
       <artifactId>hyracks-api</artifactId>
     </dependency>
     <dependency>
@@ -245,10 +237,6 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hyracks</groupId>
-      <artifactId>hyracks-storage-am-lsm-invertedindex</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hyracks</groupId>
       <artifactId>hyracks-storage-am-lsm-rtree</artifactId>
     </dependency>
     <dependency>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
index 162e693..0503c09 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
@@ -28,7 +28,9 @@
 import org.apache.asterix.common.replication.IReplicaResourcesManager;
 import org.apache.asterix.common.replication.IReplicationChannel;
 import org.apache.asterix.common.replication.IReplicationManager;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
 import org.apache.asterix.common.storage.IStorageSubsystem;
+import org.apache.asterix.common.storage.IIndexCheckpointManager;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.application.INCServiceContext;
@@ -118,4 +120,6 @@
     INCServiceContext getServiceContext();
 
     IStorageSubsystem getStorageSubsystem();
+
+    IIndexCheckpointManagerProvider getIndexCheckpointManagerProvider();
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
index 9f57981..9ec13ef 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
@@ -46,7 +46,7 @@
     @Override
     public void afterOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
             IModificationOperationCallback modificationCallback) throws HyracksDataException {
-        if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.REPLICATE) {
+        if (opType == LSMOperationType.REPLICATE) {
             dsInfo.undeclareActiveIOOperation();
         }
     }
@@ -54,14 +54,11 @@
     @Override
     public void completeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
             IModificationOperationCallback modificationCallback) throws HyracksDataException {
-        if (opType == LSMOperationType.MERGE) {
+        if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE) {
             dsInfo.undeclareActiveIOOperation();
         }
     }
 
-    public void exclusiveJobCommitted() throws HyracksDataException {
-    }
-
     @Override
     public void beforeTransaction(long resourceId) {
         /*
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index ce43bca..6a1ebfb 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -35,6 +35,8 @@
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
 import org.apache.asterix.common.replication.IReplicationStrategy;
+import org.apache.asterix.common.storage.DatasetResourceReference;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
 import org.apache.asterix.common.transactions.ILogManager;
 import org.apache.asterix.common.transactions.LogRecord;
 import org.apache.asterix.common.utils.StoragePathUtil;
@@ -64,13 +66,16 @@
     private final LogRecord logRecord;
     private final int numPartitions;
     private volatile boolean stopped = false;
+    private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
 
     public DatasetLifecycleManager(StorageProperties storageProperties, ILocalResourceRepository resourceRepository,
-            ILogManager logManager, IDatasetMemoryManager memoryManager, int numPartitions) {
+            ILogManager logManager, IDatasetMemoryManager memoryManager,
+            IIndexCheckpointManagerProvider indexCheckpointManagerProvider, int numPartitions) {
         this.logManager = logManager;
         this.storageProperties = storageProperties;
         this.resourceRepository = resourceRepository;
         this.memoryManager = memoryManager;
+        this.indexCheckpointManagerProvider = indexCheckpointManagerProvider;
         this.numPartitions = numPartitions;
         logRecord = new LogRecord();
     }
@@ -149,12 +154,7 @@
         // TODO: use fine-grained counters, one for each index instead of a single counter per dataset.
         DatasetInfo dsInfo = dsr.getDatasetInfo();
         dsInfo.waitForIO();
-        if (iInfo.isOpen()) {
-            ILSMOperationTracker indexOpTracker = iInfo.getIndex().getOperationTracker();
-            synchronized (indexOpTracker) {
-                iInfo.getIndex().deactivate(false);
-            }
-        }
+        closeIndex(iInfo);
         dsInfo.getIndexes().remove(resourceID);
         if (dsInfo.getReferenceCount() == 0 && dsInfo.isOpen() && dsInfo.getIndexes().isEmpty()
                 && !dsInfo.isExternal()) {
@@ -451,13 +451,7 @@
             throw HyracksDataException.create(e);
         }
         for (IndexInfo iInfo : dsInfo.getIndexes().values()) {
-            if (iInfo.isOpen()) {
-                ILSMOperationTracker opTracker = iInfo.getIndex().getOperationTracker();
-                synchronized (opTracker) {
-                    iInfo.getIndex().deactivate(false);
-                }
-                iInfo.setOpen(false);
-            }
+            closeIndex(iInfo);
         }
         removeDatasetFromCache(dsInfo.getDatasetID());
         dsInfo.setOpen(false);
@@ -579,4 +573,15 @@
             }
         }
     }
+
+    private void closeIndex(IndexInfo indexInfo) throws HyracksDataException {
+        if (indexInfo.isOpen()) {
+            ILSMOperationTracker opTracker = indexInfo.getIndex().getOperationTracker();
+            synchronized (opTracker) {
+                indexInfo.getIndex().deactivate(false);
+            }
+            indexCheckpointManagerProvider.close(DatasetResourceReference.of(indexInfo.getLocalResource()));
+            indexInfo.setOpen(false);
+        }
+    }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
index f6e2b0d..c02de7e 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
@@ -108,7 +108,7 @@
         if (index == null) {
             throw new HyracksDataException("Attempt to register a null index");
         }
-        datasetInfo.getIndexes().put(resourceID, new IndexInfo(index, datasetInfo.getDatasetID(), resourceID,
+        datasetInfo.getIndexes().put(resourceID, new IndexInfo(index, datasetInfo.getDatasetID(), resource,
                 ((DatasetLocalResource) resource.getResource()).getPartition()));
     }
 
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/IndexInfo.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/IndexInfo.java
index 9eb5b6c..b094b6f 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/IndexInfo.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/IndexInfo.java
@@ -19,17 +19,20 @@
 package org.apache.asterix.common.context;
 
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.common.LocalResource;
 
 public class IndexInfo extends Info {
     private final ILSMIndex index;
     private final int datasetId;
     private final long resourceId;
     private final int partition;
+    private final LocalResource localResource;
 
-    public IndexInfo(ILSMIndex index, int datasetId, long resourceId, int partition) {
+    public IndexInfo(ILSMIndex index, int datasetId, LocalResource localResource, int partition) {
         this.index = index;
         this.datasetId = datasetId;
-        this.resourceId = resourceId;
+        this.localResource = localResource;
+        this.resourceId = localResource.getId();
         this.partition = partition;
     }
 
@@ -48,4 +51,8 @@
     public int getDatasetId() {
         return datasetId;
     }
+
+    public LocalResource getLocalResource() {
+        return localResource;
+    }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index ababe9c..14e91ba 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -73,7 +73,7 @@
     public void afterOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
             IModificationOperationCallback modificationCallback) throws HyracksDataException {
         // Searches are immediately considered complete, because they should not prevent the execution of flushes.
-        if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.REPLICATE) {
+        if (opType == LSMOperationType.REPLICATE) {
             completeOperation(index, opType, searchCallback, modificationCallback);
         }
     }
@@ -160,12 +160,6 @@
         flushLogCreated = false;
     }
 
-    @Override
-    public void exclusiveJobCommitted() throws HyracksDataException {
-        numActiveOperations.set(0);
-        flushIfRequested();
-    }
-
     public int getNumActiveOperations() {
         return numActiveOperations.get();
     }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMIndexUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMIndexUtil.java
index 04090bb..e844192 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMIndexUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMIndexUtil.java
@@ -41,11 +41,4 @@
             }
         }
     }
-
-    public static long getComponentFileLSNOffset(ILSMIndex lsmIndex, ILSMDiskComponent lsmComponent,
-            String componentFilePath) throws HyracksDataException {
-        AbstractLSMIOOperationCallback ioOpCallback =
-                (AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback();
-        return ioOpCallback.getComponentFileLSNOffset(lsmComponent, componentFilePath);
-    }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
index 1432f25..c625988 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
@@ -19,8 +19,13 @@
 
 package org.apache.asterix.common.ioopcallbacks;
 
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Optional;
 
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
+import org.apache.asterix.common.storage.ResourceReference;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.primitive.LongPointable;
 import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
@@ -33,6 +38,7 @@
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
+import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
 import org.apache.hyracks.storage.am.lsm.common.impls.DiskComponentMetadata;
 import org.apache.hyracks.storage.am.lsm.common.util.ComponentUtils;
 import org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils;
@@ -61,10 +67,14 @@
     protected ILSMComponentId[] nextComponentIds;
 
     protected final ILSMComponentIdGenerator idGenerator;
+    private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
+    private final Map<ILSMComponentId, Long> componentLsnMap = new HashMap<>();
 
-    public AbstractLSMIOOperationCallback(ILSMIndex lsmIndex, ILSMComponentIdGenerator idGenerator) {
+    public AbstractLSMIOOperationCallback(ILSMIndex lsmIndex, ILSMComponentIdGenerator idGenerator,
+            IIndexCheckpointManagerProvider indexCheckpointManagerProvider) {
         this.lsmIndex = lsmIndex;
         this.idGenerator = idGenerator;
+        this.indexCheckpointManagerProvider = indexCheckpointManagerProvider;
         int count = lsmIndex.getNumberOfAllMemoryComponents();
         mutableLastLSNs = new long[count];
         firstLSNs = new long[count];
@@ -104,42 +114,59 @@
     public void afterOperation(LSMIOOperationType opType, List<ILSMComponent> oldComponents,
             ILSMDiskComponent newComponent) throws HyracksDataException {
         //TODO: Copying Filters and all content of the metadata pages for flush operation should be done here
-        if (newComponent != null) {
-            putLSNIntoMetadata(newComponent, oldComponents);
-            putComponentIdIntoMetadata(opType, newComponent, oldComponents);
-            if (opType == LSMIOOperationType.MERGE) {
-                // In case of merge, oldComponents are never null
-                LongPointable markerLsn =
-                        LongPointable.FACTORY.createPointable(ComponentUtils.getLong(oldComponents.get(0).getMetadata(),
-                                ComponentUtils.MARKER_LSN_KEY, ComponentUtils.NOT_FOUND));
-                newComponent.getMetadata().put(ComponentUtils.MARKER_LSN_KEY, markerLsn);
-            } else if (opType == LSMIOOperationType.FLUSH) {
-                // advance memory component indexes
-                synchronized (this) {
-                    // we've already consumed the specified LSN/component id.
-                    // Now we can advance to the next component
-                    flushRequested[readIndex] = false;
-                    // if the component which just finished flushing is the component that will be modified next,
-                    // we set its first LSN to its previous LSN
-                    if (readIndex == writeIndex) {
-                        firstLSNs[writeIndex] = mutableLastLSNs[writeIndex];
-                    }
-                    readIndex = (readIndex + 1) % mutableLastLSNs.length;
+        if (newComponent == null) {
+            // failed operation. Nothing to do.
+            return;
+        }
+        putLSNIntoMetadata(newComponent, oldComponents);
+        putComponentIdIntoMetadata(opType, newComponent, oldComponents);
+        componentLsnMap.put(newComponent.getId(), getComponentLSN(oldComponents));
+        if (opType == LSMIOOperationType.MERGE) {
+            if (oldComponents == null) {
+                throw new IllegalStateException("Merge must have old components");
+            }
+            LongPointable markerLsn = LongPointable.FACTORY.createPointable(ComponentUtils
+                    .getLong(oldComponents.get(0).getMetadata(), ComponentUtils.MARKER_LSN_KEY,
+                            ComponentUtils.NOT_FOUND));
+            newComponent.getMetadata().put(ComponentUtils.MARKER_LSN_KEY, markerLsn);
+        } else if (opType == LSMIOOperationType.FLUSH) {
+            // advance memory component indexes
+            synchronized (this) {
+                // we've already consumed the specified LSN/component id.
+                // Now we can advance to the next component
+                flushRequested[readIndex] = false;
+                // if the component which just finished flushing is the component that will be modified next,
+                // we set its first LSN to its previous LSN
+                if (readIndex == writeIndex) {
+                    firstLSNs[writeIndex] = mutableLastLSNs[writeIndex];
                 }
+                readIndex = (readIndex + 1) % mutableLastLSNs.length;
             }
         }
     }
 
     @Override
-    public void afterFinalize(LSMIOOperationType opType, ILSMDiskComponent newComponent) {
+    public void afterFinalize(LSMIOOperationType opType, ILSMDiskComponent newComponent) throws HyracksDataException {
         // The operation was complete and the next I/O operation for the LSM index didn't start yet
         if (opType == LSMIOOperationType.FLUSH) {
             hasFlushed = true;
+            if (newComponent != null) {
+                final Long lsn = componentLsnMap.remove(newComponent.getId());
+                if (lsn == null) {
+                    throw new IllegalStateException("Unidentified flushed component: " + newComponent);
+                }
+                // empty component doesn't have any files
+                final Optional<String> componentFile = newComponent.getLSMComponentPhysicalFiles().stream().findAny();
+                if (componentFile.isPresent()) {
+                    final ResourceReference ref = ResourceReference.of(componentFile.get());
+                    final String componentEndTime = AbstractLSMIndexFileManager.getComponentEndTime(ref.getName());
+                    indexCheckpointManagerProvider.get(ref).flushed(componentEndTime, lsn);
+                }
+            }
         }
-
     }
 
-    public void putLSNIntoMetadata(ILSMDiskComponent newComponent, List<ILSMComponent> oldComponents)
+    private void putLSNIntoMetadata(ILSMDiskComponent newComponent, List<ILSMComponent> oldComponents)
             throws HyracksDataException {
         newComponent.getMetadata().put(LSN_KEY, LongPointable.FACTORY.createPointable(getComponentLSN(oldComponents)));
     }
@@ -155,8 +182,8 @@
         if (mergedComponents == null || mergedComponents.isEmpty()) {
             return null;
         }
-        return LSMComponentIdUtils.union(mergedComponents.get(0).getId(),
-                mergedComponents.get(mergedComponents.size() - 1).getId());
+        return LSMComponentIdUtils
+                .union(mergedComponents.get(0).getId(), mergedComponents.get(mergedComponents.size() - 1).getId());
 
     }
 
@@ -186,7 +213,7 @@
         }
     }
 
-    public void setFirstLSN(long firstLSN) {
+    public synchronized void setFirstLSN(long firstLSN) {
         // We make sure that this method is only called on an empty component so the first LSN is not set incorrectly
         firstLSNs[writeIndex] = firstLSN;
     }
@@ -212,8 +239,7 @@
             // Implies a flush IO operation. --> moves the flush pointer
             // Flush operation of an LSM index are executed sequentially.
             synchronized (this) {
-                long lsn = mutableLastLSNs[readIndex];
-                return lsn;
+                return mutableLastLSNs[readIndex];
             }
         }
         // Get max LSN from the diskComponents. Implies a merge IO operation or Recovery operation.
@@ -246,15 +272,4 @@
             component.resetId(componentId);
         }
     }
-
-    /**
-     * @param component
-     * @param componentFilePath
-     * @return The LSN byte offset in the LSM disk component if the index is valid,
-     *         otherwise {@link IMetadataPageManager#INVALID_LSN_OFFSET}.
-     * @throws HyracksDataException
-     */
-    public abstract long getComponentFileLSNOffset(ILSMDiskComponent component, String componentFilePath)
-            throws HyracksDataException;
-
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIndexIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIndexIOOperationCallbackFactory.java
index 5dff7f4..ed56ab1 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIndexIOOperationCallbackFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIndexIOOperationCallbackFactory.java
@@ -21,6 +21,8 @@
 
 import java.io.ObjectStreamException;
 
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
 import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
@@ -49,6 +51,10 @@
         return idGeneratorFactory.getComponentIdGenerator(ncCtx);
     }
 
+    protected IIndexCheckpointManagerProvider getIndexCheckpointManagerProvider() {
+        return ((INcApplicationContext) ncCtx.getApplicationContext()).getIndexCheckpointManagerProvider();
+    }
+
     private void readObjectNoData() throws ObjectStreamException {
         idGeneratorFactory = new ILSMComponentIdGeneratorFactory() {
             private static final long serialVersionUID = 1L;
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
index c1ee03b..db6c609 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
@@ -19,28 +19,14 @@
 
 package org.apache.asterix.common.ioopcallbacks;
 
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.btree.impls.BTree;
-import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
-import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeFileManager;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 
 public class LSMBTreeIOOperationCallback extends AbstractLSMIOOperationCallback {
 
-    public LSMBTreeIOOperationCallback(ILSMIndex index, ILSMComponentIdGenerator idGenerator) {
-        super(index, idGenerator);
-    }
-
-    @Override
-    public long getComponentFileLSNOffset(ILSMDiskComponent diskComponent, String diskComponentFilePath)
-            throws HyracksDataException {
-        if (diskComponentFilePath.endsWith(LSMBTreeFileManager.BTREE_SUFFIX)) {
-            IMetadataPageManager metadataPageManager =
-                    (IMetadataPageManager) ((BTree) diskComponent.getIndex()).getPageManager();
-            return metadataPageManager.getFileOffset(metadataPageManager.createMetadataFrame(), LSN_KEY);
-        }
-        return INVALID;
+    public LSMBTreeIOOperationCallback(ILSMIndex index, ILSMComponentIdGenerator idGenerator,
+            IIndexCheckpointManagerProvider checkpointManagerProvider) {
+        super(index, idGenerator, checkpointManagerProvider);
     }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
index 4ef12ef..95245cb 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
@@ -33,6 +33,6 @@
 
     @Override
     public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) {
-        return new LSMBTreeIOOperationCallback(index, getComponentIdGenerator());
+        return new LSMBTreeIOOperationCallback(index, getComponentIdGenerator(), getIndexCheckpointManagerProvider());
     }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java
index b43fb2f..da1446b 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java
@@ -18,28 +18,14 @@
  */
 package org.apache.asterix.common.ioopcallbacks;
 
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.btree.impls.BTree;
-import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
-import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeWithBuddyFileManager;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 
 public class LSMBTreeWithBuddyIOOperationCallback extends AbstractLSMIOOperationCallback {
 
-    public LSMBTreeWithBuddyIOOperationCallback(ILSMIndex lsmIndex, ILSMComponentIdGenerator idGenerator) {
-        super(lsmIndex, idGenerator);
-    }
-
-    @Override
-    public long getComponentFileLSNOffset(ILSMDiskComponent diskComponent, String diskComponentFilePath)
-            throws HyracksDataException {
-        if (diskComponentFilePath.endsWith(LSMBTreeWithBuddyFileManager.BTREE_SUFFIX)) {
-            IMetadataPageManager metadataPageManager =
-                    (IMetadataPageManager) ((BTree) diskComponent.getIndex()).getPageManager();
-            return metadataPageManager.getFileOffset(metadataPageManager.createMetadataFrame(), LSN_KEY);
-        }
-        return INVALID;
+    public LSMBTreeWithBuddyIOOperationCallback(ILSMIndex lsmIndex, ILSMComponentIdGenerator idGenerator,
+            IIndexCheckpointManagerProvider checkpointManagerProvider) {
+        super(lsmIndex, idGenerator, checkpointManagerProvider);
     }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java
index 6727bf6..6c75ed6 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java
@@ -32,6 +32,7 @@
 
     @Override
     public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) {
-        return new LSMBTreeWithBuddyIOOperationCallback(index, getComponentIdGenerator());
+        return new LSMBTreeWithBuddyIOOperationCallback(index, getComponentIdGenerator(),
+                getIndexCheckpointManagerProvider());
     }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
index 015cd38..3ba9bcd 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
@@ -19,29 +19,14 @@
 
 package org.apache.asterix.common.ioopcallbacks;
 
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
-import org.apache.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndexDiskComponent;
-import org.apache.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndexFileManager;
 
 public class LSMInvertedIndexIOOperationCallback extends AbstractLSMIOOperationCallback {
 
-    public LSMInvertedIndexIOOperationCallback(ILSMIndex index, ILSMComponentIdGenerator idGenerator) {
-        super(index, idGenerator);
-    }
-
-    @Override
-    public long getComponentFileLSNOffset(ILSMDiskComponent diskComponent, String diskComponentFilePath)
-            throws HyracksDataException {
-        if (diskComponentFilePath.endsWith(LSMInvertedIndexFileManager.DELETED_KEYS_BTREE_SUFFIX)) {
-            LSMInvertedIndexDiskComponent invIndexComponent = (LSMInvertedIndexDiskComponent) diskComponent;
-            IMetadataPageManager metadataPageManager =
-                    (IMetadataPageManager) invIndexComponent.getBuddyIndex().getPageManager();
-            return metadataPageManager.getFileOffset(metadataPageManager.createMetadataFrame(), LSN_KEY);
-        }
-        return INVALID;
+    public LSMInvertedIndexIOOperationCallback(ILSMIndex index, ILSMComponentIdGenerator idGenerator,
+            IIndexCheckpointManagerProvider checkpointManagerProvider) {
+        super(index, idGenerator, checkpointManagerProvider);
     }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
index a2712d1..fb73d19 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
@@ -33,6 +33,7 @@
 
     @Override
     public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) {
-        return new LSMInvertedIndexIOOperationCallback(index, getComponentIdGenerator());
+        return new LSMInvertedIndexIOOperationCallback(index, getComponentIdGenerator(),
+                getIndexCheckpointManagerProvider());
     }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
index bc79074..f3e80ec 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
@@ -19,28 +19,14 @@
 
 package org.apache.asterix.common.ioopcallbacks;
 
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
-import org.apache.hyracks.storage.am.lsm.rtree.impls.LSMRTreeFileManager;
-import org.apache.hyracks.storage.am.rtree.impls.RTree;
 
 public class LSMRTreeIOOperationCallback extends AbstractLSMIOOperationCallback {
 
-    public LSMRTreeIOOperationCallback(ILSMIndex index, ILSMComponentIdGenerator idGenerator) {
-        super(index, idGenerator);
-    }
-
-    @Override
-    public long getComponentFileLSNOffset(ILSMDiskComponent diskComponent, String diskComponentFilePath)
-            throws HyracksDataException {
-        if (diskComponentFilePath.endsWith(LSMRTreeFileManager.RTREE_SUFFIX)) {
-            IMetadataPageManager metadataPageManager =
-                    (IMetadataPageManager) ((RTree) diskComponent.getIndex()).getPageManager();
-            return metadataPageManager.getFileOffset(metadataPageManager.createMetadataFrame(), LSN_KEY);
-        }
-        return INVALID;
+    public LSMRTreeIOOperationCallback(ILSMIndex index, ILSMComponentIdGenerator idGenerator,
+            IIndexCheckpointManagerProvider checkpointManagerProvodier) {
+        super(index, idGenerator, checkpointManagerProvodier);
     }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
index 087aaae..94be0bb 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
@@ -33,6 +33,6 @@
 
     @Override
     public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) {
-        return new LSMRTreeIOOperationCallback(index, getComponentIdGenerator());
+        return new LSMRTreeIOOperationCallback(index, getComponentIdGenerator(), getIndexCheckpointManagerProvider());
     }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/DatasetResourceReference.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/DatasetResourceReference.java
index d05321e..c488b65 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/DatasetResourceReference.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/DatasetResourceReference.java
@@ -28,6 +28,7 @@
 
     private int datasetId;
     private int partitionId;
+    private long resourceId;
 
     private DatasetResourceReference() {
         super();
@@ -53,6 +54,10 @@
         return partitionId;
     }
 
+    public long getResourceId() {
+        return resourceId;
+    }
+
     private static DatasetResourceReference parse(LocalResource localResource) {
         final DatasetResourceReference datasetResourceReference = new DatasetResourceReference();
         final String filePath = Paths.get(localResource.getPath(), StorageConstants.METADATA_FILE_NAME).toString();
@@ -73,5 +78,28 @@
         final DatasetLocalResource dsResource = (DatasetLocalResource) localResource.getResource();
         lrr.datasetId = dsResource.getDatasetId();
         lrr.partitionId = dsResource.getPartition();
+        lrr.resourceId = localResource.getId();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o != null && o instanceof ResourceReference) {
+            ResourceReference that = (ResourceReference) o;
+            return getRelativePath().toString().equals(that.getRelativePath().toString());
+        }
+        return false;
+    }
+
+    @Override
+    public int hashCode() {
+        return getRelativePath().toString().hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return getRelativePath().toString();
     }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
new file mode 100644
index 0000000..afa3823
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.storage;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface IIndexCheckpointManager {
+
+    /**
+     * Initializes the first checkpoint of an index with low watermark {@code lsn}
+     *
+     * @param lsn
+     * @throws HyracksDataException
+     */
+    void init(long lsn) throws HyracksDataException;
+
+    /**
+     * Called when a new LSM disk component is flushed. When called,  the index checkpoiint is updated
+     * with the latest valid {@code componentTimestamp} and low watermark {@code lsn}
+     *
+     * @param componentTimestamp
+     * @param lsn
+     * @throws HyracksDataException
+     */
+    void flushed(String componentTimestamp, long lsn) throws HyracksDataException;
+
+    /**
+     * Called when a new LSM disk component is replicated from master. When called,  the index checkpoiint is updated
+     * with the latest valid {@code componentTimestamp} and the local lsn mapping of {@code masterLsn} is set as the
+     * new low watermark.
+     *
+     * @param componentTimestamp
+     * @param masterLsn
+     * @throws HyracksDataException
+     */
+    void replicated(String componentTimestamp, long masterLsn) throws HyracksDataException;
+
+    /**
+     * Called when a flush log is received and replicated from master. The mapping between
+     * {@code masterLsn} and {@code localLsn} is updated in the checkpoint.
+     *
+     * @param masterLsn
+     * @param localLsn
+     * @throws HyracksDataException
+     */
+    void masterFlush(long masterLsn, long localLsn) throws HyracksDataException;
+
+    /**
+     * The index low watermark
+     *
+     * @return The low watermark
+     * @throws HyracksDataException
+     */
+    long getLowWatermark() throws HyracksDataException;
+
+    /**
+     * True if a mapping exists between {@code masterLsn} and a localLsn. Otherwise false.
+     *
+     * @param masterLsn
+     * @return True if the mapping exists. Otherwise false.
+     * @throws HyracksDataException
+     */
+    boolean isFlushed(long masterLsn) throws HyracksDataException;
+
+    /**
+     * Advance the index low watermark to {@code lsn}
+     *
+     * @param lsn
+     * @throws HyracksDataException
+     */
+    void advanceLowWatermark(long lsn) throws HyracksDataException;
+
+    /**
+     * Deletes all checkpoints
+     */
+    void delete();
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManagerProvider.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManagerProvider.java
new file mode 100644
index 0000000..e6cef57
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManagerProvider.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.storage;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface IIndexCheckpointManagerProvider {
+
+    /**
+     * Gets {@link IIndexCheckpointManager} for the index referenced by {@code ref}
+     *
+     * @param ref
+     * @return The index checkpoint manager.
+     * @throws HyracksDataException
+     */
+    IIndexCheckpointManager get(ResourceReference ref) throws HyracksDataException;
+
+    /**
+     * Closes any resources used by the index checkpoint manager referenced by {@code ref}
+     *
+     * @param ref
+     */
+    void close(ResourceReference ref);
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
new file mode 100644
index 0000000..6e845e1
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.storage;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+public class IndexCheckpoint {
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    private static final long INITIAL_CHECKPOINT_ID = 0;
+    private long id;
+    private String validComponentTimestamp;
+    private long lowWatermark;
+    private Map<Long, Long> masterNodeFlushMap;
+
+    public static IndexCheckpoint first(long lowWatermark) {
+        IndexCheckpoint firstCheckpoint = new IndexCheckpoint();
+        firstCheckpoint.id = INITIAL_CHECKPOINT_ID;
+        firstCheckpoint.lowWatermark = lowWatermark;
+        firstCheckpoint.validComponentTimestamp = null;
+        firstCheckpoint.masterNodeFlushMap = new HashMap<>();
+        return firstCheckpoint;
+    }
+
+    public static IndexCheckpoint next(IndexCheckpoint latest, long lowWatermark, String validComponentTimestamp) {
+        if (lowWatermark < latest.getLowWatermark()) {
+            throw new IllegalStateException("Low watermark should always be increasing");
+        }
+        IndexCheckpoint next = new IndexCheckpoint();
+        next.id = latest.getId() + 1;
+        next.lowWatermark = lowWatermark;
+        next.validComponentTimestamp = validComponentTimestamp;
+        next.masterNodeFlushMap = latest.getMasterNodeFlushMap();
+        // remove any lsn from the map that wont be used anymore
+        next.masterNodeFlushMap.values().removeIf(lsn -> lsn <= lowWatermark);
+        return next;
+    }
+
+    @JsonCreator
+    private IndexCheckpoint() {
+    }
+
+    public String getValidComponentTimestamp() {
+        return validComponentTimestamp;
+    }
+
+    public long getLowWatermark() {
+        return lowWatermark;
+    }
+
+    public Map<Long, Long> getMasterNodeFlushMap() {
+        return masterNodeFlushMap;
+    }
+
+    public long getId() {
+        return id;
+    }
+
+    public String asJson() throws HyracksDataException {
+        try {
+            return OBJECT_MAPPER.writeValueAsString(this);
+        } catch (JsonProcessingException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    public static IndexCheckpoint fromJson(String json) throws HyracksDataException {
+        try {
+            return OBJECT_MAPPER.readValue(json, IndexCheckpoint.class);
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
index bd057fa..4aa6982 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
@@ -107,4 +107,26 @@
         ref.root = tokens[--offset];
         ref.rebalance = String.valueOf(0);
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o != null && o instanceof ResourceReference) {
+            ResourceReference that = (ResourceReference) o;
+            return getRelativePath().toString().equals(that.getRelativePath().toString());
+        }
+        return false;
+    }
+
+    @Override
+    public int hashCode() {
+        return getRelativePath().toString().hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return getRelativePath().toString();
+    }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
index 6262f71..220b089 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.common.utils;
 
 import org.apache.hyracks.storage.am.common.api.ITreeIndexFrame;
+import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
 
 /**
  * A static class that stores storage constants
@@ -27,6 +28,12 @@
 
     public static final String STORAGE_ROOT_DIR_NAME = "storage";
     public static final String PARTITION_DIR_PREFIX = "partition_";
+    /**
+     * Any file that shares the same directory as the LSM index files must
+     * begin with ".". Otherwise {@link AbstractLSMIndexFileManager} will try to
+     * use them as index files.
+     */
+    public static final String INDEX_CHECKPOINT_FILE_PREFIX = ".idx_checkpoint_";
     public static final String METADATA_FILE_NAME = ".metadata";
     public static final String LEGACY_DATASET_INDEX_NAME_SEPARATOR = "_idx_";
 
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
index 2928d90..3aa7b17 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
@@ -41,6 +41,7 @@
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
 import org.apache.hyracks.storage.common.IIndexAccessParameters;
+import org.apache.hyracks.storage.common.LocalResource;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -237,8 +238,8 @@
 
         Mockito.when(index.createAccessor(Mockito.any(IIndexAccessParameters.class))).thenReturn(accessor);
         Mockito.when(index.isPrimaryIndex()).thenReturn(isPrimary);
-
-        return new IndexInfo(index, DATASET_ID, 0, partition);
+        final LocalResource localResource = Mockito.mock(LocalResource.class);
+        return new IndexInfo(index, DATASET_ID, localResource, partition);
     }
 
 }
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/AbstractLSMIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/AbstractLSMIOOperationCallbackTest.java
index 0f2ea50..1c57d1b 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/AbstractLSMIOOperationCallbackTest.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/AbstractLSMIOOperationCallbackTest.java
@@ -21,6 +21,8 @@
 
 import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
 import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallback;
+import org.apache.asterix.common.storage.IIndexCheckpointManager;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
@@ -45,8 +47,8 @@
         ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
         Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
         Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(Mockito.mock(AbstractLSMMemoryComponent.class));
-        LSMBTreeIOOperationCallback callback =
-                new LSMBTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator());
+        LSMBTreeIOOperationCallback callback = new LSMBTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator(),
+                mockIndexCheckpointManagerProvider());
 
         //request to flush first component
         callback.updateLastLSN(1);
@@ -57,13 +59,14 @@
         callback.beforeOperation(LSMIOOperationType.FLUSH);
 
         Assert.assertEquals(1, callback.getComponentLSN(null));
-        callback.afterOperation(LSMIOOperationType.FLUSH, null, mockDiskComponent());
-        callback.afterFinalize(LSMIOOperationType.FLUSH, mockDiskComponent());
+        final ILSMDiskComponent diskComponent1 = mockDiskComponent();
+        callback.afterOperation(LSMIOOperationType.FLUSH, null, diskComponent1);
+        callback.afterFinalize(LSMIOOperationType.FLUSH, diskComponent1);
 
         Assert.assertEquals(2, callback.getComponentLSN(null));
-
-        callback.afterOperation(LSMIOOperationType.FLUSH, null, mockDiskComponent());
-        callback.afterFinalize(LSMIOOperationType.FLUSH, mockDiskComponent());
+        final ILSMDiskComponent diskComponent2 = mockDiskComponent();
+        callback.afterOperation(LSMIOOperationType.FLUSH, null, diskComponent2);
+        callback.afterFinalize(LSMIOOperationType.FLUSH, diskComponent2);
     }
 
     @Test
@@ -72,8 +75,8 @@
         Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
         Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(Mockito.mock(AbstractLSMMemoryComponent.class));
 
-        LSMBTreeIOOperationCallback callback =
-                new LSMBTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator());
+        LSMBTreeIOOperationCallback callback = new LSMBTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator(),
+                mockIndexCheckpointManagerProvider());
 
         //request to flush first component
         callback.updateLastLSN(1);
@@ -90,11 +93,13 @@
         //the scheduleFlush request would fail this time
 
         Assert.assertEquals(1, callback.getComponentLSN(null));
-        callback.afterOperation(LSMIOOperationType.FLUSH, null, mockDiskComponent());
-        callback.afterFinalize(LSMIOOperationType.FLUSH, mockDiskComponent());
-
+        final ILSMDiskComponent diskComponent1 = mockDiskComponent();
+        callback.afterOperation(LSMIOOperationType.FLUSH, null, diskComponent1);
+        callback.afterFinalize(LSMIOOperationType.FLUSH, diskComponent1);
+        final ILSMDiskComponent diskComponent2 = mockDiskComponent();
         Assert.assertEquals(2, callback.getComponentLSN(null));
-        callback.afterFinalize(LSMIOOperationType.FLUSH, mockDiskComponent());
+        callback.afterOperation(LSMIOOperationType.FLUSH, null, diskComponent2);
+        callback.afterFinalize(LSMIOOperationType.FLUSH, diskComponent2);
     }
 
     @Test
@@ -103,9 +108,8 @@
         Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
         Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(Mockito.mock(AbstractLSMMemoryComponent.class));
 
-        LSMBTreeIOOperationCallback callback =
-                new LSMBTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator());
-
+        LSMBTreeIOOperationCallback callback = new LSMBTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator(),
+                mockIndexCheckpointManagerProvider());
         //request to flush first component
         callback.updateLastLSN(1);
         callback.beforeOperation(LSMIOOperationType.FLUSH);
@@ -144,7 +148,8 @@
         ILSMMemoryComponent mockComponent = Mockito.mock(AbstractLSMMemoryComponent.class);
         Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(mockComponent);
 
-        LSMBTreeIOOperationCallback callback = new LSMBTreeIOOperationCallback(mockIndex, idGenerator);
+        LSMBTreeIOOperationCallback callback =
+                new LSMBTreeIOOperationCallback(mockIndex, idGenerator, mockIndexCheckpointManagerProvider());
 
         ILSMComponentId initialId = idGenerator.getId();
         // simulate a partition is flushed before allocated
@@ -162,7 +167,8 @@
         Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
         ILSMMemoryComponent mockComponent = Mockito.mock(AbstractLSMMemoryComponent.class);
         Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(mockComponent);
-        LSMBTreeIOOperationCallback callback = new LSMBTreeIOOperationCallback(mockIndex, idGenerator);
+        LSMBTreeIOOperationCallback callback =
+                new LSMBTreeIOOperationCallback(mockIndex, idGenerator, mockIndexCheckpointManagerProvider());
 
         ILSMComponentId id = idGenerator.getId();
         callback.allocated(mockComponent);
@@ -178,8 +184,9 @@
             callback.beforeOperation(LSMIOOperationType.FLUSH);
             callback.recycled(mockComponent, true);
 
-            callback.afterOperation(LSMIOOperationType.FLUSH, null, mockDiskComponent());
-            callback.afterFinalize(LSMIOOperationType.FLUSH, mockDiskComponent());
+            final ILSMDiskComponent diskComponent = mockDiskComponent();
+            callback.afterOperation(LSMIOOperationType.FLUSH, null, diskComponent);
+            callback.afterFinalize(LSMIOOperationType.FLUSH, diskComponent);
             checkMemoryComponent(expectedId, mockComponent);
         }
     }
@@ -191,7 +198,8 @@
         Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
         ILSMMemoryComponent mockComponent = Mockito.mock(AbstractLSMMemoryComponent.class);
         Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(mockComponent);
-        LSMBTreeIOOperationCallback callback = new LSMBTreeIOOperationCallback(mockIndex, idGenerator);
+        LSMBTreeIOOperationCallback callback =
+                new LSMBTreeIOOperationCallback(mockIndex, idGenerator, mockIndexCheckpointManagerProvider());
 
         ILSMComponentId id = idGenerator.getId();
         callback.allocated(mockComponent);
@@ -216,7 +224,8 @@
         ILSMMemoryComponent mockComponent = Mockito.mock(AbstractLSMMemoryComponent.class);
         Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(mockComponent);
         Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
-        LSMBTreeIOOperationCallback callback = new LSMBTreeIOOperationCallback(mockIndex, idGenerator);
+        LSMBTreeIOOperationCallback callback =
+                new LSMBTreeIOOperationCallback(mockIndex, idGenerator, mockIndexCheckpointManagerProvider());
 
         ILSMComponentId id = idGenerator.getId();
         callback.allocated(mockComponent);
@@ -230,7 +239,9 @@
 
         callback.updateLastLSN(0);
         callback.beforeOperation(LSMIOOperationType.FLUSH);
-        callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
+        final ILSMDiskComponent diskComponent = mockDiskComponent();
+        callback.afterOperation(LSMIOOperationType.FLUSH, null, diskComponent);
+        callback.afterFinalize(LSMIOOperationType.FLUSH, diskComponent);
 
         // another flush is to be scheduled before the component is recycled
         idGenerator.refresh();
@@ -243,7 +254,9 @@
         // schedule the next flush
         callback.updateLastLSN(0);
         callback.beforeOperation(LSMIOOperationType.FLUSH);
-        callback.afterFinalize(LSMIOOperationType.FLUSH, Mockito.mock(ILSMDiskComponent.class));
+        final ILSMDiskComponent diskComponent2 = mockDiskComponent();
+        callback.afterOperation(LSMIOOperationType.FLUSH, null, diskComponent2);
+        callback.afterFinalize(LSMIOOperationType.FLUSH, diskComponent2);
         callback.recycled(mockComponent, true);
         checkMemoryComponent(nextId, mockComponent);
     }
@@ -263,5 +276,14 @@
         return component;
     }
 
-    protected abstract AbstractLSMIOOperationCallback getIoCallback();
+    protected IIndexCheckpointManagerProvider mockIndexCheckpointManagerProvider() throws HyracksDataException {
+        IIndexCheckpointManagerProvider indexCheckpointManagerProvider =
+                Mockito.mock(IIndexCheckpointManagerProvider.class);
+        IIndexCheckpointManager indexCheckpointManager = Mockito.mock(IIndexCheckpointManager.class);
+        Mockito.doNothing().when(indexCheckpointManager).flushed(Mockito.any(), Mockito.anyLong());
+        Mockito.doReturn(indexCheckpointManager).when(indexCheckpointManagerProvider).get(Mockito.any());
+        return indexCheckpointManagerProvider;
+    }
+
+    protected abstract AbstractLSMIOOperationCallback getIoCallback() throws HyracksDataException;
 }
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeIOOperationCallbackTest.java
index c22e2e3..a4bc399 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeIOOperationCallbackTest.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeIOOperationCallbackTest.java
@@ -21,6 +21,7 @@
 
 import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
 import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallback;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentIdGenerator;
 import org.mockito.Mockito;
@@ -28,10 +29,11 @@
 public class LSMBTreeIOOperationCallbackTest extends AbstractLSMIOOperationCallbackTest {
 
     @Override
-    protected AbstractLSMIOOperationCallback getIoCallback() {
+    protected AbstractLSMIOOperationCallback getIoCallback() throws HyracksDataException {
         ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
         Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
-        return new LSMBTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator());
+        return new LSMBTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator(),
+                mockIndexCheckpointManagerProvider());
     }
 
 }
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackTest.java
index 356c80a..5f37c78 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackTest.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackTest.java
@@ -21,6 +21,7 @@
 
 import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
 import org.apache.asterix.common.ioopcallbacks.LSMBTreeWithBuddyIOOperationCallback;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentIdGenerator;
 import org.mockito.Mockito;
@@ -28,10 +29,11 @@
 public class LSMBTreeWithBuddyIOOperationCallbackTest extends AbstractLSMIOOperationCallbackTest {
 
     @Override
-    protected AbstractLSMIOOperationCallback getIoCallback() {
+    protected AbstractLSMIOOperationCallback getIoCallback() throws HyracksDataException {
         ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
         Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
-        return new LSMBTreeWithBuddyIOOperationCallback(mockIndex, new LSMComponentIdGenerator());
+        return new LSMBTreeWithBuddyIOOperationCallback(mockIndex, new LSMComponentIdGenerator(),
+                mockIndexCheckpointManagerProvider());
     }
 
 }
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMInvertedIndexIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMInvertedIndexIOOperationCallbackTest.java
index ac4595e..343bc59 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMInvertedIndexIOOperationCallbackTest.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMInvertedIndexIOOperationCallbackTest.java
@@ -21,6 +21,7 @@
 
 import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
 import org.apache.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallback;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentIdGenerator;
 import org.mockito.Mockito;
@@ -28,10 +29,11 @@
 public class LSMInvertedIndexIOOperationCallbackTest extends AbstractLSMIOOperationCallbackTest {
 
     @Override
-    protected AbstractLSMIOOperationCallback getIoCallback() {
+    protected AbstractLSMIOOperationCallback getIoCallback() throws HyracksDataException {
         ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
         Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
-        return new LSMInvertedIndexIOOperationCallback(mockIndex, new LSMComponentIdGenerator());
+        return new LSMInvertedIndexIOOperationCallback(mockIndex, new LSMComponentIdGenerator(),
+                mockIndexCheckpointManagerProvider());
     }
 
 }
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMRTreeIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMRTreeIOOperationCallbackTest.java
index 0131e3f..10d95d8 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMRTreeIOOperationCallbackTest.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMRTreeIOOperationCallbackTest.java
@@ -21,6 +21,7 @@
 
 import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
 import org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallback;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentIdGenerator;
 import org.mockito.Mockito;
@@ -28,10 +29,11 @@
 public class LSMRTreeIOOperationCallbackTest extends AbstractLSMIOOperationCallbackTest {
 
     @Override
-    protected AbstractLSMIOOperationCallback getIoCallback() {
+    protected AbstractLSMIOOperationCallback getIoCallback() throws HyracksDataException {
         ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
         Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(2);
-        return new LSMRTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator());
+        return new LSMRTreeIOOperationCallback(mockIndex, new LSMComponentIdGenerator(),
+                mockIndexCheckpointManagerProvider());
     }
 
 }
diff --git a/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixLifecycleIT.java b/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixLifecycleIT.java
index 814e109..1434629 100644
--- a/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixLifecycleIT.java
+++ b/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/AsterixLifecycleIT.java
@@ -19,43 +19,18 @@
 package org.apache.asterix.installer.test;
 
 import java.io.File;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.logging.Logger;
 
-import org.apache.asterix.event.error.VerificationUtil;
-import org.apache.asterix.event.model.AsterixInstance;
 import org.apache.asterix.event.model.AsterixInstance.State;
-import org.apache.asterix.event.model.AsterixRuntimeState;
-import org.apache.asterix.event.service.ServiceProvider;
-import org.apache.asterix.installer.command.CommandHandler;
-import org.apache.asterix.test.common.TestExecutor;
-import org.apache.asterix.testframework.context.TestCaseContext;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.junit.FixMethodOrder;
-import org.junit.Test;
-import org.junit.runners.MethodSorters;
-import org.junit.runners.Parameterized.Parameters;
-
-@FixMethodOrder(MethodSorters.NAME_ASCENDING)
 
 public class AsterixLifecycleIT {
 
-    private static final int NUM_NC = 2;
-    private static final CommandHandler cmdHandler = new CommandHandler();
-    private static final String PATH_BASE = "src/test/resources/integrationts/lifecycle";
     private static final String PATH_ACTUAL = "target" + File.separator + "ittest" + File.separator;
-    private static final Logger LOGGER = Logger.getLogger(AsterixLifecycleIT.class.getName());
-    private static List<TestCaseContext> testCaseCollection;
-    private final TestExecutor testExecutor = new TestExecutor();
 
     @BeforeClass
     public static void setUp() throws Exception {
         AsterixInstallerIntegrationUtil.init(AsterixInstallerIntegrationUtil.LOCAL_CLUSTER_PATH);
-        TestCaseContext.Builder b = new TestCaseContext.Builder();
-        testCaseCollection = b.build(new File(PATH_BASE));
         File outdir = new File(PATH_ACTUAL);
         outdir.mkdirs();
     }
@@ -70,88 +45,8 @@
         }
     }
 
-    @Parameters
-    public static Collection<Object[]> tests() throws Exception {
-        Collection<Object[]> testArgs = new ArrayList<>();
-        return testArgs;
-    }
-
     public static void restartInstance() throws Exception {
         AsterixInstallerIntegrationUtil.transformIntoRequiredState(State.INACTIVE);
         AsterixInstallerIntegrationUtil.transformIntoRequiredState(State.ACTIVE);
     }
-
-    @Test
-    public void test_1_StopActiveInstance() throws Exception {
-        try {
-            LOGGER.info("Starting test: test_1_StopActiveInstance");
-            AsterixInstallerIntegrationUtil.transformIntoRequiredState(State.ACTIVE);
-            String command = "stop -n " + AsterixInstallerIntegrationUtil.ASTERIX_INSTANCE_NAME;
-            cmdHandler.processCommand(command.split(" "));
-            Thread.sleep(4000);
-            AsterixInstance instance = ServiceProvider.INSTANCE.getLookupService()
-                    .getAsterixInstance(AsterixInstallerIntegrationUtil.ASTERIX_INSTANCE_NAME);
-            AsterixRuntimeState state = VerificationUtil.getAsterixRuntimeState(instance);
-            assert (state.getFailedNCs().size() == NUM_NC && !state.isCcRunning());
-            LOGGER.info("PASSED: test_1_StopActiveInstance");
-        } catch (Exception e) {
-            throw new Exception("Test configure installer " + "\" FAILED!", e);
-        }
-    }
-
-    @Test
-    public void test_2_StartActiveInstance() throws Exception {
-        try {
-            LOGGER.info("Starting test: test_2_StartActiveInstance");
-            AsterixInstallerIntegrationUtil.transformIntoRequiredState(State.INACTIVE);
-            String command = "start -n " + AsterixInstallerIntegrationUtil.ASTERIX_INSTANCE_NAME;
-            cmdHandler.processCommand(command.split(" "));
-            AsterixInstance instance = ServiceProvider.INSTANCE.getLookupService()
-                    .getAsterixInstance(AsterixInstallerIntegrationUtil.ASTERIX_INSTANCE_NAME);
-            AsterixRuntimeState state = VerificationUtil.getAsterixRuntimeState(instance);
-            assert (state.getFailedNCs().size() == 0 && state.isCcRunning());
-            LOGGER.info("PASSED: test_2_StartActiveInstance");
-        } catch (Exception e) {
-            throw new Exception("Test configure installer " + "\" FAILED!", e);
-        }
-    }
-
-    @Test
-    public void test_3_DeleteActiveInstance() throws Exception {
-        try {
-            LOGGER.info("Starting test: test_3_DeleteActiveInstance");
-            AsterixInstallerIntegrationUtil.transformIntoRequiredState(State.INACTIVE);
-            String command = "delete -n " + AsterixInstallerIntegrationUtil.ASTERIX_INSTANCE_NAME;
-            cmdHandler.processCommand(command.split(" "));
-            AsterixInstance instance = ServiceProvider.INSTANCE.getLookupService()
-                    .getAsterixInstance(AsterixInstallerIntegrationUtil.ASTERIX_INSTANCE_NAME);
-            assert (instance == null);
-            LOGGER.info("PASSED: test_3_DeleteActiveInstance");
-        } catch (Exception e) {
-            throw new Exception("Test delete active instance " + "\" FAILED!", e);
-        } finally {
-            // recreate instance
-            AsterixInstallerIntegrationUtil.createInstance();
-        }
-    }
-
-    @Test
-    public void test() throws Exception {
-        for (TestCaseContext testCaseCtx : testCaseCollection) {
-            testExecutor.executeTest(PATH_ACTUAL, testCaseCtx, null, false);
-        }
-    }
-
-    public static void main(String[] args) throws Exception {
-        try {
-            setUp();
-            new AsterixLifecycleIT().test();
-        } catch (Exception e) {
-            e.printStackTrace();
-            LOGGER.info("TEST CASE(S) FAILED");
-        } finally {
-            tearDown();
-        }
-    }
-
 }
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogMapping.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogMapping.java
index 3b2aff7..0444952 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogMapping.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogMapping.java
@@ -27,10 +27,6 @@
     private long localLSN;
     public AtomicInteger numOfFlushedIndexes = new AtomicInteger();
 
-    public String getRemoteNodeID() {
-        return remoteNodeID;
-    }
-
     public void setRemoteNodeID(String remoteNodeID) {
         this.remoteNodeID = remoteNodeID;
     }
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
index 3143284..dfc23d3 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
@@ -27,9 +27,6 @@
 import java.nio.channels.FileChannel;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
@@ -39,13 +36,17 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Predicate;
 import java.util.logging.Level;
 import java.util.logging.Logger;
+import java.util.stream.Collectors;
 
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.config.ReplicationProperties;
 import org.apache.asterix.common.context.IndexInfo;
+import org.apache.asterix.common.dataflow.DatasetLocalResource;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
 import org.apache.asterix.common.replication.IReplicaResourcesManager;
@@ -55,6 +56,9 @@
 import org.apache.asterix.common.replication.IReplicationThread;
 import org.apache.asterix.common.replication.ReplicaEvent;
 import org.apache.asterix.common.storage.DatasetResourceReference;
+import org.apache.asterix.common.storage.IIndexCheckpointManager;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
+import org.apache.asterix.common.storage.ResourceReference;
 import org.apache.asterix.common.transactions.IAppRuntimeContextProvider;
 import org.apache.asterix.common.transactions.ILogManager;
 import org.apache.asterix.common.transactions.LogRecord;
@@ -72,9 +76,13 @@
 import org.apache.asterix.replication.storage.LSMIndexFileProperties;
 import org.apache.asterix.replication.storage.ReplicaResourcesManager;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
+import org.apache.asterix.transaction.management.service.logging.LogBuffer;
 import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
+import org.apache.hyracks.storage.common.LocalResource;
 import org.apache.hyracks.util.StorageUtil;
 import org.apache.hyracks.util.StorageUtil.StorageUnit;
 
@@ -89,7 +97,6 @@
     private final String localNodeID;
     private final ILogManager logManager;
     private final ReplicaResourcesManager replicaResourcesManager;
-    private SocketChannel socketChannel = null;
     private ServerSocketChannel serverSocketChannel = null;
     private final IReplicationManager replicationManager;
     private final ReplicationProperties replicationProperties;
@@ -98,6 +105,7 @@
     private final LinkedBlockingQueue<LSMComponentLSNSyncTask> lsmComponentRemoteLSN2LocalLSNMappingTaskQ;
     private final LinkedBlockingQueue<LogRecord> pendingNotificationRemoteLogsQ;
     private final Map<String, LSMComponentProperties> lsmComponentId2PropertiesMap;
+    private final Map<Long, RemoteLogMapping> localLsn2RemoteMapping;
     private final Map<String, RemoteLogMapping> replicaUniqueLSN2RemoteMapping;
     private final LSMComponentsSyncService lsmComponentLSNMappingService;
     private final Set<Integer> nodeHostedPartitions;
@@ -105,6 +113,7 @@
     private final Object flushLogslock = new Object();
     private final IDatasetLifecycleManager dsLifecycleManager;
     private final PersistentLocalResourceRepository localResourceRep;
+    private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
 
     public ReplicationChannel(String nodeId, ReplicationProperties replicationProperties, ILogManager logManager,
             IReplicaResourcesManager replicaResoucesManager, IReplicationManager replicationManager,
@@ -122,6 +131,7 @@
         pendingNotificationRemoteLogsQ = new LinkedBlockingQueue<>();
         lsmComponentId2PropertiesMap = new ConcurrentHashMap<>();
         replicaUniqueLSN2RemoteMapping = new ConcurrentHashMap<>();
+        localLsn2RemoteMapping = new ConcurrentHashMap<>();
         lsmComponentLSNMappingService = new LSMComponentsSyncService();
         replicationNotifier = new ReplicationNotifier();
         replicationThreads = Executors.newCachedThreadPool(ncServiceContext.getThreadFactory());
@@ -136,6 +146,8 @@
         }
         nodeHostedPartitions = new HashSet<>(clientsPartitions.size());
         nodeHostedPartitions.addAll(clientsPartitions);
+        this.indexCheckpointManagerProvider =
+                ((INcApplicationContext) ncServiceContext.getApplicationContext()).getIndexCheckpointManagerProvider();
     }
 
     @Override
@@ -156,7 +168,7 @@
 
             //start accepting replication requests
             while (true) {
-                socketChannel = serverSocketChannel.accept();
+                SocketChannel socketChannel = serverSocketChannel.accept();
                 socketChannel.configureBlocking(true);
                 //start a new thread to handle the request
                 replicationThreads.execute(new ReplicationThread(socketChannel));
@@ -349,16 +361,19 @@
                 }
                 if (afp.isLSMComponentFile()) {
                     String componentId = LSMComponentProperties.getLSMComponentID(afp.getFilePath());
-                    if (afp.getLSNByteOffset() > AbstractLSMIOOperationCallback.INVALID) {
-                        LSMComponentLSNSyncTask syncTask = new LSMComponentLSNSyncTask(componentId,
-                                destFile.getAbsolutePath(), afp.getLSNByteOffset());
+                    final LSMComponentProperties lsmComponentProperties = lsmComponentId2PropertiesMap.get(componentId);
+                    // merge operations do not generate flush logs
+                    if (afp.requiresAck() && lsmComponentProperties.getOpType() == LSMOperationType.FLUSH) {
+                        LSMComponentLSNSyncTask syncTask =
+                                new LSMComponentLSNSyncTask(componentId, destFile.getAbsolutePath());
                         lsmComponentRemoteLSN2LocalLSNMappingTaskQ.offer(syncTask);
                     } else {
                         updateLSMComponentRemainingFiles(componentId);
                     }
                 } else {
                     //index metadata file
-                    replicaResourcesManager.initializeReplicaIndexLSNMap(indexPath, logManager.getAppendLSN());
+                    final ResourceReference indexRef = ResourceReference.of(destFile.getAbsolutePath());
+                    indexCheckpointManagerProvider.get(indexRef).init(logManager.getAppendLSN());
                 }
             }
         }
@@ -402,8 +417,7 @@
                         try (RandomAccessFile fromFile = new RandomAccessFile(filePath, "r");
                                 FileChannel fileChannel = fromFile.getChannel();) {
                             long fileSize = fileChannel.size();
-                            fileProperties.initialize(filePath, fileSize, partition.getNodeId(), false,
-                                    AbstractLSMIOOperationCallback.INVALID, false);
+                            fileProperties.initialize(filePath, fileSize, partition.getNodeId(), false, false);
                             outBuffer = ReplicationProtocol.writeFileReplicationRequest(outBuffer, fileProperties,
                                     ReplicationRequestType.REPLICATE_FILE);
 
@@ -462,7 +476,7 @@
                 switch (remoteLog.getLogType()) {
                     case LogType.UPDATE:
                     case LogType.ENTITY_COMMIT:
-                        //if the log partition belongs to a partitions hosted on this node, replicate it
+                        //if the log partition belongs to a partitions hosted on this node, replicated it
                         if (nodeHostedPartitions.contains(remoteLog.getResourcePartition())) {
                             logManager.log(remoteLog);
                         }
@@ -479,13 +493,21 @@
                     case LogType.FLUSH:
                         //store mapping information for flush logs to use them in incoming LSM components.
                         RemoteLogMapping flushLogMap = new RemoteLogMapping();
+                        LogRecord flushLog = new LogRecord();
+                        TransactionUtil.formFlushLogRecord(flushLog, remoteLog.getDatasetId(), null,
+                                remoteLog.getNodeId(), remoteLog.getNumOfFlushedIndexes());
+                        flushLog.setReplicationThread(this);
+                        flushLog.setLogSource(LogSource.REMOTE);
                         flushLogMap.setRemoteNodeID(remoteLog.getNodeId());
                         flushLogMap.setRemoteLSN(remoteLog.getLSN());
-                        logManager.log(remoteLog);
-                        //the log LSN value is updated by logManager.log(.) to a local value
-                        flushLogMap.setLocalLSN(remoteLog.getLSN());
-                        flushLogMap.numOfFlushedIndexes.set(remoteLog.getNumOfFlushedIndexes());
-                        replicaUniqueLSN2RemoteMapping.put(flushLogMap.getNodeUniqueLSN(), flushLogMap);
+                        synchronized (localLsn2RemoteMapping) {
+                            logManager.log(flushLog);
+                            //the log LSN value is updated by logManager.log(.) to a local value
+                            flushLogMap.setLocalLSN(flushLog.getLSN());
+                            flushLogMap.numOfFlushedIndexes.set(flushLog.getNumOfFlushedIndexes());
+                            replicaUniqueLSN2RemoteMapping.put(flushLogMap.getNodeUniqueLSN(), flushLogMap);
+                            localLsn2RemoteMapping.put(flushLog.getLSN(), flushLogMap);
+                        }
                         synchronized (flushLogslock) {
                             flushLogslock.notify();
                         }
@@ -497,18 +519,55 @@
         }
 
         /**
-         * this method is called sequentially by LogPage (notifyReplicationTerminator)
-         * for JOB_COMMIT and JOB_ABORT log types.
+         * this method is called sequentially by {@link LogBuffer#notifyReplicationTermination()}
+         * for JOB_COMMIT, JOB_ABORT, and FLUSH log types.
          */
         @Override
         public void notifyLogReplicationRequester(LogRecord logRecord) {
-            pendingNotificationRemoteLogsQ.offer(logRecord);
+            switch (logRecord.getLogType()) {
+                case LogType.JOB_COMMIT:
+                case LogType.ABORT:
+                    pendingNotificationRemoteLogsQ.offer(logRecord);
+                    break;
+                case LogType.FLUSH:
+                    final RemoteLogMapping remoteLogMapping;
+                    synchronized (localLsn2RemoteMapping) {
+                        remoteLogMapping = localLsn2RemoteMapping.remove(logRecord.getLSN());
+                    }
+                    checkpointReplicaIndexes(remoteLogMapping, logRecord.getDatasetId());
+                    break;
+                default:
+                    throw new IllegalStateException("Unexpected log type: " + logRecord.getLogType());
+            }
         }
 
         @Override
         public SocketChannel getReplicationClientSocket() {
             return socketChannel;
         }
+
+        private void checkpointReplicaIndexes(RemoteLogMapping remoteLogMapping, int datasetId) {
+            try {
+                Predicate<LocalResource> replicaIndexesPredicate = lr -> {
+                    DatasetLocalResource dls = (DatasetLocalResource) lr.getResource();
+                    return dls.getDatasetId() == datasetId && !localResourceRep.getActivePartitions()
+                            .contains(dls.getPartition());
+                };
+                final Map<Long, LocalResource> resources = localResourceRep.getResources(replicaIndexesPredicate);
+                final List<DatasetResourceReference> replicaIndexesRef =
+                        resources.values().stream().map(DatasetResourceReference::of).collect(Collectors.toList());
+                for (DatasetResourceReference replicaIndexRef : replicaIndexesRef) {
+                    final IIndexCheckpointManager indexCheckpointManager =
+                            indexCheckpointManagerProvider.get(replicaIndexRef);
+                    synchronized (indexCheckpointManager) {
+                        indexCheckpointManager
+                                .masterFlush(remoteLogMapping.getRemoteLSN(), remoteLogMapping.getLocalLSN());
+                    }
+                }
+            } catch (Exception e) {
+                LOGGER.log(Level.SEVERE, "Failed to checkpoint replica indexes", e);
+            }
+        }
     }
 
     /**
@@ -541,7 +600,6 @@
      * the received LSM components to a local LSN.
      */
     private class LSMComponentsSyncService extends Thread {
-        private static final int BULKLOAD_LSN = 0;
 
         @Override
         public void run() {
@@ -560,90 +618,22 @@
                         LOGGER.log(Level.SEVERE, "Unexpected exception during LSN synchronization", e);
                     }
                 }
-
             }
         }
 
         private void syncLSMComponentFlushLSN(LSMComponentProperties lsmCompProp, LSMComponentLSNSyncTask syncTask)
                 throws InterruptedException, IOException {
-            long remoteLSN = lsmCompProp.getOriginalLSN();
-            //LSN=0 (bulkload) does not need to be updated and there is no flush log corresponding to it
-            if (remoteLSN == BULKLOAD_LSN) {
-                //since this is the first LSM component of this index,
-                //then set the mapping in the LSN_MAP to the current log LSN because
-                //no other log could've been received for this index since bulkload replication is synchronous.
-                lsmCompProp.setReplicaLSN(logManager.getAppendLSN());
-                return;
-            }
-
-            //path to the LSM component file
-            Path path = Paths.get(syncTask.getComponentFilePath());
-            if (lsmCompProp.getReplicaLSN() == null) {
-                if (lsmCompProp.getOpType() == LSMOperationType.FLUSH) {
-                    //need to look up LSN mapping from memory
-                    RemoteLogMapping remoteLogMap = replicaUniqueLSN2RemoteMapping.get(lsmCompProp.getNodeUniqueLSN());
-                    //wait until flush log arrives, and verify the LSM component file still exists
-                    //The component file could be deleted if its NC fails.
-                    while (remoteLogMap == null && Files.exists(path)) {
-                        synchronized (flushLogslock) {
-                            flushLogslock.wait();
-                        }
-                        remoteLogMap = replicaUniqueLSN2RemoteMapping.get(lsmCompProp.getNodeUniqueLSN());
-                    }
-
-                    /**
-                     * file has been deleted due to its remote primary replica failure
-                     * before its LSN could've been synchronized.
-                     */
-                    if (remoteLogMap == null) {
-                        return;
-                    }
-                    lsmCompProp.setReplicaLSN(remoteLogMap.getLocalLSN());
-                } else if (lsmCompProp.getOpType() == LSMOperationType.MERGE) {
-                    //need to load the LSN mapping from disk
-                    Map<Long, Long> lsmMap = replicaResourcesManager
-                            .getReplicaIndexLSNMap(lsmCompProp.getReplicaComponentPath(replicaResourcesManager));
-                    Long mappingLSN = lsmMap.get(lsmCompProp.getOriginalLSN());
-                    if (mappingLSN == null) {
-                        /**
-                         * this shouldn't happen unless this node just recovered and
-                         * the first component it received is a merged component due
-                         * to an on-going merge operation while recovery on the remote
-                         * replica. In this case, we use the current append LSN since
-                         * no new records exist for this index, otherwise they would've
-                         * been flushed. This could be prevented by waiting for any IO
-                         * to finish on the remote replica during recovery.
-                         */
-                        mappingLSN = logManager.getAppendLSN();
-                    }
-                    lsmCompProp.setReplicaLSN(mappingLSN);
+            final String componentFilePath = syncTask.getComponentFilePath();
+            final ResourceReference indexRef = ResourceReference.of(componentFilePath);
+            final IIndexCheckpointManager indexCheckpointManager = indexCheckpointManagerProvider.get(indexRef);
+            synchronized (indexCheckpointManager) {
+                long masterLsn = lsmCompProp.getOriginalLSN();
+                // wait until the lsn mapping is flushed to disk
+                while (!indexCheckpointManager.isFlushed(masterLsn)) {
+                    indexCheckpointManager.wait();
                 }
-            }
-
-            if (Files.notExists(path)) {
-                /**
-                 * This could happen when a merged component arrives and deletes
-                 * the flushed component (which we are trying to update) before
-                 * its flush log arrives since logs and components are received
-                 * on different threads.
-                 */
-                return;
-            }
-
-            File destFile = new File(syncTask.getComponentFilePath());
-            //prepare local LSN buffer
-            ByteBuffer metadataBuffer = ByteBuffer.allocate(Long.BYTES);
-            metadataBuffer.putLong(lsmCompProp.getReplicaLSN());
-            metadataBuffer.flip();
-
-            //replace the remote LSN value by the local one
-            try (RandomAccessFile fileOutputStream = new RandomAccessFile(destFile, "rw");
-                    FileChannel fileChannel = fileOutputStream.getChannel()) {
-                long lsnStartOffset = syncTask.getLSNByteOffset();
-                while (metadataBuffer.hasRemaining()) {
-                    lsnStartOffset += fileChannel.write(metadataBuffer, lsnStartOffset);
-                }
-                fileChannel.force(true);
+                indexCheckpointManager
+                        .replicated(AbstractLSMIndexFileManager.getComponentEndTime(indexRef.getName()), masterLsn);
             }
         }
     }
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
index 48c7083..5cf7eab 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
@@ -54,7 +54,6 @@
 
 import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.config.ReplicationProperties;
-import org.apache.asterix.common.dataflow.LSMIndexUtil;
 import org.apache.asterix.common.replication.IReplicaResourcesManager;
 import org.apache.asterix.common.replication.IReplicationManager;
 import org.apache.asterix.common.replication.IReplicationStrategy;
@@ -63,6 +62,8 @@
 import org.apache.asterix.common.replication.ReplicaEvent;
 import org.apache.asterix.common.replication.ReplicationJob;
 import org.apache.asterix.common.storage.DatasetResourceReference;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
+import org.apache.asterix.common.storage.ResourceReference;
 import org.apache.asterix.common.transactions.IAppRuntimeContextProvider;
 import org.apache.asterix.common.transactions.ILogManager;
 import org.apache.asterix.common.transactions.ILogRecord;
@@ -84,7 +85,6 @@
 import org.apache.hyracks.api.replication.IReplicationJob.ReplicationExecutionType;
 import org.apache.hyracks.api.replication.IReplicationJob.ReplicationJobType;
 import org.apache.hyracks.api.replication.IReplicationJob.ReplicationOperation;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexReplicationJob;
 import org.apache.hyracks.util.StorageUtil;
 import org.apache.hyracks.util.StorageUtil.StorageUnit;
@@ -136,6 +136,7 @@
     private final ByteBuffer txnLogsBatchSizeBuffer = ByteBuffer.allocate(Integer.BYTES);
     private final IReplicationStrategy replicationStrategy;
     private final PersistentLocalResourceRepository localResourceRepo;
+    private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
 
     //TODO this class needs to be refactored by moving its private classes to separate files
     //and possibly using MessageBroker to send/receive remote replicas events.
@@ -148,6 +149,8 @@
         this.replicaResourcesManager = (ReplicaResourcesManager) remoteResoucesManager;
         this.asterixAppRuntimeContextProvider = asterixAppRuntimeContextProvider;
         this.logManager = logManager;
+        this.indexCheckpointManagerProvider =
+                asterixAppRuntimeContextProvider.getAppContext().getIndexCheckpointManagerProvider();
         localResourceRepo =
                 (PersistentLocalResourceRepository) asterixAppRuntimeContextProvider.getLocalResourceRepository();
         this.hostIPAddressFirstOctet = replicationProperties.getReplicaIPAddress(nodeId).substring(0, 3);
@@ -284,7 +287,6 @@
             if (!replicationStrategy.isMatch(indexFileRef.getDatasetId())) {
                 return;
             }
-
             int jobPartitionId = indexFileRef.getPartitionId();
 
             ByteBuffer responseBuffer = null;
@@ -327,25 +329,10 @@
                                 FileChannel fileChannel = fromFile.getChannel();) {
 
                             long fileSize = fileChannel.size();
-
-                            if (LSMComponentJob != null) {
-                                /**
-                                 * since this is LSM_COMPONENT REPLICATE job, the job will contain
-                                 * only the component being replicated.
-                                 */
-                                ILSMDiskComponent diskComponent = LSMComponentJob.getLSMIndexOperationContext()
-                                        .getComponentsToBeReplicated().get(0);
-                                long lsnOffset = LSMIndexUtil.getComponentFileLSNOffset(LSMComponentJob.getLSMIndex(),
-                                        diskComponent, filePath);
-                                asterixFileProperties.initialize(filePath, fileSize, nodeId, isLSMComponentFile,
-                                        lsnOffset, remainingFiles == 0);
-                            } else {
-                                asterixFileProperties.initialize(filePath, fileSize, nodeId, isLSMComponentFile, -1L,
-                                        remainingFiles == 0);
-                            }
+                            asterixFileProperties.initialize(filePath, fileSize, nodeId, isLSMComponentFile,
+                                    remainingFiles == 0);
                             requestBuffer = ReplicationProtocol.writeFileReplicationRequest(requestBuffer,
                                     asterixFileProperties, ReplicationRequestType.REPLICATE_FILE);
-
                             Iterator<Map.Entry<String, SocketChannel>> iterator = replicasSockets.entrySet().iterator();
                             while (iterator.hasNext()) {
                                 Map.Entry<String, SocketChannel> entry = iterator.next();
@@ -378,8 +365,7 @@
                 } else if (job.getOperation() == ReplicationOperation.DELETE) {
                     for (String filePath : job.getJobFiles()) {
                         remainingFiles--;
-                        asterixFileProperties.initialize(filePath, -1, nodeId, isLSMComponentFile, -1L,
-                                remainingFiles == 0);
+                        asterixFileProperties.initialize(filePath, -1, nodeId, isLSMComponentFile, remainingFiles == 0);
                         ReplicationProtocol.writeFileReplicationRequest(requestBuffer, asterixFileProperties,
                                 ReplicationRequestType.DELETE_FILE);
 
@@ -1026,10 +1012,10 @@
         ByteBuffer requestBuffer = ByteBuffer.allocate(INITIAL_BUFFER_SIZE);
         for (String replicaId : replicaIds) {
             //1. identify replica indexes with LSN less than nonSharpCheckpointTargetLSN.
-            Map<Long, String> laggingIndexes =
+            Map<Long, DatasetResourceReference> laggingIndexes =
                     replicaResourcesManager.getLaggingReplicaIndexesId2PathMap(replicaId, nonSharpCheckpointTargetLSN);
 
-            if (laggingIndexes.size() > 0) {
+            if (!laggingIndexes.isEmpty()) {
                 //2. send request to remote replicas that have lagging indexes.
                 ReplicaIndexFlushRequest laggingIndexesResponse = null;
                 try (SocketChannel socketChannel = getReplicaSocket(replicaId)) {
@@ -1052,16 +1038,14 @@
                 }
 
                 /*
-                 * 4. update the LSN_MAP for indexes that were not flushed
+                 * 4. update checkpoints for indexes that were not flushed
                  * to the current append LSN to indicate no operations happened
                  * since the checkpoint start.
                  */
                 if (laggingIndexesResponse != null) {
                     for (Long resouceId : laggingIndexesResponse.getLaggingRescouresIds()) {
-                        String indexPath = laggingIndexes.get(resouceId);
-                        Map<Long, Long> indexLSNMap = replicaResourcesManager.getReplicaIndexLSNMap(indexPath);
-                        indexLSNMap.put(ReplicaResourcesManager.REPLICA_INDEX_CREATION_LSN, startLSN);
-                        replicaResourcesManager.updateReplicaIndexLSNMap(indexPath, indexLSNMap);
+                        final DatasetResourceReference datasetResourceReference = laggingIndexes.get(resouceId);
+                        indexCheckpointManagerProvider.get(datasetResourceReference).advanceLowWatermark(startLSN);
                     }
                 }
             }
@@ -1141,12 +1125,11 @@
                     fileChannel.force(true);
                 }
 
-                //we need to create LSN map for .metadata files that belong to remote replicas
+                //we need to create initial map for .metadata files that belong to remote replicas
                 if (!fileProperties.isLSMComponentFile() && !fileProperties.getNodeId().equals(nodeId)) {
-                    //replica index
-                    replicaResourcesManager.initializeReplicaIndexLSNMap(indexPath, logManager.getAppendLSN());
+                    final ResourceReference indexRef = ResourceReference.of(destFile.getAbsolutePath());
+                    indexCheckpointManagerProvider.get(indexRef).init(logManager.getAppendLSN());
                 }
-
                 responseFunction = ReplicationProtocol.getRequestType(socketChannel, dataBuffer);
             }
 
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentLSNSyncTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentLSNSyncTask.java
index f11adc2..08c0ec7 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentLSNSyncTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentLSNSyncTask.java
@@ -19,33 +19,21 @@
 package org.apache.asterix.replication.storage;
 
 public class LSMComponentLSNSyncTask {
+
     private String componentFilePath;
     private String componentId;
-    private long LSNByteOffset;
 
-    public LSMComponentLSNSyncTask(String componentId, String componentFilePath, long LSNByteOffset) {
+    public LSMComponentLSNSyncTask(String componentId, String componentFilePath) {
         this.componentId = componentId;
         this.componentFilePath = componentFilePath;
-        this.LSNByteOffset = LSNByteOffset;
     }
 
     public String getComponentFilePath() {
         return componentFilePath;
     }
 
-    public void setComponentFilePath(String componentFilePath) {
-        this.componentFilePath = componentFilePath;
-    }
-
     public String getComponentId() {
         return componentId;
     }
 
-    public void setComponentId(String componentId) {
-        this.componentId = componentId;
-    }
-
-    public long getLSNByteOffset() {
-        return LSNByteOffset;
-    }
 }
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java
index 7ca6f2f..bf987d0 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java
@@ -52,9 +52,10 @@
         this.nodeId = nodeId;
         componentId = LSMComponentProperties.getLSMComponentID((String) job.getJobFiles().toArray()[0]);
         numberOfFiles = new AtomicInteger(job.getJobFiles().size());
-        originalLSN = LSMComponentProperties.getLSMComponentLSN((AbstractLSMIndex) job.getLSMIndex(),
-                job.getLSMIndexOperationContext());
         opType = job.getLSMOpType();
+        originalLSN = opType == LSMOperationType.FLUSH ?
+                LSMComponentProperties.getLSMComponentLSN((AbstractLSMIndex) job.getLSMIndex(),
+                        job.getLSMIndexOperationContext()) : 0;
     }
 
     public LSMComponentProperties() {
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java
index f2747fe..2ebf2cb 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java
@@ -31,27 +31,25 @@
     private boolean lsmComponentFile;
     private String filePath;
     private boolean requiresAck = false;
-    private long LSNByteOffset;
 
     public LSMIndexFileProperties() {
     }
 
     public LSMIndexFileProperties(String filePath, long fileSize, String nodeId, boolean lsmComponentFile,
-            long LSNByteOffset, boolean requiresAck) {
-        initialize(filePath, fileSize, nodeId, lsmComponentFile, LSNByteOffset, requiresAck);
+            boolean requiresAck) {
+        initialize(filePath, fileSize, nodeId, lsmComponentFile, requiresAck);
     }
 
     public LSMIndexFileProperties(LSMComponentProperties lsmComponentProperties) {
-        initialize(lsmComponentProperties.getComponentId(), -1, lsmComponentProperties.getNodeId(), false, -1L, false);
+        initialize(lsmComponentProperties.getComponentId(), -1, lsmComponentProperties.getNodeId(), false, false);
     }
 
-    public void initialize(String filePath, long fileSize, String nodeId, boolean lsmComponentFile, long LSNByteOffset,
+    public void initialize(String filePath, long fileSize, String nodeId, boolean lsmComponentFile,
             boolean requiresAck) {
         this.filePath = filePath;
         this.fileSize = fileSize;
         this.nodeId = nodeId;
         this.lsmComponentFile = lsmComponentFile;
-        this.LSNByteOffset = LSNByteOffset;
         this.requiresAck = requiresAck;
     }
 
@@ -61,7 +59,6 @@
         dos.writeUTF(filePath);
         dos.writeLong(fileSize);
         dos.writeBoolean(lsmComponentFile);
-        dos.writeLong(LSNByteOffset);
         dos.writeBoolean(requiresAck);
     }
 
@@ -70,10 +67,9 @@
         String filePath = input.readUTF();
         long fileSize = input.readLong();
         boolean lsmComponentFile = input.readBoolean();
-        long LSNByteOffset = input.readLong();
         boolean requiresAck = input.readBoolean();
-        LSMIndexFileProperties fileProp = new LSMIndexFileProperties(filePath, fileSize, nodeId, lsmComponentFile,
-                LSNByteOffset, requiresAck);
+        LSMIndexFileProperties fileProp =
+                new LSMIndexFileProperties(filePath, fileSize, nodeId, lsmComponentFile, requiresAck);
         return fileProp;
     }
 
@@ -108,11 +104,6 @@
         sb.append("File Size: " + fileSize + "  ");
         sb.append("Node ID: " + nodeId + "  ");
         sb.append("isLSMComponentFile : " + lsmComponentFile + "  ");
-        sb.append("LSN Byte Offset: " + LSNByteOffset);
         return sb.toString();
     }
-
-    public long getLSNByteOffset() {
-        return LSNByteOffset;
-    }
 }
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
index 7eea4a4..2ff74a8 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
@@ -19,27 +19,26 @@
 package org.apache.asterix.replication.storage;
 
 import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
 import java.io.FilenameFilter;
 import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
+import java.util.stream.Collectors;
 
 import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.config.MetadataProperties;
+import org.apache.asterix.common.dataflow.DatasetLocalResource;
 import org.apache.asterix.common.replication.IReplicaResourcesManager;
+import org.apache.asterix.common.storage.DatasetResourceReference;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
 import org.apache.asterix.common.utils.StorageConstants;
 import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
@@ -50,16 +49,15 @@
 import org.apache.hyracks.storage.common.LocalResource;
 
 public class ReplicaResourcesManager implements IReplicaResourcesManager {
-    private static final Logger LOGGER = Logger.getLogger(ReplicaResourcesManager.class.getName());
-    public final static String LSM_COMPONENT_MASK_SUFFIX = "_mask";
-    private final static String REPLICA_INDEX_LSN_MAP_NAME = ".LSN_MAP";
-    public static final long REPLICA_INDEX_CREATION_LSN = -1;
+    public static final String LSM_COMPONENT_MASK_SUFFIX = "_mask";
     private final PersistentLocalResourceRepository localRepository;
     private final Map<String, ClusterPartition[]> nodePartitions;
+    private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
 
-    public ReplicaResourcesManager(ILocalResourceRepository localRepository,
-            MetadataProperties metadataProperties) {
+    public ReplicaResourcesManager(ILocalResourceRepository localRepository, MetadataProperties metadataProperties,
+            IIndexCheckpointManagerProvider indexCheckpointManagerProvider) {
         this.localRepository = (PersistentLocalResourceRepository) localRepository;
+        this.indexCheckpointManagerProvider = indexCheckpointManagerProvider;
         nodePartitions = metadataProperties.getNodePartitions();
     }
 
@@ -86,12 +84,6 @@
         return indexPath.toString();
     }
 
-    public void initializeReplicaIndexLSNMap(String indexPath, long currentLSN) throws IOException {
-        HashMap<Long, Long> lsnMap = new HashMap<Long, Long>();
-        lsnMap.put(REPLICA_INDEX_CREATION_LSN, currentLSN);
-        updateReplicaIndexLSNMap(indexPath, lsnMap);
-    }
-
     public void createRemoteLSMComponentMask(LSMComponentProperties lsmComponentProperties) throws IOException {
         String maskPath = lsmComponentProperties.getMaskPath(this);
         Path path = Paths.get(maskPath);
@@ -106,13 +98,6 @@
         String maskPath = lsmComponentProperties.getMaskPath(this);
         Path path = Paths.get(maskPath);
         Files.deleteIfExists(path);
-
-        //add component LSN to the index LSNs map
-        Map<Long, Long> lsnMap = getReplicaIndexLSNMap(lsmComponentProperties.getReplicaComponentPath(this));
-        lsnMap.put(lsmComponentProperties.getOriginalLSN(), lsmComponentProperties.getReplicaLSN());
-
-        //update map on disk
-        updateReplicaIndexLSNMap(lsmComponentProperties.getReplicaComponentPath(this), lsnMap);
     }
 
     public Set<File> getReplicaIndexes(String replicaId) throws HyracksDataException {
@@ -128,58 +113,37 @@
     public long getPartitionsMinLSN(Set<Integer> partitions) throws HyracksDataException {
         long minRemoteLSN = Long.MAX_VALUE;
         for (Integer partition : partitions) {
-            //for every index in replica
-            Set<File> remoteIndexes = localRepository.getPartitionIndexes(partition);
-            for (File indexFolder : remoteIndexes) {
-                //read LSN map
-                try {
-                    //get max LSN per index
-                    long remoteIndexMaxLSN = getReplicaIndexMaxLSN(indexFolder);
-
-                    //get min of all maximums
-                    minRemoteLSN = Math.min(minRemoteLSN, remoteIndexMaxLSN);
-                } catch (IOException e) {
-                    LOGGER.log(Level.INFO,
-                            indexFolder.getAbsolutePath() + " Couldn't read LSN map for index " + indexFolder);
-                    continue;
-                }
+            final List<DatasetResourceReference> partitionResources = localRepository.getResources(resource -> {
+                DatasetLocalResource dsResource = (DatasetLocalResource) resource.getResource();
+                return dsResource.getPartition() == partition;
+            }).values().stream().map(DatasetResourceReference::of).collect(Collectors.toList());
+            for (DatasetResourceReference indexRef : partitionResources) {
+                long remoteIndexMaxLSN = indexCheckpointManagerProvider.get(indexRef).getLowWatermark();
+                minRemoteLSN = Math.min(minRemoteLSN, remoteIndexMaxLSN);
             }
         }
         return minRemoteLSN;
     }
 
-    public Map<Long, String> getLaggingReplicaIndexesId2PathMap(String replicaId, long targetLSN) throws IOException {
-        Map<Long, String> laggingReplicaIndexes = new HashMap<Long, String>();
-        try {
-            //for every index in replica
-            Set<File> remoteIndexes = getReplicaIndexes(replicaId);
-            for (File indexFolder : remoteIndexes) {
-                if (getReplicaIndexMaxLSN(indexFolder) < targetLSN) {
-                    File localResource = new File(
-                            indexFolder + File.separator + StorageConstants.METADATA_FILE_NAME);
-                    LocalResource resource = PersistentLocalResourceRepository.readLocalResource(localResource);
-                    laggingReplicaIndexes.put(resource.getId(), indexFolder.getAbsolutePath());
+    public Map<Long, DatasetResourceReference> getLaggingReplicaIndexesId2PathMap(String replicaId, long targetLSN)
+            throws HyracksDataException {
+        Map<Long, DatasetResourceReference> laggingReplicaIndexes = new HashMap<>();
+        final List<Integer> replicaPartitions =
+                Arrays.stream(nodePartitions.get(replicaId)).map(ClusterPartition::getPartitionId)
+                        .collect(Collectors.toList());
+        for (int patition : replicaPartitions) {
+            final Map<Long, LocalResource> partitionResources = localRepository.getPartitionResources(patition);
+            final List<DatasetResourceReference> indexesRefs =
+                    partitionResources.values().stream().map(DatasetResourceReference::of).collect(Collectors.toList());
+            for (DatasetResourceReference ref : indexesRefs) {
+                if (indexCheckpointManagerProvider.get(ref).getLowWatermark() < targetLSN) {
+                    laggingReplicaIndexes.put(ref.getResourceId(), ref);
                 }
             }
-        } catch (HyracksDataException e) {
-            e.printStackTrace();
         }
-
         return laggingReplicaIndexes;
     }
 
-    private long getReplicaIndexMaxLSN(File indexFolder) throws IOException {
-        long remoteIndexMaxLSN = 0;
-        //get max LSN per index
-        Map<Long, Long> lsnMap = getReplicaIndexLSNMap(indexFolder.getAbsolutePath());
-        if (lsnMap != null) {
-            for (Long lsn : lsnMap.values()) {
-                remoteIndexMaxLSN = Math.max(remoteIndexMaxLSN, lsn);
-            }
-        }
-        return remoteIndexMaxLSN;
-    }
-
     public void cleanInvalidLSMComponents(String replicaId) {
         //for every index in replica
         Set<File> remoteIndexes = null;
@@ -214,28 +178,6 @@
         }
     }
 
-    @SuppressWarnings({ "unchecked" })
-    public synchronized Map<Long, Long> getReplicaIndexLSNMap(String indexPath) throws IOException {
-        try (FileInputStream fis = new FileInputStream(indexPath + File.separator + REPLICA_INDEX_LSN_MAP_NAME);
-                ObjectInputStream oisFromFis = new ObjectInputStream(fis)) {
-            Map<Long, Long> lsnMap = null;
-            try {
-                lsnMap = (Map<Long, Long>) oisFromFis.readObject();
-            } catch (ClassNotFoundException e) {
-                e.printStackTrace();
-            }
-            return lsnMap;
-        }
-    }
-
-    public synchronized void updateReplicaIndexLSNMap(String indexPath, Map<Long, Long> lsnMap) throws IOException {
-        try (FileOutputStream fos = new FileOutputStream(indexPath + File.separator + REPLICA_INDEX_LSN_MAP_NAME);
-                ObjectOutputStream oosToFos = new ObjectOutputStream(fos)) {
-            oosToFos.writeObject(lsnMap);
-            oosToFos.flush();
-        }
-    }
-
     /**
      * @param partition
      * @return Absolute paths to all partition files
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index e87a39b..ba43fb7 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -29,6 +29,7 @@
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -38,8 +39,6 @@
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Predicate;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.config.MetadataProperties;
@@ -48,10 +47,12 @@
 import org.apache.asterix.common.replication.IReplicationManager;
 import org.apache.asterix.common.replication.ReplicationJob;
 import org.apache.asterix.common.storage.DatasetResourceReference;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
 import org.apache.asterix.common.storage.ResourceReference;
 import org.apache.asterix.common.utils.StorageConstants;
 import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.filefilter.IOFileFilter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IIOManager;
@@ -72,7 +73,29 @@
     public static final Predicate<Path> INDEX_COMPONENTS = path -> !path.endsWith(StorageConstants.METADATA_FILE_NAME);
     // Private constants
     private static final int MAX_CACHED_RESOURCES = 1000;
-    private static final int RESOURCES_TREE_DEPTH_FROM_STORAGE_ROOT = 6;
+    private static final IOFileFilter METADATA_FILES_FILTER = new IOFileFilter() {
+        @Override
+        public boolean accept(File file) {
+            return file.getName().equals(StorageConstants.METADATA_FILE_NAME);
+        }
+
+        @Override
+        public boolean accept(File dir, String name) {
+            return false;
+        }
+    };
+
+    private static final IOFileFilter ALL_DIR_FILTER = new IOFileFilter() {
+        @Override
+        public boolean accept(File file) {
+            return true;
+        }
+
+        @Override
+        public boolean accept(File dir, String name) {
+            return true;
+        }
+    };
 
     // Finals
     private final IIOManager ioManager;
@@ -85,15 +108,17 @@
     private IReplicationManager replicationManager;
     private Set<Integer> nodeInactivePartitions;
     private final Path[] storageRoots;
+    private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
 
-    public PersistentLocalResourceRepository(IIOManager ioManager, String nodeId,
-            MetadataProperties metadataProperties) {
+    public PersistentLocalResourceRepository(IIOManager ioManager, String nodeId, MetadataProperties metadataProperties,
+            IIndexCheckpointManagerProvider indexCheckpointManagerProvider) {
         this.ioManager = ioManager;
+        this.indexCheckpointManagerProvider = indexCheckpointManagerProvider;
         storageRoots = new Path[ioManager.getIODevices().size()];
         final List<IODeviceHandle> ioDevices = ioManager.getIODevices();
         for (int i = 0; i < ioDevices.size(); i++) {
-            storageRoots[i] =
-                    Paths.get(ioDevices.get(i).getMount().getAbsolutePath(), StorageConstants.STORAGE_ROOT_DIR_NAME);
+            storageRoots[i] = Paths.get(ioDevices.get(i).getMount().getAbsolutePath(),
+                    StorageConstants.STORAGE_ROOT_DIR_NAME);
         }
         createStorageRoots();
         resourceCache = CacheBuilder.newBuilder().maximumSize(MAX_CACHED_RESOURCES).build();
@@ -119,7 +144,7 @@
     }
 
     @Override
-    public LocalResource get(String relativePath) throws HyracksDataException {
+    public synchronized LocalResource get(String relativePath) throws HyracksDataException {
         LocalResource resource = resourceCache.getIfPresent(relativePath);
         if (resource == null) {
             FileReference resourceFile = getLocalResourceFileByName(ioManager, relativePath);
@@ -153,7 +178,7 @@
         }
 
         resourceCache.put(resource.getPath(), resource);
-
+        indexCheckpointManagerProvider.get(DatasetResourceReference.of(resource)).init(0);
         //if replication enabled, send resource metadata info to remote nodes
         if (isReplicationEnabled) {
             createReplicationJob(ReplicationOperation.REPLICATE, resourceFile);
@@ -164,18 +189,16 @@
     public synchronized void delete(String relativePath) throws HyracksDataException {
         FileReference resourceFile = getLocalResourceFileByName(ioManager, relativePath);
         if (resourceFile.getFile().exists()) {
-            try {
-                // Invalidate before deleting the file just in case file deletion throws some exception.
-                // Since it's just a cache invalidation, it should not affect correctness.
-                resourceCache.invalidate(relativePath);
-                IoUtil.delete(resourceFile);
-            } finally {
-                // Regardless of successfully deleted or not, the operation should be replicated.
-                //if replication enabled, delete resource from remote replicas
-                if (isReplicationEnabled) {
-                    createReplicationJob(ReplicationOperation.DELETE, resourceFile);
-                }
+            if (isReplicationEnabled) {
+                createReplicationJob(ReplicationOperation.DELETE, resourceFile);
             }
+            // delete all checkpoints
+            final LocalResource localResource = readLocalResource(resourceFile.getFile());
+            indexCheckpointManagerProvider.get(DatasetResourceReference.of(localResource)).delete();
+            // Invalidate before deleting the file just in case file deletion throws some exception.
+            // Since it's just a cache invalidation, it should not affect correctness.
+            resourceCache.invalidate(relativePath);
+            IoUtil.delete(resourceFile);
         } else {
             throw HyracksDataException
                     .create(org.apache.hyracks.api.exceptions.ErrorCode.RESOURCE_DOES_NOT_EXIST, relativePath);
@@ -188,13 +211,13 @@
         return ioManager.resolve(fileName);
     }
 
-    public Map<Long, LocalResource> getResources(Predicate<LocalResource> filter) throws HyracksDataException {
+    public synchronized Map<Long, LocalResource> getResources(Predicate<LocalResource> filter)
+            throws HyracksDataException {
         Map<Long, LocalResource> resourcesMap = new HashMap<>();
         for (Path root : storageRoots) {
-            try (Stream<Path> stream = Files.find(root, RESOURCES_TREE_DEPTH_FROM_STORAGE_ROOT,
-                    (path, attr) -> path.getFileName().toString().equals(StorageConstants.METADATA_FILE_NAME))) {
-                final List<File> resourceMetadataFiles = stream.map(Path::toFile).collect(Collectors.toList());
-                for (File file : resourceMetadataFiles) {
+            final Collection<File> files = FileUtils.listFiles(root.toFile(), METADATA_FILES_FILTER, ALL_DIR_FILTER);
+            try {
+                for (File file : files) {
                     final LocalResource localResource = PersistentLocalResourceRepository.readLocalResource(file);
                     if (filter.test(localResource)) {
                         resourcesMap.put(localResource.getId(), localResource);
@@ -321,6 +344,13 @@
         return indexes;
     }
 
+    public Map<Long, LocalResource> getPartitionResources(int partition) throws HyracksDataException {
+        return getResources(resource -> {
+            DatasetLocalResource dsResource = (DatasetLocalResource) resource.getResource();
+            return dsResource.getPartition() == partition;
+        });
+    }
+
     /**
      * Given any index file, an absolute {@link FileReference} is returned which points to where the index of
      * {@code indexFile} is located.
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java
index 43024b6..33c6260 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.transaction.management.resource;
 
 import org.apache.asterix.common.config.MetadataProperties;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.storage.common.ILocalResourceRepository;
@@ -28,16 +29,19 @@
     private final IIOManager ioManager;
     private final String nodeId;
     private final MetadataProperties metadataProperties;
+    private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
 
     public PersistentLocalResourceRepositoryFactory(IIOManager ioManager, String nodeId,
-            MetadataProperties metadataProperties) {
+            MetadataProperties metadataProperties, IIndexCheckpointManagerProvider indexCheckpointManagerProvider) {
         this.ioManager = ioManager;
         this.nodeId = nodeId;
         this.metadataProperties = metadataProperties;
+        this.indexCheckpointManagerProvider = indexCheckpointManagerProvider;
     }
 
     @Override
     public ILocalResourceRepository createRepository() throws HyracksDataException {
-        return new PersistentLocalResourceRepository(ioManager, nodeId, metadataProperties);
+        return new PersistentLocalResourceRepository(ioManager, nodeId, metadataProperties,
+                indexCheckpointManagerProvider);
     }
 }
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
index 6ebf52c..96a31c6 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
@@ -111,8 +111,8 @@
                     logRecord.isFlushed(false);
                     flushQ.add(logRecord);
                 }
-            } else if (logRecord.getLogSource() == LogSource.REMOTE
-                    && (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT)) {
+            } else if (logRecord.getLogSource() == LogSource.REMOTE && (logRecord.getLogType() == LogType.JOB_COMMIT
+                    || logRecord.getLogType() == LogType.ABORT || logRecord.getLogType() == LogType.FLUSH)) {
                 remoteJobsQ.add(logRecord);
             }
             this.notify();
@@ -260,10 +260,9 @@
                     } else if (logRecord.getLogType() == LogType.WAIT) {
                         notifyWaitTermination();
                     }
-                } else if (logRecord.getLogSource() == LogSource.REMOTE) {
-                    if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT) {
-                        notifyReplicationTermination();
-                    }
+                } else if (logRecord.getLogSource() == LogSource.REMOTE && (logRecord.getLogType() == LogType.JOB_COMMIT
+                        || logRecord.getLogType() == LogType.ABORT || logRecord.getLogType() == LogType.FLUSH)) {
+                    notifyReplicationTermination();
                 }
                 logRecord = logBufferTailReader.next();
             }
@@ -295,10 +294,12 @@
 
     public void notifyFlushTermination() throws ACIDException {
         LogRecord logRecord = null;
-        try {
-            logRecord = (LogRecord) flushQ.take();
-        } catch (InterruptedException e) {
-            //ignore
+        while (logRecord == null) {
+            try {
+                logRecord = (LogRecord) flushQ.take();
+            } catch (InterruptedException e) { //NOSONAR LogFlusher should survive interrupts
+                //ignore
+            }
         }
         synchronized (logRecord) {
             logRecord.isFlushed(true);
@@ -316,10 +317,12 @@
 
     public void notifyReplicationTermination() {
         LogRecord logRecord = null;
-        try {
-            logRecord = (LogRecord) remoteJobsQ.take();
-        } catch (InterruptedException e) {
-            //ignore
+        while (logRecord == null) {
+            try {
+                logRecord = (LogRecord) remoteJobsQ.take();
+            } catch (InterruptedException e) { //NOSONAR LogFlusher should survive interrupts
+                //ignore
+            }
         }
         logRecord.isFlushed(true);
         IReplicationThread replicationThread = logRecord.getReplicationThread();
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
index d3c056d..a6ceba8 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
@@ -411,4 +411,8 @@
         prevTimestamp = ts;
         return ts;
     }
+
+    public static String getComponentEndTime(String fileName) {
+        return fileName.split(DELIMITER)[1];
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 393aa6a..5368591 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -560,22 +560,31 @@
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Started a flush operation for index: " + lsmIndex + " ...");
         }
-
-        ILSMDiskComponent newComponent = null;
-        boolean failedOperation = false;
         try {
-            newComponent = lsmIndex.flush(operation);
-            operation.getCallback().afterOperation(LSMIOOperationType.FLUSH, null, newComponent);
-            newComponent.markAsValid(lsmIndex.isDurable());
-        } catch (Throwable e) { // NOSONAR Log and re-throw
-            failedOperation = true;
-            if (LOGGER.isLoggable(Level.SEVERE)) {
-                LOGGER.log(Level.SEVERE, "Flush failed on " + lsmIndex, e);
+            ILSMDiskComponent newComponent = null;
+            boolean failedOperation = false;
+            try {
+                newComponent = lsmIndex.flush(operation);
+                operation.getCallback().afterOperation(LSMIOOperationType.FLUSH, null, newComponent);
+                newComponent.markAsValid(lsmIndex.isDurable());
+            } catch (Throwable e) { // NOSONAR Log and re-throw
+                failedOperation = true;
+                if (LOGGER.isLoggable(Level.SEVERE)) {
+                    LOGGER.log(Level.SEVERE, "Flush failed on " + lsmIndex, e);
+                }
+                throw e;
+            } finally {
+                exitComponents(ctx, LSMOperationType.FLUSH, newComponent, failedOperation);
+                operation.getCallback().afterFinalize(LSMIOOperationType.FLUSH, newComponent);
+
             }
-            throw e;
         } finally {
-            exitComponents(ctx, LSMOperationType.FLUSH, newComponent, failedOperation);
-            operation.getCallback().afterFinalize(LSMIOOperationType.FLUSH, newComponent);
+            /*
+             * Completion of flush/merge operations is done explicitly here to make sure all generated files during
+             * io operations is completed before the io operation is declared complete
+             */
+            opTracker.completeOperation(lsmIndex, LSMOperationType.FLUSH, ctx.getSearchOperationCallback(),
+                    ctx.getModificationCallback());
         }
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Finished the flush operation for index: " + lsmIndex);
@@ -612,37 +621,42 @@
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Started a merge operation for index: " + lsmIndex + " ...");
         }
-
-        ILSMDiskComponent newComponent = null;
-        boolean failedOperation = false;
         try {
-            newComponent = lsmIndex.merge(operation);
-            operation.getCallback().afterOperation(LSMIOOperationType.MERGE, ctx.getComponentHolder(), newComponent);
-            newComponent.markAsValid(lsmIndex.isDurable());
-        } catch (Throwable e) { // NOSONAR: Log and re-throw
-            failedOperation = true;
-            if (LOGGER.isLoggable(Level.SEVERE)) {
-                LOGGER.log(Level.SEVERE, "Failed merge operation on " + lsmIndex, e);
+            ILSMDiskComponent newComponent = null;
+            boolean failedOperation = false;
+            try {
+                newComponent = lsmIndex.merge(operation);
+                operation.getCallback()
+                        .afterOperation(LSMIOOperationType.MERGE, ctx.getComponentHolder(), newComponent);
+                newComponent.markAsValid(lsmIndex.isDurable());
+            } catch (Throwable e) { // NOSONAR: Log and re-throw
+                failedOperation = true;
+                if (LOGGER.isLoggable(Level.SEVERE)) {
+                    LOGGER.log(Level.SEVERE, "Failed merge operation on " + lsmIndex, e);
+                }
+                throw e;
+            } finally {
+                exitComponents(ctx, LSMOperationType.MERGE, newComponent, failedOperation);
+                operation.getCallback().afterFinalize(LSMIOOperationType.MERGE, newComponent);
             }
-            throw e;
         } finally {
-            exitComponents(ctx, LSMOperationType.MERGE, newComponent, failedOperation);
-            // Completion of the merge operation is called here to and not on afterOperation because
-            // Deletion of the old components comes after afterOperation is called and the number of
-            // io operation should not be decremented before the operation is complete to avoid
-            // index destroy from competing with the merge on deletion of the files.
-            // The order becomes:
-            // 1. scheduleMerge
-            // 2. enterComponents
-            // 3. beforeOperation (increment the numOfIoOperations)
-            // 4. merge
-            // 5. exitComponents
-            // 6. afterOperation (no op)
-            // 7. delete components
-            // 8. completeOperation (decrement the numOfIoOperations)
+            /*
+             * Completion of the merge operation is called here to and not on afterOperation because
+             * deletion of old components comes after afterOperation is called and the number of
+             * io operation should not be decremented before the operation is complete to avoid
+             * index destroy from competing with the merge on deletion of the files.
+             * The order becomes:
+             * 1. scheduleMerge
+             * 2. enterComponents
+             * 3. beforeOperation (increment the numOfIoOperations)
+             * 4. merge
+             * 5. exitComponents
+             * 6. afterOperation (no op)
+             * 7. delete components
+             * 8. completeOperation (decrement the numOfIoOperations)
+             */
             opTracker.completeOperation(lsmIndex, LSMOperationType.MERGE, ctx.getSearchOperationCallback(),
                     ctx.getModificationCallback());
-            operation.getCallback().afterFinalize(LSMIOOperationType.MERGE, newComponent);
         }
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Finished the merge operation for index: " + lsmIndex);