[NO ISSUE][REP] Persist master last valid seq on index checkpoint

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:

- When a partition owner replicates a component to a replica,
  maintain the last received component sequence from master.
  This will be used to ensure that any component generated on master,
  but the master fails before replicating it, will not be used when
  the master is re-synced (recovered) from a promoted replica.

Change-Id: I102947712daa07c83b32103b3c58fad46de2dc6d
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/12966
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
index 4acc6d3..2d40ec8 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
@@ -56,7 +56,7 @@
     }
 
     @Override
-    public synchronized void init(long validComponentSequence, long lsn, long validComponentId)
+    public synchronized void init(long validComponentSequence, long lsn, long validComponentId, String masterNodeId)
             throws HyracksDataException {
         List<IndexCheckpoint> checkpoints;
         try {
@@ -68,26 +68,34 @@
             LOGGER.warn(() -> "Checkpoints found on initializing: " + indexPath);
             delete();
         }
-        IndexCheckpoint firstCheckpoint = IndexCheckpoint.first(validComponentSequence, lsn, validComponentId);
+        IndexCheckpoint firstCheckpoint =
+                IndexCheckpoint.first(validComponentSequence, lsn, validComponentId, masterNodeId);
         persist(firstCheckpoint);
     }
 
     @Override
-    public synchronized void replicated(long componentSequence, long masterLsn, long componentId)
+    public synchronized void replicated(long componentSequence, long masterLsn, long componentId, String masterNodeId)
             throws HyracksDataException {
         final Long localLsn = getLatest().getMasterNodeFlushMap().get(masterLsn);
         if (localLsn == null) {
-            throw new IllegalStateException("Component flushed before lsn mapping was received");
+            throw new IllegalStateException("Component replicated before lsn mapping was received");
         }
-        flushed(componentSequence, localLsn, componentId);
+        flushed(componentSequence, localLsn, componentId, masterNodeId);
+    }
+
+    @Override
+    public synchronized void flushed(long componentSequence, long lsn, long componentId, String masterNodeId)
+            throws HyracksDataException {
+        final IndexCheckpoint latest = getLatest();
+        IndexCheckpoint nextCheckpoint =
+                IndexCheckpoint.next(latest, lsn, componentSequence, componentId, masterNodeId);
+        persist(nextCheckpoint);
+        deleteHistory(nextCheckpoint.getId(), HISTORY_CHECKPOINTS);
     }
 
     @Override
     public synchronized void flushed(long componentSequence, long lsn, long componentId) throws HyracksDataException {
-        final IndexCheckpoint latest = getLatest();
-        IndexCheckpoint nextCheckpoint = IndexCheckpoint.next(latest, lsn, componentSequence, componentId);
-        persist(nextCheckpoint);
-        deleteHistory(nextCheckpoint.getId(), HISTORY_CHECKPOINTS);
+        flushed(componentSequence, lsn, componentId, null);
     }
 
     @Override
@@ -95,7 +103,7 @@
         final IndexCheckpoint latest = getLatest();
         latest.getMasterNodeFlushMap().put(masterLsn, localLsn);
         final IndexCheckpoint next = IndexCheckpoint.next(latest, latest.getLowWatermark(),
-                latest.getValidComponentSequence(), latest.getLastComponentId());
+                latest.getValidComponentSequence(), latest.getLastComponentId(), null);
         persist(next);
         notifyAll();
     }
@@ -155,8 +163,8 @@
     @Override
     public synchronized void setLastComponentId(long componentId) throws HyracksDataException {
         final IndexCheckpoint latest = getLatest();
-        final IndexCheckpoint next =
-                IndexCheckpoint.next(latest, latest.getLowWatermark(), latest.getValidComponentSequence(), componentId);
+        final IndexCheckpoint next = IndexCheckpoint.next(latest, latest.getLowWatermark(),
+                latest.getValidComponentSequence(), componentId, null);
         persist(next);
     }
 
@@ -165,7 +173,7 @@
         final IndexCheckpoint latest = getLatest();
         if (componentSequence > latest.getValidComponentSequence()) {
             final IndexCheckpoint next = IndexCheckpoint.next(latest, latest.getLowWatermark(), componentSequence,
-                    latest.getLastComponentId());
+                    latest.getLastComponentId(), null);
             persist(next);
         }
     }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
index 66a69ef..f6de92d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.app.nc;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -30,6 +31,7 @@
 
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.replication.IPartitionReplica;
 import org.apache.asterix.common.storage.IReplicaManager;
 import org.apache.asterix.common.storage.ReplicaIdentifier;
