diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
index f6de92d..7c4b59c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
@@ -54,17 +54,18 @@
     /**
      * the partitions to which the current node is master
      */
-    private final Set<Integer> partitions = new HashSet<>();
+    private final Map<Integer, Object> partitions = new HashMap<>();
     /**
      * current replicas
      */
     private final Map<ReplicaIdentifier, PartitionReplica> replicas = new HashMap<>();
-    private final Object replicaSyncLock = new Object();
     private final Set<Integer> nodeOwnedPartitions = new HashSet<>();
 
     public ReplicaManager(INcApplicationContext appCtx, Set<Integer> partitions) {
         this.appCtx = appCtx;
-        this.partitions.addAll(partitions);
+        for (Integer partition : partitions) {
+            this.partitions.put(partition, new Object());
+        }
         setNodeOwnedPartitions(appCtx);
     }
 
@@ -77,7 +78,7 @@
             LOGGER.warn("Ignoring request to add replica. Node is not ACTIVE yet. Current status: {}", nodeStatus);
             return;
         }
-        if (!partitions.contains(id.getPartition())) {
+        if (!partitions.containsKey(id.getPartition())) {
             throw new IllegalStateException(
                     "This node is not the current master of partition(" + id.getPartition() + ")");
         }
@@ -96,7 +97,6 @@
         }
         PartitionReplica replica = replicas.remove(id);
         appCtx.getReplicationManager().unregister(replica);
-
     }
 
     @Override
