[NO ISSUE][REPL] Ensure Valid Component ID is Initialized On Replica Sync

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
- Currently, the first time a replica is synchronized from master,
  the valid component id on each replicated index's initial checkpoint
  will be the initial value of a component id (-1). This value is
  fixed when the the replica receives a flushed component from
  the index. However, if the master fails before any component is
  flushed to a replica and that replica is promoted to master, it
  will start from an invalid component id. This change ensures that
  the initial checkpoint of replicated indexes is initialized to
  the maximum component id that appears on master. This will ensure
  that if the replica is promoted, it will at least start from
  a component that wasn't previously used on master.
- Replace assertion of component ids validation by illegal state.

Change-Id: I85395ad823a630725c4cab4bead1c61546dc61ae
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2973
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
index 7b08bad..420585a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
@@ -55,7 +55,8 @@
     }
 
     @Override
-    public synchronized void init(long validComponentSequence, long lsn) throws HyracksDataException {
+    public synchronized void init(long validComponentSequence, long lsn, long validComponentId)
+            throws HyracksDataException {
         List<IndexCheckpoint> checkpoints;
         try {
             checkpoints = getCheckpoints();
@@ -66,7 +67,7 @@
             LOGGER.warn(() -> "Checkpoints found on initializing: " + indexPath);
             delete();
         }
-        IndexCheckpoint firstCheckpoint = IndexCheckpoint.first(validComponentSequence, lsn);
+        IndexCheckpoint firstCheckpoint = IndexCheckpoint.first(validComponentSequence, lsn, validComponentId);
         persist(firstCheckpoint);
     }
 
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
index dd9ede5..2f0eddf 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
@@ -29,9 +29,10 @@
      *
      * @param validComponentSequence
      * @param lsn
+     * @param validComponentId
      * @throws HyracksDataException
      */
-    void init(long validComponentSequence, long lsn) throws HyracksDataException;
+    void init(long validComponentSequence, long lsn, long validComponentId) throws HyracksDataException;
 
     /**
      * Called when a new LSM disk component is flushed. When called, the index checkpoint is updated
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
index 9654473..cb34600 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
@@ -23,7 +23,6 @@
 import java.util.Map;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -42,12 +41,12 @@
     private long lastComponentId;
     private Map<Long, Long> masterNodeFlushMap;
 
-    public static IndexCheckpoint first(long lastComponentSequence, long lowWatermark) {
+    public static IndexCheckpoint first(long lastComponentSequence, long lowWatermark, long validComponentId) {
         IndexCheckpoint firstCheckpoint = new IndexCheckpoint();
         firstCheckpoint.id = INITIAL_CHECKPOINT_ID;
         firstCheckpoint.lowWatermark = lowWatermark;
         firstCheckpoint.validComponentSequence = lastComponentSequence;
-        firstCheckpoint.lastComponentId = LSMComponentId.EMPTY_INDEX_LAST_COMPONENT_ID.getMaxId();
+        firstCheckpoint.lastComponentId = validComponentId;
         firstCheckpoint.masterNodeFlushMap = new HashMap<>();
         return firstCheckpoint;
     }
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
index 448613b..e778cce 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
@@ -45,9 +45,11 @@
 public class CheckpointPartitionIndexesTask implements IReplicaTask {
 
     private final int partition;
+    private final long maxComponentId;
 
-    public CheckpointPartitionIndexesTask(int partition) {
+    public CheckpointPartitionIndexesTask(int partition, long maxComponentId) {
         this.partition = partition;
+        this.maxComponentId = maxComponentId;
     }
 
     @Override
@@ -75,7 +77,7 @@
                 maxComponentSequence =
                         Math.max(maxComponentSequence, IndexComponentFileReference.of(file).getSequenceEnd());
             }
-            indexCheckpointManager.init(maxComponentSequence, currentLSN);
+            indexCheckpointManager.init(maxComponentSequence, currentLSN, maxComponentId);
         }
         ReplicationProtocol.sendAck(worker.getChannel(), worker.getReusableBuffer());
     }
@@ -90,6 +92,7 @@
         try {
             DataOutputStream dos = new DataOutputStream(out);
             dos.writeInt(partition);
+            dos.writeLong(maxComponentId);
         } catch (IOException e) {
             throw HyracksDataException.create(e);
         }
@@ -98,7 +101,8 @@
     public static CheckpointPartitionIndexesTask create(DataInput input) throws HyracksDataException {
         try {
             int partition = input.readInt();
-            return new CheckpointPartitionIndexesTask(partition);
+            long maxComponentId = input.readLong();
+            return new CheckpointPartitionIndexesTask(partition, maxComponentId);
         } catch (IOException e) {
             throw HyracksDataException.create(e);
         }
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
index f53d448..ae36c13 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
@@ -40,6 +40,7 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -98,7 +99,8 @@
         final IIndexCheckpointManager indexCheckpointManager = checkpointManagerProvider.get(indexRef);
         final long currentLSN = appCtx.getTransactionSubsystem().getLogManager().getAppendLSN();
         indexCheckpointManager.delete();
-        indexCheckpointManager.init(Long.MIN_VALUE, currentLSN);
+        indexCheckpointManager.init(Long.MIN_VALUE, currentLSN,
+                LSMComponentId.EMPTY_INDEX_LAST_COMPONENT_ID.getMaxId());
         LOGGER.info(() -> "Checkpoint index: " + indexRef);
     }
 
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
index ef85977..09f1205 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
@@ -25,6 +25,8 @@
 import org.apache.asterix.replication.api.PartitionReplica;
 import org.apache.asterix.replication.messaging.CheckpointPartitionIndexesTask;
 import org.apache.asterix.replication.messaging.ReplicationProtocol;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 /**
  * Performs the steps required to ensure any newly added replica
@@ -60,9 +62,17 @@
     }
 
     private void checkpointReplicaIndexes() throws IOException {
+        final int partition = replica.getIdentifier().getPartition();
         CheckpointPartitionIndexesTask task =
-                new CheckpointPartitionIndexesTask(replica.getIdentifier().getPartition());
+                new CheckpointPartitionIndexesTask(partition, getPartitionMaxComponentId(partition));
         ReplicationProtocol.sendTo(replica, task);
         ReplicationProtocol.waitForAck(replica);
     }
+
+    private long getPartitionMaxComponentId(int partition) throws HyracksDataException {
+        final IReplicationStrategy replStrategy = appCtx.getReplicationManager().getReplicationStrategy();
+        final PersistentLocalResourceRepository localResourceRepository =
+                (PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
+        return localResourceRepository.getReplicatedIndexesMaxComponentId(partition, replStrategy);
+    }
 }
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index c0da095..8f870c0 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -66,8 +66,8 @@
 import org.apache.hyracks.api.replication.IReplicationJob.ReplicationOperation;
 import org.apache.hyracks.api.util.IoUtil;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexFrame;
-import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
 import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
 import org.apache.hyracks.storage.common.ILocalResourceRepository;
 import org.apache.hyracks.storage.common.LocalResource;
 import org.apache.hyracks.util.ExitUtil;
@@ -196,7 +196,8 @@
             byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(resource.toJson(persistedResourceRegistry));
             final Path path = Paths.get(resourceFile.getAbsolutePath());
             Files.write(path, bytes);
-            indexCheckpointManagerProvider.get(DatasetResourceReference.of(resource)).init(Long.MIN_VALUE, 0);
+            indexCheckpointManagerProvider.get(DatasetResourceReference.of(resource)).init(Long.MIN_VALUE, 0,
+                    LSMComponentId.EMPTY_INDEX_LAST_COMPONENT_ID.getMaxId());
             deleteResourceFileMask(resourceFile);
         } catch (Exception e) {
             cleanup(resourceFile);
@@ -393,6 +394,21 @@
         return partitionReplicatedFiles;
     }
 
+    public long getReplicatedIndexesMaxComponentId(int partition, IReplicationStrategy strategy)
+            throws HyracksDataException {
+        long maxComponentId = LSMComponentId.MIN_VALID_COMPONENT_ID;
+        final Map<Long, LocalResource> partitionResources = getPartitionResources(partition);
+        for (LocalResource lr : partitionResources.values()) {
+            DatasetLocalResource datasetLocalResource = (DatasetLocalResource) lr.getResource();
+            if (strategy.isMatch(datasetLocalResource.getDatasetId())) {
+                final IIndexCheckpointManager indexCheckpointManager =
+                        indexCheckpointManagerProvider.get(DatasetResourceReference.of(lr));
+                maxComponentId = Math.max(maxComponentId, indexCheckpointManager.getLatest().getLastComponentId());
+            }
+        }
+        return maxComponentId;
+    }
+
     private List<String> getIndexFiles(File indexDir) {
         final List<String> indexFiles = new ArrayList<>();
         if (indexDir.isDirectory()) {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
index 3d928a4..9199fbb 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
@@ -577,7 +577,7 @@
         if (c != EmptyComponent.INSTANCE) {
             diskComponents.add(0, c);
         }
-        assert checkComponentIds();
+        validateComponentIds();
     }
 
     @Override
@@ -588,7 +588,7 @@
         if (newComponent != EmptyComponent.INSTANCE) {
             diskComponents.add(swapIndex, newComponent);
         }
-        assert checkComponentIds();
+        validateComponentIds();
     }
 
     /**
@@ -597,16 +597,16 @@
      *
      * @throws HyracksDataException
      */
-    private boolean checkComponentIds() throws HyracksDataException {
+    private void validateComponentIds() throws HyracksDataException {
         for (int i = 0; i < diskComponents.size() - 1; i++) {
             ILSMComponentId id1 = diskComponents.get(i).getId();
             ILSMComponentId id2 = diskComponents.get(i + 1).getId();
             IdCompareResult cmp = id1.compareTo(id2);
             if (cmp != IdCompareResult.UNKNOWN && cmp != IdCompareResult.GREATER_THAN) {
-                return false;
+                throw new IllegalStateException(
+                        "found non-decreasing component ids (" + id1 + " -> " + id2 + ") on index " + this);
             }
         }
-        return true;
     }
 
     @Override