[NO ISSUE][CLUS] Add node active partitions config

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:

- Add a new config (ACTIVE_PARTITIONS) that contains the current
  list of active partitions on a node.
- By default, a node's active partitions list is the same as the
  node's assigned partitions.
- Pass node active partitions to CC during bootstrap tasks.
- Adapt test cases.

Change-Id: Ia91e15897221f512aeeccbbe134f1d91db8aa629
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/12663
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 9aa433f..eb8a92f 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -21,12 +21,10 @@
 import java.io.IOException;
 import java.rmi.RemoteException;
 import java.rmi.server.UnicastRemoteObject;
-import java.util.Arrays;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
-import java.util.stream.Collectors;
 
 import org.apache.asterix.active.ActiveManager;
 import org.apache.asterix.common.api.IConfigValidator;
@@ -38,7 +36,6 @@
 import org.apache.asterix.common.api.IPropertiesFactory;
 import org.apache.asterix.common.api.IReceptionist;
 import org.apache.asterix.common.api.IReceptionistFactory;
-import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.config.ActiveProperties;
 import org.apache.asterix.common.config.BuildProperties;
 import org.apache.asterix.common.config.CompilerProperties;
@@ -225,10 +222,8 @@
                 new DatasetLifecycleManager(storageProperties, localResourceRepository, txnSubsystem.getLogManager(),
                         virtualBufferCache, indexCheckpointManagerProvider, ioManager.getIODevices().size());
         final String nodeId = getServiceContext().getNodeId();