@@ -58,10 +60,12 @@
      */
     private final Map<ReplicaIdentifier, PartitionReplica> replicas = new HashMap<>();
     private final Object replicaSyncLock = new Object();
+    private final Set<Integer> nodeOwnedPartitions = new HashSet<>();
 
     public ReplicaManager(INcApplicationContext appCtx, Set<Integer> partitions) {
         this.appCtx = appCtx;
         this.partitions.addAll(partitions);
+        setNodeOwnedPartitions(appCtx);
     }
 
     @Override
@@ -154,6 +158,11 @@
         return new ArrayList<>(replicas.values());
     }
 
+    @Override
+    public boolean isPartitionOwner(int partition) {
+        return nodeOwnedPartitions.contains(partition);
+    }
+
     public void closePartitionResources(int partition) throws HyracksDataException {
         final IDatasetLifecycleManager datasetLifecycleManager = appCtx.getDatasetLifecycleManager();
         //TODO(mhubail) we can flush only datasets of the requested partition
@@ -171,4 +180,13 @@
         String nodeId = appCtx.getServiceContext().getNodeId();
         return id.getNodeId().equals(nodeId);
     }
+
+    private void setNodeOwnedPartitions(INcApplicationContext appCtx) {
+        ClusterPartition[] clusterPartitions =
+                appCtx.getMetadataProperties().getNodePartitions().get(appCtx.getServiceContext().getNodeId());
+        if (clusterPartitions != null) {
+            nodeOwnedPartitions.addAll(Arrays.stream(clusterPartitions).map(ClusterPartition::getPartitionId)
+                    .collect(Collectors.toList()));
+        }
+    }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
index 801ca0b..f9230dd 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
@@ -28,9 +28,23 @@
      * @param validComponentSequence
      * @param lsn
      * @param validComponentId
+     * @param masterNodeId
      * @throws HyracksDataException
      */
-    void init(long validComponentSequence, long lsn, long validComponentId) throws HyracksDataException;
+    void init(long validComponentSequence, long lsn, long validComponentId, String masterNodeId)
+            throws HyracksDataException;
+
+    /**
+     * Called when a new LSM disk component is flushed due to a replicated component.
+     * When called, the index checkpoint is updated with the latest valid {@code componentSequence}
+     * and low watermark {@code lsn}
+     *
+     * @param componentSequence
+     * @param lsn
+     * @param masterNodeId
+     * @throws HyracksDataException
+     */
+    void flushed(long componentSequence, long lsn, long componentId, String masterNodeId) throws HyracksDataException;
 
     /**
      * Called when a new LSM disk component is flushed. When called, the index checkpoint is updated
@@ -50,9 +64,11 @@
      * @param componentSequence
      * @param masterLsn
      * @param componentId
+     * @param masterNodeId
      * @throws HyracksDataException
      */
-    void replicated(long componentSequence, long masterLsn, long componentId) throws HyracksDataException;
+    void replicated(long componentSequence, long masterLsn, long componentId, String masterNodeId)
+            throws HyracksDataException;
 
     /**
      * Called when a flush log is received and replicated from master. The mapping between
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
index 4f46227..88d3113 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
@@ -100,4 +100,12 @@
      * @return the list of replicas
      */
     List<IPartitionReplica> getReplicas();
+
+    /**
+     * Returns true if {@code partition} is owned by this node, otherwise false.
+     *
+     * @param partition
+     * @return true if the partition is owned by this node, otherwise false.
+     */
+    boolean isPartitionOwner(int partition);
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
index 24b9ae69..a2cf531 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
@@ -44,8 +44,11 @@
     private long lowWatermark;
     private long lastComponentId;
     private Map<Long, Long> masterNodeFlushMap;
+    private String masterNodeId;
+    private long masterValidSeq;
 
