[NO ISSUE][REP] Updated Replicated Indexes Checkpoint Predicate

- user model changes: no
- storage format changes: no
- interface changes: no

Details:

- Remove master partitions filter when selecting replicated
  indexes to checkpoint since the checkpoint will be limited to
  a single partition.
- Ensure any data decrypted before the close is received on SSL
  sockets is delivered to the reader.
- Take a thread dump when an LSN not received from master
  within the timeout to help in diagnosing any synchronization
  issues.

Change-Id: Ie6f11cc10714ec758b824c49c8c1b31ce7794ca5
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/15943
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java
index 004b640..58025fc 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java
@@ -22,7 +22,6 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
@@ -86,11 +85,9 @@
 
     private void checkpointReplicaIndexes(RemoteLogRecord remoteLogMapping, int datasetId, int resourcePartition)
             throws HyracksDataException {
-        final Set<Integer> masterPartitions = appCtx.getReplicaManager().getPartitions();
         final Predicate<LocalResource> replicaIndexesPredicate = lr -> {
             DatasetLocalResource dls = (DatasetLocalResource) lr.getResource();
-            return dls.getDatasetId() == datasetId && dls.getPartition() == resourcePartition
-                    && !masterPartitions.contains(dls.getPartition());
+            return dls.getDatasetId() == datasetId && dls.getPartition() == resourcePartition;
         };
         final Map<Long, LocalResource> resources =
                 localResourceRep.getResources(replicaIndexesPredicate, Collections.singleton(resourcePartition));
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java
index 172bd59..fa77378 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java
@@ -37,6 +37,7 @@
 import org.apache.asterix.replication.sync.IndexSynchronizer;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;
+import org.apache.hyracks.util.ThreadDumpUtil;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -95,6 +96,7 @@
                 if (replicationTimeOut <= 0) {
                     LOGGER.warn("{} seconds passed without receiving flush lsn {} from master for component {}",
                             appCtx.getReplicationProperties().getReplicationTimeOut(), masterLsn, file);
+                    LOGGER.debug("thead dump on receiving flush lsn timeout {}", ThreadDumpUtil::takeDumpString);
                     throw new ReplicationException(new TimeoutException("couldn't receive flush lsn from master"));
                 }
                 final long startTime = System.nanoTime();
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannel.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannel.java
index f9bf5c7..ce8496f 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannel.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannel.java
@@ -127,7 +127,7 @@
                     break;
                 case CLOSED:
                     close();
-                    return -1;
+                    return decryptedBytes;
                 default:
                     throw new IllegalStateException("Invalid SSL result status: " + result.getStatus());
             }