[ASTERIXDB-3219][REPL] Add timeout to log replication
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Do not wait indefinitely for logs to be replicated.
Change-Id: I53b3a0d23514fce09082556e031f822dfe426a35
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17632
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
index 8a1cc65..1c614c9 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.transaction.management.service.logging;
+import java.util.concurrent.TimeUnit;
+
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.replication.IReplicationManager;
import org.apache.asterix.common.replication.IReplicationStrategy;
@@ -26,6 +28,7 @@
import org.apache.asterix.common.transactions.LogSource;
import org.apache.asterix.common.transactions.LogType;
import org.apache.hyracks.api.util.InvokeUtil;
+import org.apache.logging.log4j.Logger;
import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
import it.unimi.dsi.fastutil.longs.LongSet;
@@ -33,12 +36,16 @@
public class LogManagerWithReplication extends LogManager {
+ private static final Logger LOGGER = org.apache.logging.log4j.LogManager.getLogger();
private IReplicationManager replicationManager;
private IReplicationStrategy replicationStrategy;
private final LongSet replicatedTxn = LongSets.synchronize(new LongOpenHashSet());
+ private final long replicationTimeoutMillis;
public LogManagerWithReplication(ITransactionSubsystem txnSubsystem) {
super(txnSubsystem);
+ replicationTimeoutMillis = TimeUnit.SECONDS
+ .toMillis(txnSubsystem.getApplicationContext().getReplicationProperties().getReplicationTimeOut());
}
@SuppressWarnings("squid:S2445")
@@ -94,8 +101,18 @@
//wait for job Commit/Abort ACK from replicas
if (logRecord.isReplicate() && (logRecord.getLogType() == LogType.JOB_COMMIT
|| logRecord.getLogType() == LogType.ABORT)) {
+ long replicationTimeOut = replicationTimeoutMillis;
while (!logRecord.isReplicated()) {
- logRecord.wait();
+ if (replicationTimeOut <= 0) {
+ LOGGER.warn(
+ "{} ms passed without receiving acks for log {}; setting log as replicated due to timeout",
+ replicationTimeoutMillis, logRecord.getLogRecordForDisplay());
+ logRecord.setReplicated(true);
+ continue;
+ }
+ final long startTime = System.nanoTime();
+ logRecord.wait(replicationTimeOut);
+ replicationTimeOut -= TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
}
}
}