-    public static IndexCheckpoint first(long lastComponentSequence, long lowWatermark, long validComponentId) {
+    public static IndexCheckpoint first(long lastComponentSequence, long lowWatermark, long validComponentId,
+            String masterNodeId) {
         IndexCheckpoint firstCheckpoint = new IndexCheckpoint();
         firstCheckpoint.id = INITIAL_CHECKPOINT_ID;
         firstCheckpoint.lowWatermark = lowWatermark;
@@ -53,11 +56,13 @@
         firstCheckpoint.lastComponentId = validComponentId;
         firstCheckpoint.masterNodeFlushMap = new HashMap<>();
         firstCheckpoint.masterNodeFlushMap.put(HAS_NULL_MISSING_VALUES_FIX, HAS_NULL_MISSING_VALUES_FIX);
+        firstCheckpoint.masterNodeId = masterNodeId;
+        firstCheckpoint.masterValidSeq = lastComponentSequence;
         return firstCheckpoint;
     }
 
     public static IndexCheckpoint next(IndexCheckpoint latest, long lowWatermark, long validComponentSequence,
-            long lastComponentId) {
+            long lastComponentId, String masterNodeId) {
         if (lowWatermark < latest.getLowWatermark()) {
             if (LOGGER.isErrorEnabled()) {
                 LOGGER.error("low watermark {} less than the latest checkpoint low watermark {}", lowWatermark, latest);
@@ -70,6 +75,13 @@
         next.lastComponentId = lastComponentId;
         next.validComponentSequence = validComponentSequence;
         next.masterNodeFlushMap = latest.getMasterNodeFlushMap();
+        if (masterNodeId != null) {
+            next.masterNodeId = masterNodeId;
+            next.masterValidSeq = validComponentSequence;
+        } else {
+            next.masterNodeId = latest.getMasterNodeId();
+            next.masterValidSeq = latest.getMasterValidSeq();
+        }
         // remove any lsn from the map that wont be used anymore
         next.masterNodeFlushMap.values().removeIf(lsn -> lsn < lowWatermark && lsn != HAS_NULL_MISSING_VALUES_FIX);
         return next;
@@ -111,6 +123,14 @@
         }
     }
 
+    public String getMasterNodeId() {
+        return masterNodeId;
+    }
+
+    public long getMasterValidSeq() {
+        return masterValidSeq;
+    }
+
     public static IndexCheckpoint fromJson(String json) throws HyracksDataException {
         try {
             return OBJECT_MAPPER.readValue(json, IndexCheckpoint.class);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
index ebf212b..7065767 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
@@ -147,6 +147,10 @@
         return ResourceReference.ofIndex(relativePath.getParent().resolve(dataset).toFile().getPath());
     }
 
+    public boolean isMetadataResource() {
+        return getName().equals(StorageConstants.METADATA_FILE_NAME);
+    }
+
     public Path getFileRelativePath() {
         return relativePath.resolve(name);
     }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
index 6a080bd..2d231d3 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
@@ -37,6 +37,7 @@
      * begin with ".". Otherwise {@link AbstractLSMIndexFileManager} will try to
      * use them as index files.
      */
+    public static final String INDEX_NON_DATA_FILES_PREFIX = ".";
     public static final String INDEX_CHECKPOINT_FILE_PREFIX = ".idx_checkpoint_";
     public static final String METADATA_FILE_NAME = ".metadata";
     public static final String MASK_FILE_PREFIX = ".mask_";
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
index dac4a70..97b6556 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
@@ -48,10 +48,12 @@
 
     private final int partition;
     private final long maxComponentId;
+    private final String masterNodeId;
 
-    public CheckpointPartitionIndexesTask(int partition, long maxComponentId) {
+    public CheckpointPartitionIndexesTask(int partition, long maxComponentId, String masterNodeId) {
         this.partition = partition;
         this.maxComponentId = maxComponentId;
+        this.masterNodeId = masterNodeId;
     }
 
     @Override
@@ -66,7 +68,6 @@
         for (LocalResource ls : partitionResources) {
             DatasetResourceReference ref = DatasetResourceReference.of(ls);
             final IIndexCheckpointManager indexCheckpointManager = indexCheckpointManagerProvider.get(ref);
-            indexCheckpointManager.delete();
             // Get most recent sequence of existing files to avoid deletion
             Path indexPath = StoragePathUtil.getIndexPath(ioManager, ref);
             String[] files = indexPath.toFile().list(AbstractLSMIndexFileManager.COMPONENT_FILES_FILTER);
@@ -79,7 +80,11 @@
                 maxComponentSequence =
                         Math.max(maxComponentSequence, IndexComponentFileReference.of(file).getSequenceEnd());
             }
-            indexCheckpointManager.init(maxComponentSequence, currentLSN, maxComponentId);
+            if (indexCheckpointManager.getCheckpointCount() > 0) {
+                indexCheckpointManager.flushed(maxComponentSequence, currentLSN, maxComponentId, masterNodeId);
+            } else {
+                indexCheckpointManager.init(maxComponentSequence, currentLSN, maxComponentId, masterNodeId);
+            }
         }
         ReplicationProtocol.sendAck(worker.getChannel(), worker.getReusableBuffer());
     }
@@ -95,6 +100,11 @@
             DataOutputStream dos = new DataOutputStream(out);
             dos.writeInt(partition);
             dos.writeLong(maxComponentId);
+            boolean hasMaster = masterNodeId != null;
+            dos.writeBoolean(hasMaster);
+            if (hasMaster) {
+                dos.writeUTF(masterNodeId);
+            }
         } catch (IOException e) {
             throw HyracksDataException.create(e);
         }
@@ -104,7 +114,9 @@
         try {
             int partition = input.readInt();
             long maxComponentId = input.readLong();
-            return new CheckpointPartitionIndexesTask(partition, maxComponentId);
+            final boolean hasMaster = input.readBoolean();
+            final String masterNodeId = hasMaster ? input.readUTF() : null;
+            return new CheckpointPartitionIndexesTask(partition, maxComponentId, masterNodeId);
         } catch (IOException e) {
             throw HyracksDataException.create(e);
         }
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java
index b8f61d0..c92a527 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java
@@ -46,11 +46,13 @@
     private final long masterLsn;
     private final long lastComponentId;
     private final String file;
+    private final String masterNodeId;
 
-    public MarkComponentValidTask(String file, long masterLsn, long lastComponentId) {
+    public MarkComponentValidTask(String file, long masterLsn, long lastComponentId, String masterNodeId) {
         this.file = file;
         this.lastComponentId = lastComponentId;
         this.masterLsn = masterLsn;
+        this.masterNodeId = masterNodeId;
     }
 
     @Override
@@ -95,7 +97,7 @@
                 replicationTimeOut -= TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
             }
             final long componentSequence = IndexComponentFileReference.of(indexRef.getName()).getSequenceEnd();
-            indexCheckpointManager.replicated(componentSequence, masterLsn, lastComponentId);
+            indexCheckpointManager.replicated(componentSequence, masterLsn, lastComponentId, masterNodeId);
         }
     }
 
