[ASTERIXDB-2213] Guard against concurrent config updates
Change-Id: If7dffb1b502b9331118ad344e6f4ef0d625f4c8f
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2467
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Tested-by: Michael Blow <mblow@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
index ba4f82d..c60e673 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
@@ -24,6 +24,9 @@
import java.util.concurrent.TimeUnit;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.util.IOInterruptibleAction;
+import org.apache.hyracks.util.InterruptibleAction;
+import org.apache.hyracks.util.ThrowingAction;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -231,19 +234,4 @@
}
}
}
-
- @FunctionalInterface
- public interface InterruptibleAction {
- void run() throws InterruptedException;
- }
-
- @FunctionalInterface
- public interface ThrowingAction {
- void run() throws Exception; // NOSONAR
- }
-
- @FunctionalInterface
- public interface IOInterruptibleAction {
- void run() throws IOException, InterruptedException;
- }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/pom.xml b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/pom.xml
index e13e23b..2f9903a 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/pom.xml
@@ -83,5 +83,10 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java
index 9564922..8dd95e1 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java
@@ -31,6 +31,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
@@ -69,27 +70,32 @@
private static final Logger LOGGER = LogManager.getLogger();
private HashSet<IOption> registeredOptions = new HashSet<>();
- private HashMap<IOption, Object> definedMap = new HashMap<>();
- private HashMap<IOption, Object> defaultMap = new HashMap<>();
- private CompositeMap<IOption, Object> configurationMap =
- new CompositeMap<>(definedMap, defaultMap, new NoOpMapMutator());
+ @SuppressWarnings("squid:S1948") // HashMap is serializable, and therefore so is its synchronized map
+ private Map<IOption, Object> definedMap = Collections.synchronizedMap(new HashMap<>());
+ @SuppressWarnings("squid:S1948") // HashMap is serializable, and therefore so is its synchronized map
+ private Map<IOption, Object> defaultMap = Collections.synchronizedMap(new HashMap<>());
+ @SuppressWarnings("squid:S1948") // CompositeMap and his encapsulated maps are serializable, therefore so is this
+ private Map<IOption, Object> configurationMap =
+ Collections.synchronizedMap(new CompositeMap<>(definedMap, defaultMap, new NoOpMapMutator()));
private EnumMap<Section, Map<String, IOption>> sectionMap = new EnumMap<>(Section.class);
@SuppressWarnings("squid:S1948") // TreeMap is serializable, and therefore so is its synchronized map
private Map<String, Map<IOption, Object>> nodeSpecificDefinedMap = Collections.synchronizedMap(new TreeMap<>());
@SuppressWarnings("squid:S1948") // TreeMap is serializable, and therefore so is its synchronized map
private Map<String, Map<IOption, Object>> nodeSpecificDefaultMap = Collections.synchronizedMap(new TreeMap<>());
+ @SuppressWarnings("squid:S1948") // TreeMap is serializable, and therefore so is its synchronized map
+ private Map<String, Map<IOption, Object>> nodeEffectiveMaps = Collections.synchronizedMap(new HashMap<>());
private transient ArrayListValuedHashMap<IOption, IConfigSetter> optionSetters = new ArrayListValuedHashMap<>();
private final String[] args;
private ConfigManagerApplicationConfig appConfig = new ConfigManagerApplicationConfig(this);
private Set<String> allSections = new HashSet<>();
private transient Collection<Consumer<List<String>>> argListeners = new ArrayList<>();
private transient Collection<IOption> iniPointerOptions = new ArrayList<>();
- private transient Collection<Section> cmdLineSections = new ArrayList<>();;
+ private transient Collection<Section> cmdLineSections = new ArrayList<>();
private transient OptionHandlerFilter usageFilter;
private transient SortedMap<Integer, List<IConfigurator>> configurators = new TreeMap<>();
private boolean configured;
private String versionString = "version undefined";
- private transient Map<String, Set<Map.Entry<String, String>>> extensionOptions = new TreeMap();
+ private transient Map<String, Set<Map.Entry<String, String>>> extensionOptions = new TreeMap<>();
public ConfigManager() {
this(null);
@@ -171,15 +177,28 @@
if (node == null) {
return isDefault ? defaultMap : definedMap;
} else {
- ensureNode(node);
- return isDefault ? nodeSpecificDefaultMap.get(node) : nodeSpecificDefinedMap.get(node);
+ synchronized (this) {
+ ensureNode(node);
+ return isDefault ? nodeSpecificDefaultMap.get(node) : nodeSpecificDefinedMap.get(node);
+ }
}
}
- public void ensureNode(String nodeId) {
+ public synchronized void ensureNode(String nodeId) {
LOGGER.debug("ensureNode: " + nodeId);
- nodeSpecificDefinedMap.computeIfAbsent(nodeId, this::createNodeSpecificMap);
- nodeSpecificDefaultMap.computeIfAbsent(nodeId, this::createNodeSpecificMap);
+ Map<IOption, Object> nodeDefinedMap =
+ nodeSpecificDefinedMap.computeIfAbsent(nodeId, this::createNodeSpecificMap);
+ Map<IOption, Object> nodeDefaultMap =
+ nodeSpecificDefaultMap.computeIfAbsent(nodeId, this::createNodeSpecificMap);
+ nodeEffectiveMaps.computeIfAbsent(nodeId, id -> Collections
+ .synchronizedMap(compositeFrom(Stream.of(nodeDefinedMap, nodeDefaultMap, definedMap))));
+ }
+
+ public synchronized void forgetNode(String nodeId) {
+ LOGGER.debug("forgetNode: " + nodeId);
+ nodeSpecificDefinedMap.remove(nodeId);
+ nodeSpecificDefaultMap.remove(nodeId);
+ nodeEffectiveMaps.remove(nodeId);
}
private Map<IOption, Object> createNodeSpecificMap(String nodeId) {
@@ -236,7 +255,7 @@
invokeSetters(option, option.type().parse(String.valueOf(value)), null);
}
- private void invokeSetters(IOption option, Object value, String nodeId) {
+ private synchronized void invokeSetters(IOption option, Object value, String nodeId) {
optionSetters.get(option).forEach(setter -> setter.set(nodeId, value, false));
}
@@ -369,7 +388,7 @@
});
}
- private Object getOrDefault(Map<IOption, Object> map, IOption option, String nodeId) {
+ private synchronized Object getOrDefault(Map<IOption, Object> map, IOption option, String nodeId) {
if (map.containsKey(option)) {
return map.get(option);
} else {
@@ -426,7 +445,7 @@
@Override
public Set<IOption> getOptions(Section section) {
- return getSectionOptionMap(section).values().stream().collect(Collectors.toSet());
+ return new HashSet<>(getSectionOptionMap(section).values());
}
private Map<String, IOption> getSectionOptionMap(Section section) {
@@ -438,7 +457,7 @@
return Collections.unmodifiableList(new ArrayList<>(nodeSpecificDefinedMap.keySet()));
}
- public IApplicationConfig getNodeEffectiveConfig(String nodeId) {
+ public synchronized IApplicationConfig getNodeEffectiveConfig(String nodeId) {
ensureNode(nodeId);
final Map<IOption, Object> nodeMap = nodeSpecificDefaultMap.get(nodeId);
Map<IOption, Object> nodeEffectiveMap = getNodeEffectiveMap(nodeId);
@@ -454,15 +473,22 @@
};
}
- private Map<IOption, Object> getNodeEffectiveMap(String nodeId) {
+ private synchronized Map<IOption, Object> getNodeEffectiveMap(String nodeId) {
ensureNode(nodeId);
- return new CompositeMap<>(
- Stream.of(nodeSpecificDefinedMap.get(nodeId), nodeSpecificDefaultMap.get(nodeId), definedMap)
- .toArray(Map[]::new),
- new NoOpMapMutator());
+ return nodeEffectiveMaps.get(nodeId);
}
- public Ini toIni(boolean includeDefaults) {
+ private synchronized CompositeMap<IOption, Object> compositeFrom(Stream<Map<IOption, Object>> stream) {
+ List<Map<IOption, Object>> list = stream.collect(Collectors.toList());
+ CompositeMap<IOption, Object> map = new CompositeMap<>();
+ map.setMutator(new NoOpMapMutator());
+ for (ListIterator<Map<IOption, Object>> iter = list.listIterator(list.size()); iter.hasPrevious();) {
+ map.addComposited(iter.previous());
+ }
+ return map;
+ }
+
+ public synchronized Ini toIni(boolean includeDefaults) {
Ini ini = new Ini();
(includeDefaults ? configurationMap : definedMap).forEach((option, value) -> {
if (value != null) {
@@ -474,12 +500,10 @@
ensureNode(key);
Map<IOption, Object> nodeValueMap =
includeDefaults ? getNodeEffectiveMap(key) : nodeSpecificDefinedMap.get(key);
- synchronized (nodeValueMap) {
- for (Map.Entry<IOption, Object> entry : nodeValueMap.entrySet()) {
- if (entry.getValue() != null) {
- final IOption option = entry.getKey();
- ini.add(section, option.ini(), option.type().serializeToIni(entry.getValue()));
- }
+ for (Map.Entry<IOption, Object> entry : nodeValueMap.entrySet()) {
+ if (entry.getValue() != null) {
+ final IOption option = entry.getKey();
+ ini.add(section, option.ini(), option.type().serializeToIni(entry.getValue()));
}
}
}
@@ -492,7 +516,7 @@
set(null, option, value);
}
- public void set(String nodeId, IOption option, Object value) {
+ public synchronized void set(String nodeId, IOption option, Object value) {
invokeSetters(option, copyIfArray(value), nodeId);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/test/java/org/apache/hyracks/control/common/config/ConfigManagerTest.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/test/java/org/apache/hyracks/control/common/config/ConfigManagerTest.java
new file mode 100644
index 0000000..ef5f69c
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/test/java/org/apache/hyracks/control/common/config/ConfigManagerTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.common.config;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hyracks.api.config.IOption;
+import org.apache.hyracks.api.config.IOptionType;
+import org.apache.hyracks.api.config.Section;
+import org.apache.hyracks.util.Span;
+import org.junit.Test;
+
+public class ConfigManagerTest {
+
+ public enum Option implements IOption {
+ OPTION1,
+ OPTION2,
+ OPTION3,
+ OPTION4,
+ OPTION5;
+
+ @Override
+ public Section section() {
+ return Section.values()[this.ordinal() % Section.values().length];
+ }
+
+ @Override
+ public String description() {
+ return "Description for " + name();
+ }
+
+ @Override
+ public IOptionType type() {
+ return OptionTypes.INTEGER;
+ }
+
+ @Override
+ public Object defaultValue() {
+ return name() + " default value";
+ }
+ }
+
+ private static final Random RANDOM = new Random();
+
+ @Test
+ public void testConcurrentUpdates() throws Exception {
+ ConfigManager configManager = new ConfigManager();
+ configManager.register(Option.class);
+ ExecutorService executor = Executors.newCachedThreadPool();
+ List<Future<Void>> futures = new ArrayList<>();
+ IntStream.range(0, 20).forEach(a -> futures.add(executor.submit(() -> {
+ Span.start(30, TimeUnit.SECONDS).loopUntilExhausted(() -> {
+ String node = "node" + RANDOM.nextInt(5);
+ IntStream.range(0, 20).parallel().forEach(a1 -> {
+ if (RANDOM.nextBoolean()) {
+ configManager.set(node, randomOption(), RANDOM.nextInt());
+ } else {
+ configManager.getNodeEffectiveConfig(node).get(randomOption());
+ }
+ if (RANDOM.nextBoolean()) {
+ configManager.forgetNode(node);
+ }
+ });
+ });
+ return null;
+ })));
+ MutableObject<Exception> failure = new MutableObject<>();
+ futures.forEach(f -> {
+ try {
+ f.get();
+ } catch (Exception e) {
+ if (failure.getValue() == null) {
+ failure.setValue(e);
+ } else {
+ failure.getValue().addSuppressed(e);
+ }
+ }
+ });
+ if (failure.getValue() != null) {
+ throw failure.getValue();
+ }
+ }
+
+ private static Option randomOption() {
+ return Option.values()[RANDOM.nextInt(Option.values().length)];
+ }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IoRequest.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IoRequest.java
index 4235d19..8c81d41 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IoRequest.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IoRequest.java
@@ -24,7 +24,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.IAsyncRequest;
import org.apache.hyracks.api.io.IFileHandle;
-import org.apache.hyracks.api.util.InvokeUtil.InterruptibleAction;
+import org.apache.hyracks.util.InterruptibleAction;
public class IoRequest implements IAsyncRequest, InterruptibleAction {
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IOInterruptibleAction.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IOInterruptibleAction.java
new file mode 100644
index 0000000..1fca227
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/IOInterruptibleAction.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.util;
+
+import java.io.IOException;
+
+@FunctionalInterface
+public interface IOInterruptibleAction {
+ void run() throws IOException, InterruptedException;
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/InterruptibleAction.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/InterruptibleAction.java
new file mode 100644
index 0000000..8a3787e
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/InterruptibleAction.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.util;
+
+@FunctionalInterface
+public interface InterruptibleAction {
+ void run() throws InterruptedException;
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/Span.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/Span.java
new file mode 100644
index 0000000..95db604
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/Span.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.util;
+
+import java.util.concurrent.TimeUnit;
+
+public class Span {
+ private final long startNanos;
+ private final long spanNanos;
+
+ private Span(long span, TimeUnit unit) {
+ startNanos = System.nanoTime();
+ spanNanos = unit.toNanos(span);
+ }
+
+ public static Span start(long span, TimeUnit unit) {
+ return new Span(span, unit);
+ }
+
+ public boolean elapsed() {
+ return remaining(TimeUnit.NANOSECONDS) > spanNanos;
+ }
+
+ public long remaining(TimeUnit unit) {
+ return unit.convert(System.nanoTime() - startNanos, TimeUnit.NANOSECONDS);
+ }
+
+ public void sleep(long sleep, TimeUnit unit) throws InterruptedException {
+ TimeUnit.NANOSECONDS.sleep(Math.min(remaining(TimeUnit.NANOSECONDS), unit.toNanos(sleep)));
+ }
+
+ public void loopUntilExhausted(ThrowingAction action) throws Exception {
+ loopUntilExhausted(action, 0, TimeUnit.NANOSECONDS);
+ }
+
+ public void loopUntilExhausted(ThrowingAction action, long delay, TimeUnit delayUnit) throws Exception {
+ while (!elapsed()) {
+ action.run();
+ if (remaining(delayUnit) < delay) {
+ break;
+ }
+ delayUnit.sleep(delay);
+ }
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThrowingAction.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThrowingAction.java
new file mode 100644
index 0000000..d675179
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThrowingAction.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.util;
+
+@FunctionalInterface
+public interface ThrowingAction {
+ void run() throws Exception; // NOSONAR
+}