[NO ISSUE][NET] SSL Socket Fixes
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- On SSL socket handshake failure, deliver any remaining data to requester.
- Add replica synchronize debug logs.
Change-Id: Ie1f6a4df1ab0cc7c6feb352607a45194f96b3c8b
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/15963
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
index e1f99f4..27da909 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
@@ -93,6 +93,7 @@
ExecutorService threadExecutor = (ExecutorService) appCtx.getThreadExecutor();
syncFuture = threadExecutor.submit(() -> {
try {
+ Thread.currentThread().setName("Replica " + id.toString() + " Synchronizer");
new ReplicaSynchronizer(appCtx, this).sync(register, deltaRecovery);
setStatus(IN_SYNC);
} catch (Exception e) {
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java
index 736b54e..8162e0a 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java
@@ -52,7 +52,7 @@
while (byteBuffer.remaining() > 0 && socketChannel.read(byteBuffer) > 0);
if (byteBuffer.remaining() > 0) {
- throw new EOFException();
+ throw new EOFException("could not read all data from source; remaining bytes: " + byteBuffer.remaining());
}
byteBuffer.flip();
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
index 19de02a..ba770cd 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
@@ -25,6 +25,7 @@
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.config.ReplicationProperties;
@@ -52,6 +53,7 @@
private ServerSocketChannel serverSocketChannel = null;
private final INcApplicationContext appCtx;
private final RemoteLogsProcessor logsProcessor;
+ private final AtomicInteger replicationWorkerCounter = new AtomicInteger(0);
public ReplicationChannel(INcApplicationContext appCtx) {
this.appCtx = appCtx;
@@ -123,16 +125,22 @@
@Override
public void run() {
- Thread.currentThread().setName("Replication Worker");
+ Thread.currentThread().setName("Replication Worker-" + replicationWorkerCounter.incrementAndGet() + "("
+ + getRemoteAddress() + ")");
try {
if (socketChannel.requiresHandshake() && !socketChannel.handshake()) {
+ LOGGER.warn("failed to complete handshake");
return;
}
socketChannel.getSocketChannel().configureBlocking(true);
+ LOGGER.debug("reading replication worker initial request");
ReplicationRequestType requestType = ReplicationProtocol.getRequestType(socketChannel, inBuffer);
+ LOGGER.debug("got request type: {}", requestType);
while (requestType != ReplicationRequestType.GOODBYE) {
handle(requestType);
+ LOGGER.debug("handled request type: {}", requestType);
requestType = ReplicationProtocol.getRequestType(socketChannel, inBuffer);
+ LOGGER.debug("got request type: {}", requestType);
}
} catch (Exception e) {
LOGGER.warn("Unexpected error during replication.", e);
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
index d9b3b0c..82ec601 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
@@ -33,12 +33,15 @@
import org.apache.asterix.replication.api.IReplicationWorker;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
/**
* A task to get the list of the files in a partition on a replica
*/
public class PartitionResourcesListTask implements IReplicaTask {
+ private static final Logger LOGGER = LogManager.getLogger();
private final int partition;
public PartitionResourcesListTask(int partition) {
@@ -47,20 +50,26 @@
@Override
public void perform(INcApplicationContext appCtx, IReplicationWorker worker) throws HyracksDataException {
+ LOGGER.debug("processing {}", this);
final PersistentLocalResourceRepository localResourceRepository =
(PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
localResourceRepository.cleanup(partition);
+ LOGGER.debug("cleaned up partition {}", partition);
final IReplicationStrategy replicationStrategy = appCtx.getReplicationManager().getReplicationStrategy();
// .metadata file -> resource id
Map<String, Long> partitionReplicatedResources =
localResourceRepository.getPartitionReplicatedResources(partition, replicationStrategy);
+ LOGGER.debug("got partition {} resources", partition);
// all data files in partitions + .metadata files
final List<String> partitionFiles =
localResourceRepository.getPartitionReplicatedFiles(partition, replicationStrategy).stream()
.map(StoragePathUtil::getFileRelativePath).collect(Collectors.toList());
+ LOGGER.debug("got partition {} files ({})", partition, partitionFiles.size());
final PartitionResourcesListResponse response = new PartitionResourcesListResponse(partition,
partitionReplicatedResources, partitionFiles, appCtx.getReplicaManager().isPartitionOrigin(partition));
+ LOGGER.debug("partition {} files list to requester", partition);
ReplicationProtocol.sendTo(worker.getChannel(), response, worker.getReusableBuffer());
+ LOGGER.debug("sent partition {} files list to requester", partition);
}
@Override
@@ -78,6 +87,11 @@
}
}
+ @Override
+ public String toString() {
+ return "PartitionResourcesListTask{" + "partition=" + partition + '}';
+ }
+
public static PartitionResourcesListTask create(DataInput input) throws HyracksDataException {
try {
int partition = input.readInt();
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
index 44c9404..809b7a6 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
@@ -70,7 +70,9 @@
if (!deltaRecovery) {
deletePartitionFromReplica(partition);
}
+ LOGGER.debug("getting replica files");
PartitionResourcesListResponse replicaResourceResponse = getReplicaFiles(partition);
+ LOGGER.debug("got replica files");
Map<ResourceReference, Long> resourceReferenceLongMap = getValidReplicaResources(
replicaResourceResponse.getPartitionReplicatedResources(), replicaResourceResponse.isOrigin());
// clean up files for invalid resources (deleted or recreated while the replica was down)
@@ -79,9 +81,11 @@
final PersistentLocalResourceRepository localResourceRepository =
(PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
final IReplicationStrategy replicationStrategy = appCtx.getReplicationManager().getReplicationStrategy();
+ LOGGER.debug("clean up replica invalid files");
final Set<String> masterFiles =
localResourceRepository.getPartitionReplicatedFiles(partition, replicationStrategy).stream()
.map(StoragePathUtil::getFileRelativePath).collect(Collectors.toSet());
+ LOGGER.debug("got master partition files");
// exclude from the replica files the list of invalid deleted files
final Set<String> replicaFiles = new HashSet<>(replicaResourceResponse.getFiles());
replicaFiles.removeAll(deletedReplicaFiles);
@@ -127,6 +131,7 @@
}
private void deleteInvalidFiles(List<String> files) {
+ LOGGER.debug("deleting replica invalid files");
final FileSynchronizer sync = new FileSynchronizer(appCtx, replica);
// sort files to ensure index metadata files starting with "." are deleted last
files.sort(String::compareTo);
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
index 0d0ef19..7130e07 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
@@ -28,6 +28,8 @@
import org.apache.asterix.replication.messaging.ReplicationProtocol;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
/**
* Performs the steps required to ensure any newly added replica
@@ -35,6 +37,7 @@
*/
public class ReplicaSynchronizer {
+ private static final Logger LOGGER = LogManager.getLogger();
private final INcApplicationContext appCtx;
private final PartitionReplica replica;
@@ -44,16 +47,23 @@
}
public void sync(boolean register, boolean deltaRecovery) throws IOException {
+ LOGGER.debug("starting replica sync process for replica {}", replica);
Object partitionLock = appCtx.getReplicaManager().getPartitionSyncLock(replica.getIdentifier().getPartition());
synchronized (partitionLock) {
+ LOGGER.debug("acquired partition replica lock");
final ICheckpointManager checkpointManager = appCtx.getTransactionSubsystem().getCheckpointManager();
try {
// suspend checkpointing datasets to prevent async IO operations while sync'ing replicas
checkpointManager.suspend();
+ LOGGER.debug("starting replica files sync");
syncFiles(deltaRecovery);
+ LOGGER.debug("completed replica files sync");
checkpointReplicaIndexes();
+ LOGGER.debug("replica indexes checkpoint completed");
if (register) {
+ LOGGER.debug("registering replica");
appCtx.getReplicationManager().register(replica);
+ LOGGER.debug("replica registered");
}
} finally {
checkpointManager.resume();
@@ -68,6 +78,7 @@
appCtx.getDatasetLifecycleManager().flushDataset(replStrategy,
p -> p == replica.getIdentifier().getPartition());
waitForReplicatedDatasetsIO();
+ LOGGER.debug("flushed partition datasets");
fileSync.sync();
}
@@ -77,6 +88,7 @@
appCtx.getReplicaManager().isPartitionOrigin(partition) ? appCtx.getServiceContext().getNodeId() : null;
CheckpointPartitionIndexesTask task =
new CheckpointPartitionIndexesTask(partition, getPartitionMaxComponentId(partition), masterNode);
+ LOGGER.debug("asking replica to checkpoint indexes");
ReplicationProtocol.sendTo(replica, task);
ReplicationProtocol.waitForAck(replica);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannel.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannel.java
index ce8496f..fc379fb 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannel.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannel.java
@@ -82,9 +82,9 @@
@Override
public synchronized int read(ByteBuffer buffer) throws IOException {
- int transfereeBytes = 0;
+ int transferredBytes = 0;
if (cachedData) {
- transfereeBytes += transferTo(inAppData, buffer);
+ transferredBytes += transferTo(inAppData, buffer);
}
if (buffer.hasRemaining()) {
if (!partialRecord) {
@@ -97,17 +97,18 @@
inAppData.clear();
if (decrypt() > 0) {
inAppData.flip();
- transfereeBytes += transferTo(inAppData, buffer);
+ transferredBytes += transferTo(inAppData, buffer);
} else {
inAppData.limit(0);
}
} else if (bytesRead < 0) {
+ LOGGER.debug("received EOF; transferredBytes Bytes: {}", transferredBytes);
handleEndOfStreamQuietly();
return -1;
}
}
cachedData = inAppData.hasRemaining();
- return transfereeBytes;
+ return transferredBytes;
}
private int decrypt() throws IOException {
@@ -192,6 +193,9 @@
engine.closeOutbound();
try {
new SslHandshake(this).handshake();
+ } catch (Exception e) {
+ // ignore exceptions on best effort graceful close handshake
+ LOGGER.debug("ssl socket close handshake failed", e);
} finally {
socketChannel.close();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java
index 4f0c3a8..b2cd435 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java
@@ -57,7 +57,8 @@
try {
closeable.close();
} catch (IOException e) {
- LOGGER.warn("Failed to close", e);
+ // ignore since we are closing quietly
+ LOGGER.trace("failed to close", e);
}
}
}