[NO ISSUE][*DB][CLUS] Fix intermittent deadlock on nc join
Eliminate synchronization in ResourceIdManager to avoid deadlocks
between it and ClusterStateManager
Change-Id: I691fc06da3f5641904e02ece8ae1b5fe3fc286a3
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2062
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java
index afa626d..0bb862d 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java
@@ -18,8 +18,8 @@
*/
package org.apache.asterix.runtime.transaction;
-import java.util.HashSet;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.asterix.common.cluster.IClusterStateManager;
@@ -30,8 +30,7 @@
private final IClusterStateManager csm;
private final AtomicLong globalResourceId = new AtomicLong();
- private volatile Set<String> reportedNodes = new HashSet<>();
- private volatile boolean allReported = false;
+ private Set<String> reportedNodes = ConcurrentHashMap.newKeySet();
public ResourceIdManager(IClusterStateManager csm) {
this.csm = csm;
@@ -39,36 +38,19 @@
@Override
public long createResourceId() {
- if (!allReported) {
- synchronized (this) {
- if (!allReported) {
- if (reportedNodes.size() < csm.getNumberOfNodes()) {
- return -1;
- } else {
- reportedNodes = null;
- allReported = true;
- }
- }
- }
- }
- return globalResourceId.incrementAndGet();
+ return csm.isClusterActive() ? globalResourceId.incrementAndGet() : -1;
}
@Override
- public synchronized boolean reported(String nodeId) {
- return allReported || reportedNodes.contains(nodeId);
+ public boolean reported(String nodeId) {
+ return reportedNodes.contains(nodeId);
}
@Override
- public synchronized void report(String nodeId, long maxResourceId) throws HyracksDataException {
- if (!allReported) {
- globalResourceId.set(Math.max(maxResourceId, globalResourceId.get()));
- reportedNodes.add(nodeId);
- if (reportedNodes.size() == csm.getNumberOfNodes()) {
- reportedNodes = null;
- allReported = true;
- csm.refreshState();
- }
+ public void report(String nodeId, long maxResourceId) throws HyracksDataException {
+ globalResourceId.updateAndGet(prev -> Math.max(maxResourceId, prev));
+ if (reportedNodes.add(nodeId)) {
+ csm.refreshState();
}
}