@@ -112,18 +112,20 @@
 
     @Override
     public synchronized Set<Integer> getPartitions() {
-        return Collections.unmodifiableSet(partitions);
+        return Collections.unmodifiableSet(partitions.keySet());
     }
 
     @Override
     public synchronized void setActivePartitions(Set<Integer> activePartitions) {
         partitions.clear();
-        partitions.addAll(activePartitions);
+        for (Integer partition : activePartitions) {
+            partitions.put(partition, new Object());
+        }
     }
 
     @Override
     public synchronized void promote(int partition) throws HyracksDataException {
-        if (partitions.contains(partition)) {
+        if (partitions.containsKey(partition)) {
             return;
         }
         LOGGER.warn("promoting partition {}", partition);
@@ -132,12 +134,12 @@
         localResourceRepository.cleanup(partition);
         final IRecoveryManager recoveryManager = appCtx.getTransactionSubsystem().getRecoveryManager();
         recoveryManager.replayReplicaPartitionLogs(Stream.of(partition).collect(Collectors.toSet()), true);
-        partitions.add(partition);
+        partitions.put(partition, new Object());
     }
 
     @Override
     public synchronized void release(int partition) throws HyracksDataException {
-        if (!partitions.contains(partition)) {
+        if (!partitions.containsKey(partition)) {
             return;
         }
         closePartitionResources(partition);
@@ -149,8 +151,12 @@
     }
 
     @Override
-    public Object getReplicaSyncLock() {
-        return replicaSyncLock;
+    public synchronized Object getPartitionSyncLock(int partition) {
+        Object syncLock = partitions.get(partition);
+        if (syncLock == null) {
+            throw new IllegalStateException("partition " + partition + " is not active on this node");
+        }
+        return syncLock;
     }
 
     @Override
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
index b3546cc..f0a4a7c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.app.replication.message;
 
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
@@ -73,7 +74,8 @@
             }
             NcLocalCounters localCounter = success ? NcLocalCounters.collect(getCcId(),
                     (NodeControllerService) appCtx.getServiceContext().getControllerService()) : null;
-            Set<Integer> nodeActivePartitions = appCtx.getReplicaManager().getPartitions();
+            // wrap the returned partitions in a hash set to make it serializable
+            Set<Integer> nodeActivePartitions = new HashSet<>(appCtx.getReplicaManager().getPartitions());
             NCLifecycleTaskReportMessage result =
                     new NCLifecycleTaskReportMessage(nodeId, success, localCounter, nodeActivePartitions);
             result.setException(exception);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
index 88d3113..a4d56ce 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
@@ -80,12 +80,14 @@
     void release(int partition) throws HyracksDataException;
 
     /**
-     * A lock that can be used to ensure a single replica is being synchronized at a time
+     * A lock that can be used to ensure a single partition replica is being synchronized at a time
      * by this {@link IReplicaManager}
      *
+     * @param partition partition
+     *
      * @return the synchronization lock
      */
-    Object getReplicaSyncLock();
+    Object getPartitionSyncLock(int partition);
 
     /**
      * Gets the partition replicas matching {@code id}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
index 3b10700..c49bb7b 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
@@ -24,6 +24,8 @@
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.exceptions.ReplicationException;
@@ -57,6 +59,7 @@
     private ByteBuffer reusbaleBuf;
     private PartitionReplicaStatus status = DISCONNECTED;
     private ISocketChannel sc;
+    private Future<?> syncFuture;
 
     public PartitionReplica(ReplicaIdentifier id, INcApplicationContext appCtx) {
         this.id = id;
@@ -87,7 +90,8 @@
             return;
         }
         setStatus(CATCHING_UP);
-        appCtx.getThreadExecutor().execute(() -> {
+        ExecutorService threadExecutor = (ExecutorService) appCtx.getThreadExecutor();
+        syncFuture = threadExecutor.submit(() -> {
             try {
                 new ReplicaSynchronizer(appCtx, this).sync(register, deltaRecovery);
                 setStatus(IN_SYNC);
@@ -100,6 +104,13 @@
         });
     }
 
+    public synchronized void abort() {
+        if (syncFuture != null) {
+            syncFuture.cancel(true);
+        }
+        syncFuture = null;
+    }
+
     public synchronized ISocketChannel getChannel() {
         try {
             if (!NetworkingUtil.isHealthy(sc)) {
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
index 05e2e75..2434686 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
@@ -44,7 +44,8 @@
     }
 
     public void sync(boolean register, boolean deltaRecovery) throws IOException {
-        synchronized (appCtx.getReplicaManager().getReplicaSyncLock()) {
+        Object partitionLock = appCtx.getReplicaManager().getPartitionSyncLock(replica.getIdentifier().getPartition());
+        synchronized (partitionLock) {
             final ICheckpointManager checkpointManager = appCtx.getTransactionSubsystem().getCheckpointManager();
             try {
                 // suspend checkpointing datasets to prevent async IO operations while sync'ing replicas
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
index 6582670..f09248f 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
@@ -46,7 +46,7 @@
     private static final long NO_SECURED_LSN = -1L;
     private final long datasetCheckpointIntervalNanos;
     private final Map<TxnId, Long> securedLSNs;
-    private boolean suspended = false;
+    private int suspendCount = 0;
 
     public CheckpointManager(ITransactionSubsystem txnSubsystem, CheckpointProperties checkpointProperties) {
         super(txnSubsystem, checkpointProperties);
@@ -84,7 +84,7 @@
         }
         final long minFirstLSN = txnSubsystem.getRecoveryManager().getMinFirstLSN();
         boolean checkpointSucceeded = minFirstLSN >= checkpointTargetLSN;
-        if (!checkpointSucceeded && !suspended) {
+        if (!checkpointSucceeded && !isSuspended()) {
             // Flush datasets with indexes behind target checkpoint LSN
             final IDatasetLifecycleManager dlcm = txnSubsystem.getApplicationContext().getDatasetLifecycleManager();
             dlcm.asyncFlushMatchingIndexes(newLaggingDatasetPredicate(checkpointTargetLSN));
@@ -109,21 +109,25 @@
 
     @Override
     public synchronized void checkpointIdleDatasets() throws HyracksDataException {
-        if (suspended) {
+        if (isSuspended()) {
             return;
         }
         final IDatasetLifecycleManager dlcm = txnSubsystem.getApplicationContext().getDatasetLifecycleManager();
         dlcm.asyncFlushMatchingIndexes(newIdleDatasetPredicate());
     }
 
+    private synchronized boolean isSuspended() {
+        return suspendCount != 0;
+    }
+
     @Override
     public synchronized void suspend() {
-        suspended = true;
+        suspendCount++;
     }
 
     @Override
     public synchronized void resume() {
-        suspended = false;
+        suspendCount--;
     }
 
     private synchronized long getMinSecuredLSN() {
