[NO ISSUE][REP] Updated Replicated Indexes Checkpoint Predicate
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Remove master partitions filter when selecting replicated
indexes to checkpoint since the checkpoint will be limited to
a single partition.
- Ensure any data decrypted before the close is received on SSL
sockets is delivered to the reader.
- Take a thread dump when an LSN not received from master
within the timeout to help in diagnosing any synchronization
issues.
Change-Id: Ie6f11cc10714ec758b824c49c8c1b31ce7794ca5
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/15943
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java
index 004b640..58025fc 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java
@@ -22,7 +22,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@@ -86,11 +85,9 @@
private void checkpointReplicaIndexes(RemoteLogRecord remoteLogMapping, int datasetId, int resourcePartition)
throws HyracksDataException {
- final Set<Integer> masterPartitions = appCtx.getReplicaManager().getPartitions();
final Predicate<LocalResource> replicaIndexesPredicate = lr -> {
DatasetLocalResource dls = (DatasetLocalResource) lr.getResource();
- return dls.getDatasetId() == datasetId && dls.getPartition() == resourcePartition
- && !masterPartitions.contains(dls.getPartition());
+ return dls.getDatasetId() == datasetId && dls.getPartition() == resourcePartition;
};
final Map<Long, LocalResource> resources =
localResourceRep.getResources(replicaIndexesPredicate, Collections.singleton(resourcePartition));
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java
index 172bd59..fa77378 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java
@@ -37,6 +37,7 @@
import org.apache.asterix.replication.sync.IndexSynchronizer;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;
+import org.apache.hyracks.util.ThreadDumpUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -95,6 +96,7 @@
if (replicationTimeOut <= 0) {
LOGGER.warn("{} seconds passed without receiving flush lsn {} from master for component {}",
appCtx.getReplicationProperties().getReplicationTimeOut(), masterLsn, file);
+ LOGGER.debug("thead dump on receiving flush lsn timeout {}", ThreadDumpUtil::takeDumpString);
throw new ReplicationException(new TimeoutException("couldn't receive flush lsn from master"));
}
final long startTime = System.nanoTime();
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannel.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannel.java
index f9bf5c7..ce8496f 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannel.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannel.java
@@ -127,7 +127,7 @@
break;
case CLOSED:
close();
- return -1;
+ return decryptedBytes;
default:
throw new IllegalStateException("Invalid SSL result status: " + result.getStatus());
}