[ASTERIXDB-2213] Guard against concurrent config updates

Change-Id: If7dffb1b502b9331118ad344e6f4ef0d625f4c8f
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2467
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Tested-by: Michael Blow <mblow@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
index ba4f82d..c60e673 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
@@ -24,6 +24,9 @@
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.util.IOInterruptibleAction;
+import org.apache.hyracks.util.InterruptibleAction;
+import org.apache.hyracks.util.ThrowingAction;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -231,19 +234,4 @@
             }
         }
     }
-
-    @FunctionalInterface
-    public interface InterruptibleAction {
-        void run() throws InterruptedException;
-    }
-
-    @FunctionalInterface
-    public interface ThrowingAction {
-        void run() throws Exception; // NOSONAR
-    }
-
-    @FunctionalInterface
-    public interface IOInterruptibleAction {
-        void run() throws IOException, InterruptedException;
-    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/pom.xml b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/pom.xml
index e13e23b..2f9903a 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/pom.xml
@@ -83,5 +83,10 @@
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-lang3</artifactId>
     </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java
index 9564922..8dd95e1 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java
@@ -31,6 +31,7 @@
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.ListIterator;
 import java.util.Map;
 import java.util.Set;
 import java.util.SortedMap;
@@ -69,27 +70,32 @@
     private static final Logger LOGGER = LogManager.getLogger();
 
     private HashSet<IOption> registeredOptions = new HashSet<>();
-    private HashMap<IOption, Object> definedMap = new HashMap<>();
-    private HashMap<IOption, Object> defaultMap = new HashMap<>();
-    private CompositeMap<IOption, Object> configurationMap =
-            new CompositeMap<>(definedMap, defaultMap, new NoOpMapMutator());
+    @SuppressWarnings("squid:S1948") // HashMap is serializable, and therefore so is its synchronized map
+    private Map<IOption, Object> definedMap = Collections.synchronizedMap(new HashMap<>());
+    @SuppressWarnings("squid:S1948") // HashMap is serializable, and therefore so is its synchronized map
+    private Map<IOption, Object> defaultMap = Collections.synchronizedMap(new HashMap<>());
+    @SuppressWarnings("squid:S1948") // CompositeMap and his encapsulated maps are serializable, therefore so is this
+    private Map<IOption, Object> configurationMap =
+            Collections.synchronizedMap(new CompositeMap<>(definedMap, defaultMap, new NoOpMapMutator()));
     private EnumMap<Section, Map<String, IOption>> sectionMap = new EnumMap<>(Section.class);
     @SuppressWarnings("squid:S1948") // TreeMap is serializable, and therefore so is its synchronized map
     private Map<String, Map<IOption, Object>> nodeSpecificDefinedMap = Collections.synchronizedMap(new TreeMap<>());
     @SuppressWarnings("squid:S1948") // TreeMap is serializable, and therefore so is its synchronized map
     private Map<String, Map<IOption, Object>> nodeSpecificDefaultMap = Collections.synchronizedMap(new TreeMap<>());
+    @SuppressWarnings("squid:S1948") // TreeMap is serializable, and therefore so is its synchronized map
+    private Map<String, Map<IOption, Object>> nodeEffectiveMaps = Collections.synchronizedMap(new HashMap<>());
     private transient ArrayListValuedHashMap<IOption, IConfigSetter> optionSetters = new ArrayListValuedHashMap<>();
     private final String[] args;
     private ConfigManagerApplicationConfig appConfig = new ConfigManagerApplicationConfig(this);
     private Set<String> allSections = new HashSet<>();
     private transient Collection<Consumer<List<String>>> argListeners = new ArrayList<>();
     private transient Collection<IOption> iniPointerOptions = new ArrayList<>();
-    private transient Collection<Section> cmdLineSections = new ArrayList<>();;
+    private transient Collection<Section> cmdLineSections = new ArrayList<>();
     private transient OptionHandlerFilter usageFilter;
     private transient SortedMap<Integer, List<IConfigurator>> configurators = new TreeMap<>();
     private boolean configured;
     private String versionString = "version undefined";
-    private transient Map<String, Set<Map.Entry<String, String>>> extensionOptions = new TreeMap();
+    private transient Map<String, Set<Map.Entry<String, String>>> extensionOptions = new TreeMap<>();
 
     public ConfigManager() {
         this(null);
@@ -171,15 +177,28 @@
         if (node == null) {
             return isDefault ? defaultMap : definedMap;
         } else {
-            ensureNode(node);
-            return isDefault ? nodeSpecificDefaultMap.get(node) : nodeSpecificDefinedMap.get(node);
+            synchronized (this) {
+                ensureNode(node);
+                return isDefault ? nodeSpecificDefaultMap.get(node) : nodeSpecificDefinedMap.get(node);
+            }
         }
     }
 
-    public void ensureNode(String nodeId) {
+    public synchronized void ensureNode(String nodeId) {
         LOGGER.debug("ensureNode: " + nodeId);
-        nodeSpecificDefinedMap.computeIfAbsent(nodeId, this::createNodeSpecificMap);
-        nodeSpecificDefaultMap.computeIfAbsent(nodeId, this::createNodeSpecificMap);
+        Map<IOption, Object> nodeDefinedMap =
+                nodeSpecificDefinedMap.computeIfAbsent(nodeId, this::createNodeSpecificMap);
+        Map<IOption, Object> nodeDefaultMap =
+                nodeSpecificDefaultMap.computeIfAbsent(nodeId, this::createNodeSpecificMap);
+        nodeEffectiveMaps.computeIfAbsent(nodeId, id -> Collections
+                .synchronizedMap(compositeFrom(Stream.of(nodeDefinedMap, nodeDefaultMap, definedMap))));
+    }
+
+    public synchronized void forgetNode(String nodeId) {
+        LOGGER.debug("forgetNode: " + nodeId);
+        nodeSpecificDefinedMap.remove(nodeId);
+        nodeSpecificDefaultMap.remove(nodeId);
+        nodeEffectiveMaps.remove(nodeId);
     }
 
     private Map<IOption, Object> createNodeSpecificMap(String nodeId) {
@@ -236,7 +255,7 @@
         invokeSetters(option, option.type().parse(String.valueOf(value)), null);
     }
 
-    private void invokeSetters(IOption option, Object value, String nodeId) {
+    private synchronized void invokeSetters(IOption option, Object value, String nodeId) {
         optionSetters.get(option).forEach(setter -> setter.set(nodeId, value, false));
     }
 
@@ -369,7 +388,7 @@
         });
     }
 
