Merge commit '0561d10' from stabilization-f69489
Change-Id: I18115329ce7ab3501e7fcc9c6dc06d0e87c97688
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java
index ac652e3..6926dac 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java
@@ -65,4 +65,14 @@
* @throws HyracksDataException
*/
void checkpointIdleDatasets() throws HyracksDataException;
-}
+
+ /**
+ * Suspends checkpointing datasets
+ */
+ void suspend();
+
+ /**
+ * Resumes checkpointing datasets
+ */
+ void resume();
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
index 0f0b5bd..123709b 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
@@ -22,6 +22,7 @@
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.replication.IReplicationStrategy;
+import org.apache.asterix.common.transactions.ICheckpointManager;
import org.apache.asterix.replication.api.PartitionReplica;
import org.apache.asterix.replication.messaging.CheckpointPartitionIndexesTask;
import org.apache.asterix.replication.messaging.ReplicationProtocol;
@@ -45,21 +46,25 @@
public void sync() throws IOException {
final Object syncLock = appCtx.getReplicaManager().getReplicaSyncLock();
synchronized (syncLock) {
- syncFiles();
- checkpointReplicaIndexes();
- appCtx.getReplicationManager().register(replica);
+ final ICheckpointManager checkpointManager = appCtx.getTransactionSubsystem().getCheckpointManager();
+ try {
+ // suspend checkpointing datasets to prevent async IO operations while sync'ing replicas
+ checkpointManager.suspend();
+ syncFiles();
+ checkpointReplicaIndexes();
+ appCtx.getReplicationManager().register(replica);
+ } finally {
+ checkpointManager.resume();
+ }
}
}
private void syncFiles() throws IOException {
final ReplicaFilesSynchronizer fileSync = new ReplicaFilesSynchronizer(appCtx, replica);
- waitForReplicatedDatasetsIO();
- fileSync.sync();
// flush replicated dataset to generate disk component for any remaining in-memory components
final IReplicationStrategy replStrategy = appCtx.getReplicationManager().getReplicationStrategy();
appCtx.getDatasetLifecycleManager().flushDataset(replStrategy);
waitForReplicatedDatasetsIO();
- // sync any newly generated files
fileSync.sync();
}
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
index ba945be..43758d3 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
@@ -45,6 +45,7 @@
private static final long NO_SECURED_LSN = -1L;
private final long datasetCheckpointInterval;
private final Map<TxnId, Long> securedLSNs;
+ private boolean suspended = false;
public CheckpointManager(ITransactionSubsystem txnSubsystem, CheckpointProperties checkpointProperties) {
super(txnSubsystem, checkpointProperties);
@@ -82,7 +83,7 @@
}
final long minFirstLSN = txnSubsystem.getRecoveryManager().getMinFirstLSN();
boolean checkpointSucceeded = minFirstLSN >= checkpointTargetLSN;
- if (!checkpointSucceeded) {
+ if (!checkpointSucceeded && !suspended) {
// Flush datasets with indexes behind target checkpoint LSN
final IDatasetLifecycleManager dlcm = txnSubsystem.getApplicationContext().getDatasetLifecycleManager();
dlcm.asyncFlushMatchingIndexes(newLaggingDatasetPredicate(checkpointTargetLSN));
@@ -107,10 +108,23 @@
@Override
public synchronized void checkpointIdleDatasets() throws HyracksDataException {
+ if (suspended) {
+ return;
+ }
final IDatasetLifecycleManager dlcm = txnSubsystem.getApplicationContext().getDatasetLifecycleManager();
dlcm.asyncFlushMatchingIndexes(newIdleDatasetPredicate());
}
+ @Override
+ public synchronized void suspend() {
+ suspended = true;
+ }
+
+ @Override
+ public synchronized void resume() {
+ suspended = false;
+ }
+
private synchronized long getMinSecuredLSN() {
return securedLSNs.isEmpty() ? NO_SECURED_LSN : Collections.min(securedLSNs.values());
}