Fix indefinite wait time for replication Job ACK
Change-Id: I88d2d61270522c766441e16fd996ac975935594b
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1372
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
index 87be768..cd0179d 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
@@ -97,6 +97,7 @@
private static final Logger LOGGER = Logger.getLogger(ReplicationManager.class.getName());
private static final int INITIAL_REPLICATION_FACTOR = 1;
+ private static final int MAX_JOB_COMMIT_ACK_WAIT = 10000;
private final String nodeId;
private ExecutorService replicationListenerThreads;
private final Map<Integer, Set<String>> jobCommitAcks;
@@ -575,8 +576,16 @@
if (logsRepSockets != null) {
synchronized (jobCommitAcks) {
try {
- while (jobCommitAcks.size() != 0) {
- jobCommitAcks.wait();
+ long waitStartTime = System.currentTimeMillis();
+ while (!jobCommitAcks.isEmpty()) {
+ jobCommitAcks.wait(1000);
+ long waitDuration = System.currentTimeMillis() - waitStartTime;
+ if (waitDuration > MAX_JOB_COMMIT_ACK_WAIT) {
+ LOGGER.log(Level.SEVERE,
+ "Timeout before receving all job ACKs from replicas. Pending jobs ("
+ + jobCommitAcks.keySet().toString() + ")");
+ break;
+ }
}
} catch (InterruptedException e) {
if (LOGGER.isLoggable(Level.SEVERE)) {