[ASTERIXDB-2490][NET] Support Encrypted Replication Connections
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- Use SocketChannelFactory in replication connections
to support both unencrypted and encrypted sockets.
- Add SSL replication test cases.
- Make SslSocketChannel close idempotent.
- Ensure FlushDatasetOperatorDescriptor waits for all
on-going dataset IO.
Change-Id: I9657624a5d54d4966357651efb671f3d8f0cb304
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3092
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SslReplicationExecutionTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SslReplicationExecutionTest.java
new file mode 100644
index 0000000..14aac1f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SslReplicationExecutionTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.runtime;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.test.common.TestExecutor;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.hyracks.control.common.controllers.NCConfig;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.test.support.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class SslReplicationExecutionTest {
+ protected static final String TEST_CONFIG_FILE_NAME = "src/test/resources/cc-rep-ssl.conf";
+ private static final TestExecutor testExecutor = new TestExecutor();
+ private static boolean configured = false;
+
+ @BeforeClass
+ public static void setUp() {
+ LangExecutionUtil.setCheckStorageDistribution(false);
+ }
+
+ @Before
+ public void before() throws Exception {
+ TestUtils.redirectLoggingToConsole();
+ LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, testExecutor);
+ if (!configured) {
+ final NodeControllerService[] ncs = ExecutionTestUtil.integrationUtil.ncs;
+ Map<String, InetSocketAddress> ncEndPoints = new HashMap<>();
+ Map<String, InetSocketAddress> replicationAddress = new HashMap<>();
+ final String ip = InetAddress.getLoopbackAddress().getHostAddress();
+ for (NodeControllerService nc : ncs) {
+ final String nodeId = nc.getId();
+ final INcApplicationContext appCtx = (INcApplicationContext) nc.getApplicationContext();
+ int apiPort = appCtx.getExternalProperties().getNcApiPort();
+ int replicationPort =
+ (int) appCtx.getServiceContext().getAppConfig().get(NCConfig.Option.REPLICATION_LISTEN_PORT);
+ ncEndPoints.put(nodeId, InetSocketAddress.createUnresolved(ip, apiPort));
+ replicationAddress.put(nodeId, InetSocketAddress.createUnresolved(ip, replicationPort));
+ }
+ testExecutor.setNcEndPoints(ncEndPoints);
+ testExecutor.setNcReplicationAddress(replicationAddress);
+ configured = true;
+ }
+ }
+
+ @After
+ public void after() throws Exception {
+ LangExecutionUtil.tearDown();
+ }
+
+ @Parameters(name = "ReplicationExecutionTest {index}: {0}")
+ public static Collection<Object[]> tests() throws Exception {
+ return LangExecutionUtil.tests("replication.xml", "replication.xml");
+ }
+
+ protected TestCaseContext tcCtx;
+
+ public SslReplicationExecutionTest(TestCaseContext tcCtx) {
+ this.tcCtx = tcCtx;
+ }
+
+ @Test
+ public void test() throws Exception {
+ LangExecutionUtil.test(tcCtx);
+ }
+}
diff --git a/asterixdb/asterix-app/src/test/resources/cc-rep-ssl.conf b/asterixdb/asterix-app/src/test/resources/cc-rep-ssl.conf
new file mode 100644
index 0000000..db4ca20
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/cc-rep-ssl.conf
@@ -0,0 +1,61 @@
+; Licensed to the Apache Software Foundation (ASF) under one
+; or more contributor license agreements. See the NOTICE file
+; distributed with this work for additional information
+; regarding copyright ownership. The ASF licenses this file
+; to you under the Apache License, Version 2.0 (the
+; "License"); you may not use this file except in compliance
+; with the License. You may obtain a copy of the License at
+;
+; http://www.apache.org/licenses/LICENSE-2.0
+;
+; Unless required by applicable law or agreed to in writing,
+; software distributed under the License is distributed on an
+; "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+; KIND, either express or implied. See the License for the
+; specific language governing permissions and limitations
+; under the License.
+
+[nc/asterix_nc1]
+txn.log.dir=../asterix-server/target/tmp/asterix_nc1/txnlog
+core.dump.dir=../asterix-server/target/tmp/asterix_nc1/coredump
+iodevices=asterix_nc1/iodevice1,asterix_nc1/iodevice2
+#jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5006
+replication.listen.port=2001
+nc.api.port=19004
+key.store.path=security/nc1/asterix_nc1.jks
+key.store.password=asterixdb
+trust.store.path=security/root/root.truststore
+
+[nc/asterix_nc2]
+ncservice.port=9091
+txn.log.dir=../asterix-server/target/tmp/asterix_nc2/txnlog
+core.dump.dir=../asterix-server/target/tmp/asterix_nc2/coredump
+iodevices=asterix_nc2/iodevice1,asterix_nc2/iodevice2
+#jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5007
+replication.listen.port=2002
+nc.api.port=19005
+key.store.path=security/nc2/asterix_nc2.jks
+key.store.password=asterixdb
+trust.store.path=security/root/root.truststore
+
+[nc]
+address=127.0.0.1
+command=asterixnc
+app.class=org.apache.asterix.hyracks.bootstrap.NCApplication
+jvm.args=-Xmx4096m -Dnode.Resolver="org.apache.asterix.external.util.IdentitiyResolverFactory"
+storage.memorycomponent.globalbudget = 1073741824
+
+[cc]
+address = 127.0.0.1
+app.class=org.apache.asterix.hyracks.bootstrap.CCApplication
+heartbeat.period=2000
+heartbeat.max.misses=25
+key.store.path=security/cc/cc.jks
+key.store.password=asterixdb
+trust.store.path=security/root/root.truststore
+
+[common]
+log.level = INFO
+replication.enabled=true
+replication.strategy=all
+ssl.enabled=true
\ No newline at end of file
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationWorker.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationWorker.java
index c8abe8f..c7b2561 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationWorker.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationWorker.java
@@ -19,14 +19,15 @@
package org.apache.asterix.replication.api;
import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
+
+import org.apache.hyracks.api.network.ISocketChannel;
public interface IReplicationWorker extends Runnable {
/**
* @return The replication socket channel.
*/
- SocketChannel getChannel();
+ ISocketChannel getChannel();
/**
* Gets a reusable buffer that can be used to send data
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
index 8847e7e..0e85665 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
@@ -24,7 +24,6 @@
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.exceptions.ReplicationException;
@@ -32,6 +31,7 @@
import org.apache.asterix.common.storage.ReplicaIdentifier;
import org.apache.asterix.replication.messaging.ReplicationProtocol;
import org.apache.asterix.replication.sync.ReplicaSynchronizer;
+import org.apache.hyracks.api.network.ISocketChannel;
import org.apache.hyracks.util.NetworkUtil;
import org.apache.hyracks.util.StorageUtil;
import org.apache.hyracks.util.annotations.ThreadSafe;
@@ -53,7 +53,7 @@
private final ReplicaIdentifier id;
private ByteBuffer reusbaleBuf;
private PartitionReplicaStatus status = DISCONNECTED;
- private SocketChannel sc;
+ private ISocketChannel sc;
public PartitionReplica(ReplicaIdentifier id, INcApplicationContext appCtx) {
this.id = id;
@@ -93,13 +93,10 @@
});
}
- public synchronized SocketChannel getChannel() {
+ public synchronized ISocketChannel getChannel() {
try {
- if (sc == null || !sc.isOpen() || !sc.isConnected()) {
- sc = SocketChannel.open();
- NetworkUtil.configure(sc);
- sc.configureBlocking(true);
- sc.connect(id.getLocation());
+ if (sc == null || !sc.getSocketChannel().isOpen() || !sc.getSocketChannel().isConnected()) {
+ sc = ReplicationProtocol.establishReplicaConnection(appCtx, id.getLocation());
}
return sc;
} catch (IOException e) {
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java
index 8ccfced..eda37b5 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java
@@ -20,17 +20,17 @@
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.nio.channels.SocketChannel;
import java.util.HashSet;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.exceptions.ReplicationException;
import org.apache.asterix.common.replication.IPartitionReplica;
import org.apache.asterix.common.replication.IReplicationDestination;
import org.apache.asterix.replication.messaging.ReplicationProtocol;
-import org.apache.hyracks.util.NetworkUtil;
+import org.apache.hyracks.api.network.ISocketChannel;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -39,7 +39,7 @@
private static final Logger LOGGER = LogManager.getLogger();
private final Set<IPartitionReplica> replicas = new HashSet<>();
private final InetSocketAddress location;
- private SocketChannel logRepChannel;
+ private ISocketChannel logRepChannel;
private ReplicationDestination(InetSocketAddress location) {
this.location = location;
@@ -75,13 +75,11 @@
&& replica.getStatus() == IPartitionReplica.PartitionReplicaStatus.IN_SYNC).findAny();
}
- public synchronized SocketChannel getLogReplicationChannel() {
+ public synchronized ISocketChannel getLogReplicationChannel(INcApplicationContext appCtx) {
try {
- if (logRepChannel == null || !logRepChannel.isOpen() || !logRepChannel.isConnected()) {
- logRepChannel = SocketChannel.open();
- NetworkUtil.configure(logRepChannel);
- logRepChannel.configureBlocking(true);
- logRepChannel.connect(location);
+ if (logRepChannel == null || !logRepChannel.getSocketChannel().isOpen()
+ || !logRepChannel.getSocketChannel().isConnected()) {
+ logRepChannel = ReplicationProtocol.establishReplicaConnection(appCtx, location);
}
return logRepChannel;
} catch (IOException e) {
@@ -91,7 +89,7 @@
private synchronized void closeLogReplicationChannel() {
try {
- if (logRepChannel != null && logRepChannel.isOpen()) {
+ if (logRepChannel != null && logRepChannel.getSocketChannel().isOpen()) {
ReplicationProtocol.sendGoodbye(logRepChannel);
logRepChannel.close();
logRepChannel = null;
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java
index 366abce..440f8ef 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java
@@ -65,7 +65,7 @@
case LogType.JOB_COMMIT:
case LogType.ABORT:
// send ACK to requester
- logRecord.getReplicationWorker().getChannel().socket().getOutputStream()
+ logRecord.getReplicationWorker().getChannel().getSocketChannel().socket().getOutputStream()
.write((nodeId + ReplicationProtocol.LOG_REPLICATION_ACK + logRecord.getTxnId()
+ System.lineSeparator()).getBytes());
break;
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java
index 6c8e372..0bcffc6 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java
@@ -23,7 +23,6 @@
import java.io.InputStreamReader;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
-import java.nio.channels.SocketChannel;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -46,6 +45,7 @@
import org.apache.asterix.replication.logging.TxnLogReplicator;
import org.apache.asterix.replication.messaging.ReplicateLogsTask;
import org.apache.asterix.replication.messaging.ReplicationProtocol;
+import org.apache.hyracks.api.network.ISocketChannel;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -55,17 +55,17 @@
private final LinkedBlockingQueue<ReplicationLogBuffer> emptyLogBuffersQ;
private final LinkedBlockingQueue<ReplicationLogBuffer> pendingFlushLogBuffersQ;
private final ByteBuffer txnLogsBatchSizeBuffer = ByteBuffer.allocate(Integer.BYTES);
- private final Map<ReplicationDestination, SocketChannel> destinations = new HashMap<>();
+ private final Map<ReplicationDestination, ISocketChannel> destinations = new HashMap<>();
private final IReplicationManager replicationManager;
private final Executor executor;
private final TxnAckTracker ackTracker = new TxnAckTracker();
- private final Set<SocketChannel> failedSockets = new HashSet<>();
+ private final Set<ISocketChannel> failedSockets = new HashSet<>();
private final Object transferLock = new Object();
private final INcApplicationContext appCtx;
private final int logPageSize;
private final int logBatchSize;
private ReplicationLogBuffer currentTxnLogBuffer;
- private SocketChannel[] destSockets;
+ private ISocketChannel[] destSockets;
public LogReplicationManager(INcApplicationContext appCtx, IReplicationManager replicationManager) {
this.appCtx = appCtx;
@@ -100,11 +100,11 @@
return;
}
LOGGER.info(() -> "register " + dest);
- SocketChannel socketChannel = dest.getLogReplicationChannel();
+ ISocketChannel socketChannel = dest.getLogReplicationChannel(appCtx);
handshake(dest, socketChannel);
destinations.put(dest, socketChannel);
failedSockets.remove(socketChannel);
- destSockets = destinations.values().toArray(new SocketChannel[destinations.size()]);
+ destSockets = destinations.values().toArray(new ISocketChannel[0]);
}
}
}
@@ -117,9 +117,9 @@
}
LOGGER.info(() -> "unregister " + dest);
ackTracker.unregister(dest);
- SocketChannel destSocket = destinations.remove(dest);
+ ISocketChannel destSocket = destinations.remove(dest);
failedSockets.remove(destSocket);
- destSockets = destinations.values().toArray(new SocketChannel[destinations.size()]);
+ destSockets = destinations.values().toArray(new ISocketChannel[0]);
endReplication(destSocket);
}
}
@@ -143,7 +143,7 @@
buffer.mark();
synchronized (transferLock) {
if (destSockets != null) {
- for (SocketChannel replicaSocket : destSockets) {
+ for (ISocketChannel replicaSocket : destSockets) {
try {
// send batch size then the batch itself
NetworkingUtil.transferBufferToChannel(replicaSocket, txnLogsBatchSizeBuffer);
@@ -192,15 +192,15 @@
pendingFlushLogBuffersQ.add(currentTxnLogBuffer);
}
- private void handshake(ReplicationDestination dest, SocketChannel socketChannel) {
+ private void handshake(ReplicationDestination dest, ISocketChannel socketChannel) {
final String nodeId = appCtx.getServiceContext().getNodeId();
final ReplicateLogsTask task = new ReplicateLogsTask(nodeId);
ReplicationProtocol.sendTo(socketChannel, task, null);
executor.execute(new TxnAckListener(dest, socketChannel));
}
- private void endReplication(SocketChannel socketChannel) {
- if (socketChannel.isConnected()) {
+ private void endReplication(ISocketChannel socketChannel) {
+ if (socketChannel.getSocketChannel().isConnected()) {
// end log replication (by sending a dummy log with a single byte)
final ByteBuffer endLogRepBuffer = ReplicationProtocol.getEndLogReplicationBuffer();
try {
@@ -211,7 +211,7 @@
}
}
- private synchronized void handleFailure(SocketChannel replicaSocket, IOException e) {
+ private synchronized void handleFailure(ISocketChannel replicaSocket, IOException e) {
if (failedSockets.contains(replicaSocket)) {
return;
}
@@ -224,9 +224,9 @@
private class TxnAckListener implements Runnable {
private final ReplicationDestination dest;
- private final SocketChannel replicaSocket;
+ private final ISocketChannel replicaSocket;
- TxnAckListener(ReplicationDestination dest, SocketChannel replicaSocket) {
+ TxnAckListener(ReplicationDestination dest, ISocketChannel replicaSocket) {
this.dest = dest;
this.replicaSocket = replicaSocket;
}
@@ -235,8 +235,8 @@
public void run() {
Thread.currentThread().setName("TxnAckListener (" + dest + ")");
LOGGER.info("Started listening on socket: {}", dest);
- try (BufferedReader incomingResponse =
- new BufferedReader(new InputStreamReader(replicaSocket.socket().getInputStream()))) {
+ try (BufferedReader incomingResponse = new BufferedReader(
+ new InputStreamReader(replicaSocket.getSocketChannel().socket().getInputStream()))) {
while (true) {
final String response = incomingResponse.readLine();
if (response == null) {
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java
index c93920f..30ad72c 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java
@@ -33,6 +33,7 @@
import java.util.Enumeration;
import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.network.ISocketChannel;
public class NetworkingUtil {
@@ -40,7 +41,7 @@
throw new AssertionError("This util class should not be initialized.");
}
- public static void readBytes(SocketChannel socketChannel, ByteBuffer byteBuffer, int length) throws IOException {
+ public static void readBytes(ISocketChannel socketChannel, ByteBuffer byteBuffer, int length) throws IOException {
byteBuffer.clear();
byteBuffer.limit(length);
@@ -53,7 +54,7 @@
byteBuffer.flip();
}
- public static void sendFile(FileChannel fileChannel, SocketChannel socketChannel) throws IOException {
+ public static void sendFile(FileChannel fileChannel, ISocketChannel socketChannel) throws IOException {
long pos = 0;
long fileSize = fileChannel.size();
long remainingBytes = fileSize;
@@ -63,11 +64,10 @@
pos += transferredBytes;
remainingBytes -= transferredBytes;
}
-
- socketChannel.socket().getOutputStream().flush();
+ socketChannel.getSocketChannel().socket().getOutputStream().flush();
}
- public static void downloadFile(FileChannel fileChannel, SocketChannel socketChannel) throws IOException {
+ public static void downloadFile(FileChannel fileChannel, ISocketChannel socketChannel) throws IOException {
long pos = 0;
long fileSize = fileChannel.size();
long count = fileSize;
@@ -97,7 +97,7 @@
return hostName;
}
- public static void transferBufferToChannel(SocketChannel socketChannel, ByteBuffer requestBuffer)
+ public static void transferBufferToChannel(ISocketChannel socketChannel, ByteBuffer requestBuffer)
throws IOException {
while (requestBuffer.hasRemaining()) {
socketChannel.write(requestBuffer);
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
index 1f6efa8..3dc094e 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
@@ -35,6 +35,10 @@
import org.apache.asterix.replication.messaging.ReplicationProtocol;
import org.apache.asterix.replication.messaging.ReplicationProtocol.ReplicationRequestType;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.network.INetworkSecurityManager;
+import org.apache.hyracks.api.network.ISocketChannel;
+import org.apache.hyracks.api.network.ISocketChannelFactory;
+import org.apache.hyracks.util.NetworkUtil;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -70,9 +74,7 @@
LOGGER.log(Level.INFO, "opened Replication Channel @ IP Address: " + nodeIP + ":" + dataPort);
while (serverSocketChannel.isOpen()) {
SocketChannel socketChannel = serverSocketChannel.accept();
- socketChannel.configureBlocking(true);
- //start a new thread to handle the request
- appCtx.getThreadExecutor().execute(new ReplicationWorker(socketChannel));
+ connectionAccepted(socketChannel);
}
} catch (AsynchronousCloseException e) {
LOGGER.debug("Replication channel closed", e);
@@ -93,12 +95,27 @@
}
}
+ private void connectionAccepted(SocketChannel socketChannel) {
+ try {
+ NetworkUtil.configure(socketChannel);
+ socketChannel.configureBlocking(false);
+ final INetworkSecurityManager networkSecurityManager =
+ appCtx.getServiceContext().getControllerService().getNetworkSecurityManager();
+ final ISocketChannelFactory socketChannelFactory = networkSecurityManager.getSocketChannelFactory();
+ final ISocketChannel serverChannel = socketChannelFactory.createServerChannel(socketChannel);
+ //start a new thread to handle the request
+ appCtx.getThreadExecutor().execute(new ReplicationWorker(serverChannel));
+ } catch (Exception e) {
+ LOGGER.error("failed to process accepted connection", e);
+ }
+ }
+
private class ReplicationWorker implements IReplicationWorker {
- private final SocketChannel socketChannel;
+ private final ISocketChannel socketChannel;
private final ByteBuffer inBuffer;
private final ByteBuffer outBuffer;
- public ReplicationWorker(SocketChannel socketChannel) {
+ public ReplicationWorker(ISocketChannel socketChannel) {
this.socketChannel = socketChannel;
inBuffer = ByteBuffer.allocate(ReplicationProtocol.INITIAL_BUFFER_SIZE);
outBuffer = ByteBuffer.allocate(ReplicationProtocol.INITIAL_BUFFER_SIZE);
@@ -108,6 +125,10 @@
public void run() {
Thread.currentThread().setName("Replication Worker");
try {
+ if (socketChannel.requiresHandshake() && !socketChannel.handshake()) {
+ return;
+ }
+ socketChannel.getSocketChannel().configureBlocking(true);
ReplicationRequestType requestType = ReplicationProtocol.getRequestType(socketChannel, inBuffer);
while (requestType != ReplicationRequestType.GOODBYE) {
handle(requestType);
@@ -116,18 +137,12 @@
} catch (Exception e) {
LOGGER.warn("Unexpected error during replication.", e);
} finally {
- if (socketChannel.isOpen()) {
- try {
- socketChannel.close();
- } catch (IOException e) {
- LOGGER.warn("Failed to close replication socket.", e);
- }
- }
+ NetworkUtil.closeQuietly(socketChannel);
}
}
@Override
- public SocketChannel getChannel() {
+ public ISocketChannel getChannel() {
return socketChannel;
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateLogsTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateLogsTask.java
index b71f4b8..e38a33d 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateLogsTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateLogsTask.java
@@ -34,6 +34,7 @@
import org.apache.asterix.replication.logging.RemoteLogsProcessor;
import org.apache.asterix.replication.management.ReplicationChannel;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.network.ISocketChannel;
/**
* A task to replicate transaction logs from master replica
@@ -53,7 +54,7 @@
final RemoteLogsProcessor logsProcessor = replicationChannel.getRemoteLogsProcessor();
final ILogManager logManager = appCtx.getTransactionSubsystem().getLogManager();
final RemoteLogRecord reusableLog = new RemoteLogRecord();
- final SocketChannel channel = worker.getChannel();
+ final ISocketChannel channel = worker.getChannel();
ByteBuffer logsBuffer = ByteBuffer.allocate(logManager.getLogPageSize());
try {
while (true) {
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java
index 41e7d9e..c702f1b 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java
@@ -22,17 +22,22 @@
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Stream;
+import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.exceptions.ReplicationException;
import org.apache.asterix.replication.api.IReplicationMessage;
import org.apache.asterix.replication.api.PartitionReplica;
import org.apache.asterix.replication.management.NetworkingUtil;
+import org.apache.hyracks.api.network.ISocketChannel;
+import org.apache.hyracks.api.network.ISocketChannelFactory;
import org.apache.hyracks.data.std.util.ExtendedByteArrayOutputStream;
+import org.apache.hyracks.util.NetworkUtil;
import org.apache.hyracks.util.StorageUtil;
public class ReplicationProtocol {
@@ -65,7 +70,7 @@
Stream.of(ReplicationRequestType.values()).forEach(type -> TYPES.put(type.ordinal(), type));
}
- public static ByteBuffer readRequest(SocketChannel socketChannel, ByteBuffer dataBuffer) throws IOException {
+ public static ByteBuffer readRequest(ISocketChannel socketChannel, ByteBuffer dataBuffer) throws IOException {
// read request size
NetworkingUtil.readBytes(socketChannel, dataBuffer, Integer.BYTES);
final int requestSize = dataBuffer.getInt();
@@ -75,7 +80,7 @@
return buf;
}
- public static ReplicationRequestType getRequestType(SocketChannel socketChannel, ByteBuffer byteBuffer)
+ public static ReplicationRequestType getRequestType(ISocketChannel socketChannel, ByteBuffer byteBuffer)
throws IOException {
// read replication request type
NetworkingUtil.readBytes(socketChannel, byteBuffer, REPLICATION_REQUEST_TYPE_SIZE);
@@ -93,12 +98,12 @@
return Integer.parseInt(msg.substring(msg.indexOf(LOG_REPLICATION_ACK) + 1));
}
- public static void sendGoodbye(SocketChannel socketChannel) throws IOException {
+ public static void sendGoodbye(ISocketChannel socketChannel) throws IOException {
ByteBuffer goodbyeBuffer = ReplicationProtocol.getGoodbyeBuffer();
NetworkingUtil.transferBufferToChannel(socketChannel, goodbyeBuffer);
}
- public static void sendAck(SocketChannel socketChannel, ByteBuffer buf) {
+ public static void sendAck(ISocketChannel socketChannel, ByteBuffer buf) {
try {
buf.clear();
buf.putInt(ReplicationRequestType.ACK.ordinal());
@@ -110,7 +115,7 @@
}
public static void waitForAck(PartitionReplica replica) throws IOException {
- final SocketChannel channel = replica.getChannel();
+ final ISocketChannel channel = replica.getChannel();
final ByteBuffer buf = replica.getReusableBuffer();
ReplicationRequestType responseFunction = ReplicationProtocol.getRequestType(channel, buf);
if (responseFunction != ReplicationRequestType.ACK) {
@@ -119,12 +124,12 @@
}
public static void sendTo(PartitionReplica replica, IReplicationMessage task) {
- final SocketChannel channel = replica.getChannel();
+ final ISocketChannel channel = replica.getChannel();
final ByteBuffer buf = replica.getReusableBuffer();
sendTo(channel, task, buf);
}
- public static void sendTo(SocketChannel channel, IReplicationMessage task, ByteBuffer buf) {
+ public static void sendTo(ISocketChannel channel, IReplicationMessage task, ByteBuffer buf) {
ExtendedByteArrayOutputStream outputStream = new ExtendedByteArrayOutputStream();
try (DataOutputStream oos = new DataOutputStream(outputStream)) {
task.serialize(oos);
@@ -135,18 +140,18 @@
requestBuffer.put(outputStream.getByteArray(), 0, outputStream.getLength());
requestBuffer.flip();
NetworkingUtil.transferBufferToChannel(channel, requestBuffer);
- channel.socket().getOutputStream().flush();
+ channel.getSocketChannel().socket().getOutputStream().flush();
} catch (IOException e) {
throw new ReplicationException(e);
}
}
- public static IReplicationMessage read(SocketChannel socketChannel, ByteBuffer buffer) throws IOException {
+ public static IReplicationMessage read(ISocketChannel socketChannel, ByteBuffer buffer) throws IOException {
final ReplicationRequestType type = getRequestType(socketChannel, buffer);
return readMessage(type, socketChannel, buffer);
}
- public static IReplicationMessage readMessage(ReplicationRequestType type, SocketChannel socketChannel,
+ public static IReplicationMessage readMessage(ReplicationRequestType type, ISocketChannel socketChannel,
ByteBuffer buffer) {
try {
final ByteBuffer requestBuf = ReplicationProtocol.readRequest(socketChannel, buffer);
@@ -191,6 +196,24 @@
return endLogRepBuffer;
}
+ public static ISocketChannel establishReplicaConnection(INcApplicationContext appCtx, InetSocketAddress location)
+ throws IOException {
+ final SocketChannel socketChannel = SocketChannel.open();
+ NetworkUtil.configure(socketChannel);
+ socketChannel.connect(location);
+ // perform handshake in a non-blocking mode
+ socketChannel.configureBlocking(false);
+ final ISocketChannelFactory socketChannelFactory =
+ appCtx.getServiceContext().getControllerService().getNetworkSecurityManager().getSocketChannelFactory();
+ final ISocketChannel clientChannel = socketChannelFactory.createClientChannel(socketChannel);
+ if (clientChannel.requiresHandshake() && !clientChannel.handshake()) {
+ throw new IllegalStateException("handshake failure");
+ }
+ // switch to blocking mode after handshake success
+ socketChannel.configureBlocking(true);
+ return clientChannel;
+ }
+
private static ByteBuffer ensureSize(ByteBuffer buffer, int size) {
if (buffer == null || buffer.capacity() < size) {
return ByteBuffer.allocate(size);
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/FileSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/FileSynchronizer.java
index cc0f7b4..d795d4e 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/FileSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/FileSynchronizer.java
@@ -32,6 +32,7 @@
import org.apache.asterix.replication.api.PartitionReplica;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.api.network.ISocketChannel;
public class FileSynchronizer {
@@ -50,7 +51,7 @@
public void replicate(String file, boolean metadata) {
try {
final IIOManager ioManager = appCtx.getIoManager();
- final SocketChannel channel = replica.getChannel();
+ final ISocketChannel channel = replica.getChannel();
final FileReference filePath = ioManager.resolve(file);
ReplicateFileTask task = new ReplicateFileTask(file, filePath.getFile().length(), metadata);
ReplicationProtocol.sendTo(replica, task);
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
index 583f33d..3cd1a07 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
@@ -34,6 +34,7 @@
import org.apache.asterix.replication.messaging.PartitionResourcesListTask;
import org.apache.asterix.replication.messaging.ReplicationProtocol;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
+import org.apache.hyracks.api.network.ISocketChannel;
/**
* Ensures that the files between master and a replica are synchronized
@@ -69,7 +70,7 @@
private Set<String> getReplicaFiles(int partition) throws IOException {
final PartitionResourcesListTask replicaFilesRequest = new PartitionResourcesListTask(partition);
- final SocketChannel channel = replica.getChannel();
+ final ISocketChannel channel = replica.getChannel();
final ByteBuffer reusableBuffer = replica.getReusableBuffer();
ReplicationProtocol.sendTo(replica, replicaFilesRequest);
final PartitionResourcesListResponse response =
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java
index 7e42d14..a404ba8 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java
@@ -90,6 +90,7 @@
datasetLifeCycleManager.flushDataset(datasetId.getId(), false);
}
}
+ datasetInfo.waitForIO();
} catch (ACIDException e) {
throw HyracksDataException.create(e);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/network/ISocketChannel.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/network/ISocketChannel.java
index 70ef1d2..5cfa442 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/network/ISocketChannel.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/network/ISocketChannel.java
@@ -21,9 +21,11 @@
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SocketChannel;
+import java.nio.channels.WritableByteChannel;
-public interface ISocketChannel extends Closeable {
+public interface ISocketChannel extends WritableByteChannel, ReadableByteChannel, Closeable {
/**
* Indicates whether this {@link ISocketChannel} requires a client/server handshake before
@@ -91,4 +93,9 @@
* @return the socket channel
*/
SocketChannel getSocketChannel();
+
+ @Override
+ default boolean isOpen() {
+ return getSocketChannel().isOpen();
+ }
}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannel.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannel.java
index 73475b0..abb8b15 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannel.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannel.java
@@ -184,9 +184,11 @@
@Override
public synchronized void close() throws IOException {
- engine.closeOutbound();
- new SslHandshake(this).handshake();
- socketChannel.close();
+ if (socketChannel.isOpen()) {
+ engine.closeOutbound();
+ new SslHandshake(this).handshake();
+ socketChannel.close();
+ }
}
@Override