@@ -111,6 +113,11 @@
             dos.writeUTF(file);
             dos.writeLong(masterLsn);
             dos.writeLong(lastComponentId);
+            boolean hasMaster = masterNodeId != null;
+            dos.writeBoolean(hasMaster);
+            if (hasMaster) {
+                dos.writeUTF(masterNodeId);
+            }
         } catch (IOException e) {
             throw HyracksDataException.create(e);
         }
@@ -120,6 +127,8 @@
         final String indexFile = input.readUTF();
         final long lsn = input.readLong();
         final long lastComponentId = input.readLong();
-        return new MarkComponentValidTask(indexFile, lsn, lastComponentId);
+        final boolean hasMaster = input.readBoolean();
+        final String masterNodeId = hasMaster ? input.readUTF() : null;
+        return new MarkComponentValidTask(indexFile, lsn, lastComponentId, masterNodeId);
     }
 }
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListResponse.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListResponse.java
index 9c3902e..a9921c6 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListResponse.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListResponse.java
@@ -23,7 +23,9 @@
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.asterix.replication.api.IReplicationMessage;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -31,11 +33,16 @@
 public class PartitionResourcesListResponse implements IReplicationMessage {
 
     private final int partition;
-    private final List<String> resources;
+    private Map<String, Long> partitionReplicatedResources;
+    private final List<String> files;
+    private final boolean owner;
 
-    public PartitionResourcesListResponse(int partition, List<String> resources) {
+    public PartitionResourcesListResponse(int partition, Map<String, Long> partitionReplicatedResources,
+            List<String> files, boolean owner) {
         this.partition = partition;
-        this.resources = resources;
+        this.partitionReplicatedResources = partitionReplicatedResources;
+        this.files = files;
+        this.owner = owner;
     }
 
     @Override
@@ -48,17 +55,23 @@
         try {
             DataOutputStream dos = new DataOutputStream(out);
             dos.writeInt(partition);
-            dos.writeInt(resources.size());
-            for (String file : resources) {
+            dos.writeInt(files.size());
+            for (String file : files) {
                 dos.writeUTF(file);
             }
+            dos.writeBoolean(owner);
+            dos.writeInt(partitionReplicatedResources.size());
+            for (Map.Entry<String, Long> stringLongEntry : partitionReplicatedResources.entrySet()) {
+                dos.writeUTF(stringLongEntry.getKey());
+                dos.writeLong(stringLongEntry.getValue());
+            }
         } catch (IOException e) {
             throw HyracksDataException.create(e);
         }
     }
 
-    public List<String> getResources() {
-        return resources;
+    public List<String> getFiles() {
+        return files;
     }
 
     public static PartitionResourcesListResponse create(DataInput input) throws IOException {
@@ -68,6 +81,20 @@
         for (int i = 0; i < size; i++) {
             resources.add(input.readUTF());
         }
-        return new PartitionResourcesListResponse(partition, resources);
+        boolean owner = input.readBoolean();
+        int resourceSize = input.readInt();
+        Map<String, Long> partitionReplicatedResources = new HashMap<>();
+        for (int i = 0; i < resourceSize; i++) {
+            partitionReplicatedResources.put(input.readUTF(), input.readLong());
+        }
+        return new PartitionResourcesListResponse(partition, partitionReplicatedResources, resources, owner);
+    }
+
+    public boolean isOwner() {
+        return owner;
+    }
+
+    public Map<String, Long> getPartitionReplicatedResources() {
+        return partitionReplicatedResources;
     }
 }
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
index cff12de..3ea252f 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
@@ -23,6 +23,7 @@
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 
 import org.apache.asterix.common.api.INcApplicationContext;
@@ -50,11 +51,15 @@
                 (PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
         localResourceRepository.cleanup(partition);
         final IReplicationStrategy replicationStrategy = appCtx.getReplicationManager().getReplicationStrategy();
-        final List<String> partitionResources =
+        // .metadata file -> resource id
+        Map<String, Long> partitionReplicatedResources =
+                localResourceRepository.getPartitionReplicatedResources(partition, replicationStrategy);
+        // all data files in partitions + .metadata files
+        final List<String> partitionFiles =
                 localResourceRepository.getPartitionReplicatedFiles(partition, replicationStrategy).stream()
                         .map(StoragePathUtil::getFileRelativePath).collect(Collectors.toList());
-        final PartitionResourcesListResponse response =
-                new PartitionResourcesListResponse(partition, partitionResources);
+        final PartitionResourcesListResponse response = new PartitionResourcesListResponse(partition,
+                partitionReplicatedResources, partitionFiles, appCtx.getReplicaManager().isPartitionOwner(partition));
         ReplicationProtocol.sendTo(worker.getChannel(), response, worker.getReusableBuffer());
     }
 
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
index 44c7bea..3ee3094 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
@@ -55,11 +55,13 @@
     private final String file;
     private final long size;
     private final boolean indexMetadata;
+    private final String masterNodeId;
 
-    public ReplicateFileTask(String file, long size, boolean indexMetadata) {
+    public ReplicateFileTask(String file, long size, boolean indexMetadata, String masterNodeId) {
         this.file = file;
         this.size = size;
         this.indexMetadata = indexMetadata;
+        this.masterNodeId = masterNodeId;
     }
 
     @Override
@@ -103,7 +105,7 @@
         final long currentLSN = appCtx.getTransactionSubsystem().getLogManager().getAppendLSN();
         indexCheckpointManager.delete();
         indexCheckpointManager.init(UNINITIALIZED_COMPONENT_SEQ, currentLSN,
-                LSMComponentId.EMPTY_INDEX_LAST_COMPONENT_ID.getMaxId());
+                LSMComponentId.EMPTY_INDEX_LAST_COMPONENT_ID.getMaxId(), masterNodeId);
         LOGGER.info(() -> "Checkpoint index: " + indexRef);
     }
 
@@ -119,6 +121,11 @@
             dos.writeUTF(file);
             dos.writeLong(size);
             dos.writeBoolean(indexMetadata);
+            boolean hasMaster = masterNodeId != null;
+            dos.writeBoolean(hasMaster);
+            if (hasMaster) {
+                dos.writeUTF(masterNodeId);
+            }
         } catch (IOException e) {
             throw HyracksDataException.create(e);
         }
