[ASTERIXDB-2490][NET] Support Encrypted IPC Connections
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- Add CC/NC options to support SSL connections.
- Add APIS to support secured connections.
- Support encrypted connections for CC/NC IPCs.
- Add keys/certificates for testing.
- Add SqlppExecutionTest with SSL connections enabled.
- Sort imports.
Change-Id: I7007a9be25287a94c5936d440355cfedb8e032b9
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3052
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml
index 4ed61dc..f697741 100644
--- a/asterixdb/asterix-app/pom.xml
+++ b/asterixdb/asterix-app/pom.xml
@@ -187,6 +187,7 @@
<exclude>src/test/resources/**/only*.xml</exclude>
<exclude>src/main/resources/sdk/**</exclude>
<exclude>src/main/resources/dashboard/**</exclude>
+ <exclude>src/test/resources/security/**</exclude>
</excludes>
</configuration>
</execution>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ServletUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ServletUtil.java
index 17e4c16..2308ea3 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ServletUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ServletUtil.java
@@ -36,8 +36,10 @@
synchronized (ctx) {
resultSet = (IResultSet) ctx.get(RESULTSET_ATTR);
if (resultSet == null) {
- resultSet =
- new ResultSet(hcc, appCtx.getCompilerProperties().getFrameSize(), ResultReader.NUM_READERS);
+ resultSet = new ResultSet(hcc,
+ appCtx.getServiceContext().getControllerService().getNetworkSecurityManager()
+ .getSocketChannelFactory(),
+ appCtx.getCompilerProperties().getFrameSize(), ResultReader.NUM_READERS);
ctx.put(RESULTSET_ATTR, resultSet);
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 07d540b..8924512 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -478,7 +478,8 @@
// TODO(mblow): multicc
CcId primaryCcId = ncSrv.getPrimaryCcId();
ClusterControllerInfo ccInfo = ncSrv.getNodeParameters(primaryCcId).getClusterControllerInfo();
- hcc = new HyracksConnection(ccInfo.getClientNetAddress(), ccInfo.getClientNetPort());
+ hcc = new HyracksConnection(ccInfo.getClientNetAddress(), ccInfo.getClientNetPort(),
+ ncSrv.getNetworkSecurityManager().getSocketChannelFactory());
} catch (Exception e) {
throw HyracksDataException.create(e);
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 482f67f..99500ce 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -140,7 +140,8 @@
String strIP = ccServiceCtx.getCCContext().getClusterControllerInfo().getClientNetAddress();
int port = ccServiceCtx.getCCContext().getClusterControllerInfo().getClientNetPort();
- hcc = new HyracksConnection(strIP, port);
+ hcc = new HyracksConnection(strIP, port,
+ ccServiceCtx.getControllerService().getNetworkSecurityManager().getSocketChannelFactory());
MetadataBuiltinFunctions.init();
ILibraryManager libraryManager = new ExternalLibraryManager();
ReplicationProperties repProp =
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index f510be5..6cc4677 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -71,8 +71,8 @@
public static final int DEFAULT_HYRACKS_CC_CLIENT_PORT = 1098;
public static final int DEFAULT_HYRACKS_CC_CLUSTER_PORT = 1099;
- public static final String DEFAULT_CONF_FILE =
- joinPath(getProjectPath().toString(), "src", "test", "resources", "cc.conf");
+ public static final String RESOURCES_PATH = joinPath(getProjectPath().toString(), "src", "test", "resources");
+ public static final String DEFAULT_CONF_FILE = joinPath(RESOURCES_PATH, "cc-ssl.conf");
private static final String DEFAULT_STORAGE_PATH = joinPath("target", "io", "dir");
private static String storagePath = DEFAULT_STORAGE_PATH;
private static final long RESULT_TTL = TimeUnit.MINUTES.toMillis(5);
@@ -126,6 +126,8 @@
ccApplication.registerConfig(configManager);
final CCConfig ccConfig = createCCConfig(configManager);
configManager.processConfig();
+ ccConfig.setKeyStorePath(joinPath(RESOURCES_PATH, ccConfig.getKeyStorePath()));
+ ccConfig.setTrustStorePath(joinPath(RESOURCES_PATH, ccConfig.getTrustStorePath()));
cc = new ClusterControllerService(ccConfig, ccApplication);
nodeNames = ccConfig.getConfigManager().getNodeNames();
@@ -146,8 +148,8 @@
}
ncApplication.registerConfig(ncConfigManager);
opts.forEach(opt -> ncConfigManager.set(nodeId, opt.getLeft(), opt.getRight()));
- nodeControllers.add(
- new NodeControllerService(fixupIODevices(createNCConfig(nodeId, ncConfigManager)), ncApplication));
+ nodeControllers
+ .add(new NodeControllerService(fixupPaths(createNCConfig(nodeId, ncConfigManager)), ncApplication));
}
opts.forEach(opt -> configManager.set(opt.getLeft(), opt.getRight()));
@@ -176,7 +178,8 @@
}
// Wait until cluster becomes active
((ICcApplicationContext) cc.getApplicationContext()).getClusterStateManager().waitForState(ClusterState.ACTIVE);
- hcc = new HyracksConnection(cc.getConfig().getClientListenAddress(), cc.getConfig().getClientListenPort());
+ hcc = new HyracksConnection(cc.getConfig().getClientListenAddress(), cc.getConfig().getClientListenPort(),
+ cc.getNetworkSecurityManager().getSocketChannelFactory());
this.ncs = nodeControllers.toArray(new NodeControllerService[nodeControllers.size()]);
setTestPersistedResourceRegistry();
}
@@ -243,7 +246,7 @@
return (INCApplication) Class.forName(ncAppClass).newInstance();
}
- private NCConfig fixupIODevices(NCConfig ncConfig) throws IOException, AsterixException, CmdLineException {
+ private NCConfig fixupPaths(NCConfig ncConfig) throws IOException, AsterixException, CmdLineException {
// we have to first process the config
ncConfig.getConfigManager().processConfig();
@@ -258,6 +261,10 @@
nodeStores[i] = joinPath(getDefaultStoragePath(), ncConfig.getNodeId(), nodeStores[i]);
}
ncConfig.getConfigManager().set(ncConfig.getNodeId(), NCConfig.Option.IODEVICES, nodeStores);
+ final String keyStorePath = joinPath(RESOURCES_PATH, ncConfig.getKeyStorePath());
+ final String trustStorePath = joinPath(RESOURCES_PATH, ncConfig.getTrustStorePath());
+ ncConfig.getConfigManager().set(ncConfig.getNodeId(), NCConfig.Option.KEY_STORE_PATH, keyStorePath);
+ ncConfig.getConfigManager().set(ncConfig.getNodeId(), NCConfig.Option.TRUST_STORE_PATH, trustStorePath);
return ncConfig;
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/SslAsterixHyracksIntegrationUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/SslAsterixHyracksIntegrationUtil.java
new file mode 100644
index 0000000..2d6813e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/SslAsterixHyracksIntegrationUtil.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.api.common;
+
+import static org.apache.asterix.api.common.AsterixHyracksIntegrationUtil.LoggerHolder.LOGGER;
+import static org.apache.hyracks.util.file.FileUtil.joinPath;
+
+public class SslAsterixHyracksIntegrationUtil extends AsterixHyracksIntegrationUtil {
+
+ public static final String SSL_CONF_FILE = joinPath(RESOURCES_PATH, "cc-ssl.conf");
+
+ public static void main(String[] args) {
+ final AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil();
+ try {
+ integrationUtil.run(Boolean.getBoolean("cleanup.start"), Boolean.getBoolean("cleanup.shutdown"),
+ System.getProperty("external.lib", ""), System.getProperty("conf.path", SSL_CONF_FILE));
+ } catch (Exception e) {
+ LOGGER.fatal("Unexpected exception", e);
+ System.exit(1);
+ }
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SslSqlppExecutionTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SslSqlppExecutionTest.java
new file mode 100644
index 0000000..ca2bd7b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SslSqlppExecutionTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.runtime;
+
+import java.util.Collection;
+
+import org.apache.asterix.api.common.SslAsterixHyracksIntegrationUtil;
+import org.apache.asterix.test.common.TestExecutor;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Runs the SQL++ runtime tests with the storage parallelism with ssl enabled.
+ */
+@RunWith(Parameterized.class)
+public class SslSqlppExecutionTest {
+ protected static final String TEST_CONFIG_FILE_NAME = SslAsterixHyracksIntegrationUtil.SSL_CONF_FILE;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, new TestExecutor());
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ LangExecutionUtil.tearDown();
+ }
+
+ @Parameters(name = "SslSqlppExecutionTest {index}: {0}")
+ public static Collection<Object[]> tests() throws Exception {
+ return LangExecutionUtil.tests("only_sqlpp.xml", "testsuite_sqlpp.xml");
+ }
+
+ protected TestCaseContext tcCtx;
+
+ public SslSqlppExecutionTest(TestCaseContext tcCtx) {
+ this.tcCtx = tcCtx;
+ }
+
+ @Test
+ public void test() throws Exception {
+ LangExecutionUtil.test(tcCtx);
+ }
+}
diff --git a/asterixdb/asterix-app/src/test/resources/cc-ssl.conf b/asterixdb/asterix-app/src/test/resources/cc-ssl.conf
new file mode 100644
index 0000000..ea00513
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/cc-ssl.conf
@@ -0,0 +1,69 @@
+; Licensed to the Apache Software Foundation (ASF) under one
+; or more contributor license agreements. See the NOTICE file
+; distributed with this work for additional information
+; regarding copyright ownership. The ASF licenses this file
+; to you under the Apache License, Version 2.0 (the
+; "License"); you may not use this file except in compliance
+; with the License. You may obtain a copy of the License at
+;
+; http://www.apache.org/licenses/LICENSE-2.0
+;
+; Unless required by applicable law or agreed to in writing,
+; software distributed under the License is distributed on an
+; "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+; KIND, either express or implied. See the License for the
+; specific language governing permissions and limitations
+; under the License.
+
+[nc/asterix_nc1]
+txn.log.dir=target/tmp/asterix_nc1/txnlog
+core.dump.dir=target/tmp/asterix_nc1/coredump
+iodevices=target/tmp/asterix_nc1/iodevice1,../asterix-server/target/tmp/asterix_nc1/iodevice2
+nc.api.port=19004
+key.store.path=security/nc1/asterix_nc1.jks
+key.store.password=asterixdb
+trust.store.path=security/root/root.truststore
+replication.listen.port=2001
+#jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5006
+
+[nc/asterix_nc2]
+ncservice.port=9091
+txn.log.dir=target/tmp/asterix_nc2/txnlog
+core.dump.dir=target/tmp/asterix_nc2/coredump
+iodevices=target/tmp/asterix_nc2/iodevice1,../asterix-server/target/tmp/asterix_nc2/iodevice2
+nc.api.port=19005
+key.store.path=security/nc2/asterix_nc2.jks
+key.store.password=asterixdb
+trust.store.path=security/root/root.truststore
+replication.listen.port=2002
+#jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5007
+
+[nc]
+address=127.0.0.1
+command=asterixnc
+app.class=org.apache.asterix.hyracks.bootstrap.NCApplication
+jvm.args=-Xmx4096m -Dnode.Resolver="org.apache.asterix.external.util.IdentitiyResolverFactory"
+storage.buffercache.pagesize=32KB
+storage.buffercache.size=48MB
+storage.memorycomponent.globalbudget=512MB
+
+[cc]
+address = 127.0.0.1
+app.class=org.apache.asterix.hyracks.bootstrap.CCApplication
+heartbeat.period=2000
+heartbeat.max.misses=25
+key.store.path=security/cc/cc.jks
+key.store.password=asterixdb
+trust.store.path=security/root/root.truststore
+
+[common]
+log.dir = logs/
+log.level = INFO
+compiler.framesize=32KB
+compiler.sortmemory=320KB
+compiler.groupmemory=160KB
+compiler.joinmemory=256KB
+compiler.textsearchmemory=160KB
+messaging.frame.size=4096
+messaging.frame.count=512
+ssl.enabled=true
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
index 050a799..5faf4d8 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
@@ -34,6 +34,7 @@
"replication\.log\.buffer\.pagesize" : 131072,
"replication\.strategy" : "none",
"replication\.timeout" : 30,
+ "ssl\.enabled" : false,
"storage.max.active.writable.datasets" : 8,
"txn\.commitprofiler\.enabled" : false,
"txn\.commitprofiler\.reportinterval" : 5,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
index c56062a..e30c879 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
@@ -34,6 +34,7 @@
"replication\.log\.buffer\.pagesize" : 131072,
"replication\.strategy" : "none",
"replication\.timeout" : 30,
+ "ssl\.enabled" : false,
"storage.max.active.writable.datasets" : 8,
"txn\.commitprofiler\.enabled" : false,
"txn\.commitprofiler\.reportinterval" : 5,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
index 3a3796d..ce5add1 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
@@ -34,6 +34,7 @@
"replication\.log\.buffer\.pagesize" : 131072,
"replication\.strategy" : "none",
"replication\.timeout" : 30,
+ "ssl\.enabled" : false,
"storage.max.active.writable.datasets" : 8,
"txn\.commitprofiler\.enabled" : false,
"txn\.commitprofiler\.reportinterval" : 5,
diff --git a/asterixdb/asterix-app/src/test/resources/security/cc/cc.crt b/asterixdb/asterix-app/src/test/resources/security/cc/cc.crt
new file mode 100644
index 0000000..94740b1
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/security/cc/cc.crt
@@ -0,0 +1,23 @@
+-----BEGIN CERTIFICATE-----
+MIIDvzCCAacCCQDz/BMGga4kNTANBgkqhkiG9w0BAQsFADApMQswCQYDVQQGEwJY
+WDEaMBgGA1UEAwwRQXN0ZXJpeERCIFRlc3QgQ0EwHhcNMTgxMTIxMTMyNjM2WhcN
+MjgxMTE4MTMyNjM2WjAaMQswCQYDVQQGEwJYWDELMAkGA1UEAwwCQ0MwggEiMA0G
+CSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCv74U89DyyS2Zp1hmYDcCTf1+PQF7r
+QBd4+JjfCcSAAZfsZIhTqn0cVMCselqG/6+Z3CdBDGdunBLpkD1AqnYBD2JKT+vO
+13T9MRsArr/ItLwU9K5NaXBAYtspDF7sYvFQH7VgRKzp9Dk9D7/+VzJqBnsuU62h
+oqJ8nkYCdfuSMYhuT+UbANAzY4DF9sBwKmKXenodORQH9hO9iaJRefPLzmd+zI0/
+rxefgFkozEn68zBE36ez0+f2PBPiOsi9kLsK7pqd5scfIB5WIXHp4zDR3eafOe7v
+fvDWCPcAgOp7OXT2ehyUYsjMb2UCpdhf1eB410wmxRPdxBzele0ge7QhAgMBAAEw
+DQYJKoZIhvcNAQELBQADggIBALco+j2eAufgFMR0mUhOMTfdVKvA0p1YHqWM+O4V
+EWox4EHpLHp1/ae1ZQ2/M/b5MQQmZjhFp48sBVFMaQXgYzcPsPj+YfIqpe6H0KbA
+6seqztJ0dMFJY7iQtNKOMrhs04ykL4xogBzY4Dnj5rQxLuC8weDyAJGQCjvLDGDP
+GbgHdthuQB5IXNsdsUPha0CyBrqzVrTmi3fQ0SH4gSbJFK944Gtxhfi4IVxh+bR0
+MvvUoc5bgPjFn1QfGDHqXey1pon+Rt7rN2NlegmmafvfFfitxBeF0AzEzg0wGDuG
+zaEYs9Qc9emHdGYqZ9aJxcNTylbDvf6QSV5+AOZdntvPoXK3/lMBxgnG9ez9kE9t
+UoytM7TyXj6f5djhixhObc4P2grnxfigUDBKqqttTaMqQP1V/Rx4mjlqoePWQqlI
+Lv+/JZd7CId8J2QEtmaErsHPzpUdXGwClBrrqBnjH3sggP1IDWm0Q3E957vPDNK+
+6xqcFOIBv4GwqTikFkMmf0qACRS6+OaWNKS7lt55a0QL/g8gWlYRJIMtYxm2J562
+6ijC7L9sF705UjIjCnScxyE9h8QS/yl/AFF+iNhE/32AeAYDjyyqRgPZh2BQNGNk
+q/HruzEWmMphHjP8eypiePQeTWUPHpV4caHWoG7/Qy85+hpEccP2bh4QiT1YiKkR
+TeK/
+-----END CERTIFICATE-----
diff --git a/asterixdb/asterix-app/src/test/resources/security/cc/cc.jks b/asterixdb/asterix-app/src/test/resources/security/cc/cc.jks
new file mode 100644
index 0000000..242d615
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/security/cc/cc.jks
Binary files differ
diff --git a/asterixdb/asterix-app/src/test/resources/security/cc/cc.key b/asterixdb/asterix-app/src/test/resources/security/cc/cc.key
new file mode 100644
index 0000000..59df93e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/security/cc/cc.key
@@ -0,0 +1,27 @@
+-----BEGIN RSA PRIVATE KEY-----
+MIIEowIBAAKCAQEAr++FPPQ8sktmadYZmA3Ak39fj0Be60AXePiY3wnEgAGX7GSI
+U6p9HFTArHpahv+vmdwnQQxnbpwS6ZA9QKp2AQ9iSk/rztd0/TEbAK6/yLS8FPSu
+TWlwQGLbKQxe7GLxUB+1YESs6fQ5PQ+//lcyagZ7LlOtoaKifJ5GAnX7kjGIbk/l
+GwDQM2OAxfbAcCpil3p6HTkUB/YTvYmiUXnzy85nfsyNP68Xn4BZKMxJ+vMwRN+n
+s9Pn9jwT4jrIvZC7Cu6anebHHyAeViFx6eMw0d3mnznu737w1gj3AIDqezl09noc
+lGLIzG9lAqXYX9XgeNdMJsUT3cQc3pXtIHu0IQIDAQABAoIBAEhxHnKHuopLg6Lm
+lmlGmFnjY4yPm8XQARo9emy0D+YJQe4DQyL4G0XUG/Wo96eIllyLCtq86cXgm+Ty
+EFaVGbu8AuPIXrcvfitW0eCJE8znpl4WlcKoPKE2Uzlmr5sz7lrog31dKbm3Zu4e
+kxZ9/vMrMgNUTzUzyyX24arXu7to3ETxWwUcblIDdYIqJz9U/OfOyXI+Wcekg0jx
+63ElDFo85A0ObTUMVPnsNKQoGcLXTlx9KXg8SPwC50A507IC3Z8jiPodIoa7aZT4
+yce1JZMH1mBL7dBvKPHujDtutXC6B2GOeakBqVDcq5wDqn98kjVSWQzUDOEIF223
+NThHdDUCgYEA6dzSi5mznsAaXaw9/O6ZqTVbp9iUMyM0s+obEZ6mcAs4n1mgTRM5
+7hhmOk/Hb12qf0RSyi1NAvo06CLq5sCF8Sy80vqrsj9KUHX02pIIOQVhZyx7Lkfz
+30t0Rwt7SXWT7c2k7eJa7dUQFPM0hbqVlOQbTEvYSicbEbUi9KU33AsCgYEAwJb1
+I6KHg3zP/u5Z/LTlWmw+5MbJBdXF5AT6sLwUpV7g87zB02dO8JrVLzV9ILZp6X9p
+YDOkvNpLT84jm90pTSof+0ltmhatvRl2Dxb55GCxpfQkOzXar+5nvM/VoJJxy7Ib
+fwPjiXhatQpRNI/DVEMr5tFmOJzlA7UH+FzwYAMCgYEAqXas0yAjusu1z1865Af6
+LVVO/4e/jHbcf+sKhnASZ2qaM059DJz8A36GxYZ+HEMhg2t9GqhM//VTVUvIMEIe
+TS9//NpMhLO8JCX1slTTxI4TkPH4qQbWv14r+jdltUuQUGgONZsrDOCx8Jxz2Nvl
+/Kh254imSMWhpek+VU1L9xUCgYBwt6mQCq8PmPxPc4c9bU6TJqmE0oeTH7PrqbJj
+wCDBTQ+R+BblOSCtl6FQORkcDUddvxGYmYFAeu77LWYP8lga7p27QBWiysUy2PUN
+DeCB9sninEqcUP/GWl1i161rhmqN0pdfNpJ0wfks3nX6sm2bIplORZ5zYfhzu/+H
+MDYLqwKBgE0QZjZuiAO4678iztIRvC184h51F4eCyhL/XhUsYKPirORxQBwBTsDo
+xoixutYqoZmM5hp3+rkhQF3PByPsackpSZlEJdd3X36uW2uJTetWXqACkgzEgrQf
+IHES0nVmwgYM425fFN9w9H/+rO7hNO9PcpjBZPAdD22TcqJW8oOs
+-----END RSA PRIVATE KEY-----
diff --git a/asterixdb/asterix-app/src/test/resources/security/cc/cc.p12 b/asterixdb/asterix-app/src/test/resources/security/cc/cc.p12
new file mode 100644
index 0000000..855170f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/security/cc/cc.p12
Binary files differ
diff --git a/asterixdb/asterix-app/src/test/resources/security/nc1/asterix_nc1.crt b/asterixdb/asterix-app/src/test/resources/security/nc1/asterix_nc1.crt
new file mode 100644
index 0000000..8fbff0a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/security/nc1/asterix_nc1.crt
@@ -0,0 +1,23 @@
+-----BEGIN CERTIFICATE-----
+MIIDyjCCAbICCQDz/BMGga4kNjANBgkqhkiG9w0BAQsFADApMQswCQYDVQQGEwJY
+WDEaMBgGA1UEAwwRQXN0ZXJpeERCIFRlc3QgQ0EwHhcNMTgxMTIxMTMyODIwWhcN
+MjgxMTE4MTMyODIwWjAlMQswCQYDVQQGEwJYWDEWMBQGA1UEAwwNYXN0ZXJpeGRi
+X25jMTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBALM0qmtXFlDUcwly
+VT3MjWy1SokSxxjqPK2Y/zWmeupzCXZbP53N3kryAcO5sB/+aMgaYnKWCYNl6dlC
+GwQSzzwkI7hesyHlU7EiUeG920w9XJ7EdVnE/+kwtRevN6UOondzz0NlusLaHeJE
+Vj7kD7pP4LHRMUhSN2vANWQBjzs+wGcEn5li+i8goCI96KFH7j7M1GoDmFHIC0IQ
+9v9QHYQ/rUu2kbeqlyAIY7ZWURGIQgL5IiKLAa4TVHiSdDj911kZMiAgpQouotSs
+5zPMjj0KuusoC0zsCLaj8yJKZ8gscKTW0Ny48CQoR1j15FVwl7PNh2uLLNdc/ZVv
+DSUJgjMCAwEAATANBgkqhkiG9w0BAQsFAAOCAgEAozI3Up3/8oBVlKsxOWG5b98l
+isVcjO2rvwWZNy3ZI8bGma2TymBAeMQMxemXWUaWkeajJt0MzWD7tDaDZJyz1kbV
+E3dNDEdPT1I+xReQNadRiTdN3qyN49PG4eexBoYaYzvlrxmYZH632QwVOwYiyjRM
+qbzZP5k4fOLEwkPrR38ghJOFuI49QxVbC4+QJ7EgjuC78ZmB39QuKZIXiYMGojRl
+ItF95Y/dNTZYzXZnJTbeGylycyO4mkd9b6bRtsN79toate4tzUpm15l5B3ZkD0dX
+Lc4sKPBGAGqwrHojC6ipyyJpJIQfP/VoWUs+FdezA45+AHMkXZqiPhYBOqA66y1X
+z8NJDi87rcefDTwfFvqyyuodlOBPrFcNSowE/VZw51HpyAxkmYeObrJVk087Iz8i
+UrKMgrQ+w0hyA7IW6ma7ezC8607MMMTOB+FngX5CDcOjzAGPLpi/bgAho7CenmFG
+GYKKyWYW4FTnsiNH7Fi/WYKliKEBTmDBVt8crFHwDamLfMyEKvQExoCCgAu+MSs1
++Yd0JFQ0OOELMjyo2UYt7A6exYSZ57TnqmGRM2QMwRtvwafOzqPL2xwtUZxdgphu
+XxijoZo5eA/DIENSLA0l2B+WaBh+MkdySuxqi2CBAxgldTQyUgsiyu7ctsYIMdMa
+nVIrPLaI1domgvyoXSA=
+-----END CERTIFICATE-----
diff --git a/asterixdb/asterix-app/src/test/resources/security/nc1/asterix_nc1.jks b/asterixdb/asterix-app/src/test/resources/security/nc1/asterix_nc1.jks
new file mode 100644
index 0000000..d6d3844
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/security/nc1/asterix_nc1.jks
Binary files differ
diff --git a/asterixdb/asterix-app/src/test/resources/security/nc1/asterix_nc1.key b/asterixdb/asterix-app/src/test/resources/security/nc1/asterix_nc1.key
new file mode 100644
index 0000000..3e2eabb
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/security/nc1/asterix_nc1.key
@@ -0,0 +1,27 @@
+-----BEGIN RSA PRIVATE KEY-----
+MIIEogIBAAKCAQEAszSqa1cWUNRzCXJVPcyNbLVKiRLHGOo8rZj/NaZ66nMJdls/
+nc3eSvIBw7mwH/5oyBpicpYJg2Xp2UIbBBLPPCQjuF6zIeVTsSJR4b3bTD1cnsR1
+WcT/6TC1F683pQ6id3PPQ2W6wtod4kRWPuQPuk/gsdExSFI3a8A1ZAGPOz7AZwSf
+mWL6LyCgIj3ooUfuPszUagOYUcgLQhD2/1AdhD+tS7aRt6qXIAhjtlZREYhCAvki
+IosBrhNUeJJ0OP3XWRkyICClCi6i1KznM8yOPQq66ygLTOwItqPzIkpnyCxwpNbQ
+3LjwJChHWPXkVXCXs82Ha4ss11z9lW8NJQmCMwIDAQABAoIBAC+3NDFEjPKUvtSj
+FsNPtdBeLSx2TYD6zZFDjaCRZWULoFddUIEKEchOy917kcPTD79IFzJ/dKUB+9QX
+X+4ju/49eS1cOcIqt7AQfVDoJn4UUJcNpFl0tNc4Wt+ljeFrFiNMOMGvUp0TSqW2
+oGg6fV2UazWth7vD+HG+SfkI2URirPMyeASNbjzyo3anzqQxYGomAybWQK4gtvfq
+/X+o1wEihvv1n59F5bUgEf++itrRSfaXjv1w8d80qdLXGIIgDqeNDqrOjsK/kU10
+hDbWAkswNW0lJE1hmKL8nt3H3Wp9ADz5+mvdqP+TLr/YJuoCMQ0y0h/o6D7/0nY6
+JfayH7ECgYEA2BpUjBZ1E0TI1AW3j8aG1kpwYfcB94Fzy7oKSX+uID0pQhhvh24G
+RPMurnXiRaUOZTA6w4zOd/+m7T0sLDli0UDYqVGQp3NdiyJXU2zJ/kqt3NJznOp1
+paKVIY9tZM9Itf20driRTp5IKjU2aPGSBuT17EriqqIUFEzzEqMmducCgYEA1Ep3
+ZqYWf5IpFJgPnU6XmLWuuruiTnK1/wfWAnXi+EpDZY9IKUm1JfkCGBMsF34FEvWF
+CxhdCwOfLWiDMRCz+0YOvVRH2PPSpZqZn7MGiT1K0FhWSLuDiBGaqO44a6fX+2ii
+fjQdd18GfLApILHe7nkrHsyUyglFuJOPE6PkTNUCgYBFjJPRUhjzzptjwUNGfno+
+1U49+SUk5wDBfGp7JSCBN63jm8GpMHvMDQflFgNwrqJnZpJDBTod3KV4jMt+oClx
+dxFDzQBlI/fjI3Y/Xy/TK22xN/oFcl/SovSkDGkEnMGl8LT30IbTapWHIAEW3UhF
+98I9/gvdJrSXo/xG70Md2QKBgHHfZ+DF4neXnTWQw39r8uFVQ16i21MdMQaV23QC
+bDcnDrPhgaG0CdlOkpL7ZcFMNciPrkffT7livfWLhCPDg+ebErj4BnoXf7yZCyKg
+0za2i/TqDY6CFvHcD4viDJ2isLYI0HDF37ByZnZnwAfroVtl331r27vr8Vwquqqc
+VQ/pAoGAOhQJcyb+mC1N/XzEacFCloVmQ0yNPp7SrT+OWyv+WlPXeF605ck9/48R
+cZXcdJ3/ZXUng1z1FGUw+UKffebp5qje8DJ0G6aNoED8LEwAEgGr1AnkXf6nz4hI
+fyBk+IE8Hyy8g2ap3IyCdyCs6D5OUsmwr+TwmQvukLeRLXOAzT0=
+-----END RSA PRIVATE KEY-----
diff --git a/asterixdb/asterix-app/src/test/resources/security/nc1/asterix_nc1.p12 b/asterixdb/asterix-app/src/test/resources/security/nc1/asterix_nc1.p12
new file mode 100644
index 0000000..315da67
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/security/nc1/asterix_nc1.p12
Binary files differ
diff --git a/asterixdb/asterix-app/src/test/resources/security/nc2/asterix_nc2.crt b/asterixdb/asterix-app/src/test/resources/security/nc2/asterix_nc2.crt
new file mode 100644
index 0000000..486d2a0
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/security/nc2/asterix_nc2.crt
@@ -0,0 +1,23 @@
+-----BEGIN CERTIFICATE-----
+MIIDyjCCAbICCQDz/BMGga4kNzANBgkqhkiG9w0BAQsFADApMQswCQYDVQQGEwJY
+WDEaMBgGA1UEAwwRQXN0ZXJpeERCIFRlc3QgQ0EwHhcNMTgxMTIxMTMyODQ4WhcN
+MjgxMTE4MTMyODQ4WjAlMQswCQYDVQQGEwJYWDEWMBQGA1UEAwwNYXN0ZXJpeGRi
+X25jMjCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAMVL+K/tP23E6Lk6
+uZH38yzKSEdAxDgnxAMVqTiqgEz8lz1YvFsY43rlBLu0+NSbNTRlUXhoxfFRVKN5
+e/iLVPFWL9VViLZP2FvTY0z6pkdwjKHtLA1uCMtUyIZwjB9yv7k0uVmvqm/zmFpm
+riL4TZA3/cNGZZw6MewkT0LeqI5vHUWfsNmDqYRyDucFOUVNunneZ6TZPMaspqOg
+XpwdN4UkbC2TUnWjMxC/XB+ZhkOAIYNYIvXzsL15tRV/3Rba5lJZHg7KaxM3rPo3
+8mmtA/y4rtlqouYf8X2nSbol3+WX3/U7xXtc2YuB5WUlXtUyixO8E8jVadpTUot4
+fU7CNOsCAwEAATANBgkqhkiG9w0BAQsFAAOCAgEAcIbcRE813t3Deku4K1867YBG
+sQ+sSshLy2VRWIykI7S9KRQwP2VuixvuUw/U7Ey39fDzj1howMhulFq8jXOQKp1z
+Fk2dmA5cDlAxEyD42k8QgvM8G9Kz6dmv4eV3cnUqv6f1VA3wV/598y3hhPuLHCXx
+vAaoGBCgdokDAARbirBU4UBjBO/Frjo63L6CoR3Avy3Cm+Hk556cTQlfWTMOKuOR
+wUINusnLc2x5q0tN7CSkZKzjyfotTSle2cfupnTLlVKC3/ln1Thga6gCkht39FtE
+XVWj1A2qNQlud5RTkmTLkedLhkRXvMSdD/V2pxo5gFaYTNRBHEGUhpXyOSnImLoh
+Uc7LFfalHC2cvhqNJNd4VPA7PNjNO6dvd/oNVclIFlrtiEMbTXnsLR8Tl7DU/L1/
+uxnYsdJomMTyFpCxMTaqDuYmsRKwcqaR0LQKvEmIWAzomjg+o5vpnhsThwpLBemA
+cboOFXV7mZiV4wLXzTUMnYHlb8syoNtliVbG+fkePVzZRkjD5wHW6OQbIJwE8/lB
+oY/XRsga9NT0O6KVfaLLJjeDRNLXBh+d7SjVPERIiut1/WnRVADSn/5g45YTWifM
+7UTw68DtCUA9HdXuNZ3d1B2zyPncNd2utJA6PIqVVDPGi0F0w4h1blSY/x5aOTjo
+VZ5xVqVAqZ2+K5Ljjvs=
+-----END CERTIFICATE-----
diff --git a/asterixdb/asterix-app/src/test/resources/security/nc2/asterix_nc2.jks b/asterixdb/asterix-app/src/test/resources/security/nc2/asterix_nc2.jks
new file mode 100644
index 0000000..90c5591
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/security/nc2/asterix_nc2.jks
Binary files differ
diff --git a/asterixdb/asterix-app/src/test/resources/security/nc2/asterix_nc2.key b/asterixdb/asterix-app/src/test/resources/security/nc2/asterix_nc2.key
new file mode 100644
index 0000000..f4116f5
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/security/nc2/asterix_nc2.key
@@ -0,0 +1,27 @@
+-----BEGIN RSA PRIVATE KEY-----
+MIIEpAIBAAKCAQEAxUv4r+0/bcTouTq5kffzLMpIR0DEOCfEAxWpOKqATPyXPVi8
+WxjjeuUEu7T41Js1NGVReGjF8VFUo3l7+ItU8VYv1VWItk/YW9NjTPqmR3CMoe0s
+DW4Iy1TIhnCMH3K/uTS5Wa+qb/OYWmauIvhNkDf9w0ZlnDox7CRPQt6ojm8dRZ+w
+2YOphHIO5wU5RU26ed5npNk8xqymo6BenB03hSRsLZNSdaMzEL9cH5mGQ4Ahg1gi
+9fOwvXm1FX/dFtrmUlkeDsprEzes+jfyaa0D/Liu2Wqi5h/xfadJuiXf5Zff9TvF
+e1zZi4HlZSVe1TKLE7wTyNVp2lNSi3h9TsI06wIDAQABAoIBAFwZX8i4JlDWh5DY
+EgJKzbCN6PmiCTbxkTWjafAy31uQ2gTgJGUeFCqtN+1ryHBu14JS/ZoIxsYkoi8B
+qdZXLFrQUdnzaLM6SJRs9EDeDLryliOMtHC8ecx5EnZ3mWGgzlDlhNSSBSzneKpS
+nl1ircpx6Lq8ZPhtzhoexQVBLUv3TqZnEiwvWNFkcw7GI2bQiBIAtmZqV8xVSmWw
+TSbGtVNNeknPqztvWunabPBWMb9k1lhtjmHb0NWrJ1361koS5aGcqi74ApYvCOB8
+voWyF6DE6GJIrx06+TROFLTsg+geBo8oIgKq8SqupQF2FhkLLrIPfblp5peaQpV0
+Lt8pT8kCgYEA6a0VxwSYcvQRzOZbMIgy9E+Zw5NuaLfMJFwSimruY2k8NWcZi55x
+VFJrWsU1h0S7yp2uqij92qgYWLjGR8rSUTmMP5f3Tnvc/RL4wreqSwjBuvsecoNU
+ZVymvP374qW9DSVMX5aYC4XDNQUQsHFP62YxFym6w+qvChn3ZuUuze0CgYEA2CUs
+JV4MTMZOGHzItaD7BM2Npc63/LqceLcTbrr4a2A9wRyIJOMJ88DrW5P8ZoKIJIuK
+TY08o84RpcomjnPp25vwoHpFR27y3aExlp3/bmERz/flADIcGg4UctYT385HCCgA
+dXWrDtw7sRGn4paVjIexdt6NiAqpupwBxE2s8zcCgYEAzS4exh6B4cXvb1QBVA7z
+dtQCNtlYg/iG9pIl8YZNBdscc+OwaYjZB+pKu2wYQUsX/aQQ/vZ5WCprHlQ4PkeX
+/pwiSqCcFTzrYQfsh8UPcU0iFpVzOaeZTltZSO9W4b8XzdgnRHON0+hC7GYjLlqA
+iziy+By7ElYaFiuQsbehLk0CgYEAk1PrHmFXovE8hCZyLJNY5nIxzcX01Rhh1Up8
+vWpGL/J+xWVsSzBSNUrXYimhzkHFCJvwnmHZ5pFsqAP9efX6fk1xnAEbvdAbTQQ1
+p8N0O0mA+a+v8Q026G+WcpwHGfMhqaaSgX2+JjfpnA070Q/xScOoLC0QbZ1PCbqS
+3bpQW8sCgYAVe43KNGUIqVHlkukfZg9IO93ONErDQJMPWs1y84ZYPM32xqFjJJBX
+1OxH/csV09oXMdWkgL4eTCVEa1iueK4ImFWS3Orsw1+qaMZx7i3z4k9JvTywozqM
+vXOuXQO3ojvLSsJ0NQFyu1nMTAqUD0VkbJYb2ldfNWXQitDCcdeUOw==
+-----END RSA PRIVATE KEY-----
diff --git a/asterixdb/asterix-app/src/test/resources/security/nc2/asterix_nc2.p12 b/asterixdb/asterix-app/src/test/resources/security/nc2/asterix_nc2.p12
new file mode 100644
index 0000000..c93b7c9
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/security/nc2/asterix_nc2.p12
Binary files differ
diff --git a/asterixdb/asterix-app/src/test/resources/security/root/root.truststore b/asterixdb/asterix-app/src/test/resources/security/root/root.truststore
new file mode 100644
index 0000000..f4eade3
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/security/root/root.truststore
Binary files differ
diff --git a/asterixdb/asterix-app/src/test/resources/security/root/rootCA.crt b/asterixdb/asterix-app/src/test/resources/security/root/rootCA.crt
new file mode 100644
index 0000000..5c61e8c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/security/root/rootCA.crt
@@ -0,0 +1,28 @@
+-----BEGIN CERTIFICATE-----
+MIIEzjCCArYCCQD5nGD8YSCVjzANBgkqhkiG9w0BAQsFADApMQswCQYDVQQGEwJY
+WDEaMBgGA1UEAwwRQXN0ZXJpeERCIFRlc3QgQ0EwHhcNMTgxMTIxMTMyMjQ5WhcN
+MjEwOTEwMTMyMjQ5WjApMQswCQYDVQQGEwJYWDEaMBgGA1UEAwwRQXN0ZXJpeERC
+IFRlc3QgQ0EwggIiMA0GCSqGSIb3DQEBAQUAA4ICDwAwggIKAoICAQDz+hjWCXiK
+B8UAU8PCa1nkk6dpuDpy47tFguurFvjUQl3MlJfVYc1m0pDuY7lpAonFs3jpoiKR
+d6hZVpceNvZ/DTygtkIyMEEsn6oE+FRtUCfkJDSxp4dR8/O81AVCcMDCPHB0wH0N
+DeUE1vaJOP/mabPSWqQv85g5B/gAYyrb1imaZ1WcQBNcXfsaLmq7w+KXTsikH0GB
+KqIRXpwRn4OG8SlZkpuxsnh/sAjiZejXFS4lOV5w2Hx0bXDOErebxTBHkdtoTkHG
+2lCHn43w+bKXqGfxg+fGuZ+qnypdsVKjuf0OmS6Yb67X1jKp+KQe9qlFj+gs4PXk
+HwJqhJdYYaUbSJk+nxJ0bHpVrLJnZ5jvYYew1kIw5ngedWzfmnjii21jKM1RnDSC
+CI1ZqBJ3+FR2pXnC6/9vROOxyPzP0DOYNxDgTqCse6io13+SWoiuyqyNSXjV9lAf
+9dn3CEooWa7vUZiQ0FPHfR55yMAgHbl5EYDsWjqemTeYFH1p1Nh+hneOpEhsIJ55
+SW7308cAowe8jOYkS17+12g8leTRijacs2XS2mNrYTjtQV9Js1WztLeB3c2cCqR8
+Xqtnd8Twv9zpYEZW4BHj1e4QegREVMSuauNtw/IZILdAawpX4+ZSB0hG1euX3VGv
+3LLFGMCDqR+R4hH9l3XOBwus2IGtTAe/JwIDAQABMA0GCSqGSIb3DQEBCwUAA4IC
+AQBASS6G4DdGi3WpHk3pgn0UpOAG/qc7IxG+jHfmdGIRw8TwucicSy3bWnH5EhOJ
+j8rAdYy2318lRxOdGNfC5JFCOu9dQBm5mp6B45Z/AJdf5kKDiMBffOc00jL2UvkC
+X5XPX0yNchXs1xcn3HF5BOrrSBZQ+fMIh2MZ5Z5EJ/CVna8DbaMoKTtVcA5CuGlO
+6ME/b9qH/zHkdbBK8/uQSNRz6XD63F15BPoDB76QzmGMxDMrZedo0v943LjwCquL
+k9xn0bou0XKLYSkMqHD+xpXDhX+4sKGXlepV+ojFXijhrskaCgzU4AU3gQxm9Mur
+RLXn4JXdRBmA7u8lr63kshNqn0xEu7CSIpUOA82jhOBO3sA7KHoL2Qd7hUxdme+P
+Dwsx6qWYw272gSZwHcooo/6lDrJ14Q8KnrZ+9mAmARU8xqXLvAAWDdaOmJRjWKH8
+ObQXtHxfKo0ohOaWelOb+J59uDj0H6wZM5u/HEUz0f2p6qIj0ktdHNoOvVD/rnAy
+W8L8nAJs3VbZ78CCRWVKpDLh4on13vYFtjZ0kIMgZi7f0lwgc0HS1UAOfQB2G6fW
+YEpGobMI8t7hRKbiyrIGlmI2ZMF4lHAubWC2FZPwdDTBg7mUiJUyMl1tNLZPJKxG
+W1WD45yqMRNceS5A0s4MsNgZyqVm5tSpxUQtf/WLthAc5g==
+-----END CERTIFICATE-----
diff --git a/asterixdb/asterix-app/src/test/resources/security/root/rootCA.key b/asterixdb/asterix-app/src/test/resources/security/root/rootCA.key
new file mode 100644
index 0000000..b0793d9
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/security/root/rootCA.key
@@ -0,0 +1,51 @@
+-----BEGIN RSA PRIVATE KEY-----
+MIIJKQIBAAKCAgEA8/oY1gl4igfFAFPDwmtZ5JOnabg6cuO7RYLrqxb41EJdzJSX
+1WHNZtKQ7mO5aQKJxbN46aIikXeoWVaXHjb2fw08oLZCMjBBLJ+qBPhUbVAn5CQ0
+saeHUfPzvNQFQnDAwjxwdMB9DQ3lBNb2iTj/5mmz0lqkL/OYOQf4AGMq29YpmmdV
+nEATXF37Gi5qu8Pil07IpB9BgSqiEV6cEZ+DhvEpWZKbsbJ4f7AI4mXo1xUuJTle
+cNh8dG1wzhK3m8UwR5HbaE5BxtpQh5+N8Pmyl6hn8YPnxrmfqp8qXbFSo7n9Dpku
+mG+u19YyqfikHvapRY/oLOD15B8CaoSXWGGlG0iZPp8SdGx6VayyZ2eY72GHsNZC
+MOZ4HnVs35p44ottYyjNUZw0ggiNWagSd/hUdqV5wuv/b0Tjscj8z9AzmDcQ4E6g
+rHuoqNd/klqIrsqsjUl41fZQH/XZ9whKKFmu71GYkNBTx30eecjAIB25eRGA7Fo6
+npk3mBR9adTYfoZ3jqRIbCCeeUlu99PHAKMHvIzmJEte/tdoPJXk0Yo2nLNl0tpj
+a2E47UFfSbNVs7S3gd3NnAqkfF6rZ3fE8L/c6WBGVuAR49XuEHoERFTErmrjbcPy
+GSC3QGsKV+PmUgdIRtXrl91Rr9yyxRjAg6kfkeIR/Zd1zgcLrNiBrUwHvycCAwEA
+AQKCAgA34r31qU23MHhrzsQ0sKpytW/Pw6d/0bKABbE+C5EL7ffWeOMeubx3JRyV
+vrol17L/WlEEE2Oftq0VacRL44MrLzkIBze6j1segaRMZyXPtGCu3axEVX+lwh++
+zUI3KeS9mWKZG86JQvmG1ka6wBe+C9BpCuI5Ka9gwPaKAC09Fh2JhMoTdIn6ynxO
+Tvub7w7kYeOL1K0IjV96YxWU49kY+/zgy7hmR99aQFSIp5diTTO4yx9a8fvwH3GL
+grYUNGMOwdZkWiivCCsSJfY5kLjcDHNi2zgoSTTNvI/Tzl6DqoLQ6T3HJS4Dusj4
+LkBJ5252xLQGM7Gfg4rmI2EXmaZb2xLg4gHbpIedtCU0rg2QzTUnKVKI/r6KGmJ4
+C3S69vcsrPdRUO0y2RmO55WzLYlDs1/vIwAXpZ9MG8mQqnVhep2z0GXkQMvzvgrC
+tbeLWBoT7LG4sxrGNZtPX5K/DCsT73AhHqO862l8mRuFSYdaRDEb49lIWr87XyPQ
+ub1BfzvPU+71rp4Ihjjf3HO1lGrCZ8OuFyosKyaM9IYODflvA1vvGiB2BsaCVouu
+ufw/BVmhJJHfSmRDRElncG6rIx6asru/PEwymKI9otzrk3Utup+BSvaTyWQc6Jml
+gDX+2iLIFXakMSj/SwL1qQGG6TYBqGYWSnS+J9e0HjID5Aa2YQKCAQEA+iPti4q4
+dUiBkKScckz5YY/u++UpkE5YPM+mckJ1PU2IfQSp8wAjhXrrTeyr6uexr+4NjfoK
+E4MmVQ4yWJF2iGI00OlMOLevZ8JUHeLyKy7HNUzRt3sByMWtMEI10U/Di3x/wMSF
+6vX/NutU+vw7MdvEwZaNBPbLS8nk8KEAzXTWaD0aPMZo328eLIFbH9xkNtmTM9FE
+/3gO0dPfZR0TA40GGVk5VcGEr4vdt4InZnZDgjcrFxmWySlMhKl0IQIoW2eKJNdv
++2AIqaePerd2ul3Oc6rpnekEHTQ1a8BrAXL+hBFow2kB0GLh+vVSCK1LFi5PXQ3n
+hvml+hUrDzmtLQKCAQEA+bE1KZWNm98Ml+CfUKmiu9vaKZjPp1fwfzQ0E5lt/hM9
+tmzNckA974W1nUIO2Fkr4Vx2fm9aC4mUMxogbyPfPmtNmG0mcKPROb2junXVdU/d
+1mlzsJ6GmjFUgQZD/xfATAJF4MBgWV1XzjJoajbvgXRzKK9R+Iguj0fQgx3+KGDx
+8kSXKqcrsc2K2IdM1aHIBkYJtfAH7H3r9P27++iLcmfd6IrFziJ3JicYqxSsaAcn
+D1fkonIs/iJXFNYfWBa/56JDFB5MwQd6QUccHlGup6Ii67xTu2xhVMAicAoWfwSm
+7P0DG5gLoot+OzT+Av9KTW2yLNkfPiVU3wBbY9iaIwKCAQEAp3YUGv6E3Sfsbcx2
+XGNB9Vnp8cOKnuyEUDnoQchSOvdEMAZGTMPEBCG1lFalBb+ViqWBd5J02nlL2VeG
+xxqjU00D9PSrLbFzgbBsphGAdP59KSbDo+V63VHRz2QUKYwP6rsvv/sReKq18Kt+
+GgVxD8EEqWHECRW8JoIEfkAbjHDy9Zgqj5N5NFRsy/jR67Odd8cTsHYijjFvInSI
+s2XSi+cGtUOxicLjtK6bgbZl8EeujGeotm9QLl4ytwHHGC4cnIzlxCJi+tRhGEtw
+WASpPL1+cJt0iV4tfeus8/U+7hpxYDNTgrczGrKIfX1tLRfvE+tvAGpFnWxW/OTd
+DfVcBQKCAQARXKYSryh6lybcBK1vQWxkXaQs0khWHYJGhg288yU3zoSzA5vBfwAP
+gJ+hQK+hkYnAkNpYbku3k6hvaTiqIbdrMrDhkW1j1bA9sJcK5xihoJ0PRKPbuxom
+7Jkwo4vk1/TyFBJhz16juB7b4ptqD0nWGY/MW2x6uJVsNxUjmNlMW3lXXDeaqqca
+JO2rorTQAgDTltkkAn26jwLyNRZ2LLRNjzVZ7xRUAgeA6qUHeFeIoD6yDW2Dcib4
+wNCHTWA+ks5jP+AkLPsvTOLOWTB2Vz2qwFZnR+AWnzGhX/7FBZ6M1Hj675jguVDN
+y59KZKPo93FmMuN5xNssShI9s1undK87AoIBAQDq/9Br28B3vZtGiunPpOG5xoTo
+z4uS1yWM3gvk9IJt+4JuOeMVw/IDaVe7on6xl/ZMODvzqnm/wkJfsftqQSXWecEA
+KbwiLAc0fzOKAKerB7jSWI+D4ZcOoPoUu6jeWNNthtuR965iOnSoejuOdCmKjxdb
+mIybOlE8r04YC+hhjl575KoRLL6eWyOfXsU9SU17eZs1CgGSMorVm5v7PIH7P4XP
+nFGutzWLDQAkb9smBViAJfJOf3PgjSwCGKiSWx9uoFsewGMqN4UAOR0j+HsqgTwP
+GuDAYNnajYQhEugq1sPWCtKXN3a9GKaSiRLLF+cTmlls4oUSRCnpnMRXRy6O
+-----END RSA PRIVATE KEY-----
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/network/INetworkSecurityConfig.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/network/INetworkSecurityConfig.java
new file mode 100644
index 0000000..772ee9f
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/network/INetworkSecurityConfig.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.network;
+
+import java.io.File;
+import java.security.KeyStore;
+
+public interface INetworkSecurityConfig {
+
+ /**
+ * Indicates if SSL is enabled
+ *
+ * @return true if ssl is enabled. Otherwise false.
+ */
+ boolean isSslEnabled();
+
+ /**
+ * Gets the key store to be used for secured connections
+ *
+ * @return the key store to be used
+ */
+ KeyStore getKeyStore();
+
+ /**
+ * Gets a key store file to be used if {@link INetworkSecurityConfig#getKeyStore()} returns null.
+ *
+ * @return the key store file
+ */
+ File getKeyStoreFile();
+
+ /**
+ * Gets the password for the key store file.
+ *
+ * @return the password to the key store file
+ */
+ String getKeyStorePassword();
+
+ /**
+ * Gets a trust store file to be used for validating certificates of secured connections.
+ *
+ * @return the trust store file
+ */
+ File getTrustStoreFile();
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/network/INetworkSecurityManager.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/network/INetworkSecurityManager.java
new file mode 100644
index 0000000..9dc6960
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/network/INetworkSecurityManager.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.network;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+
+public interface INetworkSecurityManager {
+
+ /**
+ * Creates a new ssl context based on the current configuration of this {@link INetworkSecurityManager}
+ *
+ * @return a new ssl context
+ */
+ SSLContext newSSLContext();
+
+ /**
+ * Creates a new ssl engine based on the current configuration of this {@link INetworkSecurityManager}
+ *
+ * @return a new ssl engine
+ */
+ SSLEngine newSSLEngine();
+
+ /**
+ * Sets the configuration to be used for this {@link INetworkSecurityManager}
+ *
+ * @param config
+ */
+ void setConfiguration(INetworkSecurityConfig config);
+
+ /**
+ * Gets the socket channel factory
+ *
+ * @return the socket channel factory
+ */
+ ISocketChannelFactory getSocketChannelFactory();
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/network/ISocketChannel.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/network/ISocketChannel.java
new file mode 100644
index 0000000..70ef1d2
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/network/ISocketChannel.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.network;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+
+public interface ISocketChannel extends Closeable {
+
+ /**
+ * Indicates whether this {@link ISocketChannel} requires a client/server handshake before
+ * exchanging application data
+ *
+ * @return true if the socket requires handshake, otherwise false.
+ */
+ boolean requiresHandshake();
+
+ /**
+ * Performs the handshake operations.
+ *
+ * @return true, if the handshake is successful. Otherwise false.
+ */
+ boolean handshake();
+
+ /**
+ * Indicates if this {@link ISocketChannel} has data that is ready for reading.
+ *
+ * @return true, if the socket has data ready for reading. Otherwise false.
+ */
+ boolean isPendingRead();
+
+ /**
+ * Attempts to read data into {@code dst} buffer. The position of the byte buffer
+ * is incremented by the number of read bytes.
+ *
+ * @param dst
+ * @return The number of bytes transferred into the buffer.
+ * @throws IOException
+ */
+ int read(ByteBuffer dst) throws IOException;
+
+ /**
+ * Attempts to write data from the {@code src} buffer. The position of the byte buffer
+ * is incremented by the number of written bytes. A write operation may not fully write
+ * the number of consumed bytes from the {@code src} buffer. The caller may check if any data
+ * is still pending writing using {@link ISocketChannel#isPendingWrite()}. An attempt can be
+ * made to complete the write operation using {@link ISocketChannel#completeWrite()}
+ *
+ * @param src
+ * @return The number of bytes consumed from the buffer.
+ * @throws IOException
+ */
+ int write(ByteBuffer src) throws IOException;
+
+ /**
+ * Indicates if this {@link ISocketChannel} has data pending write completion.
+ *
+ * @return true, if the socket has data pending write. Otherwise false.
+ */
+ boolean isPendingWrite();
+
+ /**
+ * Attempts to write any data pending write.
+ *
+ * @return true, if all data pending write has been written. Otherwise false.
+ * @throws IOException
+ */
+ boolean completeWrite() throws IOException;
+
+ /**
+ * Gets the network socket channel behind this {@link ISocketChannel}
+ *
+ * @return the socket channel
+ */
+ SocketChannel getSocketChannel();
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/network/ISocketChannelFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/network/ISocketChannelFactory.java
new file mode 100644
index 0000000..e65641c
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/network/ISocketChannelFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.network;
+
+import java.nio.channels.SocketChannel;
+
+public interface ISocketChannelFactory {
+
+ /**
+ * Creates a socket channel to be used for server
+ *
+ * @param socketChannel
+ * @return a server socket channel
+ */
+ ISocketChannel createServerChannel(SocketChannel socketChannel);
+
+ /**
+ * Creates a socket channel to be used for a client
+ *
+ * @param socketChannel
+ * @return a client socket channel
+ */
+ ISocketChannel createClientChannel(SocketChannel socketChannel);
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/service/IControllerService.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/service/IControllerService.java
index d39a3b1..018f9fe 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/service/IControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/service/IControllerService.java
@@ -22,6 +22,7 @@
import java.util.concurrent.ExecutorService;
import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.network.INetworkSecurityManager;
public interface IControllerService {
void start() throws Exception;
@@ -35,4 +36,11 @@
Timer getTimer();
Object getApplicationContext();
+
+ /**
+ * Gets the network security manager
+ *
+ * @return the network security manager
+ */
+ INetworkSecurityManager getNetworkSecurityManager();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectory.java b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectory.java
index 6f8c4d0..d7f79fd 100644
--- a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectory.java
+++ b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectory.java
@@ -18,15 +18,18 @@
*/
package org.apache.hyracks.client.result;
+import java.io.IOException;
import java.net.InetSocketAddress;
-import org.apache.hyracks.api.result.ResultJobRecord.Status;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.network.ISocketChannelFactory;
import org.apache.hyracks.api.result.IResultDirectory;
import org.apache.hyracks.api.result.ResultDirectoryRecord;
+import org.apache.hyracks.api.result.ResultJobRecord.Status;
import org.apache.hyracks.api.result.ResultSetId;
-import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.ipc.api.IIPCHandle;
import org.apache.hyracks.ipc.api.RPCInterface;
+import org.apache.hyracks.ipc.exceptions.IPCException;
import org.apache.hyracks.ipc.impl.IPCSystem;
import org.apache.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
@@ -35,9 +38,11 @@
private final IPCSystem ipc;
private final IResultDirectory remoteResultDirectory;
- public ResultDirectory(String resultHost, int resultPort) throws Exception {
+ public ResultDirectory(String resultHost, int resultPort, ISocketChannelFactory socketChannelFactory)
+ throws IOException, IPCException {
RPCInterface rpci = new RPCInterface();
- ipc = new IPCSystem(new InetSocketAddress(0), rpci, new JavaSerializationBasedPayloadSerializerDeserializer());
+ ipc = new IPCSystem(new InetSocketAddress(0), socketChannelFactory, rpci,
+ new JavaSerializationBasedPayloadSerializerDeserializer());
ipc.start();
IIPCHandle ddsIpchandle = ipc.getReconnectingHandle(new InetSocketAddress(resultHost, resultPort));
this.remoteResultDirectory = new ResultDirectoryRemoteProxy(ddsIpchandle, rpci);
diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSet.java b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSet.java
index ef93cce..a72573c 100644
--- a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSet.java
+++ b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSet.java
@@ -21,13 +21,14 @@
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.comm.NetworkAddress;
import org.apache.hyracks.api.context.IHyracksCommonContext;
-import org.apache.hyracks.api.result.IResultSet;
-import org.apache.hyracks.api.result.IResultDirectory;
-import org.apache.hyracks.api.result.IResultSetReader;
-import org.apache.hyracks.api.result.ResultSetId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.network.ISocketChannelFactory;
+import org.apache.hyracks.api.result.IResultDirectory;
+import org.apache.hyracks.api.result.IResultSet;
+import org.apache.hyracks.api.result.IResultSetReader;
+import org.apache.hyracks.api.result.ResultSetId;
import org.apache.hyracks.client.net.ClientNetworkManager;
import org.apache.hyracks.control.nc.resources.memory.FrameManager;
@@ -38,9 +39,10 @@
private final IHyracksCommonContext resultClientCtx;
- public ResultSet(IHyracksClientConnection hcc, int frameSize, int nReaders) throws Exception {
+ public ResultSet(IHyracksClientConnection hcc, ISocketChannelFactory socketChannelFactory, int frameSize,
+ int nReaders) throws Exception {
NetworkAddress ddsAddress = hcc.getResultDirectoryAddress();
- resultDirectory = new ResultDirectory(ddsAddress.getAddress(), ddsAddress.getPort());
+ resultDirectory = new ResultDirectory(ddsAddress.getAddress(), ddsAddress.getPort(), socketChannelFactory);
netManager = new ClientNetworkManager(nReaders);
netManager.start();
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index e751589..c2e7b22 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -49,6 +49,8 @@
import org.apache.hyracks.api.job.JobIdFactory;
import org.apache.hyracks.api.job.JobParameterByteStore;
import org.apache.hyracks.api.job.resource.IJobCapacityController;
+import org.apache.hyracks.api.network.INetworkSecurityConfig;
+import org.apache.hyracks.api.network.INetworkSecurityManager;
import org.apache.hyracks.api.service.IControllerService;
import org.apache.hyracks.api.topology.ClusterTopology;
import org.apache.hyracks.api.topology.TopologyDefinitionParser;
@@ -80,6 +82,8 @@
import org.apache.hyracks.ipc.api.IIPCI;
import org.apache.hyracks.ipc.impl.IPCSystem;
import org.apache.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
+import org.apache.hyracks.ipc.security.NetworkSecurityConfig;
+import org.apache.hyracks.ipc.security.NetworkSecurityManager;
import org.apache.hyracks.util.ExitUtil;
import org.apache.hyracks.util.MaintainedThreadNameExecutorService;
import org.apache.logging.log4j.Level;
@@ -146,6 +150,8 @@
private final CcId ccId;
+ private final INetworkSecurityManager networkSecurityManager;
+
static {
ExitUtil.init();
}
@@ -164,6 +170,9 @@
File jobLogFolder = new File(ccConfig.getRootDir(), "logs/jobs");
jobLog = new LogFile(jobLogFolder);
+ final INetworkSecurityConfig securityConfig = getNetworkSecurityConfig();
+ networkSecurityManager = new NetworkSecurityManager(securityConfig);
+
// WorkQueue is in charge of heartbeat as well as other events.
workQueue = new WorkQueue("ClusterController", Thread.MAX_PRIORITY);
this.timer = new Timer(true);
@@ -204,12 +213,13 @@
LOGGER.log(Level.INFO, "Starting ClusterControllerService: " + this);
serverCtx = new ServerContext(ServerContext.ServerType.CLUSTER_CONTROLLER, new File(ccConfig.getRootDir()));
IIPCI ccIPCI = new ClusterControllerIPCI(this);
- clusterIPC = new IPCSystem(new InetSocketAddress(ccConfig.getClusterListenPort()), ccIPCI,
- new CCNCFunctions.SerializerDeserializer());
+ clusterIPC = new IPCSystem(new InetSocketAddress(ccConfig.getClusterListenPort()),
+ networkSecurityManager.getSocketChannelFactory(), ccIPCI, new CCNCFunctions.SerializerDeserializer());
IIPCI ciIPCI = new ClientInterfaceIPCI(this, jobIdFactory);
clientIPC =
new IPCSystem(new InetSocketAddress(ccConfig.getClientListenAddress(), ccConfig.getClientListenPort()),
- ciIPCI, new JavaSerializationBasedPayloadSerializerDeserializer());
+ networkSecurityManager.getSocketChannelFactory(), ciIPCI,
+ new JavaSerializationBasedPayloadSerializerDeserializer());
webServer = new WebServer(this, ccConfig.getConsoleListenPort());
clusterIPC.start();
clientIPC.start();
@@ -538,4 +548,14 @@
public Object getApplicationContext() {
return application.getApplicationContext();
}
+
+ @Override
+ public INetworkSecurityManager getNetworkSecurityManager() {
+ return networkSecurityManager;
+ }
+
+ protected INetworkSecurityConfig getNetworkSecurityConfig() {
+ return NetworkSecurityConfig.of(ccConfig.isSslEnabled(), ccConfig.getKeyStorePath(),
+ ccConfig.getKeyStorePassword(), ccConfig.getTrustStorePath());
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
index 5417513..a8edbd1 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
@@ -74,7 +74,10 @@
JOB_MANAGER_CLASS(STRING, "org.apache.hyracks.control.cc.job.JobManager"),
ENFORCE_FRAME_WRITER_PROTOCOL(BOOLEAN, false),
CORES_MULTIPLIER(POSITIVE_INTEGER, 3),
- CONTROLLER_ID(SHORT, (short) 0x0000);
+ CONTROLLER_ID(SHORT, (short) 0x0000),
+ KEY_STORE_PATH(STRING),
+ TRUST_STORE_PATH(STRING),
+ KEY_STORE_PASSWORD(STRING);
private final IOptionType parser;
private Object defaultValue;
@@ -188,6 +191,12 @@
+ "execution level";
case CONTROLLER_ID:
return "The 16-bit (0-65535) id of this Cluster Controller";
+ case KEY_STORE_PATH:
+ return "A fully-qualified path to a key store file that will be used for secured connections";
+ case TRUST_STORE_PATH:
+ return "A fully-qualified path to a trust store file that will be used for secured connections";
+ case KEY_STORE_PASSWORD:
+ return "The password to the provided key store";
default:
throw new IllegalStateException("NYI: " + this);
}
@@ -212,6 +221,7 @@
public CCConfig(ConfigManager configManager) {
super(configManager);
configManager.register(Option.class);
+ configManager.register(ControllerConfig.Option.class);
configManager.registerArgsListener(appArgs::addAll);
}
@@ -434,4 +444,24 @@
public CcId getCcId() {
return CcId.valueOf(getAppConfig().getShort(Option.CONTROLLER_ID));
}
+
+ public String getKeyStorePath() {
+ return getAppConfig().getString(Option.KEY_STORE_PATH);
+ }
+
+ public String getKeyStorePassword() {
+ return getAppConfig().getString(Option.KEY_STORE_PASSWORD);
+ }
+
+ public void setKeyStorePath(String keyStorePath) {
+ configManager.set(Option.KEY_STORE_PATH, keyStorePath);
+ }
+
+ public String getTrustStorePath() {
+ return getAppConfig().getString(Option.TRUST_STORE_PATH);
+ }
+
+ public void setTrustStorePath(String trustStorePath) {
+ configManager.set(Option.TRUST_STORE_PATH, trustStorePath);
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/ControllerConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/ControllerConfig.java
index a800ac4..dc229e0 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/ControllerConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/ControllerConfig.java
@@ -18,6 +18,8 @@
*/
package org.apache.hyracks.control.common.controllers;
+import static org.apache.hyracks.control.common.config.OptionTypes.BOOLEAN;
+
import java.io.Serializable;
import java.net.URL;
import java.util.function.Function;
@@ -46,7 +48,8 @@
OptionTypes.STRING,
(Function<IApplicationConfig, String>) appConfig -> FileUtil
.joinPath(appConfig.getString(ControllerConfig.Option.DEFAULT_DIR), "logs"),
- "The directory where logs for this node are written");
+ "The directory where logs for this node are written"),
+ SSL_ENABLED(BOOLEAN, false, "A flag indicating if cluster communications should use secured connections");
private final IOptionType type;
private final String description;
@@ -122,4 +125,8 @@
public String getLogDir() {
return configManager.getAppConfig().getString(ControllerConfig.Option.LOG_DIR);
}
+
+ public boolean isSslEnabled() {
+ return getAppConfig().getBoolean(Option.SSL_ENABLED);
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
index 1d94dda..d41350f 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
@@ -18,6 +18,7 @@
*/
package org.apache.hyracks.control.common.controllers;
+import static org.apache.hyracks.control.common.config.OptionTypes.BOOLEAN;
import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER;
import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER_BYTE_UNIT;
import static org.apache.hyracks.control.common.config.OptionTypes.LONG;
@@ -85,7 +86,10 @@
NCSERVICE_PID(INTEGER, -1),
COMMAND(STRING, "hyracksnc"),
JVM_ARGS(STRING, (String) null),
- TRACE_CATEGORIES(STRING_ARRAY, new String[0]);
+ TRACE_CATEGORIES(STRING_ARRAY, new String[0]),
+ KEY_STORE_PATH(STRING, (String) null),
+ TRUST_STORE_PATH(STRING, (String) null),
+ KEY_STORE_PASSWORD(STRING, (String) null);
private final IOptionType parser;
private final String defaultValueDescription;
@@ -208,6 +212,12 @@
return "JVM args to pass to the NCDriver";
case TRACE_CATEGORIES:
return "Categories for tracing";
+ case KEY_STORE_PATH:
+ return "A fully-qualified path to a key store file that will be used for secured connections";
+ case TRUST_STORE_PATH:
+ return "A fully-qualified path to a trust store file that will be used for secured connections";
+ case KEY_STORE_PASSWORD:
+ return "The password to the provided key store";
default:
throw new IllegalStateException("NYI: " + this);
}
@@ -253,6 +263,7 @@
super(configManager);
this.appConfig = nodeId == null ? configManager.getAppConfig() : configManager.getNodeEffectiveConfig(nodeId);
configManager.register(Option.class);
+ configManager.register(ControllerConfig.Option.class);
setNodeId(nodeId);
this.nodeId = nodeId;
configManager.registerArgsListener(appArgs::addAll);
@@ -537,4 +548,24 @@
public void setVirtualNC() {
configManager.set(nodeId, Option.NCSERVICE_PORT, NCSERVICE_PORT_DISABLED);
}
+
+ public String getKeyStorePath() {
+ return appConfig.getString(Option.KEY_STORE_PATH);
+ }
+
+ public String getKeyStorePassword() {
+ return appConfig.getString(Option.KEY_STORE_PASSWORD);
+ }
+
+ public void setKeyStorePath(String keyStorePath) {
+ configManager.set(Option.KEY_STORE_PATH, keyStorePath);
+ }
+
+ public String getTrustStorePath() {
+ return appConfig.getString(Option.TRUST_STORE_PATH);
+ }
+
+ public void setTrustStorePath(String keyStorePath) {
+ configManager.set(CCConfig.Option.TRUST_STORE_PATH, keyStorePath);
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index d7da5a4..a92fcb6 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -56,6 +56,8 @@
import org.apache.hyracks.api.job.JobParameterByteStore;
import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
+import org.apache.hyracks.api.network.INetworkSecurityConfig;
+import org.apache.hyracks.api.network.INetworkSecurityManager;
import org.apache.hyracks.api.result.IResultPartitionManager;
import org.apache.hyracks.api.service.IControllerService;
import org.apache.hyracks.api.util.CleanupUtils;
@@ -88,6 +90,8 @@
import org.apache.hyracks.ipc.api.IIPCHandle;
import org.apache.hyracks.ipc.exceptions.IPCException;
import org.apache.hyracks.ipc.impl.IPCSystem;
+import org.apache.hyracks.ipc.security.NetworkSecurityConfig;
+import org.apache.hyracks.ipc.security.NetworkSecurityManager;
import org.apache.hyracks.net.protocols.muxdemux.FullFrameChannelInterfaceFactory;
import org.apache.hyracks.util.ExitUtil;
import org.apache.hyracks.util.MaintainedThreadNameExecutorService;
@@ -159,6 +163,8 @@
private final MemoryManager memoryManager;
+ private final INetworkSecurityManager networkSecurityManager;
+
private StackTraceElement[] shutdownCallStack;
private MessagingNetworkManager messagingNetManager;
@@ -193,6 +199,8 @@
if (application == null) {
throw new IllegalArgumentException("INCApplication cannot be null");
}
+ final INetworkSecurityConfig securityConfig = getNetworkSecurityConfig();
+ networkSecurityManager = new NetworkSecurityManager(securityConfig);
this.application = application;
id = ncConfig.getNodeId();
if (id == null) {
@@ -278,7 +286,8 @@
public void start() throws Exception {
LOGGER.log(Level.INFO, "Starting NodeControllerService");
ipc = new IPCSystem(new InetSocketAddress(ncConfig.getClusterListenAddress(), ncConfig.getClusterListenPort()),
- new NodeControllerIPCI(this), new CCNCFunctions.SerializerDeserializer());
+ networkSecurityManager.getSocketChannelFactory(), new NodeControllerIPCI(this),
+ new CCNCFunctions.SerializerDeserializer());
ipc.start();
partitionManager = new PartitionManager(this);
netManager = new NetworkManager(ncConfig.getDataListenAddress(), ncConfig.getDataListenPort(), partitionManager,
@@ -718,4 +727,14 @@
public INCApplication getApplication() {
return application;
}
+
+ @Override
+ public INetworkSecurityManager getNetworkSecurityManager() {
+ return networkSecurityManager;
+ }
+
+ protected INetworkSecurityConfig getNetworkSecurityConfig() {
+ return NetworkSecurityConfig.of(ncConfig.isSslEnabled(), ncConfig.getKeyStorePath(),
+ ncConfig.getKeyStorePassword(), ncConfig.getTrustStorePath());
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java
index a4a00ce..fd985db 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java
@@ -53,6 +53,7 @@
import org.apache.hyracks.dataflow.common.comm.io.ResultFrameTupleAccessor;
import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
import org.apache.hyracks.ipc.impl.HyracksConnection;
+import org.apache.hyracks.ipc.sockets.PlainSocketChannelFactory;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.AfterClass;
@@ -154,7 +155,7 @@
IFrameTupleAccessor frameTupleAccessor = new ResultFrameTupleAccessor();
- IResultSet resultSet = new ResultSet(hcc, spec.getFrameSize(), nReaders);
+ IResultSet resultSet = new ResultSet(hcc, PlainSocketChannelFactory.INSTANCE, spec.getFrameSize(), nReaders);
IResultSetReader reader = resultSet.createReader(jobId, resultSetId);
List<String> resultRecords = new ArrayList<>();
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
index 4bee7ee..be22b9c 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
@@ -48,6 +48,7 @@
import org.apache.hyracks.dataflow.common.comm.io.ResultFrameTupleAccessor;
import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
import org.apache.hyracks.ipc.impl.HyracksConnection;
+import org.apache.hyracks.ipc.sockets.PlainSocketChannelFactory;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.AfterClass;
@@ -160,7 +161,8 @@
IFrameTupleAccessor frameTupleAccessor = new ResultFrameTupleAccessor();
if (!spec.getResultSetIds().isEmpty()) {
- IResultSet resultSet = new ResultSet(hcc, spec.getFrameSize(), nReaders);
+ IResultSet resultSet =
+ new ResultSet(hcc, PlainSocketChannelFactory.INSTANCE, spec.getFrameSize(), nReaders);
IResultSetReader reader = resultSet.createReader(jobId, spec.getResultSetIds().get(0));
ObjectMapper om = new ObjectMapper();
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/src/test/java/org/apache/hyracks/hdfs/utils/HyracksUtils.java b/hyracks-fullstack/hyracks/hyracks-hdfs/src/test/java/org/apache/hyracks/hdfs/utils/HyracksUtils.java
index 17cd793..e06e09a 100644
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/src/test/java/org/apache/hyracks/hdfs/utils/HyracksUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/src/test/java/org/apache/hyracks/hdfs/utils/HyracksUtils.java
@@ -21,7 +21,6 @@
import java.util.EnumSet;
-import org.apache.hyracks.ipc.impl.HyracksConnection;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.api.job.JobId;
@@ -30,6 +29,7 @@
import org.apache.hyracks.control.common.controllers.CCConfig;
import org.apache.hyracks.control.common.controllers.NCConfig;
import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.ipc.impl.HyracksConnection;
public class HyracksUtils {
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/src/test/java/org/apache/hyracks/hdfs2/dataflow/DataflowTest.java b/hyracks-fullstack/hyracks/hyracks-hdfs/src/test/java/org/apache/hyracks/hdfs2/dataflow/DataflowTest.java
index 04fdc85..e547ac0 100644
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/src/test/java/org/apache/hyracks/hdfs2/dataflow/DataflowTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/src/test/java/org/apache/hyracks/hdfs2/dataflow/DataflowTest.java
@@ -31,7 +31,6 @@
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hyracks.ipc.impl.HyracksConnection;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -53,6 +52,7 @@
import org.apache.hyracks.hdfs.lib.TextTupleWriterFactory;
import org.apache.hyracks.hdfs.utils.HyracksUtils;
import org.apache.hyracks.hdfs2.scheduler.Scheduler;
+import org.apache.hyracks.ipc.impl.HyracksConnection;
import org.junit.Assert;
/**
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java
index e6c28fa..2c7e82e 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java
@@ -52,10 +52,12 @@
import org.apache.hyracks.api.job.JobInfo;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.network.ISocketChannelFactory;
import org.apache.hyracks.api.topology.ClusterTopology;
import org.apache.hyracks.api.util.InvokeUtil;
import org.apache.hyracks.api.util.JavaSerializationUtils;
import org.apache.hyracks.ipc.api.RPCInterface;
+import org.apache.hyracks.ipc.sockets.PlainSocketChannelFactory;
import org.apache.hyracks.util.ExitUtil;
import org.apache.hyracks.util.InterruptibleAction;
import org.apache.logging.log4j.Level;
@@ -102,11 +104,12 @@
* host name.
* @throws Exception
*/
- public HyracksConnection(String ccHost, int ccPort) throws Exception {
+ public HyracksConnection(String ccHost, int ccPort, ISocketChannelFactory socketChannelFactory) throws Exception {
this.ccHost = ccHost;
this.ccPort = ccPort;
RPCInterface rpci = new RPCInterface();
- ipc = new IPCSystem(new InetSocketAddress(0), rpci, new JavaSerializationBasedPayloadSerializerDeserializer());
+ ipc = new IPCSystem(new InetSocketAddress(0), socketChannelFactory, rpci,
+ new JavaSerializationBasedPayloadSerializerDeserializer());
ipc.start();
hci = new HyracksClientInterfaceRemoteProxy(ipc.getReconnectingHandle(new InetSocketAddress(ccHost, ccPort)),
rpci);
@@ -115,6 +118,10 @@
uninterruptibleExecutor.execute(new UninterrubtileHandlerWatcher());
}
+ public HyracksConnection(String ccHost, int ccPort) throws Exception {
+ this(ccHost, ccPort, PlainSocketChannelFactory.INSTANCE);
+ }
+
@Override
public JobStatus getJobStatus(JobId jobId) throws Exception {
return hci.getJobStatus(jobId);
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
index 9ef506e..205ecfe 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
@@ -36,8 +36,10 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.CompletableFuture;
+import org.apache.hyracks.api.network.ISocketChannel;
+import org.apache.hyracks.api.network.ISocketChannelFactory;
import org.apache.hyracks.util.ExitUtil;
import org.apache.hyracks.util.NetworkUtil;
import org.apache.logging.log4j.Level;
@@ -71,8 +73,12 @@
private volatile boolean stopped;
- IPCConnectionManager(IPCSystem system, InetSocketAddress socketAddress) throws IOException {
+ private final ISocketChannelFactory socketChannelFactory;
+
+ IPCConnectionManager(IPCSystem system, InetSocketAddress socketAddress, ISocketChannelFactory socketChannelFactory)
+ throws IOException {
this.system = system;
+ this.socketChannelFactory = socketChannelFactory;
this.serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().setReuseAddress(true);
serverSocketChannel.configureBlocking(false);
@@ -209,7 +215,8 @@
SelectionKey key = i.next();
i.remove();
final SelectableChannel sc = key.channel();
- if (key.isReadable()) {
+ // do not attempt to read until handle is set (e.g. after handshake is completed)
+ if (key.isReadable() && key.attachment() != null) {
read(key);
} else if (key.isWritable()) {
write(key);
@@ -229,8 +236,13 @@
try {
connected = channel.finishConnect();
if (connected) {
- connectableKey.interestOps(SelectionKey.OP_READ);
- connectionEstablished(handle);
+ SelectionKey channelKey = channel.register(selector, SelectionKey.OP_READ);
+ final ISocketChannel clientChannel = socketChannelFactory.createClientChannel(channel);
+ if (clientChannel.requiresHandshake()) {
+ asyncHandshake(clientChannel, handle, channelKey);
+ } else {
+ connectionEstablished(handle, channelKey, clientChannel);
+ }
}
} catch (IOException e) {
LOGGER.warn("Exception finishing connect", e);
@@ -248,11 +260,13 @@
try {
channel = serverSocketChannel.accept();
register(channel);
+ final ISocketChannel serverChannel = socketChannelFactory.createServerChannel(channel);
channelKey = channel.register(selector, SelectionKey.OP_READ);
- IPCHandle handle = new IPCHandle(system, null);
- handle.setKey(channelKey);
- channelKey.attach(handle);
- handle.setState(HandleState.CONNECT_RECEIVED);
+ if (serverChannel.requiresHandshake()) {
+ asyncHandshake(serverChannel, null, channelKey);
+ } else {
+ connectionReceived(serverChannel, channelKey);
+ }
} catch (IOException e) {
LOGGER.error("Failed to accept channel ", e);
close(channelKey, channel);
@@ -268,12 +282,17 @@
register(channel);
if (channel.connect(handle.getRemoteAddress())) {
channelKey = channel.register(selector, SelectionKey.OP_READ);
- connectionEstablished(handle);
+ final ISocketChannel clientChannel = socketChannelFactory.createClientChannel(channel);
+ if (clientChannel.requiresHandshake()) {
+ asyncHandshake(clientChannel, handle, channelKey);
+ } else {
+ connectionEstablished(handle, channelKey, clientChannel);
+ }
} else {
channelKey = channel.register(selector, SelectionKey.OP_CONNECT);
+ handle.setKey(channelKey);
+ channelKey.attach(handle);
}
- handle.setKey(channelKey);
- channelKey.attach(handle);
} catch (IOException e) {
LOGGER.error("Failed to accept channel ", e);
close(channelKey, channel);
@@ -283,10 +302,13 @@
workingPendingConnections.clear();
}
- private void connectionEstablished(IPCHandle handle) {
+ private void connectionEstablished(IPCHandle handle, SelectionKey channelKey, ISocketChannel channel) {
+ handle.setSocketChannel(channel);
handle.setState(HandleState.CONNECT_SENT);
+ handle.setKey(channelKey);
registerHandle(handle);
IPCConnectionManager.this.write(createInitialReqMessage(handle));
+ channelKey.attach(handle);
}
private void sendPendingMessages() {
@@ -367,7 +389,7 @@
IPCHandle handle = (IPCHandle) readableKey.attachment();
ByteBuffer readBuffer = handle.getInBuffer();
try {
- int len = channel.read(readBuffer);
+ int len = handle.getSocketChannel().read(readBuffer);
if (len < 0) {
close(readableKey, channel);
return;
@@ -386,15 +408,16 @@
private void write(SelectionKey writableKey) {
SocketChannel channel = (SocketChannel) writableKey.channel();
IPCHandle handle = (IPCHandle) writableKey.attachment();
+ final ISocketChannel socketChannel = handle.getSocketChannel();
ByteBuffer writeBuffer = handle.getOutBuffer();
try {
- int len = channel.write(writeBuffer);
+ int len = socketChannel.write(writeBuffer);
if (len < 0) {
close(writableKey, channel);
return;
}
system.getPerformanceCounters().addMessageBytesSent(len);
- if (!writeBuffer.hasRemaining()) {
+ if (!writeBuffer.hasRemaining() && !socketChannel.isPendingWrite()) {
writableKey.interestOps(writableKey.interestOps() & ~SelectionKey.OP_WRITE);
}
if (handle.full()) {
@@ -445,5 +468,31 @@
target.addAll(source);
source.clear();
}
+
+ private void asyncHandshake(ISocketChannel socketChannel, IPCHandle handle, SelectionKey channelKey) {
+ CompletableFuture.supplyAsync(socketChannel::handshake).exceptionally(ex -> false).thenAccept(
+ handshakeSuccess -> handleHandshakeCompletion(handshakeSuccess, socketChannel, handle, channelKey));
+ }
+
+ private void handleHandshakeCompletion(Boolean handshakeSuccess, ISocketChannel socketChannel, IPCHandle handle,
+ SelectionKey channelKey) {
+ if (handshakeSuccess) {
+ if (handle == null) {
+ connectionReceived(socketChannel, channelKey);
+ } else {
+ connectionEstablished(handle, channelKey, socketChannel);
+ }
+ } else {
+ close(channelKey, socketChannel.getSocketChannel());
+ }
+ }
+
+ private void connectionReceived(ISocketChannel channel, SelectionKey channelKey) {
+ final IPCHandle handle = new IPCHandle(system, null);
+ handle.setState(HandleState.CONNECT_RECEIVED);
+ handle.setSocketChannel(channel);
+ handle.setKey(channelKey);
+ channelKey.attach(handle);
+ }
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java
index 09c7c97..5d92960 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java
@@ -22,6 +22,7 @@
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
+import org.apache.hyracks.api.network.ISocketChannel;
import org.apache.hyracks.ipc.api.IIPCHandle;
import org.apache.hyracks.ipc.exceptions.IPCException;
@@ -44,6 +45,8 @@
private boolean full;
+ private ISocketChannel socketChannel;
+
IPCHandle(IPCSystem system, InetSocketAddress remoteAddress) {
this.system = system;
this.remoteAddress = remoteAddress;
@@ -100,6 +103,14 @@
this.key = key;
}
+ public ISocketChannel getSocketChannel() {
+ return socketChannel;
+ }
+
+ public void setSocketChannel(ISocketChannel socketChannel) {
+ this.socketChannel = socketChannel;
+ }
+
public synchronized boolean isConnected() {
return state == HandleState.CONNECTED;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
index b7dcf05..8d90ba3 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
@@ -22,6 +22,7 @@
import java.net.InetSocketAddress;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hyracks.api.network.ISocketChannelFactory;
import org.apache.hyracks.ipc.api.IIPCEventListener;
import org.apache.hyracks.ipc.api.IIPCHandle;
import org.apache.hyracks.ipc.api.IIPCI;
@@ -45,9 +46,9 @@
private final IPCPerformanceCounters perfCounters;
- public IPCSystem(InetSocketAddress socketAddress, IIPCI ipci, IPayloadSerializerDeserializer serde)
- throws IOException {
- cMgr = new IPCConnectionManager(this, socketAddress);
+ public IPCSystem(InetSocketAddress socketAddress, ISocketChannelFactory socketChannelFactory, IIPCI ipci,
+ IPayloadSerializerDeserializer serde) throws IOException {
+ cMgr = new IPCConnectionManager(this, socketAddress, socketChannelFactory);
this.ipci = ipci;
this.serde = serde;
midFactory = new AtomicLong();
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/security/NetworkSecurityConfig.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/security/NetworkSecurityConfig.java
new file mode 100644
index 0000000..7f02830
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/security/NetworkSecurityConfig.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.ipc.security;
+
+import java.io.File;
+import java.security.KeyStore;
+
+import org.apache.hyracks.api.network.INetworkSecurityConfig;
+
+public class NetworkSecurityConfig implements INetworkSecurityConfig {
+
+ private final boolean sslEnabled;
+ private final File keyStoreFile;
+ private final File trustStoreFile;
+ private final String keyStorePassword;
+ private final KeyStore keyStore;
+
+ private NetworkSecurityConfig(boolean sslEnabled, String keyStoreFile, String keyStorePassword,
+ String trustStoreFile, KeyStore keyStore) {
+ this.sslEnabled = sslEnabled;
+ this.keyStoreFile = keyStoreFile != null ? new File(keyStoreFile) : null;
+ this.keyStorePassword = keyStorePassword;
+ this.trustStoreFile = trustStoreFile != null ? new File(trustStoreFile) : null;
+ this.keyStore = keyStore;
+ }
+
+ public static NetworkSecurityConfig of(boolean sslEnabled, String keyStoreFile, String keyStorePassword,
+ String trustStoreFile) {
+ return new NetworkSecurityConfig(sslEnabled, keyStoreFile, keyStorePassword, trustStoreFile, null);
+ }
+
+ public static NetworkSecurityConfig of(boolean sslEnabled, KeyStore keyStore, String keyStorePassword,
+ String trustStoreFile) {
+ return new NetworkSecurityConfig(sslEnabled, null, keyStorePassword, trustStoreFile, keyStore);
+ }
+
+ public boolean isSslEnabled() {
+ return sslEnabled;
+ }
+
+ public File getKeyStoreFile() {
+ return keyStoreFile;
+ }
+
+ public String getKeyStorePassword() {
+ return keyStorePassword;
+ }
+
+ public KeyStore getKeyStore() {
+ return keyStore;
+ }
+
+ public File getTrustStoreFile() {
+ return trustStoreFile;
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/security/NetworkSecurityManager.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/security/NetworkSecurityManager.java
new file mode 100644
index 0000000..ed25f41
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/security/NetworkSecurityManager.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.ipc.security;
+
+import java.io.FileInputStream;
+import java.security.KeyStore;
+import java.security.SecureRandom;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.TrustManagerFactory;
+
+import org.apache.hyracks.api.network.INetworkSecurityConfig;
+import org.apache.hyracks.api.network.INetworkSecurityManager;
+import org.apache.hyracks.api.network.ISocketChannelFactory;
+import org.apache.hyracks.ipc.sockets.PlainSocketChannelFactory;
+import org.apache.hyracks.ipc.sockets.SslSocketChannelFactory;
+
+public class NetworkSecurityManager implements INetworkSecurityManager {
+
+ private volatile INetworkSecurityConfig config;
+ private final ISocketChannelFactory sslSocketFactory;
+ private static final String TSL_VERSION = "TLSv1.2";
+
+ public NetworkSecurityManager(INetworkSecurityConfig config) {
+ this.config = config;
+ sslSocketFactory = new SslSocketChannelFactory(this);
+ }
+
+ @Override
+ public SSLContext newSSLContext() {
+ try {
+ final char[] password = getKeyStorePassword();
+ KeyStore engineKeyStore = config.getKeyStore();
+ if (engineKeyStore == null) {
+ engineKeyStore = loadKeyStoreFromFile(password);
+ }
+ final String defaultAlgorithm = KeyManagerFactory.getDefaultAlgorithm();
+ KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(defaultAlgorithm);
+ TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(defaultAlgorithm);
+ keyManagerFactory.init(engineKeyStore, "".toCharArray());
+ final KeyStore trustStore = loadTrustStoreFromFile(password);
+ trustManagerFactory.init(trustStore);
+ SSLContext ctx = SSLContext.getInstance(TSL_VERSION);
+ ctx.init(keyManagerFactory.getKeyManagers(), trustManagerFactory.getTrustManagers(), new SecureRandom());
+ return ctx;
+ } catch (Exception ex) {
+ throw new IllegalStateException("Failed to create SSLEngine", ex);
+ }
+ }
+
+ @Override
+ public SSLEngine newSSLEngine() {
+ try {
+ SSLContext ctx = newSSLContext();
+ return ctx.createSSLEngine();
+ } catch (Exception ex) {
+ throw new IllegalStateException("Failed to create SSLEngine", ex);
+ }
+ }
+
+ public ISocketChannelFactory getSocketChannelFactory() {
+ if (config.isSslEnabled()) {
+ return sslSocketFactory;
+ }
+ return PlainSocketChannelFactory.INSTANCE;
+ }
+
+ @Override
+ public void setConfiguration(INetworkSecurityConfig config) {
+ this.config = config;
+ }
+
+ private KeyStore loadKeyStoreFromFile(char[] password) {
+ try {
+ final KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
+ ks.load(new FileInputStream(config.getKeyStoreFile()), password);
+ return ks;
+ } catch (Exception e) {
+ throw new IllegalStateException("failed to load key store", e);
+ }
+ }
+
+ private KeyStore loadTrustStoreFromFile(char[] password) {
+ try {
+ final KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
+ ks.load(new FileInputStream(config.getTrustStoreFile()), password);
+ return ks;
+ } catch (Exception e) {
+ throw new IllegalStateException("failed to load trust store", e);
+ }
+ }
+
+ private char[] getKeyStorePassword() {
+ final String pass = config.getKeyStorePassword();
+ return pass == null || pass.isEmpty() ? null : pass.toCharArray();
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/PlainSocketChannel.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/PlainSocketChannel.java
new file mode 100644
index 0000000..cae04a1
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/PlainSocketChannel.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.ipc.sockets;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+
+import org.apache.hyracks.api.network.ISocketChannel;
+
+public class PlainSocketChannel implements ISocketChannel {
+
+ private final SocketChannel socketChannel;
+
+ public PlainSocketChannel(SocketChannel socketChannel) {
+ this.socketChannel = socketChannel;
+ }
+
+ @Override
+ public boolean requiresHandshake() {
+ return false;
+ }
+
+ @Override
+ public boolean handshake() {
+ return true;
+ }
+
+ @Override
+ public int read(ByteBuffer dst) throws IOException {
+ return socketChannel.read(dst);
+ }
+
+ @Override
+ public int write(ByteBuffer src) throws IOException {
+ return socketChannel.write(src);
+ }
+
+ @Override
+ public void close() throws IOException {
+ socketChannel.close();
+ }
+
+ @Override
+ public SocketChannel getSocketChannel() {
+ return socketChannel;
+ }
+
+ @Override
+ public boolean isPendingRead() {
+ return false;
+ }
+
+ @Override
+ public boolean isPendingWrite() {
+ return false;
+ }
+
+ @Override
+ public boolean completeWrite() {
+ return true;
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/PlainSocketChannelFactory.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/PlainSocketChannelFactory.java
new file mode 100644
index 0000000..e9b310f
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/PlainSocketChannelFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.ipc.sockets;
+
+import java.nio.channels.SocketChannel;
+
+import org.apache.hyracks.api.network.ISocketChannel;
+import org.apache.hyracks.api.network.ISocketChannelFactory;
+
+public class PlainSocketChannelFactory implements ISocketChannelFactory {
+
+ public static final PlainSocketChannelFactory INSTANCE = new PlainSocketChannelFactory();
+
+ @Override
+ public ISocketChannel createServerChannel(SocketChannel socketChannel) {
+ return new PlainSocketChannel(socketChannel);
+ }
+
+ @Override
+ public ISocketChannel createClientChannel(SocketChannel socketChannel) {
+ return new PlainSocketChannel(socketChannel);
+ }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslHandshake.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslHandshake.java
new file mode 100644
index 0000000..af32a70
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslHandshake.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.ipc.sockets;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLException;
+
+import org.apache.hyracks.util.NetworkUtil;
+
+public class SslHandshake {
+
+ private final ExecutorService executor = Executors.newSingleThreadExecutor();
+ private final ByteBuffer handshakeOutData;
+ private final SocketChannel socketChannel;
+ private final SSLEngine engine;
+ private SSLEngineResult.HandshakeStatus handshakeStatus;
+ private ByteBuffer handshakeInData;
+ private ByteBuffer outEncryptedData;
+ private ByteBuffer inEncryptedData;
+
+ public SslHandshake(SslSocketChannel sslSocketChannel) {
+ socketChannel = sslSocketChannel.getSocketChannel();
+ engine = sslSocketChannel.getSslEngine();
+ final int pocketBufferSize = engine.getSession().getPacketBufferSize();
+ inEncryptedData = ByteBuffer.allocate(pocketBufferSize);
+ outEncryptedData = ByteBuffer.allocate(pocketBufferSize);
+ // increase app buffer size to reduce possibility of overflow
+ final int appBufferSize = engine.getSession().getApplicationBufferSize() + 50;
+ handshakeOutData = ByteBuffer.allocate(appBufferSize);
+ handshakeInData = ByteBuffer.allocate(appBufferSize);
+ }
+
+ public boolean handshake() throws IOException {
+ handshakeStatus = engine.getHandshakeStatus();
+ while (handshakeStatus != SSLEngineResult.HandshakeStatus.FINISHED
+ && handshakeStatus != SSLEngineResult.HandshakeStatus.NOT_HANDSHAKING) {
+ switch (handshakeStatus) {
+ case NEED_UNWRAP:
+ if (!unwrap()) {
+ return false;
+ }
+ break;
+ case NEED_WRAP:
+ wrap();
+ break;
+ case NEED_TASK:
+ Runnable task;
+ while ((task = engine.getDelegatedTask()) != null) {
+ executor.execute(task);
+ }
+ handshakeStatus = engine.getHandshakeStatus();
+ break;
+ default:
+ throw new IllegalStateException("Invalid SSL handshake status: " + handshakeStatus);
+ }
+ }
+ return true;
+ }
+
+ private void wrap() throws IOException {
+ outEncryptedData.clear();
+ SSLEngineResult result;
+ try {
+ result = engine.wrap(handshakeOutData, outEncryptedData);
+ handshakeStatus = result.getHandshakeStatus();
+ } catch (SSLException sslException) {
+ engine.closeOutbound();
+ handshakeStatus = engine.getHandshakeStatus();
+ throw sslException;
+ }
+ switch (result.getStatus()) {
+ case OK:
+ outEncryptedData.flip();
+ while (outEncryptedData.hasRemaining()) {
+ socketChannel.write(outEncryptedData);
+ }
+ break;
+ case BUFFER_OVERFLOW:
+ outEncryptedData = NetworkUtil.enlargeSslPacketBuffer(engine, outEncryptedData);
+ break;
+ case CLOSED:
+ outEncryptedData.flip();
+ while (outEncryptedData.hasRemaining()) {
+ socketChannel.write(outEncryptedData);
+ }
+ inEncryptedData.clear();
+ handshakeStatus = engine.getHandshakeStatus();
+ break;
+ case BUFFER_UNDERFLOW:
+ default:
+ throw new IllegalStateException("Invalid SSL status " + result.getStatus());
+ }
+ }
+
+ private boolean unwrap() throws IOException {
+ final int read = socketChannel.read(inEncryptedData);
+ if (read < 0) {
+ if (engine.isInboundDone() && engine.isOutboundDone()) {
+ return false;
+ }
+ engine.closeInbound();
+ // close output to put engine in WRAP status to attempt graceful ssl session end
+ engine.closeOutbound();
+ return false;
+ }
+ inEncryptedData.flip();
+ SSLEngineResult result;
+ try {
+ result = engine.unwrap(inEncryptedData, handshakeInData);
+ inEncryptedData.compact();
+ handshakeStatus = result.getHandshakeStatus();
+ } catch (SSLException sslException) {
+ engine.closeOutbound();
+ handshakeStatus = engine.getHandshakeStatus();
+ throw sslException;
+ }
+ switch (result.getStatus()) {
+ case OK:
+ break;
+ case BUFFER_OVERFLOW:
+ handshakeInData = NetworkUtil.enlargeSslApplicationBuffer(engine, handshakeInData);
+ break;
+ case BUFFER_UNDERFLOW:
+ inEncryptedData = handleBufferUnderflow(engine, inEncryptedData);
+ break;
+ case CLOSED:
+ if (engine.isOutboundDone()) {
+ return false;
+ } else {
+ engine.closeOutbound();
+ handshakeStatus = engine.getHandshakeStatus();
+ break;
+ }
+ default:
+ throw new IllegalStateException("Invalid SSL status " + result.getStatus());
+ }
+ return true;
+ }
+
+ private ByteBuffer handleBufferUnderflow(SSLEngine engine, ByteBuffer buffer) {
+ if (buffer.capacity() >= engine.getSession().getPacketBufferSize()) {
+ return buffer;
+ } else {
+ final ByteBuffer replaceBuffer = NetworkUtil.enlargeSslPacketBuffer(engine, buffer);
+ buffer.flip();
+ replaceBuffer.put(buffer);
+ return replaceBuffer;
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannel.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannel.java
new file mode 100644
index 0000000..73475b0
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannel.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.ipc.sockets;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLSession;
+
+import org.apache.hyracks.api.network.ISocketChannel;
+import org.apache.hyracks.util.NetworkUtil;
+import org.apache.hyracks.util.StorageUtil;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class SslSocketChannel implements ISocketChannel {
+
+ private static final Logger LOGGER = LogManager.getLogger();
+ private static final int DEFAULT_APP_BUFFER_SIZE =
+ StorageUtil.getIntSizeInBytes(1, StorageUtil.StorageUnit.MEGABYTE);
+ private final SocketChannel socketChannel;
+ private final SSLEngine engine;
+ private ByteBuffer outEncryptedData;
+ private ByteBuffer inAppData;
+ private ByteBuffer inEncryptedData;
+ private boolean partialRecord = false;
+ private boolean cachedData = false;
+ private boolean pendingWrite = false;
+
+ public SslSocketChannel(SocketChannel socketChannel, SSLEngine engine) {
+ this.socketChannel = socketChannel;
+ this.engine = engine;
+ inAppData = ByteBuffer.allocate(DEFAULT_APP_BUFFER_SIZE);
+ inAppData.limit(0);
+ final SSLSession sslSession = engine.getSession();
+ inEncryptedData = ByteBuffer.allocate(sslSession.getPacketBufferSize());
+ outEncryptedData = ByteBuffer.allocate(sslSession.getPacketBufferSize());
+ outEncryptedData.limit(0);
+ }
+
+ @Override
+ public synchronized boolean handshake() {
+ try {
+ LOGGER.debug("starting SSL handshake {}", this);
+ engine.beginHandshake();
+ final SslHandshake sslHandshake = new SslHandshake(this);
+ final boolean success = sslHandshake.handshake();
+ if (success) {
+ LOGGER.debug("SSL handshake successful {}", this);
+ }
+ return success;
+ } catch (Exception e) {
+ LOGGER.error("handshake failed {}", this, e);
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ public boolean requiresHandshake() {
+ return true;
+ }
+
+ @Override
+ public synchronized int read(ByteBuffer buffer) throws IOException {
+ int transfereeBytes = 0;
+ if (cachedData) {
+ transfereeBytes += transferTo(inAppData, buffer);
+ }
+ if (buffer.hasRemaining()) {
+ if (!partialRecord) {
+ inEncryptedData.clear();
+ }
+ final int bytesRead = socketChannel.read(inEncryptedData);
+ if (bytesRead > 0) {
+ partialRecord = false;
+ inEncryptedData.flip();
+ inAppData.clear();
+ if (decrypt() > 0) {
+ inAppData.flip();
+ transfereeBytes += transferTo(inAppData, buffer);
+ } else {
+ inAppData.limit(0);
+ }
+ } else if (bytesRead < 0) {
+ handleEndOfStreamQuietly();
+ return -1;
+ }
+ }
+ cachedData = inAppData.hasRemaining();
+ return transfereeBytes;
+ }
+
+ private int decrypt() throws IOException {
+ int decryptedBytes = 0;
+ while (inEncryptedData.hasRemaining() && !partialRecord) {
+ SSLEngineResult result = engine.unwrap(inEncryptedData, inAppData);
+ switch (result.getStatus()) {
+ case OK:
+ decryptedBytes += result.bytesProduced();
+ partialRecord = false;
+ break;
+ case BUFFER_OVERFLOW:
+ inAppData = NetworkUtil.enlargeSslApplicationBuffer(engine, inAppData);
+ break;
+ case BUFFER_UNDERFLOW:
+ handleReadUnderflow();
+ break;
+ case CLOSED:
+ close();
+ return -1;
+ default:
+ throw new IllegalStateException("Invalid SSL result status: " + result.getStatus());
+ }
+ }
+ return decryptedBytes;
+ }
+
+ public synchronized int write(ByteBuffer src) throws IOException {
+ if (pendingWrite && !completeWrite()) {
+ return 0;
+ }
+ int encryptedBytes = 0;
+ while (src.hasRemaining()) {
+ // chunk src to encrypted ssl records of pocket size
+ outEncryptedData.clear();
+ final SSLEngineResult result = engine.wrap(src, outEncryptedData);
+ switch (result.getStatus()) {
+ case OK:
+ outEncryptedData.flip();
+ encryptedBytes += result.bytesConsumed();
+ while (outEncryptedData.hasRemaining()) {
+ final int written = socketChannel.write(outEncryptedData);
+ if (written == 0) {
+ pendingWrite = true;
+ return encryptedBytes;
+ }
+ }
+ break;
+ case BUFFER_OVERFLOW:
+ outEncryptedData = NetworkUtil.enlargeSslPacketBuffer(engine, outEncryptedData);
+ break;
+ case CLOSED:
+ close();
+ return -1;
+ case BUFFER_UNDERFLOW:
+ default:
+ throw new IllegalStateException("Invalid SSL result status: " + result.getStatus());
+ }
+ }
+ pendingWrite = false;
+ return encryptedBytes;
+ }
+
+ @Override
+ public synchronized boolean completeWrite() throws IOException {
+ while (outEncryptedData.hasRemaining()) {
+ final int written = socketChannel.write(outEncryptedData);
+ if (written == 0) {
+ return false;
+ }
+ }
+ pendingWrite = false;
+ return true;
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ engine.closeOutbound();
+ new SslHandshake(this).handshake();
+ socketChannel.close();
+ }
+
+ @Override
+ public SocketChannel getSocketChannel() {
+ return socketChannel;
+ }
+
+ @Override
+ public synchronized boolean isPendingRead() {
+ return cachedData;
+ }
+
+ @Override
+ public synchronized boolean isPendingWrite() {
+ return pendingWrite;
+ }
+
+ public SSLEngine getSslEngine() {
+ return engine;
+ }
+
+ @Override
+ public String toString() {
+ return getConnectionInfo();
+ }
+
+ private void handleReadUnderflow() {
+ if (engine.getSession().getPacketBufferSize() > inEncryptedData.capacity()) {
+ inEncryptedData = NetworkUtil.enlargeSslPacketBuffer(engine, inEncryptedData);
+ } else {
+ inEncryptedData.compact();
+ }
+ partialRecord = true;
+ }
+
+ private void handleEndOfStreamQuietly() {
+ try {
+ engine.closeInbound();
+ close();
+ } catch (Exception e) {
+ LOGGER.warn("failed to close socket gracefully", e);
+ }
+ }
+
+ private String getConnectionInfo() {
+ try {
+ return getSocketChannel().getLocalAddress() + " -> " + getSocketChannel().getRemoteAddress();
+ } catch (IOException e) {
+ LOGGER.warn("failed to get connection info", e);
+ return "";
+ }
+ }
+
+ private static int transferTo(ByteBuffer src, ByteBuffer dst) {
+ final int maxTransfer = Math.min(dst.remaining(), src.remaining());
+ if (maxTransfer > 0) {
+ dst.put(src.array(), src.arrayOffset() + src.position(), maxTransfer);
+ src.position(src.position() + maxTransfer);
+ }
+ return maxTransfer;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannelFactory.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannelFactory.java
new file mode 100644
index 0000000..2a2fb62
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannelFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.ipc.sockets;
+
+import java.nio.channels.SocketChannel;
+
+import javax.net.ssl.SSLEngine;
+
+import org.apache.hyracks.api.network.INetworkSecurityManager;
+import org.apache.hyracks.api.network.ISocketChannel;
+import org.apache.hyracks.api.network.ISocketChannelFactory;
+
+public class SslSocketChannelFactory implements ISocketChannelFactory {
+
+ private final INetworkSecurityManager networkSecurityManager;
+
+ public SslSocketChannelFactory(INetworkSecurityManager networkSecurityManager) {
+ this.networkSecurityManager = networkSecurityManager;
+ }
+
+ @Override
+ public ISocketChannel createServerChannel(SocketChannel socketChannel) {
+ final SSLEngine sslEngine = networkSecurityManager.newSSLEngine();
+ sslEngine.setUseClientMode(false);
+ return new SslSocketChannel(socketChannel, sslEngine);
+ }
+
+ @Override
+ public ISocketChannel createClientChannel(SocketChannel socketChannel) {
+ final SSLEngine sslEngine = networkSecurityManager.newSSLEngine();
+ sslEngine.setUseClientMode(true);
+ return new SslSocketChannel(socketChannel, sslEngine);
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/test/java/org/apache/hyracks/ipc/tests/IPCTest.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/test/java/org/apache/hyracks/ipc/tests/IPCTest.java
index 70a0e18..00bd761 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/test/java/org/apache/hyracks/ipc/tests/IPCTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/test/java/org/apache/hyracks/ipc/tests/IPCTest.java
@@ -29,6 +29,7 @@
import org.apache.hyracks.ipc.exceptions.IPCException;
import org.apache.hyracks.ipc.impl.IPCSystem;
import org.apache.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
+import org.apache.hyracks.ipc.sockets.PlainSocketChannelFactory;
import org.junit.Assert;
import org.junit.Test;
@@ -83,12 +84,12 @@
});
}
};
- return new IPCSystem(new InetSocketAddress("127.0.0.1", 0), ipci,
+ return new IPCSystem(new InetSocketAddress("127.0.0.1", 0), PlainSocketChannelFactory.INSTANCE, ipci,
new JavaSerializationBasedPayloadSerializerDeserializer());
}
private IPCSystem createClientIPCSystem(RPCInterface rpci) throws IOException {
- return new IPCSystem(new InetSocketAddress("127.0.0.1", 0), rpci,
+ return new IPCSystem(new InetSocketAddress("127.0.0.1", 0), PlainSocketChannelFactory.INSTANCE, rpci,
new JavaSerializationBasedPayloadSerializerDeserializer());
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java
index c6b76fc..d10be8e 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java
@@ -24,10 +24,13 @@
import java.net.StandardSocketOptions;
import java.net.URI;
import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;
+import javax.net.ssl.SSLEngine;
+
import org.apache.http.HttpHost;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.conn.util.InetAddressUtils;
@@ -107,4 +110,24 @@
return hostname.length() > 0 && hostname.charAt(0) == '[' ? hostname.substring(1, hostname.length() - 1)
: hostname;
}
+
+ public static ByteBuffer enlargeSslPacketBuffer(SSLEngine engine, ByteBuffer buffer) {
+ return enlargeSslBuffer(buffer, engine.getSession().getPacketBufferSize());
+ }
+
+ public static ByteBuffer enlargeSslApplicationBuffer(SSLEngine engine, ByteBuffer buffer) {
+ return enlargeSslBuffer(buffer, engine.getSession().getApplicationBufferSize());
+ }
+
+ public static ByteBuffer enlargeSslBuffer(ByteBuffer src, int sessionProposedCapacity) {
+ final ByteBuffer enlargedBuffer;
+ if (sessionProposedCapacity > src.capacity()) {
+ enlargedBuffer = ByteBuffer.allocate(sessionProposedCapacity);
+ } else {
+ enlargedBuffer = ByteBuffer.allocate(src.capacity() * 2);
+ }
+ src.flip();
+ enlargedBuffer.put(src);
+ return enlargedBuffer;
+ }
}