Fixed race condition during ncMap lookup

Change-Id: I1bfbe712c100f48011a516c373ac8994028dc3dd
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1792
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/INodeResolver.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/INodeResolver.java
index 99ffdf1..c6e87bd 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/INodeResolver.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/INodeResolver.java
@@ -18,6 +18,10 @@
  */
 package org.apache.asterix.external.api;
 
+import java.net.InetAddress;
+import java.util.Map;
+import java.util.Set;
+
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
 
@@ -35,5 +39,6 @@
      * @return resolved result (a node controller id)
      * @throws AsterixException
      */
-    String resolveNode(ICcApplicationContext appCtx, String value) throws AsterixException;
+    String resolveNode(ICcApplicationContext appCtx, String value, Map<InetAddress, Set<String>> ncMap, Set<String> ncs)
+            throws AsterixException;
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java
index 44b0b43..d7afa13 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java
@@ -18,13 +18,16 @@
  */
 package org.apache.asterix.external.input.stream.factory;
 
+import java.net.InetAddress;
 import java.nio.file.Path;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.logging.Level;
 import java.util.logging.Logger;
+import java.util.stream.Collectors;
 
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
@@ -37,6 +40,7 @@
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.external.util.FileSystemWatcher;
 import org.apache.asterix.external.util.NodeResolverFactory;
+import org.apache.asterix.runtime.utils.RuntimeUtils;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.api.application.IServiceContext;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -103,6 +107,8 @@
 
     private void configureFileSplits(ICcApplicationContext appCtx, String[] splits) throws AsterixException {
         INodeResolver resolver = getNodeResolver();
+        Map<InetAddress, Set<String>> ncMap = RuntimeUtils.getForcedNodeControllerMap(appCtx);
+        Set<String> ncs = ncMap.values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
         inputFileSplits = new UnmanagedFileSplit[splits.length];
         String node;
         String path;
@@ -114,7 +120,7 @@
                 throw new AsterixException(
                         "Invalid path: " + splitPath + "\nUsage- path=\"Host://Absolute File Path\"");
             }
-            node = resolver.resolveNode(appCtx, trimmedValue.split(":")[0]);
+            node = resolver.resolveNode(appCtx, trimmedValue.split(":")[0], ncMap, ncs);
             path = trimmedValue.split("://")[1];
             inputFileSplits[count++] = new UnmanagedFileSplit(node, path);
         }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/IdentityResolver.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/IdentityResolver.java
index 9a4ddff..651833f 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/IdentityResolver.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/IdentityResolver.java
@@ -18,6 +18,10 @@
  */
 package org.apache.asterix.external.util;
 
+import java.net.InetAddress;
+import java.util.Map;
+import java.util.Set;
+
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.external.api.INodeResolver;
 
@@ -28,7 +32,8 @@
 public class IdentityResolver implements INodeResolver {
 
     @Override
-    public String resolveNode(ICcApplicationContext appCtx, String value) {
+    public String resolveNode(ICcApplicationContext appCtx, String value, Map<InetAddress, Set<String>> ncMap,
+            Set<String> ncs) {
         return value;
     }
 
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/NodeResolver.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/NodeResolver.java
index 5b15e9e..c180dea 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/NodeResolver.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/NodeResolver.java
@@ -20,10 +20,7 @@
 
 import java.net.InetAddress;
 import java.net.UnknownHostException;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Random;
 import java.util.Set;
 
@@ -31,57 +28,30 @@
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.external.api.INodeResolver;
-import org.apache.asterix.runtime.utils.RuntimeUtils;
 
 /**
  * Resolves a value (DNS/IP Address) or a (Node Controller Id) to the id of a Node Controller running at the location.
  */
 public class NodeResolver implements INodeResolver {
     //TODO: change this call and replace by calling AsterixClusterProperties
-    private static final Random random = new Random();
-    private static final Map<InetAddress, Set<String>> ncMap = new HashMap<>();
-    private static final Set<String> ncs = new HashSet<>();
+    private final Random random = new Random();
 
     @Override
-    public String resolveNode(ICcApplicationContext appCtx, String value) throws AsterixException {
+    public String resolveNode(ICcApplicationContext appCtx, String value, Map<InetAddress, Set<String>> ncMap,
+            Set<String> ncs) throws AsterixException {
+        if (ncs.contains(value)) {
+            return value;
+        }
+        InetAddress ipAddress = null;
         try {
-            if (ncMap.isEmpty()) {
-                NodeResolver.updateNCs(appCtx);
-            }
-            if (ncs.contains(value)) {
-                return value;
-            } else {
-                NodeResolver.updateNCs(appCtx);
-                if (ncs.contains(value)) {
-                    return value;
-                }
-            }
-            InetAddress ipAddress = null;
-            try {
-                ipAddress = InetAddress.getByName(value);
-            } catch (UnknownHostException e) {
-                throw new AsterixException(ErrorCode.NODE_RESOLVER_UNABLE_RESOLVE_HOST, e, value);
-            }
-            Set<String> nodeControllers = ncMap.get(ipAddress);
-            if (nodeControllers == null || nodeControllers.isEmpty()) {
-                throw new AsterixException(ErrorCode.NODE_RESOLVER_NO_NODE_CONTROLLERS, value);
-            }
-            return nodeControllers.toArray(new String[] {})[random.nextInt(nodeControllers.size())];
-        } catch (Exception e) {
-            throw new AsterixException(e);
+            ipAddress = InetAddress.getByName(value);
+        } catch (UnknownHostException e) {
+            throw new AsterixException(ErrorCode.NODE_RESOLVER_UNABLE_RESOLVE_HOST, e, value);
         }
-    }
-
-    private static void updateNCs(ICcApplicationContext appCtx) throws Exception {
-        synchronized (ncMap) {
-            ncMap.clear();
-            RuntimeUtils.getNodeControllerMap(appCtx, ncMap);
-            synchronized (ncs) {
-                ncs.clear();
-                for (Entry<InetAddress, Set<String>> entry : ncMap.entrySet()) {
-                    ncs.addAll(entry.getValue());
-                }
-            }
+        Set<String> nodeControllers = ncMap.get(ipAddress);
+        if (nodeControllers == null || nodeControllers.isEmpty()) {
+            throw new AsterixException(ErrorCode.NODE_RESOLVER_NO_NODE_CONTROLLERS, value);
         }
+        return nodeControllers.toArray(new String[] {})[random.nextInt(nodeControllers.size())];
     }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RuntimeUtils.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RuntimeUtils.java
index 85e93b8..6970af5 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RuntimeUtils.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RuntimeUtils.java
@@ -60,10 +60,10 @@
         return map;
     }
 
-    public static void getNodeControllerMap(ICcApplicationContext appCtx, Map<InetAddress, Set<String>> map) {
+    public static Map<InetAddress, Set<String>> getForcedNodeControllerMap(ICcApplicationContext appCtx) {
         ClusterControllerService ccs = (ClusterControllerService) appCtx.getServiceContext().getControllerService();
         INodeManager nodeManager = ccs.getNodeManager();
-        map.putAll(nodeManager.getIpAddressNodeNameMap());
+        return nodeManager.getIpAddressNodeNameMap();
     }
 
     public static JobSpecification createJobSpecification(ICcApplicationContext appCtx) {