[NO ISSUE][REP] Notify nodes on replica failure
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- Whenever a node fails, notify remaining active nodes
that the failed node's replica has failed.
Change-Id: I12551bd543cd4b664101e8f4d4f44f3124de3d54
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/9944
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
index 7576e0d..d22e9fc 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
@@ -18,6 +18,9 @@
*/
package org.apache.asterix.app.replication;
+import static org.apache.hyracks.api.exceptions.ErrorCode.NODE_FAILED;
+
+import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -49,6 +52,7 @@
import org.apache.asterix.common.replication.INcLifecycleCoordinator;
import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.replication.messaging.ReplicaFailedMessage;
import org.apache.hyracks.api.application.ICCServiceContext;
import org.apache.hyracks.api.client.NodeStatus;
import org.apache.hyracks.api.control.IGatekeeper;
@@ -80,12 +84,15 @@
}
@Override
- public void notifyNodeFailure(String nodeId) throws HyracksDataException {
+ public void notifyNodeFailure(String nodeId, InetSocketAddress replicaAddress) throws HyracksDataException {
pendingStartupCompletionNodes.remove(nodeId);
clusterManager.updateNodeState(nodeId, false, null);
if (nodeId.equals(metadataNodeId)) {
clusterManager.updateMetadataNode(metadataNodeId, false);
}
+ if (replicaAddress != null) {
+ notifyFailedReplica(clusterManager, nodeId, replicaAddress);
+ }
clusterManager.refreshState();
}
@@ -229,4 +236,19 @@
throw HyracksDataException.create(e);
}
}
+
+ private void notifyFailedReplica(IClusterStateManager clusterManager, String nodeID,
+ InetSocketAddress replicaAddress) {
+ LOGGER.info("notify replica failure of nodeId {} at {}", nodeID, replicaAddress);
+ Set<String> ncs = clusterManager.getParticipantNodes(true);
+ ReplicaFailedMessage message =
+ new ReplicaFailedMessage(replicaAddress, HyracksDataException.create(NODE_FAILED, nodeID));
+ for (String nodeId : ncs) {
+ try {
+ messageBroker.sendApplicationMessageToNC(message, nodeId);
+ } catch (Exception e) {
+ LOGGER.info("failed to notify replica failure to node {}", nodeID);
+ }
+ }
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/INcLifecycleCoordinator.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/INcLifecycleCoordinator.java
index 1a7c3c8..9a3b125 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/INcLifecycleCoordinator.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/INcLifecycleCoordinator.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.common.replication;
+import java.net.InetSocketAddress;
+
import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -35,9 +37,10 @@
* Defines the logic of a {@link INcLifecycleCoordinator} when a node leaves the cluster.
*
* @param nodeId
+ * @param replicaAddress
* @throws HyracksDataException
*/
- void notifyNodeFailure(String nodeId) throws HyracksDataException;
+ void notifyNodeFailure(String nodeId, InetSocketAddress replicaAddress) throws HyracksDataException;
/**
* Binds the coordinator to {@code cluserManager}.
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicaFailedMessage.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicaFailedMessage.java
new file mode 100644
index 0000000..2a39eaf
--- /dev/null
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicaFailedMessage.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.messaging;
+
+import java.net.InetSocketAddress;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+import org.apache.asterix.replication.api.ReplicationDestination;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.util.NetworkUtil;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class ReplicaFailedMessage implements INcAddressedMessage {
+
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOGGER = LogManager.getLogger();
+ private final InetSocketAddress replicaAddress;
+ private final Exception failure;
+
+ public ReplicaFailedMessage(InetSocketAddress replicaAddress, Exception failure) {
+ this.replicaAddress = replicaAddress;
+ this.failure = failure;
+ }
+
+ @Override
+ public void handle(INcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
+ LOGGER.info("replica at {} failed", replicaAddress);
+ ReplicationDestination dest = ReplicationDestination.at(NetworkUtil.ensureUnresolved(replicaAddress));
+ appCtx.getReplicationManager().notifyFailure(dest, failure);
+ }
+
+ @Override
+ public String toString() {
+ return ReplicaFailedMessage.class.getSimpleName();
+ }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index 2c05c53..bf7ca34 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.runtime.utils;
+import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -49,6 +50,7 @@
import org.apache.hyracks.control.common.application.ConfigManagerApplicationConfig;
import org.apache.hyracks.control.common.config.ConfigManager;
import org.apache.hyracks.control.common.controllers.NCConfig;
+import org.apache.hyracks.util.NetworkUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -91,13 +93,13 @@
@Override
public synchronized void notifyNodeFailure(String nodeId) throws HyracksException {
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("Removing configuration parameters for node id " + nodeId);
- }
+ LOGGER.info("Removing configuration parameters for node id {}", nodeId);
failedNodes.add(nodeId);
+ // before removing the node config, get its replica location
+ InetSocketAddress replicaAddress = getReplicaLocation(this, nodeId);
ncConfigMap.remove(nodeId);
pendingRemoval.remove(nodeId);
- lifecycleCoordinator.notifyNodeFailure(nodeId);
+ lifecycleCoordinator.notifyNodeFailure(nodeId, replicaAddress);
}
@Override
@@ -496,4 +498,17 @@
});
}
+ private static InetSocketAddress getReplicaLocation(IClusterStateManager csm, String nodeId) {
+ final Map<IOption, Object> ncConfig = csm.getActiveNcConfiguration().get(nodeId);
+ if (ncConfig == null) {
+ return null;
+ }
+ Object destIP = ncConfig.get(NCConfig.Option.REPLICATION_PUBLIC_ADDRESS);
+ Object destPort = ncConfig.get(NCConfig.Option.REPLICATION_PUBLIC_PORT);
+ if (destIP == null || destPort == null) {
+ return null;
+ }
+ String replicaLocation = NetworkUtil.toHostPort(String.valueOf(destIP), String.valueOf(destPort));
+ return NetworkUtil.parseInetSocketAddress(replicaLocation);
+ }
}