diff --git a/hyracks-fullstack/hyracks/hyracks-api/pom.xml b/hyracks-fullstack/hyracks/hyracks-api/pom.xml
index 7a4eb58..eee03f3 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-api/pom.xml
@@ -105,5 +105,9 @@
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
     </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-handler</artifactId>
+    </dependency>
   </dependencies>
 </project>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/network/INetworkSecurityConfig.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/network/INetworkSecurityConfig.java
index 7fc0335..e581c5a 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/network/INetworkSecurityConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/network/INetworkSecurityConfig.java
@@ -24,6 +24,8 @@
 import java.security.KeyStore;
 import java.util.Optional;
 
+import io.netty.handler.ssl.ClientAuth;
+
 public interface INetworkSecurityConfig extends Serializable {
 
     /**
@@ -34,25 +36,51 @@
     boolean isSslEnabled();
 
     /**
-     * Gets the key store to be used for secured connections
-     *
-     * @return the key store to be used
+     * Indicates how to handle client authentication when ssl is enabled
      */
-    KeyStore getKeyStore();
+    ClientAuth getClientAuth();
 
     /**
-     * Gets a key store file to be used if {@link INetworkSecurityConfig#getKeyStore()} returns null.
+     * Gets the key store to be used for secured connections
+     *
+     * @return the key store to be used, if present
+     */
+    Optional<KeyStore> getKeyStore();
+
+    /**
+     * Gets a key store file, password pair to be used if {@link INetworkSecurityConfig#getKeyStore()} returns empty.
      *
      * @return the key store file
      */
     File getKeyStoreFile();
 
     /**
-     * Gets the password for the key store file.
+     * Gets a password to be used to unlock or check integrity of the key store.
      *
-     * @return the password to the key store file
+     * @return the key store password, or {@link Optional#empty()}
      */
-    String getKeyStorePassword();
+    Optional<char[]> getKeyStorePassword();
+
+    /**
+     * Gets the client key store to be used for client auth, if applicable.
+     *
+     * @return the client key store to be used for client auth, or {@link Optional#empty()}
+     */
+    Optional<KeyStore> getClientKeyStore();
+
+    /**
+     * Gets a client key store file to be used if {@link INetworkSecurityConfig#getClientKeyStore()} returns empty.
+     *
+     * @return the key store file
+     */
+    File getClientKeyStoreFile();
+
+    /**
+     * Gets a password to be used to unlock or check integrity of the client key store.
+     *
+     * @return the client key store password, or {@link Optional#empty()}
+     */
+    Optional<char[]> getClientKeyStorePassword();
 
     /**
      * Gets the trust store to be used for validating certificates of secured connections
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/network/INetworkSecurityManager.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/network/INetworkSecurityManager.java
index eb52436..8462f91 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/network/INetworkSecurityManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/network/INetworkSecurityManager.java
@@ -28,14 +28,14 @@
      *
      * @return a new ssl context
      */
-    SSLContext newSSLContext();
+    SSLContext newSSLContext(boolean clientMode);
 
     /**
      * Creates a new ssl engine based on the current configuration of this {@link INetworkSecurityManager}
      *
      * @return a new ssl engine
      */
-    SSLEngine newSSLEngine();
+    SSLEngine newSSLEngine(boolean clientMode);
 
     /**
      * Sets the configuration to be used for this {@link INetworkSecurityManager}
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/pom.xml b/hyracks-fullstack/hyracks/hyracks-ipc/pom.xml
index 939f487..8157554 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/pom.xml
@@ -67,5 +67,9 @@
       <groupId>org.apache.httpcomponents</groupId>
       <artifactId>httpclient</artifactId>
     </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-handler</artifactId>
+    </dependency>
   </dependencies>
 </project>
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/security/NetworkSecurityConfig.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/security/NetworkSecurityConfig.java
index bfcd623..a8bd087 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/security/NetworkSecurityConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/security/NetworkSecurityConfig.java
@@ -31,6 +31,8 @@
 
 import org.apache.hyracks.api.network.INetworkSecurityConfig;
 
+import io.netty.handler.ssl.ClientAuth;
+
 public class NetworkSecurityConfig implements INetworkSecurityConfig {
 
     private static final long serialVersionUID = 2L;
@@ -68,18 +70,38 @@
     }
 
     @Override
+    public ClientAuth getClientAuth() {
+        return ClientAuth.NONE;
+    }
+
+    @Override
     public File getKeyStoreFile() {
         return keyStoreFile;
     }
 
     @Override
-    public String getKeyStorePassword() {
-        return keyStorePassword;
+    public Optional<char[]> getKeyStorePassword() {
+        return keyStorePassword != null ? Optional.of(keyStorePassword.toCharArray()) : Optional.empty();
     }
 
     @Override
-    public KeyStore getKeyStore() {
-        return keyStore;
+    public Optional<KeyStore> getKeyStore() {
+        return Optional.ofNullable(keyStore);
+    }
+
+    @Override
+    public Optional<KeyStore> getClientKeyStore() {
+        return Optional.empty();
+    }
+
+    @Override
+    public File getClientKeyStoreFile() {
+        return null;
+    }
+
+    @Override
+    public Optional<char[]> getClientKeyStorePassword() {
+        return Optional.empty();
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/security/NetworkSecurityManager.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/security/NetworkSecurityManager.java
index db524ca..d8f5cff 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/security/NetworkSecurityManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/security/NetworkSecurityManager.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hyracks.ipc.security;
 
+import java.io.File;
 import java.io.FileInputStream;
 import java.security.KeyStore;
 import java.security.SecureRandom;
@@ -37,7 +38,7 @@
 
     private volatile INetworkSecurityConfig config;
     protected final ISocketChannelFactory sslSocketFactory;
-    public static final String TSL_VERSION = "TLSv1.2";
+    public static final String TLS_VERSION = "TLSv1.2";
 
     public NetworkSecurityManager(INetworkSecurityConfig config) {
         this.config = config;
@@ -45,15 +46,16 @@
     }
 
     @Override
-    public SSLContext newSSLContext() {
-        return newSSLContext(config);
+    public SSLContext newSSLContext(boolean clientMode) {
+        return newSSLContext(config, clientMode);
     }
 
     @Override
-    public SSLEngine newSSLEngine() {
+    public SSLEngine newSSLEngine(boolean clientMode) {
         try {
-            SSLContext ctx = newSSLContext();
-            return ctx.createSSLEngine();
+            SSLEngine sslEngine = newSSLContext(clientMode).createSSLEngine();
+            sslEngine.setUseClientMode(clientMode);
+            return sslEngine;
         } catch (Exception ex) {
             throw new IllegalStateException("Failed to create SSLEngine", ex);
         }
@@ -76,12 +78,22 @@
         this.config = config;
     }
 
-    public static SSLContext newSSLContext(INetworkSecurityConfig config) {
+    public static SSLContext newSSLContext(INetworkSecurityConfig config, boolean clientMode) {
         try {
-            final char[] password = getKeyStorePassword(config);
-            KeyStore engineKeyStore = config.getKeyStore();
-            if (engineKeyStore == null) {
-                engineKeyStore = loadKeyStoreFromFile(password, config);
+            KeyStore engineKeyStore;
+            final char[] password;
+            if (clientMode) {
+                password = config.getClientKeyStorePassword().orElse(null);
+                engineKeyStore = config.getClientKeyStore().orElse(null);
+                if (engineKeyStore == null) {
+                    engineKeyStore = loadKeyStoreFromFile(password, config.getClientKeyStoreFile());
+                }
+            } else {
+                password = config.getKeyStorePassword().orElse(null);
+                engineKeyStore = config.getKeyStore().orElse(null);
+                if (engineKeyStore == null) {
+                    engineKeyStore = loadKeyStoreFromFile(password, config.getKeyStoreFile());
+                }
             }
             final String defaultAlgorithm = KeyManagerFactory.getDefaultAlgorithm();
             KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(defaultAlgorithm);
@@ -89,10 +101,10 @@
             keyManagerFactory.init(engineKeyStore, password);
             KeyStore trustStore = config.getTrustStore();
             if (trustStore == null) {
-                trustStore = loadTrustStoreFromFile(password, config);
+                trustStore = loadKeyStoreFromFile(password, config.getTrustStoreFile());
             }
             trustManagerFactory.init(trustStore);
-            SSLContext ctx = SSLContext.getInstance(TSL_VERSION);
+            SSLContext ctx = SSLContext.getInstance(TLS_VERSION);
             ctx.init(keyManagerFactory.getKeyManagers(), trustManagerFactory.getTrustManagers(), new SecureRandom());
             return ctx;
         } catch (Exception ex) {
@@ -100,28 +112,14 @@
         }
     }
 
-    private static KeyStore loadKeyStoreFromFile(char[] password, INetworkSecurityConfig config) {
+    private static KeyStore loadKeyStoreFromFile(char[] password, File keystoreFile) {
         try {
             final KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
-            ks.load(new FileInputStream(config.getKeyStoreFile()), password);
+            ks.load(new FileInputStream(keystoreFile), password);
             return ks;
         } catch (Exception e) {
             throw new IllegalStateException("failed to load key store", e);
         }
     }
 
-    private static KeyStore loadTrustStoreFromFile(char[] password, INetworkSecurityConfig config) {
-        try {
-            final KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
-            ks.load(new FileInputStream(config.getTrustStoreFile()), password);
-            return ks;
-        } catch (Exception e) {
-            throw new IllegalStateException("failed to load trust store", e);
-        }
-    }
-
-    private static char[] getKeyStorePassword(INetworkSecurityConfig config) {
-        final String pass = config.getKeyStorePassword();
-        return pass == null || pass.isEmpty() ? null : pass.toCharArray();
-    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannelFactory.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannelFactory.java
index 2a2fb62..0e4b9da 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannelFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/sockets/SslSocketChannelFactory.java
@@ -36,15 +36,13 @@
 
     @Override
     public ISocketChannel createServerChannel(SocketChannel socketChannel) {
-        final SSLEngine sslEngine = networkSecurityManager.newSSLEngine();
-        sslEngine.setUseClientMode(false);
+        final SSLEngine sslEngine = networkSecurityManager.newSSLEngine(false);
         return new SslSocketChannel(socketChannel, sslEngine);
     }
 
     @Override
     public ISocketChannel createClientChannel(SocketChannel socketChannel) {
-        final SSLEngine sslEngine = networkSecurityManager.newSSLEngine();
-        sslEngine.setUseClientMode(true);
+        final SSLEngine sslEngine = networkSecurityManager.newSSLEngine(true);
         return new SslSocketChannel(socketChannel, sslEngine);
     }
 }