-    private Object getOrDefault(Map<IOption, Object> map, IOption option, String nodeId) {
+    private synchronized Object getOrDefault(Map<IOption, Object> map, IOption option, String nodeId) {
         if (map.containsKey(option)) {
             return map.get(option);
         } else {
@@ -426,7 +445,7 @@
 
     @Override
     public Set<IOption> getOptions(Section section) {
-        return getSectionOptionMap(section).values().stream().collect(Collectors.toSet());
+        return new HashSet<>(getSectionOptionMap(section).values());
     }
 
     private Map<String, IOption> getSectionOptionMap(Section section) {
@@ -438,7 +457,7 @@
         return Collections.unmodifiableList(new ArrayList<>(nodeSpecificDefinedMap.keySet()));
     }
 
-    public IApplicationConfig getNodeEffectiveConfig(String nodeId) {
+    public synchronized IApplicationConfig getNodeEffectiveConfig(String nodeId) {
         ensureNode(nodeId);
         final Map<IOption, Object> nodeMap = nodeSpecificDefaultMap.get(nodeId);
         Map<IOption, Object> nodeEffectiveMap = getNodeEffectiveMap(nodeId);
@@ -454,15 +473,22 @@
         };
     }
 
-    private Map<IOption, Object> getNodeEffectiveMap(String nodeId) {
+    private synchronized Map<IOption, Object> getNodeEffectiveMap(String nodeId) {
         ensureNode(nodeId);
-        return new CompositeMap<>(
-                Stream.of(nodeSpecificDefinedMap.get(nodeId), nodeSpecificDefaultMap.get(nodeId), definedMap)
-                        .toArray(Map[]::new),
-                new NoOpMapMutator());
+        return nodeEffectiveMaps.get(nodeId);
     }
 
-    public Ini toIni(boolean includeDefaults) {
+    private synchronized CompositeMap<IOption, Object> compositeFrom(Stream<Map<IOption, Object>> stream) {
+        List<Map<IOption, Object>> list = stream.collect(Collectors.toList());
+        CompositeMap<IOption, Object> map = new CompositeMap<>();
+        map.setMutator(new NoOpMapMutator());
+        for (ListIterator<Map<IOption, Object>> iter = list.listIterator(list.size()); iter.hasPrevious();) {
+            map.addComposited(iter.previous());
+        }
+        return map;
+    }
+
+    public synchronized Ini toIni(boolean includeDefaults) {
         Ini ini = new Ini();
         (includeDefaults ? configurationMap : definedMap).forEach((option, value) -> {
             if (value != null) {
@@ -474,12 +500,10 @@
             ensureNode(key);
             Map<IOption, Object> nodeValueMap =
                     includeDefaults ? getNodeEffectiveMap(key) : nodeSpecificDefinedMap.get(key);
-            synchronized (nodeValueMap) {
-                for (Map.Entry<IOption, Object> entry : nodeValueMap.entrySet()) {
-                    if (entry.getValue() != null) {
-                        final IOption option = entry.getKey();
-                        ini.add(section, option.ini(), option.type().serializeToIni(entry.getValue()));
-                    }
+            for (Map.Entry<IOption, Object> entry : nodeValueMap.entrySet()) {
+                if (entry.getValue() != null) {
+                    final IOption option = entry.getKey();
+                    ini.add(section, option.ini(), option.type().serializeToIni(entry.getValue()));
                 }
             }
         }
@@ -492,7 +516,7 @@
         set(null, option, value);
     }
 
-    public void set(String nodeId, IOption option, Object value) {
+    public synchronized void set(String nodeId, IOption option, Object value) {
         invokeSetters(option, copyIfArray(value), nodeId);
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/test/java/org/apache/hyracks/control/common/config/ConfigManagerTest.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/test/java/org/apache/hyracks/control/common/config/ConfigManagerTest.java
new file mode 100644
index 0000000..ef5f69c
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/test/java/org/apache/hyracks/control/common/config/ConfigManagerTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.common.config;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hyracks.api.config.IOption;
+import org.apache.hyracks.api.config.IOptionType;
+import org.apache.hyracks.api.config.Section;
+import org.apache.hyracks.util.Span;
+import org.junit.Test;
+
+public class ConfigManagerTest {
+
+    public enum Option implements IOption {
+        OPTION1,
+        OPTION2,
+        OPTION3,
+        OPTION4,
+        OPTION5;
+
+        @Override
+        public Section section() {
+            return Section.values()[this.ordinal() % Section.values().length];
+        }
+
+        @Override
+        public String description() {
+            return "Description for " + name();
+        }
+
+        @Override
+        public IOptionType type() {
+            return OptionTypes.INTEGER;
+        }
+
+        @Override
+        public Object defaultValue() {
+            return name() + " default value";
+        }
+    }
+
+    private static final Random RANDOM = new Random();
+
+    @Test
+    public void testConcurrentUpdates() throws Exception {
+        ConfigManager configManager = new ConfigManager();
+        configManager.register(Option.class);
+        ExecutorService executor = Executors.newCachedThreadPool();
+        List<Future<Void>> futures = new ArrayList<>();
+        IntStream.range(0, 20).forEach(a -> futures.add(executor.submit(() -> {
+            Span.start(30, TimeUnit.SECONDS).loopUntilExhausted(() -> {
+                String node = "node" + RANDOM.nextInt(5);
+                IntStream.range(0, 20).parallel().forEach(a1 -> {
+                    if (RANDOM.nextBoolean()) {
+                        configManager.set(node, randomOption(), RANDOM.nextInt());
+                    } else {
+                        configManager.getNodeEffectiveConfig(node).get(randomOption());
+                    }
+                    if (RANDOM.nextBoolean()) {
+                        configManager.forgetNode(node);
+                    }
+                });
+            });
+            return null;
+        })));
+        MutableObject<Exception> failure = new MutableObject<>();
+        futures.forEach(f -> {
+            try {
+                f.get();
+            } catch (Exception e) {
+                if (failure.getValue() == null) {
+                    failure.setValue(e);
+                } else {
+                    failure.getValue().addSuppressed(e);
+                }
+            }
+        });
+        if (failure.getValue() != null) {
+            throw failure.getValue();
+        }
+    }
+
+    private static Option randomOption() {
+        return Option.values()[RANDOM.nextInt(Option.values().length)];
+    }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IoRequest.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IoRequest.java
index 4235d19..8c81d41 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IoRequest.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IoRequest.java
@@ -24,7 +24,7 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IAsyncRequest;
 import org.apache.hyracks.api.io.IFileHandle;
-import org.apache.hyracks.api.util.InvokeUtil.InterruptibleAction;
+import org.apache.hyracks.util.InterruptibleAction;
 
 public class IoRequest implements IAsyncRequest, InterruptibleAction {
 
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IOInterruptibleAction.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IOInterruptibleAction.java
new file mode 100644
index 0000000..1fca227
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IOInterruptibleAction.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.util;
+
+import java.io.IOException;
+
+@FunctionalInterface
+public interface IOInterruptibleAction {
+    void run() throws IOException, InterruptedException;
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/InterruptibleAction.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/InterruptibleAction.java
new file mode 100644
index 0000000..8a3787e
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/InterruptibleAction.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.util;
+
+@FunctionalInterface
+public interface InterruptibleAction {
+    void run() throws InterruptedException;
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/Span.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/Span.java
new file mode 100644
index 0000000..95db604
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/Span.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.util;
+
+import java.util.concurrent.TimeUnit;
+
+public class Span {
+    private final long startNanos;
+    private final long spanNanos;
+
+    private Span(long span, TimeUnit unit) {
+        startNanos = System.nanoTime();
+        spanNanos = unit.toNanos(span);
+    }
+
+    public static Span start(long span, TimeUnit unit) {
+        return new Span(span, unit);
+    }
+
+    public boolean elapsed() {
+        return remaining(TimeUnit.NANOSECONDS) > spanNanos;
+    }
+
+    public long remaining(TimeUnit unit) {
+        return unit.convert(System.nanoTime() - startNanos, TimeUnit.NANOSECONDS);
+    }
+
+    public void sleep(long sleep, TimeUnit unit) throws InterruptedException {
+        TimeUnit.NANOSECONDS.sleep(Math.min(remaining(TimeUnit.NANOSECONDS), unit.toNanos(sleep)));
+    }
+
+    public void loopUntilExhausted(ThrowingAction action) throws Exception {
+        loopUntilExhausted(action, 0, TimeUnit.NANOSECONDS);
+    }
+
+    public void loopUntilExhausted(ThrowingAction action, long delay, TimeUnit delayUnit) throws Exception {
+        while (!elapsed()) {
+            action.run();
+            if (remaining(delayUnit) < delay) {
+                break;
+            }
+            delayUnit.sleep(delay);
+        }
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThrowingAction.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThrowingAction.java
new file mode 100644
index 0000000..d675179
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThrowingAction.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.util;
+
+@FunctionalInterface
+public interface ThrowingAction {
+    void run() throws Exception; // NOSONAR
+}