[NO ISSUE][REPL] Suspend Dataset Checkpointing on Replica Sync
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- Before synchronizing replicas, stop datasets checkpointing to
prevent new files from being generated due to async IO operations
triggered by checkpointing.
- Instead of sync'ing current files to replicas then scheduling a flush
and sync'ing any newly generated files, just flush datasets before
the initial sync then sync all the files in one go.
Change-Id: I058fd48bc0fb89a1e16448ce516c3410bb4d681d
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3469
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java
index 36cea55..954e399 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java
@@ -58,4 +58,14 @@
* @param id
*/
void completed(TxnId id);
+
+ /**
+ * Suspends checkpointing datasets
+ */
+ void suspend();
+
+ /**
+ * Resumes checkpointing datasets
+ */
+ void resume();
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
index 0f0b5bd..123709b 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
@@ -22,6 +22,7 @@
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.replication.IReplicationStrategy;
+import org.apache.asterix.common.transactions.ICheckpointManager;
import org.apache.asterix.replication.api.PartitionReplica;
import org.apache.asterix.replication.messaging.CheckpointPartitionIndexesTask;
import org.apache.asterix.replication.messaging.ReplicationProtocol;
@@ -45,21 +46,25 @@
public void sync() throws IOException {
final Object syncLock = appCtx.getReplicaManager().getReplicaSyncLock();
synchronized (syncLock) {
- syncFiles();
- checkpointReplicaIndexes();
- appCtx.getReplicationManager().register(replica);
+ final ICheckpointManager checkpointManager = appCtx.getTransactionSubsystem().getCheckpointManager();
+ try {
+ // suspend checkpointing datasets to prevent async IO operations while sync'ing replicas
+ checkpointManager.suspend();
+ syncFiles();
+ checkpointReplicaIndexes();
+ appCtx.getReplicationManager().register(replica);
+ } finally {
+ checkpointManager.resume();
+ }
}
}
private void syncFiles() throws IOException {
final ReplicaFilesSynchronizer fileSync = new ReplicaFilesSynchronizer(appCtx, replica);
- waitForReplicatedDatasetsIO();
- fileSync.sync();
// flush replicated dataset to generate disk component for any remaining in-memory components
final IReplicationStrategy replStrategy = appCtx.getReplicationManager().getReplicationStrategy();
appCtx.getDatasetLifecycleManager().flushDataset(replStrategy);
waitForReplicatedDatasetsIO();
- // sync any newly generated files
fileSync.sync();
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
index ce523db..b85742e 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
@@ -18,6 +18,10 @@
*/
package org.apache.asterix.transaction.management.service.recovery;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.transactions.CheckpointProperties;
import org.apache.asterix.common.transactions.ICheckpointManager;
@@ -27,10 +31,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
/**
* An implementation of {@link ICheckpointManager} that defines the logic
* of checkpoints.
@@ -40,6 +40,7 @@
private static final Logger LOGGER = LogManager.getLogger();
private static final long NO_SECURED_LSN = -1L;
private final Map<TxnId, Long> securedLSNs;
+ private boolean suspended = false;
public CheckpointManager(ITransactionSubsystem txnSubsystem, CheckpointProperties checkpointProperties) {
super(txnSubsystem, checkpointProperties);
@@ -76,7 +77,7 @@
}
final long minFirstLSN = txnSubsystem.getRecoveryManager().getMinFirstLSN();
boolean checkpointSucceeded = minFirstLSN >= checkpointTargetLSN;
- if (!checkpointSucceeded) {
+ if (!checkpointSucceeded && !suspended) {
// Flush datasets with indexes behind target checkpoint LSN
IDatasetLifecycleManager datasetLifecycleManager =
txnSubsystem.getApplicationContext().getDatasetLifecycleManager();
@@ -100,6 +101,16 @@
securedLSNs.remove(id);
}
+ @Override
+ public synchronized void suspend() {
+ suspended = true;
+ }
+
+ @Override
+ public synchronized void resume() {
+ suspended = false;
+ }
+
private synchronized long getMinSecuredLSN() {
return securedLSNs.isEmpty() ? NO_SECURED_LSN : Collections.min(securedLSNs.values());
}