@@ -128,7 +135,9 @@
         final String s = input.readUTF();
         final long i = input.readLong();
         final boolean isMetadata = input.readBoolean();
-        return new ReplicateFileTask(s, i, isMetadata);
+        final boolean hasMaster = input.readBoolean();
+        final String masterNodeId = hasMaster ? input.readUTF() : null;
+        return new ReplicateFileTask(s, i, isMetadata, masterNodeId);
     }
 
     @Override
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/FileSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/FileSynchronizer.java
index 7bb2858..813b293 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/FileSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/FileSynchronizer.java
@@ -55,7 +55,9 @@
             final IIOManager ioManager = appCtx.getIoManager();
             final ISocketChannel channel = replica.getChannel();
             final FileReference filePath = ioManager.resolve(file);
-            ReplicateFileTask task = new ReplicateFileTask(file, filePath.getFile().length(), metadata);
+            String masterNode = appCtx.getReplicaManager().isPartitionOwner(replica.getIdentifier().getPartition())
+                    ? appCtx.getServiceContext().getNodeId() : null;
+            ReplicateFileTask task = new ReplicateFileTask(file, filePath.getFile().length(), metadata, masterNode);
             LOGGER.info("attempting to replicate {} to replica {}", task, replica);
             ReplicationProtocol.sendTo(replica, task);
             // send the file itself
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/IndexSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/IndexSynchronizer.java
index 47f872c..48ad5a5 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/IndexSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/IndexSynchronizer.java
@@ -94,8 +94,10 @@
         final FileSynchronizer fileSynchronizer = new FileSynchronizer(appCtx, replica);
         job.getJobFiles().stream().map(StoragePathUtil::getFileRelativePath).forEach(fileSynchronizer::replicate);
         // send mark component valid
