[ASTERIXDB-2107][CLUS] Prevent Invalid UNUSABLE State in Dynamic Topology
- user model changes: no
- storage format changes: no
- interface changes: yes
Renamed IClusterStateManager add/Remove NCConfig methods
to notifyNode join/failure.
Details:
- Mark node as participant when it completes its startup and
not when it joins the cluster.
- Allow partitions to be added with pending activation state.
- Remove the use of static MetadataProperties for reporting number of nodes.
- Add test cases.
Change-Id: I7a0db2d66cf44650dcc673b3f2de537816cb84c7
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2029
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java
index db26c3a..80fdbd6 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java
@@ -186,8 +186,8 @@
private boolean addActiveReplica(String replica, ClusterPartition partition,
Map<String, List<Integer>> partitionRecoveryPlan) {
- Map<String, Map<IOption, Object>> activeNcConfiguration = clusterManager.getActiveNcConfiguration();
- if (activeNcConfiguration.containsKey(replica) && !failedNodes.contains(replica)) {
+ final Set<String> participantNodes = clusterManager.getParticipantNodes();
+ if (participantNodes.contains(replica) && !failedNodes.contains(replica)) {
if (!partitionRecoveryPlan.containsKey(replica)) {
List<Integer> replicaPartitions = new ArrayList<>();
replicaPartitions.add(partition.getPartitionId());
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
index 0583508..84f841c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
@@ -71,7 +71,7 @@
LOGGER.info("NC: " + nodeId + " joined");
}
IClusterStateManager csm = appCtx.getClusterStateManager();
- csm.addNCConfiguration(nodeId, ncConfiguration);
+ csm.notifyNodeJoin(nodeId, ncConfiguration);
//if metadata node rejoining, we need to rebind the proxy connection when it is active again.
if (!csm.isMetadataNodeActive()) {
@@ -101,7 +101,7 @@
LOGGER.info("NC: " + deadNode + " left");
}
IClusterStateManager csm = appCtx.getClusterStateManager();
- csm.removeNCConfiguration(deadNode);
+ csm.notifyNodeFailure(deadNode);
//if metadata node failed, we need to rebind the proxy connection when it is active again
if (!csm.isMetadataNodeActive()) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FaultToleranceUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FaultToleranceUtil.java
index 241cd65..9887c57 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FaultToleranceUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/FaultToleranceUtil.java
@@ -20,6 +20,7 @@
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
@@ -48,15 +49,16 @@
List<String> primaryRemoteReplicas = replicationStrategy.getRemotePrimaryReplicas(nodeId).stream()
.map(Replica::getId).collect(Collectors.toList());
String nodeIdAddress = StringUtils.EMPTY;
- Map<String, Map<IOption, Object>> activeNcConfiguration = clusterManager.getActiveNcConfiguration();
+ Map<String, Map<IOption, Object>> ncConfiguration = clusterManager.getNcConfiguration();
// In case the node joined with a new IP address, we need to send it to the other replicas
if (event == ClusterEventType.NODE_JOIN) {
- nodeIdAddress = (String)activeNcConfiguration.get(nodeId).get(NCConfig.Option.CLUSTER_PUBLIC_ADDRESS);
+ nodeIdAddress = (String) ncConfiguration.get(nodeId).get(NCConfig.Option.CLUSTER_PUBLIC_ADDRESS);
}
+ final Set<String> participantNodes = clusterManager.getParticipantNodes();
ReplicaEventMessage msg = new ReplicaEventMessage(nodeId, nodeIdAddress, event);
for (String replica : primaryRemoteReplicas) {
// If the remote replica is alive, send the event
- if (activeNcConfiguration.containsKey(replica)) {
+ if (participantNodes.contains(replica)) {
try {
messageBroker.sendApplicationMessageToNC(msg, replica);
} catch (Exception e) {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
new file mode 100644
index 0000000..6c11139
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.asterix.app.replication.NoFaultToleranceStrategy;
+import org.apache.asterix.app.replication.message.NCLifecycleTaskReportMessage;
+import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
+import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
+import org.apache.asterix.common.config.MetadataProperties;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.metadata.IMetadataBootstrap;
+import org.apache.asterix.runtime.transaction.ResourceIdManager;
+import org.apache.asterix.runtime.utils.CcApplicationContext;
+import org.apache.asterix.runtime.utils.ClusterStateManager;
+import org.apache.hyracks.api.application.ICCServiceContext;
+import org.apache.hyracks.api.config.IApplicationConfig;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.control.cc.application.CCServiceContext;
+import org.apache.hyracks.control.common.application.ConfigManagerApplicationConfig;
+import org.apache.hyracks.control.common.config.ConfigManager;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class ClusterStateManagerTest {
+
+ private static final String NC1 = "NC1";
+ private static final String NC2 = "NC2";
+ private static final String NC3 = "NC3";
+ private static final String METADATA_NODE = NC1;
+
+ /**
+ * Ensures that a cluster with a fixed topology will not be active until
+ * all partitions are active.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void fixedTopologyState() throws Exception {
+ ClusterStateManager csm = new ClusterStateManager();
+ CcApplicationContext ccAppCtx = ccAppContext(csm);
+ // prepare fixed topology
+ ccAppCtx.getMetadataProperties().getClusterPartitions().put(0, new ClusterPartition(0, NC1, 0));
+ ccAppCtx.getMetadataProperties().getClusterPartitions().put(1, new ClusterPartition(1, NC2, 0));
+ ccAppCtx.getMetadataProperties().getClusterPartitions().put(2, new ClusterPartition(2, NC3, 0));
+ for (ClusterPartition cp : ccAppCtx.getMetadataProperties().getClusterPartitions().values()) {
+ ccAppCtx.getMetadataProperties().getNodePartitions().put(cp.getNodeId(), new ClusterPartition[] { cp });
+ }
+ csm.setCcAppCtx(ccAppCtx);
+
+ // notify NC1 joined and completed startup
+ notifyNodeJoined(csm, NC1, 0, false);
+ notifyNodeStartupCompletion(ccAppCtx, NC1);
+ // cluster should be unusable
+ Assert.assertTrue(!csm.isClusterActive());
+ // notify NC2 joined
+ notifyNodeJoined(csm, NC2, 1, false);
+ // notify NC3 joined
+ notifyNodeJoined(csm, NC3, 2, false);
+ // notify NC2 completed startup
+ notifyNodeStartupCompletion(ccAppCtx, NC2);
+ // cluster should still be unusable
+ Assert.assertTrue(!csm.isClusterActive());
+ // notify NC3 completed startup
+ notifyNodeStartupCompletion(ccAppCtx, NC3);
+ // cluster should now be active
+ Assert.assertTrue(csm.isClusterActive());
+ // NC2 failed
+ csm.notifyNodeFailure(NC2);
+ // cluster should now be unusable
+ Assert.assertTrue(!csm.isClusterActive());
+ }
+
+ /**
+ * Ensures that a cluster with a dynamic topology will not go into unusable state while
+ * new partitions are dynamically added.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void dynamicTopologyState() throws Exception {
+ ClusterStateManager csm = new ClusterStateManager();
+ CcApplicationContext ccApplicationContext = ccAppContext(csm);
+ csm.setCcAppCtx(ccApplicationContext);
+
+ // notify NC1 joined and completed startup
+ notifyNodeJoined(csm, NC1, 0, true);
+ notifyNodeStartupCompletion(ccApplicationContext, NC1);
+ // cluster should now be active
+ Assert.assertTrue(csm.isClusterActive());
+ // notify NC2 joined
+ notifyNodeJoined(csm, NC2, 1, true);
+ // notify NC3 joined
+ notifyNodeJoined(csm, NC3, 2, true);
+ // cluster should still be active
+ Assert.assertTrue(csm.isClusterActive());
+ // notify NC2 completed startup
+ notifyNodeStartupCompletion(ccApplicationContext, NC2);
+ // cluster should still be active
+ Assert.assertTrue(csm.isClusterActive());
+ // notify NC3 completed startup
+ notifyNodeStartupCompletion(ccApplicationContext, NC3);
+ // cluster should still be active
+ Assert.assertTrue(csm.isClusterActive());
+ // NC2 failed
+ csm.notifyNodeFailure(NC2);
+ // cluster should now be unusable
+ Assert.assertTrue(!csm.isClusterActive());
+ }
+
+ /**
+ * Ensures that a cluster with a dynamic topology will not go into unusable state if
+ * a newly added node fails before completing its startup
+ *
+ * @throws Exception
+ */
+ @Test
+ public void dynamicTopologyNodeFailure() throws Exception {
+ ClusterStateManager csm = new ClusterStateManager();
+ CcApplicationContext ccApplicationContext = ccAppContext(csm);
+ csm.setCcAppCtx(ccApplicationContext);
+
+ // notify NC1 joined and completed startup
+ notifyNodeJoined(csm, NC1, 0, true);
+ notifyNodeStartupCompletion(ccApplicationContext, NC1);
+ // cluster should now be active
+ Assert.assertTrue(csm.isClusterActive());
+ // notify NC2 joined
+ notifyNodeJoined(csm, NC2, 1, true);
+ // notify NC3 joined
+ notifyNodeJoined(csm, NC3, 2, true);
+ // cluster should still be active
+ Assert.assertTrue(csm.isClusterActive());
+ // notify NC2 completed startup
+ notifyNodeStartupCompletion(ccApplicationContext, NC2);
+ // cluster should still be active
+ Assert.assertTrue(csm.isClusterActive());
+ // NC3 failed before completing startup
+ csm.notifyNodeFailure(NC3);
+ // cluster should still be active
+ Assert.assertTrue(csm.isClusterActive());
+ }
+
+ /**
+ * Ensures that a cluster with a dynamic topology will be in an unusable state
+ * if all partitions are pending activation
+ *
+ * @throws Exception
+ */
+ @Test
+ public void dynamicTopologyNoActivePartitions() throws Exception {
+ ClusterStateManager csm = new ClusterStateManager();
+ CcApplicationContext ccApplicationContext = ccAppContext(csm);
+ csm.setCcAppCtx(ccApplicationContext);
+
+ // notify NC1 joined
+ notifyNodeJoined(csm, NC1, 0, true);
+ // notify NC1 failed before completing startup
+ csm.notifyNodeFailure(NC1);
+ Assert.assertTrue(csm.getState() == ClusterState.UNUSABLE);
+ }
+
+ private void notifyNodeJoined(ClusterStateManager csm, String nodeId, int partitionId, boolean registerPartitions)
+ throws HyracksException, AsterixException {
+ csm.notifyNodeJoin(nodeId, Collections.emptyMap());
+ if (registerPartitions) {
+ csm.registerNodePartitions(nodeId, new ClusterPartition[] { new ClusterPartition(partitionId, nodeId, 0) });
+ }
+ }
+
+ private void notifyNodeStartupCompletion(CcApplicationContext applicationContext, String nodeId)
+ throws HyracksDataException {
+ NCLifecycleTaskReportMessage msg = new NCLifecycleTaskReportMessage(nodeId, true);
+ applicationContext.getResourceIdManager().report(nodeId, 0);
+ applicationContext.getFaultToleranceStrategy().process(msg);
+ }
+
+ private CcApplicationContext ccAppContext(ClusterStateManager csm) throws HyracksDataException {
+ CcApplicationContext ccApplicationContext = Mockito.mock(CcApplicationContext.class);
+ ConfigManager configManager = new ConfigManager(null);
+ IApplicationConfig applicationConfig = new ConfigManagerApplicationConfig(configManager);
+ ICCServiceContext iccServiceContext = Mockito.mock(CCServiceContext.class);
+ Mockito.when(iccServiceContext.getAppConfig()).thenReturn(applicationConfig);
+ Mockito.when(ccApplicationContext.getServiceContext()).thenReturn(iccServiceContext);
+
+ NoFaultToleranceStrategy fts = new NoFaultToleranceStrategy();
+ fts.bindTo(csm);
+ Mockito.when(ccApplicationContext.getFaultToleranceStrategy()).thenReturn(fts);
+
+ MetadataProperties metadataProperties = mockMetadataProperties();
+ Mockito.when(ccApplicationContext.getMetadataProperties()).thenReturn(metadataProperties);
+
+ ResourceIdManager resourceIdManager = new ResourceIdManager(csm);
+ Mockito.when(ccApplicationContext.getResourceIdManager()).thenReturn(resourceIdManager);
+
+ IMetadataBootstrap metadataBootstrap = Mockito.mock(IMetadataBootstrap.class);
+ Mockito.doNothing().when(metadataBootstrap).init();
+ Mockito.when(ccApplicationContext.getMetadataBootstrap()).thenReturn(metadataBootstrap);
+
+ IGlobalRecoveryManager globalRecoveryManager = Mockito.mock(IGlobalRecoveryManager.class);
+ Mockito.when(globalRecoveryManager.isRecoveryCompleted()).thenReturn(true);
+ Mockito.when(ccApplicationContext.getGlobalRecoveryManager()).thenReturn(globalRecoveryManager);
+ return ccApplicationContext;
+ }
+
+ private MetadataProperties mockMetadataProperties() {
+ SortedMap<Integer, ClusterPartition> clusterPartitions = Collections.synchronizedSortedMap(new TreeMap<>());
+ Map<String, ClusterPartition[]> nodePartitionsMap = new ConcurrentHashMap<>();
+ MetadataProperties metadataProperties = Mockito.mock(MetadataProperties.class);
+ Mockito.when(metadataProperties.getMetadataNodeName()).thenReturn(METADATA_NODE);
+ Mockito.when(metadataProperties.getClusterPartitions()).thenReturn(clusterPartitions);
+ Mockito.when(metadataProperties.getNodePartitions()).thenReturn(nodePartitionsMap);
+ return metadataProperties;
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
index cc27fbb..cc99421 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/ClusterPartition.java
@@ -24,6 +24,8 @@
private final int ioDeviceNum;
private String activeNodeId = null;
private boolean active = false;
+ /* a flag indicating if the partition was dynamically added to the cluster and pending first time activation */
+ private boolean pendingActivation = false;
public ClusterPartition(int partitionId, String nodeId, int ioDeviceNum) {
this.partitionId = partitionId;
@@ -55,6 +57,18 @@
this.active = active;
}
+ public boolean isActive() {
+ return active;
+ }
+
+ public boolean isPendingActivation() {
+ return pendingActivation;
+ }
+
+ public void setPendingActivation(boolean pendingActivation) {
+ this.pendingActivation = pendingActivation;
+ }
+
@Override
public ClusterPartition clone() {
return new ClusterPartition(partitionId, nodeId, ioDeviceNum);
@@ -67,10 +81,7 @@
sb.append(", Original Node: " + nodeId);
sb.append(", IODevice: " + ioDeviceNum);
sb.append(", Active Node: " + activeNodeId);
+ sb.append(", Pending Activation: " + pendingActivation);
return sb.toString();
}
-
- public boolean isActive() {
- return active;
- }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
index b368c3b..3948ea6 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
@@ -76,7 +76,7 @@
/**
* @return a map of nodeId and NC Configuration for active nodes.
*/
- Map<String, Map<IOption, Object>> getActiveNcConfiguration();
+ Map<String, Map<IOption, Object>> getNcConfiguration();
/**
* @return The current metadata node Id.
@@ -187,13 +187,13 @@
int getNumberOfNodes();
/**
- * Add node configuration
+ * Notifies {@link IClusterStateManager} that a node has joined
*
* @param nodeId
* @param ncConfiguration
* @throws HyracksException
*/
- void addNCConfiguration(String nodeId, Map<IOption, Object> ncConfiguration) throws HyracksException;
+ void notifyNodeJoin(String nodeId, Map<IOption, Object> ncConfiguration) throws HyracksException;
/**
* @return true if metadata node is active, false otherwise
@@ -201,12 +201,12 @@
boolean isMetadataNodeActive();
/**
- * Remove configuration of a dead node
+ * Notifies {@link IClusterStateManager} that a node has failed
*
* @param deadNode
* @throws HyracksException
*/
- void removeNCConfiguration(String deadNode) throws HyracksException;
+ void notifyNodeFailure(String deadNode) throws HyracksException;
/**
* @return a substitution node or null
diff --git a/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/results/metadata_recovery/metadata_node_recovery/metadata_node_recovery.cluster_state.4.adm b/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/results/metadata_recovery/metadata_node_recovery/metadata_node_recovery.cluster_state.4.adm
index d6ea4b7..06e4d80 100644
--- a/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/results/metadata_recovery/metadata_node_recovery/metadata_node_recovery.cluster_state.4.adm
+++ b/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/results/metadata_recovery/metadata_node_recovery/metadata_node_recovery.cluster_state.4.adm
@@ -6,6 +6,7 @@
"nodeId" : "asterix_nc1",
"activeNodeId" : "asterix_nc1",
"active" : false,
+ "pendingActivation" : false,
"iodeviceNum" : 0
},
"1" : {
@@ -13,6 +14,7 @@
"nodeId" : "asterix_nc1",
"activeNodeId" : "asterix_nc1",
"active" : false,
+ "pendingActivation" : false,
"iodeviceNum" : 1
},
"2" : {
@@ -20,6 +22,7 @@
"nodeId" : "asterix_nc2",
"activeNodeId" : "asterix_nc2",
"active" : true,
+ "pendingActivation" : false,
"iodeviceNum" : 0
},
"3" : {
@@ -27,6 +30,7 @@
"nodeId" : "asterix_nc2",
"activeNodeId" : "asterix_nc2",
"active" : true,
+ "pendingActivation" : false,
"iodeviceNum" : 1
}
},
diff --git a/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/results/metadata_recovery/metadata_node_recovery/metadata_node_recovery.cluster_state.7.adm b/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/results/metadata_recovery/metadata_node_recovery/metadata_node_recovery.cluster_state.7.adm
index 579caac..e0ec010 100644
--- a/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/results/metadata_recovery/metadata_node_recovery/metadata_node_recovery.cluster_state.7.adm
+++ b/asterixdb/asterix-installer/src/test/resources/integrationts/metadata_only_replication/results/metadata_recovery/metadata_node_recovery/metadata_node_recovery.cluster_state.7.adm
@@ -6,6 +6,7 @@
"nodeId" : "asterix_nc1",
"activeNodeId" : "asterix_nc1",
"active" : true,
+ "pendingActivation" : false,
"iodeviceNum" : 0
},
"1" : {
@@ -13,6 +14,7 @@
"nodeId" : "asterix_nc1",
"activeNodeId" : "asterix_nc1",
"active" : true,
+ "pendingActivation" : false,
"iodeviceNum" : 1
},
"2" : {
@@ -20,6 +22,7 @@
"nodeId" : "asterix_nc2",
"activeNodeId" : "asterix_nc2",
"active" : true,
+ "pendingActivation" : false,
"iodeviceNum" : 0
},
"3" : {
@@ -27,6 +30,7 @@
"nodeId" : "asterix_nc2",
"activeNodeId" : "asterix_nc2",
"active" : true,
+ "pendingActivation" : false,
"iodeviceNum" : 1
}
},
diff --git a/asterixdb/asterix-installer/src/test/resources/integrationts/replication/results/failback/node_failback/node_failback.cluster_state.10.adm b/asterixdb/asterix-installer/src/test/resources/integrationts/replication/results/failback/node_failback/node_failback.cluster_state.10.adm
index 579caac..e0ec010 100644
--- a/asterixdb/asterix-installer/src/test/resources/integrationts/replication/results/failback/node_failback/node_failback.cluster_state.10.adm
+++ b/asterixdb/asterix-installer/src/test/resources/integrationts/replication/results/failback/node_failback/node_failback.cluster_state.10.adm
@@ -6,6 +6,7 @@
"nodeId" : "asterix_nc1",
"activeNodeId" : "asterix_nc1",
"active" : true,
+ "pendingActivation" : false,
"iodeviceNum" : 0
},
"1" : {
@@ -13,6 +14,7 @@
"nodeId" : "asterix_nc1",
"activeNodeId" : "asterix_nc1",
"active" : true,
+ "pendingActivation" : false,
"iodeviceNum" : 1
},
"2" : {
@@ -20,6 +22,7 @@
"nodeId" : "asterix_nc2",
"activeNodeId" : "asterix_nc2",
"active" : true,
+ "pendingActivation" : false,
"iodeviceNum" : 0
},
"3" : {
@@ -27,6 +30,7 @@
"nodeId" : "asterix_nc2",
"activeNodeId" : "asterix_nc2",
"active" : true,
+ "pendingActivation" : false,
"iodeviceNum" : 1
}
},
diff --git a/asterixdb/asterix-installer/src/test/resources/integrationts/replication/results/failback/node_failback/node_failback.cluster_state.5.adm b/asterixdb/asterix-installer/src/test/resources/integrationts/replication/results/failback/node_failback/node_failback.cluster_state.5.adm
index 5f58ff7..2de5a55 100644
--- a/asterixdb/asterix-installer/src/test/resources/integrationts/replication/results/failback/node_failback/node_failback.cluster_state.5.adm
+++ b/asterixdb/asterix-installer/src/test/resources/integrationts/replication/results/failback/node_failback/node_failback.cluster_state.5.adm
@@ -6,6 +6,7 @@
"nodeId" : "asterix_nc1",
"activeNodeId" : "asterix_nc2",
"active" : true,
+ "pendingActivation" : false,
"iodeviceNum" : 0
},
"1" : {
@@ -13,6 +14,7 @@
"nodeId" : "asterix_nc1",
"activeNodeId" : "asterix_nc2",
"active" : true,
+ "pendingActivation" : false,
"iodeviceNum" : 1
},
"2" : {
@@ -20,6 +22,7 @@
"nodeId" : "asterix_nc2",
"activeNodeId" : "asterix_nc2",
"active" : true,
+ "pendingActivation" : false,
"iodeviceNum" : 0
},
"3" : {
@@ -27,6 +30,7 @@
"nodeId" : "asterix_nc2",
"activeNodeId" : "asterix_nc2",
"active" : true,
+ "pendingActivation" : false,
"iodeviceNum" : 1
}
},
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index 51c87b4..6e55fd2 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -68,7 +68,7 @@
*/
private static final Logger LOGGER = Logger.getLogger(ClusterStateManager.class.getName());
- private final Map<String, Map<IOption, Object>> activeNcConfiguration = new HashMap<>();
+ private final Map<String, Map<IOption, Object>> ncConfigMap = new HashMap<>();
private Set<String> pendingRemoval = new HashSet<>();
private final Cluster cluster;
private ClusterState state = ClusterState.UNUSABLE;
@@ -78,6 +78,7 @@
private String currentMetadataNode = null;
private boolean metadataNodeActive = false;
private Set<String> failedNodes = new HashSet<>();
+ private Set<String> participantNodes = new HashSet<>();
private IFaultToleranceStrategy ftStrategy;
private ICcApplicationContext appCtx;
@@ -96,25 +97,25 @@
}
@Override
- public synchronized void removeNCConfiguration(String nodeId) throws HyracksException {
+ public synchronized void notifyNodeFailure(String nodeId) throws HyracksException {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Removing configuration parameters for node id " + nodeId);
}
failedNodes.add(nodeId);
- ftStrategy.notifyNodeFailure(nodeId);
+ ncConfigMap.remove(nodeId);
pendingRemoval.remove(nodeId);
+ ftStrategy.notifyNodeFailure(nodeId);
}
@Override
- public synchronized void addNCConfiguration(String nodeId, Map<IOption, Object> configuration)
- throws HyracksException {
+ public synchronized void notifyNodeJoin(String nodeId, Map<IOption, Object> configuration) throws HyracksException {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Registering configuration parameters for node id " + nodeId);
}
- activeNcConfiguration.put(nodeId, configuration);
failedNodes.remove(nodeId);
- ftStrategy.notifyNodeJoin(nodeId);
+ ncConfigMap.put(nodeId, configuration);
updateNodeConfig(nodeId, configuration);
+ ftStrategy.notifyNodeJoin(nodeId);
}
@Override
@@ -142,6 +143,11 @@
@Override
public synchronized void updateNodePartitions(String nodeId, boolean active) throws HyracksDataException {
+ if (active) {
+ participantNodes.add(nodeId);
+ } else {
+ participantNodes.remove(nodeId);
+ }
ClusterPartition[] nodePartitions = node2PartitionsMap.get(nodeId);
// if this isn't a storage node, it will not have cluster partitions
if (nodePartitions != null) {
@@ -159,6 +165,7 @@
clusterPartition.setActive(active);
if (active) {
clusterPartition.setActiveNodeId(activeNode);
+ clusterPartition.setPendingActivation(false);
}
}
}
@@ -170,19 +177,22 @@
return;
}
resetClusterPartitionConstraint();
- if (clusterPartitions.isEmpty()) {
+ // if the cluster has no registered partitions or all partitions are pending activation -> UNUSABLE
+ if (clusterPartitions.isEmpty() || clusterPartitions.values().stream()
+ .allMatch(ClusterPartition::isPendingActivation)) {
LOGGER.info("Cluster does not have any registered partitions");
setState(ClusterState.UNUSABLE);
return;
}
- for (ClusterPartition p : clusterPartitions.values()) {
- if (!p.isActive()) {
- setState(ClusterState.UNUSABLE);
- return;
- }
+
+ // exclude partitions that are pending activation
+ if (clusterPartitions.values().stream().anyMatch(p -> !p.isActive() && !p.isPendingActivation())) {
+ setState(ClusterState.UNUSABLE);
+ return;
}
+
IResourceIdManager resourceIdManager = appCtx.getResourceIdManager();
- for (String node : activeNcConfiguration.keySet()) {
+ for (String node : participantNodes) {
if (!resourceIdManager.reported(node)) {
LOGGER.log(Level.INFO, "Partitions are ready but %s has not yet registered its max resource id...",
node);
@@ -234,7 +244,7 @@
@Override
public synchronized String[] getIODevices(String nodeId) {
- Map<IOption, Object> ncConfig = activeNcConfiguration.get(nodeId);
+ Map<IOption, Object> ncConfig = ncConfigMap.get(nodeId);
if (ncConfig == null) {
if (LOGGER.isLoggable(Level.WARNING)) {
LOGGER.warning("Configuration parameters for nodeId " + nodeId
@@ -258,20 +268,16 @@
@Override
public synchronized Set<String> getParticipantNodes() {
- Set<String> participantNodes = new HashSet<>();
- for (String pNode : activeNcConfiguration.keySet()) {
- participantNodes.add(pNode);
- }
- return participantNodes;
+ return new HashSet<>(participantNodes);
}
@Override
public synchronized Set<String> getParticipantNodes(boolean excludePendingRemoval) {
- Set<String> participantNodes = getParticipantNodes();
+ final Set<String> participantNodesCopy = getParticipantNodes();
if (excludePendingRemoval) {
- participantNodes.removeAll(pendingRemoval);
+ participantNodesCopy.removeAll(pendingRemoval);
}
- return participantNodes;
+ return participantNodesCopy;
}
@Override
@@ -303,8 +309,8 @@
}
@Override
- public int getNumberOfNodes() {
- return appCtx.getMetadataProperties().getNodeNames().size();
+ public synchronized int getNumberOfNodes() {
+ return participantNodes.size();
}
@Override
@@ -379,8 +385,8 @@
}
@Override
- public Map<String, Map<IOption, Object>> getActiveNcConfiguration() {
- return Collections.unmodifiableMap(activeNcConfiguration);
+ public Map<String, Map<IOption, Object>> getNcConfiguration() {
+ return Collections.unmodifiableMap(ncConfigMap);
}
@Override
@@ -402,6 +408,7 @@
}
}
for (ClusterPartition nodePartition : nodePartitions) {
+ nodePartition.setPendingActivation(true);
clusterPartitions.put(nodePartition.getPartitionId(), nodePartition);
}
node2PartitionsMap.put(nodeId, nodePartitions);
@@ -419,6 +426,7 @@
for (ClusterPartition nodePartition : nodePartitions) {
clusterPartitions.remove(nodePartition.getPartitionId());
}
+ participantNodes.remove(nodeId);
}
}
@@ -427,7 +435,7 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Registering intention to remove node id " + nodeId);
}
- if (activeNcConfiguration.containsKey(nodeId)) {
+ if (participantNodes.contains(nodeId)) {
pendingRemoval.add(nodeId);
} else {
LOGGER.warning("Cannot register unknown node " + nodeId + " for pending removal");