Merge "Merge commit '3072d1' into master"
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index a702381..8ac13be 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -469,7 +469,8 @@
             // our client socket factory when SSL is enabled
             if (networkSecurityManager.getConfiguration().isSslEnabled()) {
                 final RMIServerFactory serverSocketFactory = new RMIServerFactory(networkSecurityManager);
-                final RMIClientFactory clientSocketFactory = new RMIClientFactory(true);
+                final RMIClientFactory clientSocketFactory =
+                        new RMIClientFactory(networkSecurityManager.getConfiguration());
                 metadataNodeStub = (IMetadataNode) UnicastRemoteObject.exportObject(MetadataNode.INSTANCE,
                         getMetadataProperties().getMetadataPort(), clientSocketFactory, serverSocketFactory);
             } else {
diff --git a/asterixdb/asterix-metadata/pom.xml b/asterixdb/asterix-metadata/pom.xml
index d28e40b..1e2bf53 100644
--- a/asterixdb/asterix-metadata/pom.xml
+++ b/asterixdb/asterix-metadata/pom.xml
@@ -178,5 +178,9 @@
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-hdfs-client</artifactId>
     </dependency>
+      <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-ipc</artifactId>
+    </dependency>
   </dependencies>
 </project>
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/RMIClientFactory.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/RMIClientFactory.java
index ac2ecd0..515e763 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/RMIClientFactory.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/RMIClientFactory.java
@@ -21,23 +21,72 @@
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.net.InetAddress;
 import java.net.Socket;
 import java.rmi.server.RMIClientSocketFactory;
 
 import javax.net.SocketFactory;
 import javax.net.ssl.SSLSocketFactory;
 
+import org.apache.hyracks.api.network.INetworkSecurityConfig;
+import org.apache.hyracks.ipc.security.NetworkSecurityManager;
+
 public class RMIClientFactory implements RMIClientSocketFactory, Serializable {
+
     private static final long serialVersionUID = -3874278041718817394L;
+    private final INetworkSecurityConfig config;
+    private transient SocketFactory socketFactory;
 
-    private final boolean sslEnabled;
+    public RMIClientFactory(INetworkSecurityConfig config) {
+        this.config = config;
 
-    public RMIClientFactory(boolean sslEnabled) {
-        this.sslEnabled = sslEnabled;
     }
 
     public Socket createSocket(String host, int port) throws IOException {
-        final SocketFactory factory = sslEnabled ? SSLSocketFactory.getDefault() : SocketFactory.getDefault();
-        return factory.createSocket(host, port);
+        synchronized (this) {
+            if (socketFactory == null) {
+                socketFactory = config.isSslEnabled() ? new RMITrustedClientSSLSocketFactory(config)
+                        : SocketFactory.getDefault();
+            }
+        }
+        return socketFactory.createSocket(host, port);
+    }
+
+    private static class RMITrustedClientSSLSocketFactory extends SSLSocketFactory {
+
+        protected SSLSocketFactory factory;
+
+        public RMITrustedClientSSLSocketFactory(INetworkSecurityConfig config) {
+            this.factory = NetworkSecurityManager.newSSLContext(config).getSocketFactory();
+        }
+
+        public Socket createSocket(InetAddress host, int port) throws IOException {
+            return this.factory.createSocket(host, port);
+        }
+
+        public Socket createSocket(String host, int port) throws IOException {
+            return this.factory.createSocket(host, port);
+        }
+
+        public Socket createSocket(String host, int port, InetAddress localHost, int localPort) throws IOException {
+            return this.factory.createSocket(host, port, localHost, localPort);
+        }
+
+        public Socket createSocket(InetAddress address, int port, InetAddress localAddress, int localPort)
+                throws IOException {
+            return this.factory.createSocket(address, port, localAddress, localPort);
+        }
+
+        public Socket createSocket(Socket socket, String host, int port, boolean autoClose) throws IOException {
+            return this.factory.createSocket(socket, host, port, autoClose);
+        }
+
+        public String[] getDefaultCipherSuites() {
+            return this.factory.getDefaultCipherSuites();
+        }
+
+        public String[] getSupportedCipherSuites() {
+            return this.factory.getSupportedCipherSuites();
+        }
     }
 }
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java
index d6af749..2104fdf 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java
@@ -49,7 +49,7 @@
         if (networkSecurityManager.getConfiguration().isSslEnabled()) {
             final RMIServerFactory serverSocketFactory = new RMIServerFactory(networkSecurityManager);
             final RMIClientFactory clientSocketFactory =
-                    new RMIClientFactory(networkSecurityManager.getConfiguration().isSslEnabled());
+                    new RMIClientFactory(networkSecurityManager.getConfiguration());
             stub = (IAsterixStateProxy) UnicastRemoteObject.exportObject(cc, metadataCallbackPort, clientSocketFactory,
                     serverSocketFactory);
         } else {
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/network/INetworkSecurityConfig.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/network/INetworkSecurityConfig.java
index 772ee9f..9c65eea 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/network/INetworkSecurityConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/network/INetworkSecurityConfig.java
@@ -19,9 +19,10 @@
 package org.apache.hyracks.api.network;
 
 import java.io.File;
+import java.io.Serializable;
 import java.security.KeyStore;
 
-public interface INetworkSecurityConfig {
+public interface INetworkSecurityConfig extends Serializable {
 
     /**
      * Indicates if SSL is enabled
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/security/NetworkSecurityConfig.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/security/NetworkSecurityConfig.java
index 7f02830..25ea787 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/security/NetworkSecurityConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/security/NetworkSecurityConfig.java
@@ -25,11 +25,12 @@
 
 public class NetworkSecurityConfig implements INetworkSecurityConfig {
 
+    private static final long serialVersionUID = -1914030130038989199L;
     private final boolean sslEnabled;
     private final File keyStoreFile;
     private final File trustStoreFile;
     private final String keyStorePassword;
-    private final KeyStore keyStore;
+    private final transient KeyStore keyStore;
 
     private NetworkSecurityConfig(boolean sslEnabled, String keyStoreFile, String keyStorePassword,
             String trustStoreFile, KeyStore keyStore) {
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/security/NetworkSecurityManager.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/security/NetworkSecurityManager.java
index 0c8d429..b7c0d0f 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/security/NetworkSecurityManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/security/NetworkSecurityManager.java
@@ -37,37 +37,16 @@
 
     private volatile INetworkSecurityConfig config;
     private final ISocketChannelFactory sslSocketFactory;
-    private static final String TSL_VERSION = "TLSv1.2";
+    public static final String TSL_VERSION = "TLSv1.2";
 
     public NetworkSecurityManager(INetworkSecurityConfig config) {
         this.config = config;
-        if (config.isSslEnabled()) {
-            System.setProperty("javax.net.ssl.trustStore", config.getTrustStoreFile().getAbsolutePath());
-            System.setProperty("javax.net.ssl.trustStorePassword", config.getKeyStorePassword());
-        }
         sslSocketFactory = new SslSocketChannelFactory(this);
     }
 
     @Override
     public SSLContext newSSLContext() {
-        try {
-            final char[] password = getKeyStorePassword();
-            KeyStore engineKeyStore = config.getKeyStore();
-            if (engineKeyStore == null) {
-                engineKeyStore = loadKeyStoreFromFile(password);
-            }
-            final String defaultAlgorithm = KeyManagerFactory.getDefaultAlgorithm();
-            KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(defaultAlgorithm);
-            TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(defaultAlgorithm);
-            keyManagerFactory.init(engineKeyStore, password);
-            final KeyStore trustStore = loadTrustStoreFromFile(password);
-            trustManagerFactory.init(trustStore);
-            SSLContext ctx = SSLContext.getInstance(TSL_VERSION);
-            ctx.init(keyManagerFactory.getKeyManagers(), trustManagerFactory.getTrustManagers(), new SecureRandom());
-            return ctx;
-        } catch (Exception ex) {
-            throw new IllegalStateException("Failed to create SSLEngine", ex);
-        }
+        return newSSLContext(config);
     }
 
     @Override
@@ -97,7 +76,28 @@
         this.config = config;
     }
 
-    private KeyStore loadKeyStoreFromFile(char[] password) {
+    public static SSLContext newSSLContext(INetworkSecurityConfig config) {
+        try {
+            final char[] password = getKeyStorePassword(config);
+            KeyStore engineKeyStore = config.getKeyStore();
+            if (engineKeyStore == null) {
+                engineKeyStore = loadKeyStoreFromFile(password, config);
+            }
+            final String defaultAlgorithm = KeyManagerFactory.getDefaultAlgorithm();
+            KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(defaultAlgorithm);
+            TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(defaultAlgorithm);
+            keyManagerFactory.init(engineKeyStore, password);
+            final KeyStore trustStore = loadTrustStoreFromFile(password, config);
+            trustManagerFactory.init(trustStore);
+            SSLContext ctx = SSLContext.getInstance(TSL_VERSION);
+            ctx.init(keyManagerFactory.getKeyManagers(), trustManagerFactory.getTrustManagers(), new SecureRandom());
+            return ctx;
+        } catch (Exception ex) {
+            throw new IllegalStateException("Failed to create SSLEngine", ex);
+        }
+    }
+
+    private static KeyStore loadKeyStoreFromFile(char[] password, INetworkSecurityConfig config) {
         try {
             final KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
             ks.load(new FileInputStream(config.getKeyStoreFile()), password);
@@ -107,7 +107,7 @@
         }
     }
 
-    private KeyStore loadTrustStoreFromFile(char[] password) {
+    private static KeyStore loadTrustStoreFromFile(char[] password, INetworkSecurityConfig config) {
         try {
             final KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
             ks.load(new FileInputStream(config.getTrustStoreFile()), password);
@@ -117,7 +117,7 @@
         }
     }
 
-    private char[] getKeyStorePassword() {
+    private static char[] getKeyStorePassword(INetworkSecurityConfig config) {
         final String pass = config.getKeyStorePassword();
         return pass == null || pass.isEmpty() ? null : pass.toCharArray();
     }