-        MarkComponentValidTask markValidTask =
-                new MarkComponentValidTask(indexFile, getReplicatedComponentLsn(), getReplicatedComponentId());
+        String masterNode = appCtx.getReplicaManager().isPartitionOwner(replica.getIdentifier().getPartition())
+                ? appCtx.getServiceContext().getNodeId() : null;
+        MarkComponentValidTask markValidTask = new MarkComponentValidTask(indexFile, getReplicatedComponentLsn(),
+                getReplicatedComponentId(), masterNode);
         ReplicationProtocol.sendTo(replica, markValidTask);
         ReplicationProtocol.waitForAck(replica);
         LOGGER.debug("Replicated component ({}) to replica {}", indexFile, replica);
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
index b47fd39..3c93d17 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
@@ -20,26 +20,39 @@
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.replication.IReplicationStrategy;
+import org.apache.asterix.common.storage.IIndexCheckpointManager;
+import org.apache.asterix.common.storage.IndexCheckpoint;
+import org.apache.asterix.common.storage.ResourceReference;
 import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.asterix.replication.api.PartitionReplica;
 import org.apache.asterix.replication.messaging.PartitionResourcesListResponse;
 import org.apache.asterix.replication.messaging.PartitionResourcesListTask;
 import org.apache.asterix.replication.messaging.ReplicationProtocol;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.network.ISocketChannel;
