Fixed race condition during ncMap lookup
Change-Id: I1bfbe712c100f48011a516c373ac8994028dc3dd
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1792
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/INodeResolver.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/INodeResolver.java
index 99ffdf1..c6e87bd 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/INodeResolver.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/INodeResolver.java
@@ -18,6 +18,10 @@
*/
package org.apache.asterix.external.api;
+import java.net.InetAddress;
+import java.util.Map;
+import java.util.Set;
+
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.AsterixException;
@@ -35,5 +39,6 @@
* @return resolved result (a node controller id)
* @throws AsterixException
*/
- String resolveNode(ICcApplicationContext appCtx, String value) throws AsterixException;
+ String resolveNode(ICcApplicationContext appCtx, String value, Map<InetAddress, Set<String>> ncMap, Set<String> ncs)
+ throws AsterixException;
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java
index 44b0b43..d7afa13 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java
@@ -18,13 +18,16 @@
*/
package org.apache.asterix.external.input.stream.factory;
+import java.net.InetAddress;
import java.nio.file.Path;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.logging.Level;
import java.util.logging.Logger;
+import java.util.stream.Collectors;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.AsterixException;
@@ -37,6 +40,7 @@
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.external.util.FileSystemWatcher;
import org.apache.asterix.external.util.NodeResolverFactory;
+import org.apache.asterix.runtime.utils.RuntimeUtils;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.api.application.IServiceContext;
import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -103,6 +107,8 @@
private void configureFileSplits(ICcApplicationContext appCtx, String[] splits) throws AsterixException {
INodeResolver resolver = getNodeResolver();
+ Map<InetAddress, Set<String>> ncMap = RuntimeUtils.getForcedNodeControllerMap(appCtx);
+ Set<String> ncs = ncMap.values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
inputFileSplits = new UnmanagedFileSplit[splits.length];
String node;
String path;
@@ -114,7 +120,7 @@
throw new AsterixException(
"Invalid path: " + splitPath + "\nUsage- path=\"Host://Absolute File Path\"");
}
- node = resolver.resolveNode(appCtx, trimmedValue.split(":")[0]);
+ node = resolver.resolveNode(appCtx, trimmedValue.split(":")[0], ncMap, ncs);
path = trimmedValue.split("://")[1];
inputFileSplits[count++] = new UnmanagedFileSplit(node, path);
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/IdentityResolver.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/IdentityResolver.java
index 9a4ddff..651833f 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/IdentityResolver.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/IdentityResolver.java
@@ -18,6 +18,10 @@
*/
package org.apache.asterix.external.util;
+import java.net.InetAddress;
+import java.util.Map;
+import java.util.Set;
+
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.external.api.INodeResolver;
@@ -28,7 +32,8 @@
public class IdentityResolver implements INodeResolver {
@Override
- public String resolveNode(ICcApplicationContext appCtx, String value) {
+ public String resolveNode(ICcApplicationContext appCtx, String value, Map<InetAddress, Set<String>> ncMap,
+ Set<String> ncs) {
return value;
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/NodeResolver.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/NodeResolver.java
index 5b15e9e..c180dea 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/NodeResolver.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/NodeResolver.java
@@ -20,10 +20,7 @@
import java.net.InetAddress;
import java.net.UnknownHostException;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
@@ -31,57 +28,30 @@
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.external.api.INodeResolver;
-import org.apache.asterix.runtime.utils.RuntimeUtils;
/**
* Resolves a value (DNS/IP Address) or a (Node Controller Id) to the id of a Node Controller running at the location.
*/
public class NodeResolver implements INodeResolver {
//TODO: change this call and replace by calling AsterixClusterProperties
- private static final Random random = new Random();
- private static final Map<InetAddress, Set<String>> ncMap = new HashMap<>();
- private static final Set<String> ncs = new HashSet<>();
+ private final Random random = new Random();
@Override
- public String resolveNode(ICcApplicationContext appCtx, String value) throws AsterixException {
+ public String resolveNode(ICcApplicationContext appCtx, String value, Map<InetAddress, Set<String>> ncMap,
+ Set<String> ncs) throws AsterixException {
+ if (ncs.contains(value)) {
+ return value;
+ }
+ InetAddress ipAddress = null;
try {
- if (ncMap.isEmpty()) {
- NodeResolver.updateNCs(appCtx);
- }
- if (ncs.contains(value)) {
- return value;
- } else {
- NodeResolver.updateNCs(appCtx);
- if (ncs.contains(value)) {
- return value;
- }
- }
- InetAddress ipAddress = null;
- try {
- ipAddress = InetAddress.getByName(value);
- } catch (UnknownHostException e) {
- throw new AsterixException(ErrorCode.NODE_RESOLVER_UNABLE_RESOLVE_HOST, e, value);
- }
- Set<String> nodeControllers = ncMap.get(ipAddress);
- if (nodeControllers == null || nodeControllers.isEmpty()) {
- throw new AsterixException(ErrorCode.NODE_RESOLVER_NO_NODE_CONTROLLERS, value);
- }
- return nodeControllers.toArray(new String[] {})[random.nextInt(nodeControllers.size())];
- } catch (Exception e) {
- throw new AsterixException(e);
+ ipAddress = InetAddress.getByName(value);
+ } catch (UnknownHostException e) {
+ throw new AsterixException(ErrorCode.NODE_RESOLVER_UNABLE_RESOLVE_HOST, e, value);
}
- }
-
- private static void updateNCs(ICcApplicationContext appCtx) throws Exception {
- synchronized (ncMap) {
- ncMap.clear();
- RuntimeUtils.getNodeControllerMap(appCtx, ncMap);
- synchronized (ncs) {
- ncs.clear();
- for (Entry<InetAddress, Set<String>> entry : ncMap.entrySet()) {
- ncs.addAll(entry.getValue());
- }
- }
+ Set<String> nodeControllers = ncMap.get(ipAddress);
+ if (nodeControllers == null || nodeControllers.isEmpty()) {
+ throw new AsterixException(ErrorCode.NODE_RESOLVER_NO_NODE_CONTROLLERS, value);
}
+ return nodeControllers.toArray(new String[] {})[random.nextInt(nodeControllers.size())];
}
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RuntimeUtils.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RuntimeUtils.java
index 85e93b8..6970af5 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RuntimeUtils.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RuntimeUtils.java
@@ -60,10 +60,10 @@
return map;
}
- public static void getNodeControllerMap(ICcApplicationContext appCtx, Map<InetAddress, Set<String>> map) {
+ public static Map<InetAddress, Set<String>> getForcedNodeControllerMap(ICcApplicationContext appCtx) {
ClusterControllerService ccs = (ClusterControllerService) appCtx.getServiceContext().getControllerService();
INodeManager nodeManager = ccs.getNodeManager();
- map.putAll(nodeManager.getIpAddressNodeNameMap());
+ return nodeManager.getIpAddressNodeNameMap();
}
public static JobSpecification createJobSpecification(ICcApplicationContext appCtx) {