Fix indefinite wait time for replication Job ACK

Change-Id: I88d2d61270522c766441e16fd996ac975935594b
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1372
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
index 87be768..cd0179d 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
@@ -97,6 +97,7 @@
 
     private static final Logger LOGGER = Logger.getLogger(ReplicationManager.class.getName());
     private static final int INITIAL_REPLICATION_FACTOR = 1;
+    private static final int MAX_JOB_COMMIT_ACK_WAIT = 10000;
     private final String nodeId;
     private ExecutorService replicationListenerThreads;
     private final Map<Integer, Set<String>> jobCommitAcks;
@@ -575,8 +576,16 @@
         if (logsRepSockets != null) {
             synchronized (jobCommitAcks) {
                 try {
-                    while (jobCommitAcks.size() != 0) {
-                        jobCommitAcks.wait();
+                    long waitStartTime = System.currentTimeMillis();
+                    while (!jobCommitAcks.isEmpty()) {
+                        jobCommitAcks.wait(1000);
+                        long waitDuration = System.currentTimeMillis() - waitStartTime;
+                        if (waitDuration > MAX_JOB_COMMIT_ACK_WAIT) {
+                            LOGGER.log(Level.SEVERE,
+                                    "Timeout before receving all job ACKs from replicas. Pending jobs ("
+                                            + jobCommitAcks.keySet().toString() + ")");
+                            break;
+                        }
                     }
                 } catch (InterruptedException e) {
                     if (LOGGER.isLoggable(Level.SEVERE)) {