[ASTERIXDB-3219][REPL] Add timeout to log replication

- user model changes: no
- storage format changes: no
- interface changes: no

Details:

- Do not wait indefinitely for logs to be replicated.

Change-Id: I53b3a0d23514fce09082556e031f822dfe426a35
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17632
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
index 8a1cc65..1c614c9 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.transaction.management.service.logging;
 
+import java.util.concurrent.TimeUnit;
+
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.replication.IReplicationManager;
 import org.apache.asterix.common.replication.IReplicationStrategy;
@@ -26,6 +28,7 @@
 import org.apache.asterix.common.transactions.LogSource;
 import org.apache.asterix.common.transactions.LogType;
 import org.apache.hyracks.api.util.InvokeUtil;
+import org.apache.logging.log4j.Logger;
 
 import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
 import it.unimi.dsi.fastutil.longs.LongSet;
@@ -33,12 +36,16 @@
 
 public class LogManagerWithReplication extends LogManager {
 
+    private static final Logger LOGGER = org.apache.logging.log4j.LogManager.getLogger();
     private IReplicationManager replicationManager;
     private IReplicationStrategy replicationStrategy;
     private final LongSet replicatedTxn = LongSets.synchronize(new LongOpenHashSet());
+    private final long replicationTimeoutMillis;
 
     public LogManagerWithReplication(ITransactionSubsystem txnSubsystem) {
         super(txnSubsystem);
+        replicationTimeoutMillis = TimeUnit.SECONDS
+                .toMillis(txnSubsystem.getApplicationContext().getReplicationProperties().getReplicationTimeOut());
     }
 
     @SuppressWarnings("squid:S2445")
@@ -94,8 +101,18 @@
                     //wait for job Commit/Abort ACK from replicas
                     if (logRecord.isReplicate() && (logRecord.getLogType() == LogType.JOB_COMMIT
                             || logRecord.getLogType() == LogType.ABORT)) {
+                        long replicationTimeOut = replicationTimeoutMillis;
                         while (!logRecord.isReplicated()) {
-                            logRecord.wait();
+                            if (replicationTimeOut <= 0) {
+                                LOGGER.warn(
+                                        "{} ms passed without receiving acks for log {}; setting log as replicated due to timeout",
+                                        replicationTimeoutMillis, logRecord.getLogRecordForDisplay());
+                                logRecord.setReplicated(true);
+                                continue;
+                            }
+                            final long startTime = System.nanoTime();
+                            logRecord.wait(replicationTimeOut);
+                            replicationTimeOut -= TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
                         }
                     }
                 }