[NO ISSUE][REPL] Ignore LSNs of Partially Replicated Indexes
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- When determining low watermark, ignore LSN of replicated
indexes with no checkpoints.
- Guard logs in case of unexpected min LSN read failures.
- Ensure only one replica is synchronized at a time to prevent
possible merge operations from deleting files being synchronized
to another replica concurrently.
- Ensure index metadata files are replicated first to allow
replicas to find any existing files in case of re-synchronization.
- Ensure replication channel is closed on replication failures.
Change-Id: I9ca08da29bdd8fc4406f2df7e6eb32601caf9388
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2534
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 4b14a9c..d4e652d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -50,6 +50,7 @@
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
import org.apache.asterix.common.storage.DatasetResourceReference;
+import org.apache.asterix.common.storage.IIndexCheckpointManager;
import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
import org.apache.asterix.common.transactions.Checkpoint;
import org.apache.asterix.common.transactions.ICheckpointManager;
@@ -93,6 +94,7 @@
public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
public static final boolean IS_DEBUG_MODE = false;
+ private static final long SMALLEST_POSSIBLE_LSN = 0;
private static final Logger LOGGER = org.apache.logging.log4j.LogManager.getLogger();
private final ITransactionSubsystem txnSubsystem;
private final LogManager logMgr;
@@ -499,8 +501,17 @@
return dsResource.getPartition() == partition;
}).values().stream().map(DatasetResourceReference::of).collect(Collectors.toList());
for (DatasetResourceReference indexRef : partitionResources) {
- long remoteIndexMaxLSN = idxCheckpointMgrProvider.get(indexRef).getLowWatermark();
- minRemoteLSN = Math.min(minRemoteLSN, remoteIndexMaxLSN);
+ try {
+ final IIndexCheckpointManager idxCheckpointMgr = idxCheckpointMgrProvider.get(indexRef);
+ if (idxCheckpointMgr.getCheckpointCount() > 0) {
+ long remoteIndexMaxLSN = idxCheckpointMgrProvider.get(indexRef).getLowWatermark();
+ minRemoteLSN = Math.min(minRemoteLSN, remoteIndexMaxLSN);
+ }
+ } catch (Exception e) {
+ LOGGER.warn("Failed to get min LSN of resource {}", indexRef, e);
+ // ensure no logs will be deleted in case of unexpected failures
+ return SMALLEST_POSSIBLE_LSN;
+ }
}
}
return minRemoteLSN;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
index c821c56..5c5ce93 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
@@ -57,6 +57,7 @@
* current replicas
*/
private final Map<ReplicaIdentifier, PartitionReplica> replicas = new HashMap<>();
+ private final Object replicaSyncLock = new Object();
public ReplicaManager(INcApplicationContext appCtx, Set<Integer> partitions) {
this.appCtx = appCtx;
@@ -126,6 +127,11 @@
partitions.remove(partition);
}
+ @Override
+ public Object getReplicaSyncLock() {
+ return replicaSyncLock;
+ }
+
private void closePartitionResources(int partition) throws HyracksDataException {
final PersistentLocalResourceRepository resourceRepository =
(PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
index b2deb1e..1b8ec53 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
@@ -71,4 +71,12 @@
* @throws HyracksDataException
*/
void release(int partition) throws HyracksDataException;
+
+ /**
+ * A lock that can be used to ensure a single replica is being synchronized at a time
+ * by this {@link IReplicaManager}
+ *
+ * @return the synchronization lock
+ */
+ Object getReplicaSyncLock();
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
index bfac451..5c324b1 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
@@ -109,13 +109,12 @@
public synchronized void close() {
try {
- if (sc != null && sc.isOpen()) {
- ReplicationProtocol.sendGoodbye(sc);
- sc.close();
- sc = null;
+ if (sc != null) {
+ sendGoodBye();
+ NetworkUtil.closeQuietly(sc);
}
- } catch (IOException e) {
- LOGGER.warn("Failed to close channel", e);
+ } finally {
+ sc = null;
}
}
@@ -166,4 +165,12 @@
LOGGER.info(() -> "Replica " + this + " status changing: " + this.status + " -> " + status);
this.status = status;
}
+
+ private void sendGoodBye() {
+ try {
+ ReplicationProtocol.sendGoodbye(sc);
+ } catch (IOException e) {
+ LOGGER.warn("Failed to send good bye to {}", this, e);
+ }
+ }
}
\ No newline at end of file
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
index fae6ed6..0d97a7a 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
@@ -18,9 +18,12 @@
*/
package org.apache.asterix.replication.sync;
+import static org.apache.asterix.common.utils.StorageConstants.METADATA_FILE_NAME;
+
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
+import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -40,6 +43,13 @@
*/
public class ReplicaFilesSynchronizer {
+ private static final Comparator<String> REPLICATED_FILES_COMPARATOR = (file, anotherFile) -> {
+ if (file.endsWith(METADATA_FILE_NAME) && !anotherFile.endsWith(METADATA_FILE_NAME)) {
+ return -1;
+ }
+ return file.compareTo(anotherFile);
+ };
+
private final PartitionReplica replica;
private final INcApplicationContext appCtx;
@@ -79,6 +89,8 @@
private void replicateMissingFiles(List<String> files) {
final FileSynchronizer sync = new FileSynchronizer(appCtx, replica);
+ // sort files to ensure index metadata files are replicated first
+ files.sort(REPLICATED_FILES_COMPARATOR);
files.forEach(sync::replicate);
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
index 9f397d2..ef85977 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
@@ -22,9 +22,9 @@
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.replication.IReplicationStrategy;
-import org.apache.asterix.replication.messaging.ReplicationProtocol;
-import org.apache.asterix.replication.messaging.CheckpointPartitionIndexesTask;
import org.apache.asterix.replication.api.PartitionReplica;
+import org.apache.asterix.replication.messaging.CheckpointPartitionIndexesTask;
+import org.apache.asterix.replication.messaging.ReplicationProtocol;
/**
* Performs the steps required to ensure any newly added replica
@@ -41,9 +41,12 @@
}
public void sync() throws IOException {
- syncFiles();
- checkpointReplicaIndexes();
- appCtx.getReplicationManager().register(replica);
+ final Object syncLock = appCtx.getReplicaManager().getReplicaSyncLock();
+ synchronized (syncLock) {
+ syncFiles();
+ checkpointReplicaIndexes();
+ appCtx.getReplicationManager().register(replica);
+ }
}
private void syncFiles() throws IOException {