+import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
+import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;
+import org.apache.hyracks.storage.common.LocalResource;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 /**
  * Ensures that the files between master and a replica are synchronized
  */
 public class ReplicaFilesSynchronizer {
 
+    private static final Logger LOGGER = LogManager.getLogger();
     private final PartitionReplica replica;
     private final INcApplicationContext appCtx;
 
@@ -50,31 +63,42 @@
 
     public void sync() throws IOException {
         final int partition = replica.getIdentifier().getPartition();
-        final Set<String> replicaFiles = getReplicaFiles(partition);
+        PartitionResourcesListResponse replicaResourceResponse = getReplicaFiles(partition);
+        Map<ResourceReference, Long> resourceReferenceLongMap = getValidReplicaResources(
+                replicaResourceResponse.getPartitionReplicatedResources(), replicaResourceResponse.isOwner());
+        // clean up files for invalid resources (deleted or recreated while the replica was down)
+        Set<String> deletedReplicaFiles =
+                cleanupReplicaInvalidResources(replicaResourceResponse, resourceReferenceLongMap);
         final PersistentLocalResourceRepository localResourceRepository =
                 (PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
         final IReplicationStrategy replicationStrategy = appCtx.getReplicationManager().getReplicationStrategy();
         final Set<String> masterFiles =
                 localResourceRepository.getPartitionReplicatedFiles(partition, replicationStrategy).stream()
                         .map(StoragePathUtil::getFileRelativePath).collect(Collectors.toSet());
-        // find files on master and not on replica
-        final List<String> replicaMissingFiles =
-                masterFiles.stream().filter(file -> !replicaFiles.contains(file)).collect(Collectors.toList());
-        replicateMissingFiles(replicaMissingFiles);
-        // find files on replica and not on master
-        final List<String> replicaInvalidFiles =
-                replicaFiles.stream().filter(file -> !masterFiles.contains(file)).collect(Collectors.toList());
-        deleteInvalidFiles(replicaInvalidFiles);
+        // exclude from the replica files the list of invalid deleted files
+        final Set<String> replicaFiles = new HashSet<>(replicaResourceResponse.getFiles());
+        replicaFiles.removeAll(deletedReplicaFiles);
+        syncMissingFiles(replicaFiles, masterFiles);
+        deleteReplicaExtraFiles(replicaFiles, masterFiles);
     }
 
-    private Set<String> getReplicaFiles(int partition) throws IOException {
-        final PartitionResourcesListTask replicaFilesRequest = new PartitionResourcesListTask(partition);
-        final ISocketChannel channel = replica.getChannel();
-        final ByteBuffer reusableBuffer = replica.getReusableBuffer();
-        ReplicationProtocol.sendTo(replica, replicaFilesRequest);
-        final PartitionResourcesListResponse response =
-                (PartitionResourcesListResponse) ReplicationProtocol.read(channel, reusableBuffer);
-        return new HashSet<>(response.getResources());
+    private void deleteReplicaExtraFiles(Set<String> replicaFiles, Set<String> masterFiles) {
+        final List<String> replicaInvalidFiles =
+                replicaFiles.stream().filter(file -> !masterFiles.contains(file)).collect(Collectors.toList());
+        if (!replicaInvalidFiles.isEmpty()) {
+            LOGGER.debug("deleting files not on current master {} on replica {}", replicaInvalidFiles,
+                    replica.getIdentifier());
+            deleteInvalidFiles(replicaInvalidFiles);
+        }
+    }
+
+    private void syncMissingFiles(Set<String> replicaFiles, Set<String> masterFiles) {
+        final List<String> replicaMissingFiles =
+                masterFiles.stream().filter(file -> !replicaFiles.contains(file)).collect(Collectors.toList());
+        if (!replicaMissingFiles.isEmpty()) {
+            LOGGER.debug("replicating missing files {} on replica {}", replicaMissingFiles, replica.getIdentifier());
+            replicateMissingFiles(replicaMissingFiles);
+        }
     }
 
     private void replicateMissingFiles(List<String> files) {
@@ -88,4 +112,73 @@
         final FileSynchronizer sync = new FileSynchronizer(appCtx, replica);
         files.forEach(sync::delete);
     }
+
+    private long getResourceMasterValidSeq(ResourceReference rr) throws HyracksDataException {
+        IIndexCheckpointManager iIndexCheckpointManager = appCtx.getIndexCheckpointManagerProvider().get(rr);
+        int checkpointCount = iIndexCheckpointManager.getCheckpointCount();
+        if (checkpointCount > 0) {
+            IndexCheckpoint latest = iIndexCheckpointManager.getLatest();
+            long masterValidSeq = latest.getMasterValidSeq();
+            LOGGER.info("setting resource {} valid component seq to {}", rr, masterValidSeq);
+            return masterValidSeq;
+        }
+        return AbstractLSMIndexFileManager.UNINITIALIZED_COMPONENT_SEQ;
+    }
+
+    private Set<String> cleanupReplicaInvalidResources(PartitionResourcesListResponse replicaResourceResponse,
+            Map<ResourceReference, Long> validReplicaResources) {
+        Set<String> invalidFiles = new HashSet<>();
+        for (String replicaResPath : replicaResourceResponse.getFiles()) {
+            ResourceReference replicaRes = ResourceReference.of(replicaResPath);
+            if (!validReplicaResources.containsKey(replicaRes)) {
+                LOGGER.debug("replica invalid file {} to be deleted", replicaRes.getFileRelativePath());
+                invalidFiles.add(replicaResPath);
+            } else if (replicaResourceResponse.isOwner() && !replicaRes.isMetadataResource()) {
+                // find files where the owner generated and failed before replicating
+                Long masterValidSeq = validReplicaResources.get(replicaRes);
+                IndexComponentFileReference componentFileReference =
+                        IndexComponentFileReference.of(replicaRes.getName());
+                if (componentFileReference.getSequenceStart() > masterValidSeq
+                        || componentFileReference.getSequenceEnd() > masterValidSeq) {
+                    LOGGER.debug("will ask replica {} to delete file {} based on valid master valid seq {}",
+                            replica.getIdentifier(), replicaResPath, masterValidSeq);
+                    invalidFiles.add(replicaResPath);
+                }
+            }
+        }
+        if (!invalidFiles.isEmpty()) {
+            LOGGER.info("will delete the following files from replica {}", invalidFiles);
+            deleteInvalidFiles(new ArrayList<>(invalidFiles));
+        }
+        return invalidFiles;
+    }
+
+    private PartitionResourcesListResponse getReplicaFiles(int partition) throws IOException {
+        final PartitionResourcesListTask replicaFilesRequest = new PartitionResourcesListTask(partition);
+        final ISocketChannel channel = replica.getChannel();
+        final ByteBuffer reusableBuffer = replica.getReusableBuffer();
+        ReplicationProtocol.sendTo(replica, replicaFilesRequest);
+        return (PartitionResourcesListResponse) ReplicationProtocol.read(channel, reusableBuffer);
+    }
+
+    private Map<ResourceReference, Long> getValidReplicaResources(Map<String, Long> partitionReplicatedResources,
+            boolean owner) throws HyracksDataException {
+        Map<ResourceReference, Long> resource2ValidSeqMap = new HashMap<>();
+        for (Map.Entry<String, Long> resourceEntry : partitionReplicatedResources.entrySet()) {
+            ResourceReference rr = ResourceReference.of(resourceEntry.getKey());
+            final PersistentLocalResourceRepository localResourceRepository =
+                    (PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
+            LocalResource localResource = localResourceRepository.get(rr.getRelativePath().toString());
+            if (localResource != null) {
+                if (localResource.getId() != resourceEntry.getValue()) {
+                    LOGGER.info("replica has resource {} but with different resource id; ours {}, theirs {}", rr,
+                            localResource.getId(), resourceEntry.getValue());
+                } else {
+                    long resourceMasterValidSeq = owner ? getResourceMasterValidSeq(rr) : Integer.MAX_VALUE;
+                    resource2ValidSeqMap.put(rr, resourceMasterValidSeq);
+                }
+            }
+        }
+        return resource2ValidSeqMap;
+    }
 }
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
index 3209a98..6030245 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
@@ -71,8 +71,10 @@
 
     private void checkpointReplicaIndexes() throws IOException {
         final int partition = replica.getIdentifier().getPartition();
+        String masterNode =
+                appCtx.getReplicaManager().isPartitionOwner(partition) ? appCtx.getServiceContext().getNodeId() : null;
         CheckpointPartitionIndexesTask task =
-                new CheckpointPartitionIndexesTask(partition, getPartitionMaxComponentId(partition));
+                new CheckpointPartitionIndexesTask(partition, getPartitionMaxComponentId(partition), masterNode);
         ReplicationProtocol.sendTo(replica, task);
         ReplicationProtocol.waitForAck(replica);
     }
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index a73b71a..2494893 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -19,7 +19,7 @@
 package org.apache.asterix.transaction.management.resource;
 
 import static org.apache.asterix.common.storage.ResourceReference.getComponentSequence;
-import static org.apache.asterix.common.utils.StorageConstants.INDEX_CHECKPOINT_FILE_PREFIX;
+import static org.apache.asterix.common.utils.StorageConstants.INDEX_NON_DATA_FILES_PREFIX;
 import static org.apache.asterix.common.utils.StorageConstants.METADATA_FILE_NAME;
 import static org.apache.hyracks.api.exceptions.ErrorCode.CANNOT_CREATE_FILE;
 import static org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager.COMPONENT_FILES_FILTER;
@@ -90,7 +90,7 @@
     private static final String METADATA_FILE_MASK_NAME =
             StorageConstants.MASK_FILE_PREFIX + StorageConstants.METADATA_FILE_NAME;
     private static final FilenameFilter LSM_INDEX_FILES_FILTER =
-            (dir, name) -> !name.startsWith(INDEX_CHECKPOINT_FILE_PREFIX);
+            (dir, name) -> name.startsWith(METADATA_FILE_NAME) || !name.startsWith(INDEX_NON_DATA_FILES_PREFIX);
     private static final FilenameFilter MASK_FILES_FILTER =
             (dir, name) -> name.startsWith(StorageConstants.MASK_FILE_PREFIX);
     private static final int MAX_CACHED_RESOURCES = 1000;
@@ -200,7 +200,7 @@
             byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(resource.toJson(persistedResourceRegistry));
             FileUtil.writeAndForce(Paths.get(resourceFile.getAbsolutePath()), bytes);
             indexCheckpointManagerProvider.get(DatasetResourceReference.of(resource)).init(UNINITIALIZED_COMPONENT_SEQ,
-                    0, LSMComponentId.EMPTY_INDEX_LAST_COMPONENT_ID.getMaxId());
+                    0, LSMComponentId.EMPTY_INDEX_LAST_COMPONENT_ID.getMaxId(), null);
             deleteResourceFileMask(resourceFile);
         } catch (Exception e) {
             cleanup(resourceFile);
@@ -400,6 +400,20 @@
         });
     }
 
+    public Map<String, Long> getPartitionReplicatedResources(int partition, IReplicationStrategy strategy)
+            throws HyracksDataException {
+        final Map<String, Long> partitionReplicatedResources = new HashMap<>();
+        final Map<Long, LocalResource> partitionResources = getPartitionResources(partition);
+        for (LocalResource lr : partitionResources.values()) {
+            DatasetLocalResource datasetLocalResource = (DatasetLocalResource) lr.getResource();
+            if (strategy.isMatch(datasetLocalResource.getDatasetId())) {
+                DatasetResourceReference drr = DatasetResourceReference.of(lr);
+                partitionReplicatedResources.put(drr.getFileRelativePath().toString(), lr.getId());
+            }
+        }
+        return partitionReplicatedResources;
+    }
+
     public List<String> getPartitionReplicatedFiles(int partition, IReplicationStrategy strategy)
             throws HyracksDataException {
         final List<String> partitionReplicatedFiles = new ArrayList<>();