[NO ISSUE][CLUS] Add node active partitions config
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- Add a new config (ACTIVE_PARTITIONS) that contains the current
list of active partitions on a node.
- By default, a node's active partitions list is the same as the
node's assigned partitions.
- Pass node active partitions to CC during bootstrap tasks.
- Adapt test cases.
Change-Id: Ia91e15897221f512aeeccbbe134f1d91db8aa629
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/12663
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 9aa433f..eb8a92f 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -21,12 +21,10 @@
import java.io.IOException;
import java.rmi.RemoteException;
import java.rmi.server.UnicastRemoteObject;
-import java.util.Arrays;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
-import java.util.stream.Collectors;
import org.apache.asterix.active.ActiveManager;
import org.apache.asterix.common.api.IConfigValidator;
@@ -38,7 +36,6 @@
import org.apache.asterix.common.api.IPropertiesFactory;
import org.apache.asterix.common.api.IReceptionist;
import org.apache.asterix.common.api.IReceptionistFactory;
-import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.config.ActiveProperties;
import org.apache.asterix.common.config.BuildProperties;
import org.apache.asterix.common.config.CompilerProperties;
@@ -225,10 +222,8 @@
new DatasetLifecycleManager(storageProperties, localResourceRepository, txnSubsystem.getLogManager(),
virtualBufferCache, indexCheckpointManagerProvider, ioManager.getIODevices().size());
final String nodeId = getServiceContext().getNodeId();
- final ClusterPartition[] nodePartitions = metadataProperties.getNodePartitions().get(nodeId);
- final Set<Integer> nodePartitionsIds =
- Arrays.stream(nodePartitions).map(ClusterPartition::getPartitionId).collect(Collectors.toSet());
- replicaManager = new ReplicaManager(this, nodePartitionsIds);
+ final Set<Integer> nodePartitions = metadataProperties.getNodeActivePartitions(nodeId);
+ replicaManager = new ReplicaManager(this, nodePartitions);
isShuttingdown = false;
activeManager = new ActiveManager(threadExecutor, getServiceContext().getNodeId(),
activeProperties.getMemoryComponentGlobalBudget(), compilerProperties.getFrameSize(),
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
index c4e4f82..f164773 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
@@ -26,14 +26,12 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.stream.Collectors;
import org.apache.asterix.app.nc.task.BindMetadataNodeTask;
import org.apache.asterix.app.nc.task.CheckpointTask;
@@ -51,7 +49,6 @@
import org.apache.asterix.app.replication.message.RegistrationTasksResponseMessage;
import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
import org.apache.asterix.common.api.INCLifecycleTask;
-import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
@@ -102,7 +99,7 @@
@Override
public void notifyNodeFailure(String nodeId, InetSocketAddress replicaAddress) throws HyracksDataException {
pendingStartupCompletionNodes.remove(nodeId);
- clusterManager.updateNodeState(nodeId, false, null);
+ clusterManager.updateNodeState(nodeId, false, null, null);
if (nodeId.equals(metadataNodeId)) {
clusterManager.updateMetadataNode(metadataNodeId, false);
}
@@ -138,7 +135,8 @@
private void process(RegistrationTasksRequestMessage msg) throws HyracksDataException {
final String nodeId = msg.getNodeId();
nodeSecretsMap.put(nodeId, msg.getSecrets());
- List<INCLifecycleTask> tasks = buildNCRegTasks(msg.getNodeId(), msg.getNodeStatus(), msg.getState());
+ List<INCLifecycleTask> tasks =
+ buildNCRegTasks(msg.getNodeId(), msg.getNodeStatus(), msg.getState(), msg.getActivePartitions());
RegistrationTasksResponseMessage response = new RegistrationTasksResponseMessage(nodeId, tasks);
try {
messageBroker.sendApplicationMessageToNC(response, msg.getNodeId());
@@ -157,7 +155,7 @@
return;
}
if (msg.isSuccess()) {
- clusterManager.updateNodeState(msg.getNodeId(), true, msg.getLocalCounters());
+ clusterManager.updateNodeState(msg.getNodeId(), true, msg.getLocalCounters(), msg.getActivePartitions());
if (msg.getNodeId().equals(metadataNodeId)) {
clusterManager.updateMetadataNode(metadataNodeId, true);
}
@@ -167,7 +165,8 @@
}
}
- protected List<INCLifecycleTask> buildNCRegTasks(String nodeId, NodeStatus nodeStatus, SystemState state) {
+ protected List<INCLifecycleTask> buildNCRegTasks(String nodeId, NodeStatus nodeStatus, SystemState state,
+ Set<Integer> activePartitions) {
LOGGER.info("Building registration tasks for node {} with status {} and system state: {}", nodeId, nodeStatus,
state);
final boolean isMetadataNode = nodeId.equals(metadataNodeId);
@@ -175,7 +174,7 @@
case ACTIVE:
return buildActiveNCRegTasks(isMetadataNode);
case IDLE:
- return buildIdleNcRegTasks(nodeId, isMetadataNode, state);
+ return buildIdleNcRegTasks(nodeId, isMetadataNode, state, activePartitions);
default:
return new ArrayList<>();
}
@@ -210,13 +209,13 @@
}
}
- protected List<INCLifecycleTask> buildIdleNcRegTasks(String newNodeId, boolean metadataNode, SystemState state) {
+ protected List<INCLifecycleTask> buildIdleNcRegTasks(String newNodeId, boolean metadataNode, SystemState state,
+ Set<Integer> activePartitions) {
final List<INCLifecycleTask> tasks = new ArrayList<>();
tasks.add(new UpdateNodeStatusTask(NodeStatus.BOOTING));
if (state == SystemState.CORRUPTED) {
- // need to perform local recovery for node partitions
- LocalRecoveryTask rt = new LocalRecoveryTask(Arrays.stream(clusterManager.getNodePartitions(newNodeId))
- .map(ClusterPartition::getPartitionId).collect(Collectors.toSet()));
+ // need to perform local recovery for node active partitions
+ LocalRecoveryTask rt = new LocalRecoveryTask(activePartitions);
tasks.add(rt);
}
if (replicationEnabled) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
index 79fa7c8..1309369 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.app.replication.message;
+import java.util.Set;
+
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
import org.apache.asterix.common.replication.INCLifecycleMessage;
@@ -31,11 +33,14 @@
private final boolean success;
private Throwable exception;
private final NcLocalCounters localCounters;
+ private final Set<Integer> activePartitions;
- public NCLifecycleTaskReportMessage(String nodeId, boolean success, NcLocalCounters localCounters) {
+ public NCLifecycleTaskReportMessage(String nodeId, boolean success, NcLocalCounters localCounters,
+ Set<Integer> activePartitions) {
this.nodeId = nodeId;
this.success = success;
this.localCounters = localCounters;
+ this.activePartitions = activePartitions;
}
@Override
@@ -67,4 +72,8 @@
public MessageType getType() {
return MessageType.REGISTRATION_TASKS_RESULT;
}
+
+ public Set<Integer> getActivePartitions() {
+ return activePartitions;
+ }
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
index c2cc63c..fb50b3e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
@@ -20,6 +20,7 @@
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
@@ -42,20 +43,22 @@
protected final String nodeId;
protected final NodeStatus nodeStatus;
protected final Map<String, Object> secrets;
+ protected final Set<Integer> activePartitions;
public RegistrationTasksRequestMessage(String nodeId, NodeStatus nodeStatus, SystemState state,
- Map<String, Object> secretsEphemeral) {
+ Map<String, Object> secretsEphemeral, Set<Integer> activePartitions) {
this.state = state;
this.nodeId = nodeId;
this.nodeStatus = nodeStatus;
this.secrets = new HashMap<>(secretsEphemeral);
+ this.activePartitions = activePartitions;
}
public static void send(CcId ccId, NodeControllerService cs, NodeStatus nodeStatus, SystemState systemState,
- Map<String, Object> secretsEphemeral) throws HyracksDataException {
+ Map<String, Object> secretsEphemeral, Set<Integer> activePartitions) throws HyracksDataException {
try {
- RegistrationTasksRequestMessage msg =
- new RegistrationTasksRequestMessage(cs.getId(), nodeStatus, systemState, secretsEphemeral);
+ RegistrationTasksRequestMessage msg = new RegistrationTasksRequestMessage(cs.getId(), nodeStatus,
+ systemState, secretsEphemeral, activePartitions);
((INCMessageBroker) cs.getContext().getMessageBroker()).sendMessageToCC(ccId, msg);
} catch (Exception e) {
LOGGER.log(Level.ERROR, "Unable to send RegistrationTasksRequestMessage to CC", e);
@@ -88,4 +91,8 @@
public Map<String, Object> getSecrets() {
return secrets;
}
+
+ public Set<Integer> getActivePartitions() {
+ return activePartitions;
+ }
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
index 1227a6f..d8e1894 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
@@ -19,6 +19,7 @@
package org.apache.asterix.app.replication.message;
import java.util.List;
+import java.util.Set;
import org.apache.asterix.common.api.INCLifecycleTask;
import org.apache.asterix.common.api.INcApplicationContext;
@@ -72,7 +73,9 @@
}
NcLocalCounters localCounter = success ? NcLocalCounters.collect(getCcId(),
(NodeControllerService) appCtx.getServiceContext().getControllerService()) : null;
- NCLifecycleTaskReportMessage result = new NCLifecycleTaskReportMessage(nodeId, success, localCounter);
+ Set<Integer> nodeActivePartitions = appCtx.getMetadataProperties().getNodeActivePartitions(nodeId);
+ NCLifecycleTaskReportMessage result =
+ new NCLifecycleTaskReportMessage(nodeId, success, localCounter, nodeActivePartitions);
result.setException(exception);
try {
broker.sendMessageToCC(getCcId(), result);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index c148c92..5b10512 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -300,7 +300,8 @@
apiServer != null ? Collections.singletonMap(SYS_AUTH_HEADER, apiServer.ctx().get(SYS_AUTH_HEADER))
: Collections.emptyMap();
RegistrationTasksRequestMessage.send(ccId, (NodeControllerService) ncServiceCtx.getControllerService(),
- currentStatus, systemState, httpSecrets);
+ currentStatus, systemState, httpSecrets,
+ runtimeContext.getMetadataProperties().getNodeActivePartitions(nodeId));
}
@Override
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
index 9cc295e..b80fa30 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
@@ -19,7 +19,9 @@
package org.apache.asterix.runtime;
import java.util.Collections;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
@@ -200,7 +202,8 @@
private void notifyNodeStartupCompletion(CcApplicationContext applicationContext, String nodeId)
throws HyracksDataException {
- NCLifecycleTaskReportMessage msg = new NCLifecycleTaskReportMessage(nodeId, true, mockLocalCounters());
+ NCLifecycleTaskReportMessage msg =
+ new NCLifecycleTaskReportMessage(nodeId, true, mockLocalCounters(), getNodeActivePartitions(nodeId));
applicationContext.getNcLifecycleCoordinator().process(msg);
}
@@ -262,4 +265,20 @@
Mockito.when(localCounters.getMaxTxnId()).thenReturn(1000L);
return localCounters;
}
+
+ private static Set<Integer> getNodeActivePartitions(String nodeId) {
+ Set<Integer> activePartitions = new HashSet<>();
+ switch (nodeId) {
+ case NC1:
+ activePartitions.add(0);
+ break;
+ case NC2:
+ activePartitions.add(1);
+ break;
+ case NC3:
+ activePartitions.add(2);
+ break;
+ }
+ return activePartitions;
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
index a37e6e4..2ee9435 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
@@ -62,9 +62,11 @@
* @param nodeId
* @param active
* @param ncLocalCounters
+ * @param activePartitions
* @throws HyracksDataException
*/
- void updateNodeState(String nodeId, boolean active, NcLocalCounters ncLocalCounters) throws HyracksDataException;
+ void updateNodeState(String nodeId, boolean active, NcLocalCounters ncLocalCounters, Set<Integer> activePartitions)
+ throws HyracksDataException;
/**
* Updates the active node and active state of the cluster partition with id {@code partitionNum}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/MetadataProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/MetadataProperties.java
index 7d5ec42..31708d3 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/MetadataProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/MetadataProperties.java
@@ -24,6 +24,7 @@
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.SortedMap;
import org.apache.asterix.common.cluster.ClusterPartition;
@@ -123,6 +124,10 @@
return accessor.getClusterPartitions();
}
+ public Set<Integer> getNodeActivePartitions(String nodeId) {
+ return accessor.getActivePartitions(nodeId);
+ }
+
public Map<String, String> getTransactionLogDirs() {
return accessor.getTransactionLogDirs();
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java
index aaf6316..a03530d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java
@@ -46,7 +46,8 @@
STARTING_PARTITION_ID(
OptionTypes.INTEGER,
-1,
- "The first partition id to assign to iodevices on this node (-1 == auto-assign)");
+ "The first partition id to assign to iodevices on this node (-1 == auto-assign)"),
+ ACTIVE_PARTITIONS(OptionTypes.STRING_ARRAY, null, "List of node active partitions");
private final IOptionType type;
private final Object defaultValue;
@@ -95,7 +96,7 @@
@Override
public boolean hidden() {
- return this == INITIAL_RUN || this == STARTING_PARTITION_ID;
+ return this == INITIAL_RUN || this == STARTING_PARTITION_ID || this == ACTIVE_PARTITIONS;
}
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java
index 3b6100c..5ba378d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java
@@ -25,6 +25,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -35,6 +36,7 @@
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate;
+import java.util.stream.Collectors;
import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.exceptions.AsterixException;
@@ -192,6 +194,17 @@
return clusterPartitions;
}
+ public Set<Integer> getActivePartitions(String nodeId) {
+ // by default, node actives partitions are the partitions assigned to the node
+ String[] activePartitions = cfg.getStringArray(NodeProperties.Option.ACTIVE_PARTITIONS);
+ if (activePartitions == null) {
+ ClusterPartition[] nodeClusterPartitions = nodePartitionsMap.get(nodeId);
+ return Arrays.stream(nodeClusterPartitions).map(ClusterPartition::getPartitionId)
+ .collect(Collectors.toSet());
+ }
+ return Arrays.stream(activePartitions).map(Integer::parseInt).collect(Collectors.toSet());
+ }
+
public List<AsterixExtension> getExtensions() {
return extensions;
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index 98a97b0..7e50102 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -139,19 +139,15 @@
}
@Override
- public synchronized void updateNodeState(String nodeId, boolean active, NcLocalCounters localCounters) {
+ public synchronized void updateNodeState(String nodeId, boolean active, NcLocalCounters localCounters,
+ Set<Integer> activePartitions) {
if (active) {
updateClusterCounters(nodeId, localCounters);
participantNodes.add(nodeId);
+ activateNodePartitions(nodeId, activePartitions);
} else {
participantNodes.remove(nodeId);
- }
- ClusterPartition[] nodePartitions = node2PartitionsMap.get(nodeId);
- // if this isn't a storage node, it will not have cluster partitions
- if (nodePartitions != null) {
- for (ClusterPartition p : nodePartitions) {
- updateClusterPartition(p.getPartitionId(), nodeId, active);
- }
+ deactivateNodePartitions(nodeId);
}
}
@@ -416,11 +412,10 @@
public synchronized void deregisterNodePartitions(String nodeId) throws HyracksDataException {
ClusterPartition[] nodePartitions = node2PartitionsMap.remove(nodeId);
if (nodePartitions == null) {
- LOGGER.info("deregisterNodePartitions unknown node " + nodeId + " (already removed?)");
+ LOGGER.info("deregisterNodePartitions unknown node {} (already removed?)", nodeId);
} else {
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("deregisterNodePartitions for node " + nodeId + ": " + Arrays.toString(nodePartitions));
- }
+ LOGGER.info("deregisterNodePartitions for node {}: {}", () -> nodeId,
+ () -> Arrays.toString(nodePartitions));
for (ClusterPartition nodePartition : nodePartitions) {
clusterPartitions.remove(nodePartition.getPartitionId());
}
@@ -431,12 +426,12 @@
@Override
public synchronized void removePending(String nodeId) {
if (LOGGER.isInfoEnabled()) {
- LOGGER.info("Registering intention to remove node id " + nodeId);
+ LOGGER.info("Registering intention to remove node id {}", nodeId);
}
if (participantNodes.contains(nodeId)) {
pendingRemoval.add(nodeId);
} else {
- LOGGER.warn("Cannot register unknown node " + nodeId + " for pending removal");
+ LOGGER.warn("Cannot register unknown node {} for pending removal", nodeId);
}
}
@@ -496,6 +491,19 @@
});
}
+ private synchronized void activateNodePartitions(String nodeId, Set<Integer> activePartitions) {
+ for (Integer partitionId : activePartitions) {
+ updateClusterPartition(partitionId, nodeId, true);
+ }
+ }
+
+ private synchronized void deactivateNodePartitions(String nodeId) {
+ clusterPartitions.values().stream()
+ .filter(partition -> partition.getActiveNodeId() != null && partition.getActiveNodeId().equals(nodeId))
+ .forEach(nodeActivePartition -> updateClusterPartition(nodeActivePartition.getPartitionId(), nodeId,
+ false));
+ }
+
private static InetSocketAddress getReplicaLocation(IClusterStateManager csm, String nodeId) {
final Map<IOption, Object> ncConfig = csm.getActiveNcConfiguration().get(nodeId);
if (ncConfig == null) {