[NO ISSUE][REP] Notify nodes on replica failure

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:

- Whenever a node fails, notify remaining active nodes
  that the failed node's replica has failed.

Change-Id: I12551bd543cd4b664101e8f4d4f44f3124de3d54
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/9944
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
index 7576e0d..d22e9fc 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
@@ -18,6 +18,9 @@
  */
 package org.apache.asterix.app.replication;
 
+import static org.apache.hyracks.api.exceptions.ErrorCode.NODE_FAILED;
+
+import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -49,6 +52,7 @@
 import org.apache.asterix.common.replication.INcLifecycleCoordinator;
 import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
 import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.replication.messaging.ReplicaFailedMessage;
 import org.apache.hyracks.api.application.ICCServiceContext;
 import org.apache.hyracks.api.client.NodeStatus;
 import org.apache.hyracks.api.control.IGatekeeper;
@@ -80,12 +84,15 @@
     }
 
     @Override
-    public void notifyNodeFailure(String nodeId) throws HyracksDataException {
+    public void notifyNodeFailure(String nodeId, InetSocketAddress replicaAddress) throws HyracksDataException {
         pendingStartupCompletionNodes.remove(nodeId);
         clusterManager.updateNodeState(nodeId, false, null);
         if (nodeId.equals(metadataNodeId)) {
             clusterManager.updateMetadataNode(metadataNodeId, false);
         }
+        if (replicaAddress != null) {
+            notifyFailedReplica(clusterManager, nodeId, replicaAddress);
+        }
         clusterManager.refreshState();
     }
 
@@ -229,4 +236,19 @@
             throw HyracksDataException.create(e);
         }
     }
+
+    private void notifyFailedReplica(IClusterStateManager clusterManager, String nodeID,
+            InetSocketAddress replicaAddress) {
+        LOGGER.info("notify replica failure of nodeId {} at {}", nodeID, replicaAddress);
+        Set<String> ncs = clusterManager.getParticipantNodes(true);
+        ReplicaFailedMessage message =
+                new ReplicaFailedMessage(replicaAddress, HyracksDataException.create(NODE_FAILED, nodeID));
+        for (String nodeId : ncs) {
+            try {
+                messageBroker.sendApplicationMessageToNC(message, nodeId);
+            } catch (Exception e) {
+                LOGGER.info("failed to notify replica failure to node {}", nodeID);
+            }
+        }
+    }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/INcLifecycleCoordinator.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/INcLifecycleCoordinator.java
index 1a7c3c8..9a3b125 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/INcLifecycleCoordinator.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/INcLifecycleCoordinator.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.common.replication;
 
+import java.net.InetSocketAddress;
+
 import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
@@ -35,9 +37,10 @@
      * Defines the logic of a {@link INcLifecycleCoordinator} when a node leaves the cluster.
      *
      * @param nodeId
+     * @param replicaAddress
      * @throws HyracksDataException
      */
-    void notifyNodeFailure(String nodeId) throws HyracksDataException;
+    void notifyNodeFailure(String nodeId, InetSocketAddress replicaAddress) throws HyracksDataException;
 
     /**
      * Binds the coordinator to {@code cluserManager}.
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicaFailedMessage.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicaFailedMessage.java
new file mode 100644
index 0000000..2a39eaf
--- /dev/null
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicaFailedMessage.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.messaging;
+
+import java.net.InetSocketAddress;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+import org.apache.asterix.replication.api.ReplicationDestination;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.util.NetworkUtil;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class ReplicaFailedMessage implements INcAddressedMessage {
+
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOGGER = LogManager.getLogger();
+    private final InetSocketAddress replicaAddress;
+    private final Exception failure;
+
+    public ReplicaFailedMessage(InetSocketAddress replicaAddress, Exception failure) {
+        this.replicaAddress = replicaAddress;
+        this.failure = failure;
+    }
+
+    @Override
+    public void handle(INcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
+        LOGGER.info("replica at {} failed", replicaAddress);
+        ReplicationDestination dest = ReplicationDestination.at(NetworkUtil.ensureUnresolved(replicaAddress));
+        appCtx.getReplicationManager().notifyFailure(dest, failure);
+    }
+
+    @Override
+    public String toString() {
+        return ReplicaFailedMessage.class.getSimpleName();
+    }
+}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index 2c05c53..bf7ca34 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.runtime.utils;
 
+import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -49,6 +50,7 @@
 import org.apache.hyracks.control.common.application.ConfigManagerApplicationConfig;
 import org.apache.hyracks.control.common.config.ConfigManager;
 import org.apache.hyracks.control.common.controllers.NCConfig;
+import org.apache.hyracks.util.NetworkUtil;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -91,13 +93,13 @@
 
     @Override
     public synchronized void notifyNodeFailure(String nodeId) throws HyracksException {
-        if (LOGGER.isInfoEnabled()) {
-            LOGGER.info("Removing configuration parameters for node id " + nodeId);
-        }
+        LOGGER.info("Removing configuration parameters for node id {}", nodeId);
         failedNodes.add(nodeId);
+        // before removing the node config, get its replica location
+        InetSocketAddress replicaAddress = getReplicaLocation(this, nodeId);
         ncConfigMap.remove(nodeId);
         pendingRemoval.remove(nodeId);
-        lifecycleCoordinator.notifyNodeFailure(nodeId);
+        lifecycleCoordinator.notifyNodeFailure(nodeId, replicaAddress);
     }
 
     @Override
@@ -496,4 +498,17 @@
         });
     }
 
+    private static InetSocketAddress getReplicaLocation(IClusterStateManager csm, String nodeId) {
+        final Map<IOption, Object> ncConfig = csm.getActiveNcConfiguration().get(nodeId);
+        if (ncConfig == null) {
+            return null;
+        }
+        Object destIP = ncConfig.get(NCConfig.Option.REPLICATION_PUBLIC_ADDRESS);
+        Object destPort = ncConfig.get(NCConfig.Option.REPLICATION_PUBLIC_PORT);
+        if (destIP == null || destPort == null) {
+            return null;
+        }
+        String replicaLocation = NetworkUtil.toHostPort(String.valueOf(destIP), String.valueOf(destPort));
+        return NetworkUtil.parseInetSocketAddress(replicaLocation);
+    }
 }