Merge commit '0561d10' from stabilization-f69489

Change-Id: I18115329ce7ab3501e7fcc9c6dc06d0e87c97688
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java
index ac652e3..6926dac 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java
@@ -65,4 +65,14 @@
      * @throws HyracksDataException
      */
     void checkpointIdleDatasets() throws HyracksDataException;
-}
+
+    /**
+     * Suspends checkpointing datasets
+     */
+    void suspend();
+
+    /**
+     * Resumes checkpointing datasets
+     */
+    void resume();
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
index 0f0b5bd..123709b 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
@@ -22,6 +22,7 @@
 
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.replication.IReplicationStrategy;
+import org.apache.asterix.common.transactions.ICheckpointManager;
 import org.apache.asterix.replication.api.PartitionReplica;
 import org.apache.asterix.replication.messaging.CheckpointPartitionIndexesTask;
 import org.apache.asterix.replication.messaging.ReplicationProtocol;
@@ -45,21 +46,25 @@
     public void sync() throws IOException {
         final Object syncLock = appCtx.getReplicaManager().getReplicaSyncLock();
         synchronized (syncLock) {
-            syncFiles();
-            checkpointReplicaIndexes();
-            appCtx.getReplicationManager().register(replica);
+            final ICheckpointManager checkpointManager = appCtx.getTransactionSubsystem().getCheckpointManager();
+            try {
+                // suspend checkpointing datasets to prevent async IO operations while sync'ing replicas
+                checkpointManager.suspend();
+                syncFiles();
+                checkpointReplicaIndexes();
+                appCtx.getReplicationManager().register(replica);
+            } finally {
+                checkpointManager.resume();
+            }
         }
     }
 
     private void syncFiles() throws IOException {
         final ReplicaFilesSynchronizer fileSync = new ReplicaFilesSynchronizer(appCtx, replica);
-        waitForReplicatedDatasetsIO();
-        fileSync.sync();
         // flush replicated dataset to generate disk component for any remaining in-memory components
         final IReplicationStrategy replStrategy = appCtx.getReplicationManager().getReplicationStrategy();
         appCtx.getDatasetLifecycleManager().flushDataset(replStrategy);
         waitForReplicatedDatasetsIO();
-        // sync any newly generated files
         fileSync.sync();
     }
 
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
index ba945be..43758d3 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
@@ -45,6 +45,7 @@
     private static final long NO_SECURED_LSN = -1L;
     private final long datasetCheckpointInterval;
     private final Map<TxnId, Long> securedLSNs;
+    private boolean suspended = false;
 
     public CheckpointManager(ITransactionSubsystem txnSubsystem, CheckpointProperties checkpointProperties) {
         super(txnSubsystem, checkpointProperties);
@@ -82,7 +83,7 @@
         }
         final long minFirstLSN = txnSubsystem.getRecoveryManager().getMinFirstLSN();
         boolean checkpointSucceeded = minFirstLSN >= checkpointTargetLSN;
-        if (!checkpointSucceeded) {
+        if (!checkpointSucceeded && !suspended) {
             // Flush datasets with indexes behind target checkpoint LSN
             final IDatasetLifecycleManager dlcm = txnSubsystem.getApplicationContext().getDatasetLifecycleManager();
             dlcm.asyncFlushMatchingIndexes(newLaggingDatasetPredicate(checkpointTargetLSN));
@@ -107,10 +108,23 @@
 
     @Override
     public synchronized void checkpointIdleDatasets() throws HyracksDataException {
+        if (suspended) {
+            return;
+        }
         final IDatasetLifecycleManager dlcm = txnSubsystem.getApplicationContext().getDatasetLifecycleManager();
         dlcm.asyncFlushMatchingIndexes(newIdleDatasetPredicate());
     }
 
+    @Override
+    public synchronized void suspend() {
+        suspended = true;
+    }
+
+    @Override
+    public synchronized void resume() {
+        suspended = false;
+    }
+
     private synchronized long getMinSecuredLSN() {
         return securedLSNs.isEmpty() ? NO_SECURED_LSN : Collections.min(securedLSNs.values());
     }