[NO ISSUE][REP] Sync replicas for different partitions concurrently
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- Get sync lock per partition.
Change-Id: I6693fbca51d8e13f9740941f797f63fae7b1d2d0
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/13244
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
index f6de92d..7c4b59c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
@@ -54,17 +54,18 @@
/**
* the partitions to which the current node is master
*/
- private final Set<Integer> partitions = new HashSet<>();
+ private final Map<Integer, Object> partitions = new HashMap<>();
/**
* current replicas
*/
private final Map<ReplicaIdentifier, PartitionReplica> replicas = new HashMap<>();
- private final Object replicaSyncLock = new Object();
private final Set<Integer> nodeOwnedPartitions = new HashSet<>();
public ReplicaManager(INcApplicationContext appCtx, Set<Integer> partitions) {
this.appCtx = appCtx;
- this.partitions.addAll(partitions);
+ for (Integer partition : partitions) {
+ this.partitions.put(partition, new Object());
+ }
setNodeOwnedPartitions(appCtx);
}
@@ -77,7 +78,7 @@
LOGGER.warn("Ignoring request to add replica. Node is not ACTIVE yet. Current status: {}", nodeStatus);
return;
}
- if (!partitions.contains(id.getPartition())) {
+ if (!partitions.containsKey(id.getPartition())) {
throw new IllegalStateException(
"This node is not the current master of partition(" + id.getPartition() + ")");
}
@@ -96,7 +97,6 @@
}
PartitionReplica replica = replicas.remove(id);
appCtx.getReplicationManager().unregister(replica);
-
}
@Override
@@ -112,18 +112,20 @@
@Override
public synchronized Set<Integer> getPartitions() {
- return Collections.unmodifiableSet(partitions);
+ return Collections.unmodifiableSet(partitions.keySet());
}
@Override
public synchronized void setActivePartitions(Set<Integer> activePartitions) {
partitions.clear();
- partitions.addAll(activePartitions);
+ for (Integer partition : activePartitions) {
+ partitions.put(partition, new Object());
+ }
}
@Override
public synchronized void promote(int partition) throws HyracksDataException {
- if (partitions.contains(partition)) {
+ if (partitions.containsKey(partition)) {
return;
}
LOGGER.warn("promoting partition {}", partition);
@@ -132,12 +134,12 @@
localResourceRepository.cleanup(partition);
final IRecoveryManager recoveryManager = appCtx.getTransactionSubsystem().getRecoveryManager();
recoveryManager.replayReplicaPartitionLogs(Stream.of(partition).collect(Collectors.toSet()), true);
- partitions.add(partition);
+ partitions.put(partition, new Object());
}
@Override
public synchronized void release(int partition) throws HyracksDataException {
- if (!partitions.contains(partition)) {
+ if (!partitions.containsKey(partition)) {
return;
}
closePartitionResources(partition);
@@ -149,8 +151,12 @@
}
@Override
- public Object getReplicaSyncLock() {
- return replicaSyncLock;
+ public synchronized Object getPartitionSyncLock(int partition) {
+ Object syncLock = partitions.get(partition);
+ if (syncLock == null) {
+ throw new IllegalStateException("partition " + partition + " is not active on this node");
+ }
+ return syncLock;
}
@Override
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
index b3546cc..f0a4a7c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.app.replication.message;
+import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -73,7 +74,8 @@
}
NcLocalCounters localCounter = success ? NcLocalCounters.collect(getCcId(),
(NodeControllerService) appCtx.getServiceContext().getControllerService()) : null;
- Set<Integer> nodeActivePartitions = appCtx.getReplicaManager().getPartitions();
+ // wrap the returned partitions in a hash set to make it serializable
+ Set<Integer> nodeActivePartitions = new HashSet<>(appCtx.getReplicaManager().getPartitions());
NCLifecycleTaskReportMessage result =
new NCLifecycleTaskReportMessage(nodeId, success, localCounter, nodeActivePartitions);
result.setException(exception);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
index 88d3113..a4d56ce 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
@@ -80,12 +80,14 @@
void release(int partition) throws HyracksDataException;
/**
- * A lock that can be used to ensure a single replica is being synchronized at a time
+ * A lock that can be used to ensure a single partition replica is being synchronized at a time
* by this {@link IReplicaManager}
*
+ * @param partition partition
+ *
* @return the synchronization lock
*/
- Object getReplicaSyncLock();
+ Object getPartitionSyncLock(int partition);
/**
* Gets the partition replicas matching {@code id}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
index 3b10700..c49bb7b 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
@@ -24,6 +24,8 @@
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.exceptions.ReplicationException;
@@ -57,6 +59,7 @@
private ByteBuffer reusbaleBuf;
private PartitionReplicaStatus status = DISCONNECTED;
private ISocketChannel sc;
+ private Future<?> syncFuture;
public PartitionReplica(ReplicaIdentifier id, INcApplicationContext appCtx) {
this.id = id;
@@ -87,7 +90,8 @@
return;
}
setStatus(CATCHING_UP);
- appCtx.getThreadExecutor().execute(() -> {
+ ExecutorService threadExecutor = (ExecutorService) appCtx.getThreadExecutor();
+ syncFuture = threadExecutor.submit(() -> {
try {
new ReplicaSynchronizer(appCtx, this).sync(register, deltaRecovery);
setStatus(IN_SYNC);
@@ -100,6 +104,13 @@
});
}
+ public synchronized void abort() {
+ if (syncFuture != null) {
+ syncFuture.cancel(true);
+ }
+ syncFuture = null;
+ }
+
public synchronized ISocketChannel getChannel() {
try {
if (!NetworkingUtil.isHealthy(sc)) {
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
index 05e2e75..2434686 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
@@ -44,7 +44,8 @@
}
public void sync(boolean register, boolean deltaRecovery) throws IOException {
- synchronized (appCtx.getReplicaManager().getReplicaSyncLock()) {
+ Object partitionLock = appCtx.getReplicaManager().getPartitionSyncLock(replica.getIdentifier().getPartition());
+ synchronized (partitionLock) {
final ICheckpointManager checkpointManager = appCtx.getTransactionSubsystem().getCheckpointManager();
try {
// suspend checkpointing datasets to prevent async IO operations while sync'ing replicas
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
index 6582670..f09248f 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
@@ -46,7 +46,7 @@
private static final long NO_SECURED_LSN = -1L;
private final long datasetCheckpointIntervalNanos;
private final Map<TxnId, Long> securedLSNs;
- private boolean suspended = false;
+ private int suspendCount = 0;
public CheckpointManager(ITransactionSubsystem txnSubsystem, CheckpointProperties checkpointProperties) {
super(txnSubsystem, checkpointProperties);
@@ -84,7 +84,7 @@
}
final long minFirstLSN = txnSubsystem.getRecoveryManager().getMinFirstLSN();
boolean checkpointSucceeded = minFirstLSN >= checkpointTargetLSN;
- if (!checkpointSucceeded && !suspended) {
+ if (!checkpointSucceeded && !isSuspended()) {
// Flush datasets with indexes behind target checkpoint LSN
final IDatasetLifecycleManager dlcm = txnSubsystem.getApplicationContext().getDatasetLifecycleManager();
dlcm.asyncFlushMatchingIndexes(newLaggingDatasetPredicate(checkpointTargetLSN));
@@ -109,21 +109,25 @@
@Override
public synchronized void checkpointIdleDatasets() throws HyracksDataException {
- if (suspended) {
+ if (isSuspended()) {
return;
}
final IDatasetLifecycleManager dlcm = txnSubsystem.getApplicationContext().getDatasetLifecycleManager();
dlcm.asyncFlushMatchingIndexes(newIdleDatasetPredicate());
}
+ private synchronized boolean isSuspended() {
+ return suspendCount != 0;
+ }
+
@Override
public synchronized void suspend() {
- suspended = true;
+ suspendCount++;
}
@Override
public synchronized void resume() {
- suspended = false;
+ suspendCount--;
}
private synchronized long getMinSecuredLSN() {