-        final ClusterPartition[] nodePartitions = metadataProperties.getNodePartitions().get(nodeId);
-        final Set<Integer> nodePartitionsIds =
-                Arrays.stream(nodePartitions).map(ClusterPartition::getPartitionId).collect(Collectors.toSet());
-        replicaManager = new ReplicaManager(this, nodePartitionsIds);
+        final Set<Integer> nodePartitions = metadataProperties.getNodeActivePartitions(nodeId);
+        replicaManager = new ReplicaManager(this, nodePartitions);
         isShuttingdown = false;
         activeManager = new ActiveManager(threadExecutor, getServiceContext().getNodeId(),
                 activeProperties.getMemoryComponentGlobalBudget(), compilerProperties.getFrameSize(),
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
index c4e4f82..f164773 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
@@ -26,14 +26,12 @@
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 import org.apache.asterix.app.nc.task.BindMetadataNodeTask;
 import org.apache.asterix.app.nc.task.CheckpointTask;
@@ -51,7 +49,6 @@
 import org.apache.asterix.app.replication.message.RegistrationTasksResponseMessage;
 import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
 import org.apache.asterix.common.api.INCLifecycleTask;
-import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
@@ -102,7 +99,7 @@
     @Override
     public void notifyNodeFailure(String nodeId, InetSocketAddress replicaAddress) throws HyracksDataException {
         pendingStartupCompletionNodes.remove(nodeId);
-        clusterManager.updateNodeState(nodeId, false, null);
+        clusterManager.updateNodeState(nodeId, false, null, null);
         if (nodeId.equals(metadataNodeId)) {
             clusterManager.updateMetadataNode(metadataNodeId, false);
         }
@@ -138,7 +135,8 @@
     private void process(RegistrationTasksRequestMessage msg) throws HyracksDataException {
         final String nodeId = msg.getNodeId();
         nodeSecretsMap.put(nodeId, msg.getSecrets());
-        List<INCLifecycleTask> tasks = buildNCRegTasks(msg.getNodeId(), msg.getNodeStatus(), msg.getState());
+        List<INCLifecycleTask> tasks =
+                buildNCRegTasks(msg.getNodeId(), msg.getNodeStatus(), msg.getState(), msg.getActivePartitions());
         RegistrationTasksResponseMessage response = new RegistrationTasksResponseMessage(nodeId, tasks);
         try {
             messageBroker.sendApplicationMessageToNC(response, msg.getNodeId());
@@ -157,7 +155,7 @@
             return;
         }
         if (msg.isSuccess()) {
-            clusterManager.updateNodeState(msg.getNodeId(), true, msg.getLocalCounters());
+            clusterManager.updateNodeState(msg.getNodeId(), true, msg.getLocalCounters(), msg.getActivePartitions());
             if (msg.getNodeId().equals(metadataNodeId)) {
                 clusterManager.updateMetadataNode(metadataNodeId, true);
             }
@@ -167,7 +165,8 @@
         }
     }
 
-    protected List<INCLifecycleTask> buildNCRegTasks(String nodeId, NodeStatus nodeStatus, SystemState state) {
+    protected List<INCLifecycleTask> buildNCRegTasks(String nodeId, NodeStatus nodeStatus, SystemState state,
+            Set<Integer> activePartitions) {
         LOGGER.info("Building registration tasks for node {} with status {} and system state: {}", nodeId, nodeStatus,
                 state);
         final boolean isMetadataNode = nodeId.equals(metadataNodeId);
@@ -175,7 +174,7 @@
             case ACTIVE:
                 return buildActiveNCRegTasks(isMetadataNode);
             case IDLE:
-                return buildIdleNcRegTasks(nodeId, isMetadataNode, state);
+                return buildIdleNcRegTasks(nodeId, isMetadataNode, state, activePartitions);
             default:
                 return new ArrayList<>();
         }
@@ -210,13 +209,13 @@
         }
     }
 
-    protected List<INCLifecycleTask> buildIdleNcRegTasks(String newNodeId, boolean metadataNode, SystemState state) {
+    protected List<INCLifecycleTask> buildIdleNcRegTasks(String newNodeId, boolean metadataNode, SystemState state,
+            Set<Integer> activePartitions) {
         final List<INCLifecycleTask> tasks = new ArrayList<>();
         tasks.add(new UpdateNodeStatusTask(NodeStatus.BOOTING));
         if (state == SystemState.CORRUPTED) {
-            // need to perform local recovery for node partitions
-            LocalRecoveryTask rt = new LocalRecoveryTask(Arrays.stream(clusterManager.getNodePartitions(newNodeId))
-                    .map(ClusterPartition::getPartitionId).collect(Collectors.toSet()));
+            // need to perform local recovery for node active partitions
+            LocalRecoveryTask rt = new LocalRecoveryTask(activePartitions);
             tasks.add(rt);
         }
         if (replicationEnabled) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
index 79fa7c8..1309369 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.app.replication.message;
 
+import java.util.Set;
+
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
 import org.apache.asterix.common.replication.INCLifecycleMessage;
@@ -31,11 +33,14 @@
     private final boolean success;
     private Throwable exception;
     private final NcLocalCounters localCounters;
+    private final Set<Integer> activePartitions;
 
-    public NCLifecycleTaskReportMessage(String nodeId, boolean success, NcLocalCounters localCounters) {
+    public NCLifecycleTaskReportMessage(String nodeId, boolean success, NcLocalCounters localCounters,
+            Set<Integer> activePartitions) {
         this.nodeId = nodeId;
         this.success = success;
         this.localCounters = localCounters;
+        this.activePartitions = activePartitions;
     }
 
     @Override
@@ -67,4 +72,8 @@
     public MessageType getType() {
         return MessageType.REGISTRATION_TASKS_RESULT;
     }
+
+    public Set<Integer> getActivePartitions() {
+        return activePartitions;
+    }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
index c2cc63c..fb50b3e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
@@ -20,6 +20,7 @@
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
@@ -42,20 +43,22 @@
     protected final String nodeId;
     protected final NodeStatus nodeStatus;
     protected final Map<String, Object> secrets;
+    protected final Set<Integer> activePartitions;
 
     public RegistrationTasksRequestMessage(String nodeId, NodeStatus nodeStatus, SystemState state,
-            Map<String, Object> secretsEphemeral) {
+            Map<String, Object> secretsEphemeral, Set<Integer> activePartitions) {
         this.state = state;
         this.nodeId = nodeId;
         this.nodeStatus = nodeStatus;
         this.secrets = new HashMap<>(secretsEphemeral);
+        this.activePartitions = activePartitions;
     }
 
     public static void send(CcId ccId, NodeControllerService cs, NodeStatus nodeStatus, SystemState systemState,
-            Map<String, Object> secretsEphemeral) throws HyracksDataException {
+            Map<String, Object> secretsEphemeral, Set<Integer> activePartitions) throws HyracksDataException {
         try {
-            RegistrationTasksRequestMessage msg =
-                    new RegistrationTasksRequestMessage(cs.getId(), nodeStatus, systemState, secretsEphemeral);
+            RegistrationTasksRequestMessage msg = new RegistrationTasksRequestMessage(cs.getId(), nodeStatus,
+                    systemState, secretsEphemeral, activePartitions);
             ((INCMessageBroker) cs.getContext().getMessageBroker()).sendMessageToCC(ccId, msg);
         } catch (Exception e) {
             LOGGER.log(Level.ERROR, "Unable to send RegistrationTasksRequestMessage to CC", e);
@@ -88,4 +91,8 @@
     public Map<String, Object> getSecrets() {
         return secrets;
     }
+
+    public Set<Integer> getActivePartitions() {
+        return activePartitions;
+    }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
index 1227a6f..d8e1894 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.app.replication.message;
 
 import java.util.List;
+import java.util.Set;
 
 import org.apache.asterix.common.api.INCLifecycleTask;
 import org.apache.asterix.common.api.INcApplicationContext;
@@ -72,7 +73,9 @@
             }
             NcLocalCounters localCounter = success ? NcLocalCounters.collect(getCcId(),
                     (NodeControllerService) appCtx.getServiceContext().getControllerService()) : null;
-            NCLifecycleTaskReportMessage result = new NCLifecycleTaskReportMessage(nodeId, success, localCounter);
+            Set<Integer> nodeActivePartitions = appCtx.getMetadataProperties().getNodeActivePartitions(nodeId);
+            NCLifecycleTaskReportMessage result =
+                    new NCLifecycleTaskReportMessage(nodeId, success, localCounter, nodeActivePartitions);
             result.setException(exception);
             try {
                 broker.sendMessageToCC(getCcId(), result);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index c148c92..5b10512 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -300,7 +300,8 @@
                 apiServer != null ? Collections.singletonMap(SYS_AUTH_HEADER, apiServer.ctx().get(SYS_AUTH_HEADER))
                         : Collections.emptyMap();
         RegistrationTasksRequestMessage.send(ccId, (NodeControllerService) ncServiceCtx.getControllerService(),
-                currentStatus, systemState, httpSecrets);
+                currentStatus, systemState, httpSecrets,
+                runtimeContext.getMetadataProperties().getNodeActivePartitions(nodeId));
     }
 
     @Override
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
index 9cc295e..b80fa30 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
@@ -19,7 +19,9 @@
 package org.apache.asterix.runtime;
 
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
@@ -200,7 +202,8 @@
 
     private void notifyNodeStartupCompletion(CcApplicationContext applicationContext, String nodeId)
             throws HyracksDataException {
-        NCLifecycleTaskReportMessage msg = new NCLifecycleTaskReportMessage(nodeId, true, mockLocalCounters());
+        NCLifecycleTaskReportMessage msg =
+                new NCLifecycleTaskReportMessage(nodeId, true, mockLocalCounters(), getNodeActivePartitions(nodeId));
         applicationContext.getNcLifecycleCoordinator().process(msg);
     }
 
@@ -262,4 +265,20 @@
         Mockito.when(localCounters.getMaxTxnId()).thenReturn(1000L);
         return localCounters;
     }
+
+    private static Set<Integer> getNodeActivePartitions(String nodeId) {
+        Set<Integer> activePartitions = new HashSet<>();
+        switch (nodeId) {
+            case NC1:
+                activePartitions.add(0);
+                break;
+            case NC2:
+                activePartitions.add(1);
+                break;
+            case NC3:
+                activePartitions.add(2);
+                break;
+        }
+        return activePartitions;
+    }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
index a37e6e4..2ee9435 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
@@ -62,9 +62,11 @@
      * @param nodeId
      * @param active
      * @param ncLocalCounters
+     * @param activePartitions
      * @throws HyracksDataException
      */
-    void updateNodeState(String nodeId, boolean active, NcLocalCounters ncLocalCounters) throws HyracksDataException;
+    void updateNodeState(String nodeId, boolean active, NcLocalCounters ncLocalCounters, Set<Integer> activePartitions)
+            throws HyracksDataException;
 
     /**
      * Updates the active node and active state of the cluster partition with id {@code partitionNum}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/MetadataProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/MetadataProperties.java
index 7d5ec42..31708d3 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/MetadataProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/MetadataProperties.java
@@ -24,6 +24,7 @@
 
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.SortedMap;
 
 import org.apache.asterix.common.cluster.ClusterPartition;
@@ -123,6 +124,10 @@
         return accessor.getClusterPartitions();
     }
 
+    public Set<Integer> getNodeActivePartitions(String nodeId) {
+        return accessor.getActivePartitions(nodeId);
+    }
+
     public Map<String, String> getTransactionLogDirs() {
         return accessor.getTransactionLogDirs();
     }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java
index aaf6316..a03530d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java
@@ -46,7 +46,8 @@
         STARTING_PARTITION_ID(
                 OptionTypes.INTEGER,
                 -1,
-                "The first partition id to assign to iodevices on this node (-1 == auto-assign)");
+                "The first partition id to assign to iodevices on this node (-1 == auto-assign)"),
+        ACTIVE_PARTITIONS(OptionTypes.STRING_ARRAY, null, "List of node active partitions");
 
         private final IOptionType type;
         private final Object defaultValue;
@@ -95,7 +96,7 @@
 
         @Override
         public boolean hidden() {
-            return this == INITIAL_RUN || this == STARTING_PARTITION_ID;
+            return this == INITIAL_RUN || this == STARTING_PARTITION_ID || this == ACTIVE_PARTITIONS;
         }
 
     }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java
index 3b6100c..5ba378d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java
@@ -25,6 +25,7 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -35,6 +36,7 @@
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Predicate;
+import java.util.stream.Collectors;
 
 import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.exceptions.AsterixException;
@@ -192,6 +194,17 @@
         return clusterPartitions;
     }
 
+    public Set<Integer> getActivePartitions(String nodeId) {
+        // by default, node actives partitions are the partitions assigned to the node
+        String[] activePartitions = cfg.getStringArray(NodeProperties.Option.ACTIVE_PARTITIONS);
+        if (activePartitions == null) {
+            ClusterPartition[] nodeClusterPartitions = nodePartitionsMap.get(nodeId);
+            return Arrays.stream(nodeClusterPartitions).map(ClusterPartition::getPartitionId)
+                    .collect(Collectors.toSet());
+        }
+        return Arrays.stream(activePartitions).map(Integer::parseInt).collect(Collectors.toSet());
+    }
+
     public List<AsterixExtension> getExtensions() {
         return extensions;
     }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index 98a97b0..7e50102 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -139,19 +139,15 @@
     }
 
     @Override
-    public synchronized void updateNodeState(String nodeId, boolean active, NcLocalCounters localCounters) {
+    public synchronized void updateNodeState(String nodeId, boolean active, NcLocalCounters localCounters,
+            Set<Integer> activePartitions) {
         if (active) {
             updateClusterCounters(nodeId, localCounters);
             participantNodes.add(nodeId);
+            activateNodePartitions(nodeId, activePartitions);
         } else {
             participantNodes.remove(nodeId);
-        }
-        ClusterPartition[] nodePartitions = node2PartitionsMap.get(nodeId);
-        // if this isn't a storage node, it will not have cluster partitions
-        if (nodePartitions != null) {
-            for (ClusterPartition p : nodePartitions) {
-                updateClusterPartition(p.getPartitionId(), nodeId, active);
-            }
+            deactivateNodePartitions(nodeId);
         }
     }
 
@@ -416,11 +412,10 @@
     public synchronized void deregisterNodePartitions(String nodeId) throws HyracksDataException {
         ClusterPartition[] nodePartitions = node2PartitionsMap.remove(nodeId);
         if (nodePartitions == null) {
-            LOGGER.info("deregisterNodePartitions unknown node " + nodeId + " (already removed?)");
+            LOGGER.info("deregisterNodePartitions unknown node {} (already removed?)", nodeId);
         } else {
-            if (LOGGER.isInfoEnabled()) {
-                LOGGER.info("deregisterNodePartitions for node " + nodeId + ": " + Arrays.toString(nodePartitions));
-            }
+            LOGGER.info("deregisterNodePartitions for node {}: {}", () -> nodeId,
+                    () -> Arrays.toString(nodePartitions));
             for (ClusterPartition nodePartition : nodePartitions) {
                 clusterPartitions.remove(nodePartition.getPartitionId());
             }
@@ -431,12 +426,12 @@
     @Override
     public synchronized void removePending(String nodeId) {
         if (LOGGER.isInfoEnabled()) {
-            LOGGER.info("Registering intention to remove node id " + nodeId);
+            LOGGER.info("Registering intention to remove node id {}", nodeId);
         }
         if (participantNodes.contains(nodeId)) {
             pendingRemoval.add(nodeId);
         } else {
-            LOGGER.warn("Cannot register unknown node " + nodeId + " for pending removal");
+            LOGGER.warn("Cannot register unknown node {} for pending removal", nodeId);
         }
     }
 
@@ -496,6 +491,19 @@
         });
     }
 
+    private synchronized void activateNodePartitions(String nodeId, Set<Integer> activePartitions) {
+        for (Integer partitionId : activePartitions) {
+            updateClusterPartition(partitionId, nodeId, true);
+        }
+    }
+
+    private synchronized void deactivateNodePartitions(String nodeId) {
+        clusterPartitions.values().stream()
+                .filter(partition -> partition.getActiveNodeId() != null && partition.getActiveNodeId().equals(nodeId))
+                .forEach(nodeActivePartition -> updateClusterPartition(nodeActivePartition.getPartitionId(), nodeId,
+                        false));
+    }
+
     private static InetSocketAddress getReplicaLocation(IClusterStateManager csm, String nodeId) {
         final Map<IOption, Object> ncConfig = csm.getActiveNcConfiguration().get(nodeId);
         if (ncConfig == null) {