[NO ISSUE][REP] Pass NC active partitions from CC

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:

- Pass NC active partitions from CC and set them
  during node bootstrap.
- Move local storage clean up to LocalStorageCleanupTask
  to clean up after all node partitions have been set.
- Maintain last LSN in master flush LSN to account for
  cases where the first LSN received from master is the
  same as the low water mark LSN.
- Reduce replica failure logging at error level.

Change-Id: I3782bb2be61f8a57ac45dd6dd6ae0942e83ddc40
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/12903
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Tested-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
index 1d901bd..66a69ef 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
@@ -112,10 +112,17 @@
     }
 
     @Override
+    public synchronized void setActivePartitions(Set<Integer> activePartitions) {
+        partitions.clear();
+        partitions.addAll(activePartitions);
+    }
+
+    @Override
     public synchronized void promote(int partition) throws HyracksDataException {
         if (partitions.contains(partition)) {
             return;
         }
+        LOGGER.warn("promoting partition {}", partition);
         final PersistentLocalResourceRepository localResourceRepository =
                 (PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
         localResourceRepository.cleanup(partition);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalStorageCleanupTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalStorageCleanupTask.java
index 5471fb8..a926c66 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalStorageCleanupTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalStorageCleanupTask.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.app.nc.task;
 
+import java.util.Set;
+
 import org.apache.asterix.common.api.INCLifecycleTask;
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.dataflow.DatasetLocalResource;
@@ -41,6 +43,17 @@
         INcApplicationContext appContext = (INcApplicationContext) cs.getApplicationContext();
         PersistentLocalResourceRepository localResourceRepository =
                 (PersistentLocalResourceRepository) appContext.getLocalResourceRepository();
+        deleteInvalidMetadataIndexes(localResourceRepository);
+        final Set<Integer> nodePartitions = appContext.getReplicaManager().getPartitions();
+        localResourceRepository.deleteCorruptedResources();
+        //TODO optimize this to cleanup all active partitions at once
+        for (Integer partition : nodePartitions) {
+            localResourceRepository.cleanup(partition);
+        }
+    }
+
+    private void deleteInvalidMetadataIndexes(PersistentLocalResourceRepository localResourceRepository)
+            throws HyracksDataException {
         localResourceRepository.deleteInvalidIndexes(r -> {
             DatasetLocalResource lr = (DatasetLocalResource) r.getResource();
             return MetadataIndexImmutableProperties.isMetadataDataset(lr.getDatasetId())
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/UpdateNodeStatusTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/UpdateNodeStatusTask.java
index a1e11c2..fe579ad 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/UpdateNodeStatusTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/UpdateNodeStatusTask.java
@@ -18,25 +18,45 @@
  */
 package org.apache.asterix.app.nc.task;
 
+import java.util.Set;
+
 import org.apache.asterix.common.api.INCLifecycleTask;
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.storage.IReplicaManager;
 import org.apache.hyracks.api.client.NodeStatus;
 import org.apache.hyracks.api.control.CcId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.service.IControllerService;
 import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 public class UpdateNodeStatusTask implements INCLifecycleTask {
 
-    private static final long serialVersionUID = 1L;
+    private static final Logger LOGGER = LogManager.getLogger();
+    private static final long serialVersionUID = 2L;
     private final NodeStatus status;
+    private Set<Integer> activePartitions;
 
-    public UpdateNodeStatusTask(NodeStatus status) {
+    public UpdateNodeStatusTask(NodeStatus status, Set<Integer> activePartitions) {
         this.status = status;
+        this.activePartitions = activePartitions;
     }
 
     @Override
-    public void perform(CcId ccId, IControllerService cs) {
+    public void perform(CcId ccId, IControllerService cs) throws HyracksDataException {
         NodeControllerService ncs = (NodeControllerService) cs;
         ncs.setNodeStatus(status);
+        if (status != NodeStatus.ACTIVE) {
+            updateNodeActivePartitions(cs);
+        }
+    }
+
+    private void updateNodeActivePartitions(IControllerService cs) {
+        INcApplicationContext appCtx = (INcApplicationContext) cs.getApplicationContext();
+        IReplicaManager replicaManager = appCtx.getReplicaManager();
+        LOGGER.info("updating node active partitions to {}", activePartitions);
+        replicaManager.setActivePartitions(activePartitions);
     }
 
     @Override
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
index 97e1a56..22a0a84 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
@@ -213,12 +213,13 @@
     protected List<INCLifecycleTask> buildIdleNcRegTasks(String newNodeId, boolean metadataNode, SystemState state,
             Set<Integer> activePartitions) {
         final List<INCLifecycleTask> tasks = new ArrayList<>();
-        tasks.add(new UpdateNodeStatusTask(NodeStatus.BOOTING));
+        Set<Integer> nodeActivePartitions = getNodeActivePartitions(newNodeId, activePartitions, metadataNode);
+        tasks.add(new UpdateNodeStatusTask(NodeStatus.BOOTING, nodeActivePartitions));
         int metadataPartitionId = clusterManager.getMetadataPartition().getPartitionId();
         tasks.add(new LocalStorageCleanupTask(metadataPartitionId));
         if (state == SystemState.CORRUPTED) {
             // need to perform local recovery for node active partitions
-            LocalRecoveryTask rt = new LocalRecoveryTask(activePartitions);
+            LocalRecoveryTask rt = new LocalRecoveryTask(nodeActivePartitions);
             tasks.add(rt);
         }
         if (replicationEnabled) {
@@ -243,7 +244,7 @@
             tasks.add(new ExportMetadataNodeTask(true));
             tasks.add(new BindMetadataNodeTask());
         }
-        tasks.add(new UpdateNodeStatusTask(NodeStatus.ACTIVE));
+        tasks.add(new UpdateNodeStatusTask(NodeStatus.ACTIVE, nodeActivePartitions));
         return tasks;
     }
 
@@ -300,6 +301,13 @@
         return true;
     }
 
+    protected Set<Integer> getNodeActivePartitions(String nodeId, Set<Integer> nodePartitions, boolean metadataNode) {
+        if (metadataNode) {
+            nodePartitions.add(clusterManager.getMetadataPartition().getPartitionId());
+        }
+        return nodePartitions;
+    }
+
     private void notifyFailedReplica(IClusterStateManager clusterManager, String nodeID,
             InetSocketAddress replicaAddress) {
         LOGGER.info("notify replica failure of nodeId {} at {}", nodeID, replicaAddress);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
index d8e1894..b3546cc 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
@@ -73,7 +73,7 @@
             }
             NcLocalCounters localCounter = success ? NcLocalCounters.collect(getCcId(),
                     (NodeControllerService) appCtx.getServiceContext().getControllerService()) : null;
-            Set<Integer> nodeActivePartitions = appCtx.getMetadataProperties().getNodeActivePartitions(nodeId);
+            Set<Integer> nodeActivePartitions = appCtx.getReplicaManager().getPartitions();
             NCLifecycleTaskReportMessage result =
                     new NCLifecycleTaskReportMessage(nodeId, success, localCounter, nodeActivePartitions);
             result.setException(exception);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index 18e8e65..be1cc7c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -35,7 +35,6 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.asterix.algebra.base.ILangExtension;
 import org.apache.asterix.api.http.server.BasicAuthServlet;
@@ -82,7 +81,6 @@
 import org.apache.asterix.compiler.provider.ILangCompilationProvider;
 import org.apache.asterix.messaging.MessagingChannelInterfaceFactory;
 import org.apache.asterix.messaging.NCMessageBroker;
-import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.asterix.translator.Receptionist;
 import org.apache.asterix.util.MetadataBuiltinFunctions;
 import org.apache.asterix.utils.RedactionUtil;
@@ -319,16 +317,7 @@
     }
 
     private void performLocalCleanUp() throws HyracksDataException {
-        //Delete working area files from failed jobs
         runtimeContext.getIoManager().deleteWorkspaceFiles();
-        // Reclaim storage for orphaned index artifacts in NCs.
-        final Set<Integer> nodePartitions = runtimeContext.getReplicaManager().getPartitions();
-        final PersistentLocalResourceRepository localResourceRepository =
-                (PersistentLocalResourceRepository) runtimeContext.getLocalResourceRepository();
-        localResourceRepository.deleteCorruptedResources();
-        for (Integer partition : nodePartitions) {
-            localResourceRepository.cleanup(partition);
-        }
     }
 
     private void updateOnNodeJoin() {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index 806a6d4..ec05702 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -135,7 +135,7 @@
                 for (ILSMIndex lsmIndex : indexes) {
                     if (lsmIndex.isPrimaryIndex()) {
                         if (lsmIndex.isCurrentMutableComponentEmpty()) {
-                            LOGGER.info("Primary index on dataset {} and partition {} is empty... skipping flush",
+                            LOGGER.debug("Primary index on dataset {} and partition {} is empty... skipping flush",
                                     dsInfo.getDatasetID(), partition);
                             return;
                         }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
index 9630303..4f46227 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
@@ -57,6 +57,13 @@
     Set<Integer> getPartitions();
 
     /**
+     * Sets the node active partitions
+     *
+     * @param activePartitions
+     */
+    void setActivePartitions(Set<Integer> activePartitions);
+
+    /**
      * Promotes a partition by making this node its master replica
      *
      * @param partition
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
index 832f3e9..24b9ae69 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
@@ -71,7 +71,7 @@
         next.validComponentSequence = validComponentSequence;
         next.masterNodeFlushMap = latest.getMasterNodeFlushMap();
         // remove any lsn from the map that wont be used anymore
-        next.masterNodeFlushMap.values().removeIf(lsn -> lsn <= lowWatermark && lsn != HAS_NULL_MISSING_VALUES_FIX);
+        next.masterNodeFlushMap.values().removeIf(lsn -> lsn < lowWatermark && lsn != HAS_NULL_MISSING_VALUES_FIX);
         return next;
     }
 
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
index 6b97306..282d475 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
@@ -77,13 +77,17 @@
     }
 
     public synchronized void sync() {
+        sync(true);
+    }
+
+    public synchronized void sync(boolean register) {
         if (status == IN_SYNC || status == CATCHING_UP) {
             return;
         }
         setStatus(CATCHING_UP);
         appCtx.getThreadExecutor().execute(() -> {
             try {
-                new ReplicaSynchronizer(appCtx, this).sync();
+                new ReplicaSynchronizer(appCtx, this).sync(register);
                 setStatus(IN_SYNC);
             } catch (Exception e) {
                 LOGGER.error(() -> "Failed to sync replica " + this, e);
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java
index 440f8ef..57463cb 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java
@@ -70,7 +70,7 @@
                                         + System.lineSeparator()).getBytes());
                         break;
                     case LogType.FLUSH:
-                        checkpointReplicaIndexes(logRecord, logRecord.getDatasetId());
+                        checkpointReplicaIndexes(logRecord, logRecord.getDatasetId(), logRecord.getResourcePartition());
                         break;
                     default:
                         throw new IllegalStateException("Unexpected log type: " + logRecord.getLogType());
@@ -83,11 +83,13 @@
         }
     }
 
-    private void checkpointReplicaIndexes(RemoteLogRecord remoteLogMapping, int datasetId) throws HyracksDataException {
+    private void checkpointReplicaIndexes(RemoteLogRecord remoteLogMapping, int datasetId, int resourcePartition)
+            throws HyracksDataException {
         final Set<Integer> masterPartitions = appCtx.getReplicaManager().getPartitions();
         final Predicate<LocalResource> replicaIndexesPredicate = lr -> {
             DatasetLocalResource dls = (DatasetLocalResource) lr.getResource();
-            return dls.getDatasetId() == datasetId && !masterPartitions.contains(dls.getPartition());
+            return dls.getDatasetId() == datasetId && dls.getPartition() == resourcePartition
+                    && !masterPartitions.contains(dls.getPartition());
         };
         final Map<Long, LocalResource> resources = localResourceRep.getResources(replicaIndexesPredicate);
         final List<DatasetResourceReference> replicaIndexesRef =
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java
index dd953c4..48eb8e3 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java
@@ -83,8 +83,9 @@
             if (failedDest.contains(dest)) {
                 return;
             }
-            LOGGER.error("Replica failed", e);
+            LOGGER.debug("Replica failed", e);
             if (destinations.contains(dest)) {
+                LOGGER.error("replica at {} failed", dest);
                 failedDest.add(dest);
             }
             replicationManager.notifyFailure(dest, e);
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java
index 6a23ae6..f1d8d4d 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java
@@ -215,11 +215,15 @@
         if (failedSockets.contains(replicaSocket)) {
             return;
         }
-        LOGGER.error("Replica failed", e);
+        LOGGER.debug("Replica failed", e);
         failedSockets.add(replicaSocket);
         Optional<ReplicationDestination> socketDest = destinations.entrySet().stream()
                 .filter(entry -> entry.getValue().equals(replicaSocket)).map(Map.Entry::getKey).findFirst();
-        socketDest.ifPresent(dest -> replicationManager.notifyFailure(dest, e));
+        if (socketDest.isPresent()) {
+            ReplicationDestination dest = socketDest.get();
+            LOGGER.error("replica at {} failed", dest);
+            replicationManager.notifyFailure(dest, e);
+        }
     }
 
     private class TxnAckListener implements Runnable {
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
index 261236c..3209a98 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
@@ -43,7 +43,7 @@
         this.replica = replica;
     }
 
-    public void sync() throws IOException {
+    public void sync(boolean register) throws IOException {
         synchronized (appCtx.getReplicaManager().getReplicaSyncLock()) {
             final ICheckpointManager checkpointManager = appCtx.getTransactionSubsystem().getCheckpointManager();
             try {
@@ -51,7 +51,9 @@
                 checkpointManager.suspend();
                 syncFiles();
                 checkpointReplicaIndexes();
-                appCtx.getReplicationManager().register(replica);
+                if (register) {
+                    appCtx.getReplicationManager().register(replica);
+                }
             } finally {
                 checkpointManager.resume();
             }