[NO ISSUE][REP] Add API to perform non-delta recovery for a replica
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Add an option to perform non-delta recovery for a replica.
Change-Id: Ib1837e8f1aefdd9e085ccfd62f1c6e6d4eb969e8
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/13223
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
index e265d03..3b10700 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
@@ -79,17 +79,17 @@
}
public synchronized void sync() {
- sync(true);
+ sync(true, true);
}
- public synchronized void sync(boolean register) {
+ public synchronized void sync(boolean register, boolean deltaRecovery) {
if (status == IN_SYNC || status == CATCHING_UP) {
return;
}
setStatus(CATCHING_UP);
appCtx.getThreadExecutor().execute(() -> {
try {
- new ReplicaSynchronizer(appCtx, this).sync(register);
+ new ReplicaSynchronizer(appCtx, this).sync(register, deltaRecovery);
setStatus(IN_SYNC);
} catch (Exception e) {
LOGGER.error(() -> "Failed to sync replica " + this, e);
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeletePartitionTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeletePartitionTask.java
new file mode 100644
index 0000000..90139df
--- /dev/null
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeletePartitionTask.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.messaging;
+
+import java.io.DataInput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.exceptions.ReplicationException;
+import org.apache.asterix.replication.api.IReplicaTask;
+import org.apache.asterix.replication.api.IReplicationWorker;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class DeletePartitionTask implements IReplicaTask {
+
+ private static final Logger LOGGER = LogManager.getLogger();
+ private final int partitionId;
+
+ public DeletePartitionTask(int partitionId) {
+ this.partitionId = partitionId;
+ }
+
+ @Override
+ public void perform(INcApplicationContext appCtx, IReplicationWorker worker) {
+ try {
+ PersistentLocalResourceRepository localResourceRepository =
+ (PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
+ LOGGER.warn("deleting storage partition {}", partitionId);
+ localResourceRepository.deletePartition(partitionId);
+ ReplicationProtocol.sendAck(worker.getChannel(), worker.getReusableBuffer());
+ } catch (Exception e) {
+ throw new ReplicationException(e);
+ }
+ }
+
+ @Override
+ public ReplicationProtocol.ReplicationRequestType getMessageType() {
+ return ReplicationProtocol.ReplicationRequestType.DELETE_PARTITION;
+ }
+
+ @Override
+ public void serialize(OutputStream out) throws HyracksDataException {
+ try {
+ DataOutputStream dos = new DataOutputStream(out);
+ dos.writeInt(partitionId);
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ public static DeletePartitionTask create(DataInput input) throws IOException {
+ return new DeletePartitionTask(input.readInt());
+ }
+}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListResponse.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListResponse.java
index a9921c6..1a5ba88 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListResponse.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListResponse.java
@@ -33,7 +33,7 @@
public class PartitionResourcesListResponse implements IReplicationMessage {
private final int partition;
- private Map<String, Long> partitionReplicatedResources;
+ private final Map<String, Long> partitionReplicatedResources;
private final List<String> files;
private final boolean owner;
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java
index ed2c93f..5b0c64e 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java
@@ -61,7 +61,8 @@
LSM_COMPONENT_MASK,
MARK_COMPONENT_VALID,
DROP_INDEX,
- REPLICATE_LOGS
+ REPLICATE_LOGS,
+ DELETE_PARTITION
}
private static final Map<Integer, ReplicationRequestType> TYPES = new HashMap<>();
@@ -177,6 +178,8 @@
return MarkComponentValidTask.create(dis);
case REPLICATE_LOGS:
return ReplicateLogsTask.create(dis);
+ case DELETE_PARTITION:
+ return DeletePartitionTask.create(dis);
default:
throw new IllegalStateException("Unrecognized replication message");
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
index faf3f54..735318d 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
@@ -35,6 +35,7 @@
import org.apache.asterix.common.storage.ResourceReference;
import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.asterix.replication.api.PartitionReplica;
+import org.apache.asterix.replication.messaging.DeletePartitionTask;
import org.apache.asterix.replication.messaging.PartitionResourcesListResponse;
import org.apache.asterix.replication.messaging.PartitionResourcesListTask;
import org.apache.asterix.replication.messaging.ReplicationProtocol;
@@ -55,14 +56,19 @@
private static final Logger LOGGER = LogManager.getLogger();
private final PartitionReplica replica;
private final INcApplicationContext appCtx;
+ private final boolean deltaRecovery;
- public ReplicaFilesSynchronizer(INcApplicationContext appCtx, PartitionReplica replica) {
+ public ReplicaFilesSynchronizer(INcApplicationContext appCtx, PartitionReplica replica, boolean deltaRecovery) {
this.appCtx = appCtx;
this.replica = replica;
+ this.deltaRecovery = deltaRecovery;
}
public void sync() throws IOException {
final int partition = replica.getIdentifier().getPartition();
+ if (!deltaRecovery) {
+ deletePartitionFromReplica(partition);
+ }
PartitionResourcesListResponse replicaResourceResponse = getReplicaFiles(partition);
Map<ResourceReference, Long> resourceReferenceLongMap = getValidReplicaResources(
replicaResourceResponse.getPartitionReplicatedResources(), replicaResourceResponse.isOwner());
@@ -82,6 +88,12 @@
deleteReplicaExtraFiles(replicaFiles, masterFiles);
}
+ private void deletePartitionFromReplica(int partitionId) throws IOException {
+ DeletePartitionTask deletePartitionTask = new DeletePartitionTask(partitionId);
+ ReplicationProtocol.sendTo(replica, deletePartitionTask);
+ ReplicationProtocol.waitForAck(replica);
+ }
+
private void deleteReplicaExtraFiles(Set<String> replicaFiles, Set<String> masterFiles) {
final List<String> replicaInvalidFiles =
replicaFiles.stream().filter(file -> !masterFiles.contains(file)).collect(Collectors.toList());
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
index 6030245..05e2e75 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
@@ -43,13 +43,13 @@
this.replica = replica;
}
- public void sync(boolean register) throws IOException {
+ public void sync(boolean register, boolean deltaRecovery) throws IOException {
synchronized (appCtx.getReplicaManager().getReplicaSyncLock()) {
final ICheckpointManager checkpointManager = appCtx.getTransactionSubsystem().getCheckpointManager();
try {
// suspend checkpointing datasets to prevent async IO operations while sync'ing replicas
checkpointManager.suspend();
- syncFiles();
+ syncFiles(deltaRecovery);
checkpointReplicaIndexes();
if (register) {
appCtx.getReplicationManager().register(replica);
@@ -60,8 +60,8 @@
}
}
- private void syncFiles() throws IOException {
- final ReplicaFilesSynchronizer fileSync = new ReplicaFilesSynchronizer(appCtx, replica);
+ private void syncFiles(boolean deltaRecovery) throws IOException {
+ final ReplicaFilesSynchronizer fileSync = new ReplicaFilesSynchronizer(appCtx, replica, deltaRecovery);
// flush replicated dataset to generate disk component for any remaining in-memory components
final IReplicationStrategy replStrategy = appCtx.getReplicationManager().getReplicationStrategy();
appCtx.getDatasetLifecycleManager().flushDataset(replStrategy);
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index ee5b16e..02f5772 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -695,4 +695,16 @@
FileReference resolve = ioManager.resolve(path.toString());
return resolve.getFile().toPath();
}
+
+ public void deletePartition(int partitionId) {
+ List<File> onDiskPartitions = getOnDiskPartitions();
+ for (File onDiskPartition : onDiskPartitions) {
+ int partitionNum = StoragePathUtil.getPartitionNumFromRelativePath(onDiskPartition.getAbsolutePath());
+ if (partitionNum == partitionId) {
+ LOGGER.warn("deleting partition {}", partitionNum);
+ FileUtils.deleteQuietly(onDiskPartition);
+ return;
+ }
+ }
+ }
}