diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index d89004b..3e72b7d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -78,8 +78,8 @@
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepositoryFactory;
 import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.client.ClusterControllerInfo;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.client.impl.ClusterControllerInfo;
 import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IIOManager;
@@ -435,11 +435,18 @@
         if (metadataNodeStub == null) {
             final INetworkSecurityManager networkSecurityManager =
                     ncServiceContext.getControllerService().getNetworkSecurityManager();
-            final RMIServerFactory serverSocketFactory = new RMIServerFactory(networkSecurityManager);
-            final RMIClientFactory clientSocketFactory =
-                    new RMIClientFactory(networkSecurityManager.getConfiguration().isSslEnabled());
-            metadataNodeStub = (IMetadataNode) UnicastRemoteObject.exportObject(MetadataNode.INSTANCE,
-                    getMetadataProperties().getMetadataPort(), clientSocketFactory, serverSocketFactory);
+
+            // clients need to have the client factory on their classpath- to enable older clients, only use
+            // our client socket factory when SSL is enabled
+            if (networkSecurityManager.getConfiguration().isSslEnabled()) {
+                final RMIServerFactory serverSocketFactory = new RMIServerFactory(networkSecurityManager);
+                final RMIClientFactory clientSocketFactory = new RMIClientFactory(true);
+                metadataNodeStub = (IMetadataNode) UnicastRemoteObject.exportObject(MetadataNode.INSTANCE,
+                        getMetadataProperties().getMetadataPort(), clientSocketFactory, serverSocketFactory);
+            } else {
+                metadataNodeStub = (IMetadataNode) UnicastRemoteObject.exportObject(MetadataNode.INSTANCE,
+                        getMetadataProperties().getMetadataPort());
+            }
         }
     }
 
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index 4fa86ae..97316d2 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -64,7 +64,6 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IFileDeviceResolver;
 import org.apache.hyracks.api.job.resource.NodeCapacity;
-import org.apache.hyracks.api.messages.IMessageBroker;
 import org.apache.hyracks.control.common.controllers.NCConfig;
 import org.apache.hyracks.control.nc.BaseNCApplication;
 import org.apache.hyracks.control.nc.NodeControllerService;
@@ -132,10 +131,10 @@
         }
         runtimeContext.initialize(getRecoveryManagerFactory(), runtimeContext.getNodeProperties().isInitialRun());
         MessagingProperties messagingProperties = runtimeContext.getMessagingProperties();
-        IMessageBroker messageBroker = new NCMessageBroker(controllerService, messagingProperties);
+        NCMessageBroker messageBroker = new NCMessageBroker(controllerService, messagingProperties);
         this.ncServiceCtx.setMessageBroker(messageBroker);
         MessagingChannelInterfaceFactory interfaceFactory =
-                new MessagingChannelInterfaceFactory((NCMessageBroker) messageBroker, messagingProperties);
+                new MessagingChannelInterfaceFactory(messageBroker, messagingProperties);
         this.ncServiceCtx.setMessagingChannelInterfaceFactory(interfaceFactory);
         final Checkpoint latestCheckpoint = runtimeContext.getTransactionSubsystem().getCheckpointManager().getLatest();
         if (latestCheckpoint != null && latestCheckpoint.getStorageVersion() != StorageConstants.VERSION) {
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java
index 4c971e2..d6af749 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/AsterixStateProxy.java
@@ -43,11 +43,18 @@
 
     public static IAsterixStateProxy registerRemoteObject(INetworkSecurityManager networkSecurityManager,
             int metadataCallbackPort) throws RemoteException {
-        final RMIServerFactory serverSocketFactory = new RMIServerFactory(networkSecurityManager);
-        final RMIClientFactory clientSocketFactory =
-                new RMIClientFactory(networkSecurityManager.getConfiguration().isSslEnabled());
-        final IAsterixStateProxy stub = (IAsterixStateProxy) UnicastRemoteObject.exportObject(cc, metadataCallbackPort,
-                clientSocketFactory, serverSocketFactory);
+        IAsterixStateProxy stub;
+        // clients need to have the client factory on their classpath- to enable older clients, only use
+        // our client socket factory when SSL is enabled
+        if (networkSecurityManager.getConfiguration().isSslEnabled()) {
+            final RMIServerFactory serverSocketFactory = new RMIServerFactory(networkSecurityManager);
+            final RMIClientFactory clientSocketFactory =
+                    new RMIClientFactory(networkSecurityManager.getConfiguration().isSslEnabled());
+            stub = (IAsterixStateProxy) UnicastRemoteObject.exportObject(cc, metadataCallbackPort, clientSocketFactory,
+                    serverSocketFactory);
+        } else {
+            stub = (IAsterixStateProxy) UnicastRemoteObject.exportObject(cc, metadataCallbackPort);
+        }
         LOGGER.info("Asterix Distributed State Proxy Bound");
         return stub;
     }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SqlSumAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SqlSumAggregateDescriptor.java
index 6e73a40..779fba2 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SqlSumAggregateDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/SqlSumAggregateDescriptor.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.runtime.aggregates.std;
 
 import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionDescriptor;
 import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
 import org.apache.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
@@ -31,7 +32,17 @@
 public class SqlSumAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
 
     private static final long serialVersionUID = 1L;
-    public static final IFunctionDescriptorFactory FACTORY = SqlSumAggregateDescriptor::new;
+
+    // this must remain an anonymous inner class due to the evaluator factory below being an anonymous inner
+    // serializable class, to not break binary compatibility
+    // this can be reverted once serialization compatibility code is in place to write the correct class
+    @SuppressWarnings({ "Anonymous2MethodRef", "Convert2Lambda" })
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        @Override
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new SqlSumAggregateDescriptor();
+        }
+    };
 
     @Override
     public FunctionIdentifier getIdentifier() {
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/ClusterControllerInfo.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/ClusterControllerInfo.java
similarity index 97%
rename from hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/ClusterControllerInfo.java
rename to hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/ClusterControllerInfo.java
index 7d0dd61..c0445a7 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/impl/ClusterControllerInfo.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/ClusterControllerInfo.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.api.client.impl;
+package org.apache.hyracks.api.client;
 
 import java.io.Serializable;
 
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksClientInterfaceFunctions.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
similarity index 99%
rename from hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksClientInterfaceFunctions.java
rename to hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
index a61c96d..72bdc3e 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksClientInterfaceFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.ipc.impl;
+package org.apache.hyracks.api.client;
 
 import java.io.Serializable;
 import java.net.URL;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
index e92db5e..4cc47d2 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
@@ -23,7 +23,6 @@
 import java.util.List;
 import java.util.Map;
 
-import org.apache.hyracks.api.client.impl.ClusterControllerInfo;
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.deployment.DeploymentId;
 import org.apache.hyracks.api.job.DeployedJobSpecId;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IJavaSerializationProvider.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IJavaSerializationProvider.java
new file mode 100644
index 0000000..deaa966
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IJavaSerializationProvider.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.comm;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+
+public interface IJavaSerializationProvider {
+    default ObjectOutputStream newObjectOutputStream(OutputStream out) throws IOException {
+        return new ObjectOutputStream(out);
+    }
+
+    default ObjectInputStream newObjectInputStream(InputStream in) throws IOException {
+        return new ObjectInputStream(in);
+    }
+
+    default void readObject(ObjectInputStream in, Object object) throws IOException, ClassNotFoundException {
+        in.defaultReadObject();
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IOption.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IOption.java
index 5f11214..c66b4fa 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IOption.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IOption.java
@@ -65,7 +65,7 @@
     }
 
     default String ini() {
-        return name().toLowerCase().replace("_", ".");
+        return toIni(name());
     }
 
     default String camelCase() {
@@ -75,4 +75,12 @@
     default String toIniString() {
         return "[" + section().sectionName() + "] " + ini();
     }
+
+    static String toIni(String name) {
+        return name.toLowerCase().replace("_", ".");
+    }
+
+    default SerializedOption toSerializable() {
+        return new SerializedOption(this);
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/SerializedOption.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/SerializedOption.java
new file mode 100644
index 0000000..8263af6
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/SerializedOption.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.config;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+public final class SerializedOption implements Serializable, Comparable {
+    private static final long serialVersionUID = 1L;
+    private final String sectionName;
+    private final String optionName;
+
+    SerializedOption(IOption option) {
+        this.sectionName = option.section().name();
+        this.optionName = option.name();
+    }
+
+    public String optionName() {
+        return optionName;
+    }
+
+    public Section section() {
+        return Section.valueOf(sectionName);
+    }
+
+    @Override
+    public int compareTo(Object o) {
+        if (!(o instanceof SerializedOption)) {
+            return -1;
+        }
+        SerializedOption that = (SerializedOption) o;
+        int sectionComp = sectionName.compareTo(that.sectionName);
+        return sectionComp != 0 ? sectionComp : optionName.compareTo(that.optionName);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        SerializedOption that = (SerializedOption) o;
+        return Objects.equals(sectionName, that.sectionName) && Objects.equals(optionName, that.optionName);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(sectionName, optionName);
+    }
+
+    @Override
+    public String toString() {
+        return "[" + sectionName + "] " + optionName;
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/ICCContext.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/ICCContext.java
index c3da155..83e0482 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/ICCContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/ICCContext.java
@@ -22,7 +22,7 @@
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.hyracks.api.client.impl.ClusterControllerInfo;
+import org.apache.hyracks.api.client.ClusterControllerInfo;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.topology.ClusterTopology;
 
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/JavaSerializationUtils.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/JavaSerializationUtils.java
index a92e700..b38d343 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/JavaSerializationUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/JavaSerializationUtils.java
@@ -29,20 +29,25 @@
 import java.lang.reflect.Modifier;
 import java.lang.reflect.Proxy;
 
+import org.apache.hyracks.api.comm.IJavaSerializationProvider;
+
 public class JavaSerializationUtils {
+    private static IJavaSerializationProvider serProvider = new IJavaSerializationProvider() {
+    };
+
     public static byte[] serialize(Serializable jobSpec) throws IOException {
         if (jobSpec instanceof byte[]) {
             return (byte[]) jobSpec;
         }
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        ObjectOutputStream oos = new ObjectOutputStream(baos);
+        ObjectOutputStream oos = serProvider.newObjectOutputStream(baos);
         oos.writeObject(jobSpec);
         return baos.toByteArray();
     }
 
     public static byte[] serialize(Serializable jobSpec, ClassLoader classLoader) throws IOException {
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        ObjectOutputStream oos = new ObjectOutputStream(baos);
+        ObjectOutputStream oos = serProvider.newObjectOutputStream(baos);
         ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
         try {
             Thread.currentThread().setContextClassLoader(classLoader);
@@ -57,7 +62,7 @@
         if (bytes == null) {
             return null;
         }
-        ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
+        ObjectInputStream ois = serProvider.newObjectInputStream(new ByteArrayInputStream(bytes));
         return ois.readObject();
     }
 
@@ -78,6 +83,18 @@
         return Class.forName(className);
     }
 
+    public static void setSerializationProvider(IJavaSerializationProvider serProvider) {
+        JavaSerializationUtils.serProvider = serProvider;
+    }
+
+    public static IJavaSerializationProvider getSerializationProvider() {
+        return serProvider;
+    }
+
+    public static void readObject(ObjectInputStream in, Object object) throws IOException, ClassNotFoundException {
+        serProvider.readObject(in, object);
+    }
+
     private static class ClassLoaderObjectInputStream extends ObjectInputStream {
         private ClassLoader classLoader;
 
diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectoryRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectoryRemoteProxy.java
index e802ef9..6260dd6 100644
--- a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectoryRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultDirectoryRemoteProxy.java
@@ -25,7 +25,7 @@
 import org.apache.hyracks.api.result.ResultSetId;
 import org.apache.hyracks.ipc.api.IIPCHandle;
 import org.apache.hyracks.ipc.api.RPCInterface;
-import org.apache.hyracks.ipc.impl.HyracksClientInterfaceFunctions;
+import org.apache.hyracks.api.client.HyracksClientInterfaceFunctions;
 
 //TODO(madhusudancs): Should this implementation be moved to org.apache.hyracks.client?
 public class ResultDirectoryRemoteProxy implements IResultDirectory {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
index f2ea988..deb6785 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
@@ -44,7 +44,7 @@
 import org.apache.hyracks.ipc.api.IIPCHandle;
 import org.apache.hyracks.ipc.api.IIPCI;
 import org.apache.hyracks.ipc.exceptions.IPCException;
-import org.apache.hyracks.ipc.impl.HyracksClientInterfaceFunctions;
+import org.apache.hyracks.api.client.HyracksClientInterfaceFunctions;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index c2e7b22..419dff6 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -36,7 +36,7 @@
 import java.util.concurrent.ExecutorService;
 
 import org.apache.hyracks.api.application.ICCApplication;
-import org.apache.hyracks.api.client.impl.ClusterControllerInfo;
+import org.apache.hyracks.api.client.ClusterControllerInfo;
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.config.IApplicationConfig;
 import org.apache.hyracks.api.context.ICCContext;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
index 4f76ced..3e72942 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
@@ -46,6 +46,7 @@
 import org.apache.hyracks.control.cc.job.JobRun;
 import org.apache.hyracks.control.cc.scheduler.IResourceManager;
 import org.apache.hyracks.control.common.controllers.CCConfig;
+import org.apache.hyracks.control.common.controllers.NCConfig;
 import org.apache.hyracks.ipc.exceptions.IPCException;
 import org.apache.hyracks.util.annotations.Idempotent;
 import org.apache.hyracks.util.annotations.NotThreadSafe;
@@ -218,7 +219,7 @@
     }
 
     private InetAddress getIpAddress(NodeControllerState ncState) throws HyracksException {
-        String ipAddress = ncState.getNCConfig().getDataPublicAddress();
+        String ipAddress = (String) ncState.getConfig().get(NCConfig.Option.DATA_PUBLIC_ADDRESS.toSerializable());
         try {
             return InetAddress.getByName(ipAddress);
         } catch (UnknownHostException e) {
@@ -237,8 +238,8 @@
                 state.getNodeController().shutdown(false);
                 LOGGER.warn("Request to shutdown failed node {} succeeded. false positive heartbeat miss indication",
                         nodeId);
-            } catch (Exception ignore) {
-                LOGGER.debug(() -> "Ignoring failure on ensuring node " + nodeId + " has failed", ignore);
+            } catch (Exception ex) {
+                LOGGER.debug(() -> "Ignoring failure on ensuring node " + nodeId + " has failed", ex);
             }
         });
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeDetailsJSONWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeDetailsJSONWork.java
index aa7dca1..30115ff 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeDetailsJSONWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeDetailsJSONWork.java
@@ -37,7 +37,6 @@
 import org.apache.hyracks.control.cc.cluster.INodeManager;
 import org.apache.hyracks.control.common.config.ConfigUtils;
 import org.apache.hyracks.control.common.controllers.CCConfig;
-import org.apache.hyracks.control.common.controllers.NCConfig;
 import org.apache.hyracks.control.common.work.IPCResponder;
 import org.apache.hyracks.control.common.work.SynchronizableWork;
 import org.apache.hyracks.util.MXHelper;
@@ -89,10 +88,7 @@
             if (ncs != null) {
                 detail = ncs.toDetailedJSON(includeStats, includeConfig);
                 if (includeConfig) {
-                    final NCConfig ncConfig = ncs.getNCConfig();
-                    ConfigUtils.addConfigToJSON(detail, ncConfig.getConfigManager().getNodeEffectiveConfig(nodeId),
-                            NC_SECTIONS);
-                    detail.putPOJO("app.args", ncConfig.getAppArgs());
+                    ConfigUtils.addConfigToJSON(detail, ncs.getConfig(), ccConfig.getConfigManager(), NC_SECTIONS);
                 }
             }
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
index fe33bc9..0ea9239 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
@@ -21,11 +21,11 @@
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.hyracks.api.config.IApplicationConfig;
 import org.apache.hyracks.api.config.IOption;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.cc.NodeControllerState;
 import org.apache.hyracks.control.cc.cluster.INodeManager;
+import org.apache.hyracks.control.common.config.ConfigManager;
 import org.apache.hyracks.control.common.controllers.NodeParameters;
 import org.apache.hyracks.control.common.controllers.NodeRegistration;
 import org.apache.hyracks.control.common.ipc.NodeControllerRemoteProxy;
@@ -62,11 +62,16 @@
         try {
             NodeControllerState state = new NodeControllerState(nc, reg);
             nodeManager.addNode(id, state);
-            IApplicationConfig cfg = state.getNCConfig().getConfigManager().getNodeEffectiveConfig(id);
             final Map<IOption, Object> ncConfiguration = new HashMap<>();
-            for (IOption option : cfg.getOptions()) {
-                ncConfiguration.put(option, cfg.get(option));
-            }
+            ConfigManager configManager = ccs.getConfig().getConfigManager();
+            state.getConfig().forEach((key, value) -> {
+                IOption option = configManager.lookupOption(key);
+                if (option == null) {
+                    LOGGER.info("discarding unknown option {}", key);
+                } else {
+                    ncConfiguration.put(option, value);
+                }
+            });
             LOGGER.info("registered node: {}", id);
             nc.sendRegistrationResult(params, null);
             ccs.getContext().notifyNodeJoin(id, ncConfiguration);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
index 9d755a0..d92727c 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
@@ -22,6 +22,7 @@
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.util.Collections;
 import java.util.concurrent.Executors;
 
 import org.apache.hyracks.api.comm.NetworkAddress;
@@ -174,9 +175,8 @@
         when(ncState.getDataPort()).thenReturn(dataAddr);
         when(ncState.getResultPort()).thenReturn(resultAddr);
         when(ncState.getMessagingPort()).thenReturn(msgAddr);
-        NCConfig ncConfig = new NCConfig(nodeId);
-        ncConfig.setDataPublicAddress(ipAddr);
-        when(ncState.getNCConfig()).thenReturn(ncConfig);
+        when(ncState.getConfig())
+                .thenReturn(Collections.singletonMap(NCConfig.Option.DATA_PUBLIC_ADDRESS.toSerializable(), ipAddr));
         Mockito.when(ncState.getNodeController()).thenReturn(ncProxy);
         return ncState;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/NodeControllerData.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/NodeControllerData.java
index 1926da6..c37acab 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/NodeControllerData.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/NodeControllerData.java
@@ -21,6 +21,7 @@
 import static org.apache.hyracks.control.common.utils.ConfigurationUtil.toPathElements;
 import static org.apache.hyracks.util.JSONUtil.put;
 
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashSet;
 import java.util.List;
@@ -28,9 +29,9 @@
 import java.util.Set;
 
 import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.config.SerializedOption;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.resource.NodeCapacity;
-import org.apache.hyracks.control.common.controllers.NCConfig;
 import org.apache.hyracks.control.common.controllers.NodeRegistration;
 import org.apache.hyracks.control.common.heartbeat.HeartbeatData;
 import org.apache.hyracks.control.common.heartbeat.HeartbeatSchema;
@@ -42,7 +43,9 @@
 
     private static final int RRD_SIZE = 720;
 
-    private final NCConfig ncConfig;
+    private final String nodeId;
+
+    private final Map<SerializedOption, Object> config;
 
     private final NetworkAddress dataPort;
 
@@ -145,7 +148,9 @@
     private NodeCapacity capacity;
 
     public NodeControllerData(NodeRegistration reg) {
-        ncConfig = reg.getNCConfig();
+        nodeId = reg.getNodeId();
+        config = Collections.unmodifiableMap(reg.getConfig());
+
         dataPort = reg.getDataPort();
         resultPort = reg.getResultPort();
         messagingPort = reg.getMessagingPort();
@@ -252,8 +257,8 @@
         return System.nanoTime() - lastHeartbeatNanoTime;
     }
 
-    public NCConfig getNCConfig() {
-        return ncConfig;
+    public Map<SerializedOption, Object> getConfig() {
+        return config;
     }
 
     public Set<JobId> getActiveJobIds() {
@@ -279,7 +284,7 @@
     public synchronized ObjectNode toSummaryJSON() {
         ObjectMapper om = new ObjectMapper();
         ObjectNode o = om.createObjectNode();
-        put(o, "node-id", ncConfig.getNodeId());
+        put(o, "node-id", nodeId);
         put(o, "heap-used", heapUsedSize[(rrdPtr + RRD_SIZE - 1) % RRD_SIZE]);
         put(o, "system-load-average", systemLoadAverage[(rrdPtr + RRD_SIZE - 1) % RRD_SIZE]);
 
@@ -290,7 +295,7 @@
         ObjectMapper om = new ObjectMapper();
         ObjectNode o = om.createObjectNode();
 
-        put(o, "node-id", ncConfig.getNodeId());
+        put(o, "node-id", nodeId);
 
         if (includeConfig) {
             put(o, "os-name", osName);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java
index 1dae48c..44fa0bc 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java
@@ -52,6 +52,7 @@
 import org.apache.hyracks.api.config.IConfigurator;
 import org.apache.hyracks.api.config.IOption;
 import org.apache.hyracks.api.config.Section;
+import org.apache.hyracks.api.config.SerializedOption;
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.control.common.application.ConfigManagerApplicationConfig;
 import org.apache.logging.log4j.Level;
@@ -210,6 +211,10 @@
         this.versionString = versionString;
     }
 
+    public IOption lookupOption(SerializedOption option) {
+        return lookupOption(option.section().sectionName(), IOption.toIni(option.optionName()));
+    }
+
     public IOption lookupOption(String section, String key) {
         Map<String, IOption> map = getSectionOptionMap(Section.parseSectionName(section));
         return map == null ? null : map.get(key);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigUtils.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigUtils.java
index 4fa9b56..1cc739c 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigUtils.java
@@ -34,6 +34,8 @@
 
 import org.apache.hyracks.api.config.IApplicationConfig;
 import org.apache.hyracks.api.config.IOption;
+import org.apache.hyracks.api.config.Section;
+import org.apache.hyracks.api.config.SerializedOption;
 import org.apache.hyracks.control.common.controllers.ControllerConfig;
 import org.ini4j.Ini;
 import org.kohsuke.args4j.CmdLineException;
@@ -150,23 +152,21 @@
         return value;
     }
 
-    public static String getString(Ini ini, org.apache.hyracks.api.config.Section section, IOption option,
-            String defaultValue) {
+    public static String getString(Ini ini, Section section, IOption option, String defaultValue) {
         return getString(ini, section.sectionName(), option.ini(), defaultValue);
     }
 
-    public static void addConfigToJSON(ObjectNode o, IApplicationConfig cfg,
-            org.apache.hyracks.api.config.Section... sections) {
+    public static void addConfigToJSON(ObjectNode o, IApplicationConfig cfg, Section... sections) {
         ArrayNode configArray = o.putArray("config");
-        for (org.apache.hyracks.api.config.Section section : cfg.getSections(Arrays.asList(sections)::contains)) {
+        for (Section section : cfg.getSections(Arrays.asList(sections)::contains)) {
             ObjectNode sectionNode = configArray.addObject();
             Map<String, Object> sectionConfig = getSectionOptionsForJSON(cfg, section, option -> true);
             sectionNode.put("section", section.sectionName()).putPOJO("properties", sectionConfig);
         }
     }
 
-    public static Map<String, Object> getSectionOptionsForJSON(IApplicationConfig cfg,
-            org.apache.hyracks.api.config.Section section, Predicate<IOption> selector) {
+    public static Map<String, Object> getSectionOptionsForJSON(IApplicationConfig cfg, Section section,
+            Predicate<IOption> selector) {
         Map<String, Object> sectionConfig = new TreeMap<>();
         for (IOption option : cfg.getOptions(section)) {
             if (selector.test(option)) {
@@ -175,4 +175,28 @@
         }
         return sectionConfig;
     }
+
+    public static void addConfigToJSON(ObjectNode o, Map<SerializedOption, Object> config, ConfigManager configManager,
+            Section... sections) {
+        ArrayNode configArray = o.putArray("config");
+        for (Section section : sections) {
+            ObjectNode sectionNode = configArray.addObject();
+            Map<String, Object> sectionConfig = new TreeMap<>();
+            config.entrySet().stream().filter(e -> e.getKey().section().equals(section)).forEach(entry -> sectionConfig
+                    .put(IOption.toIni(entry.getKey().optionName()), fixupValueForJSON(entry, configManager)));
+            if (!sectionConfig.isEmpty()) {
+                sectionNode.put("section", section.sectionName()).putPOJO("properties", sectionConfig);
+            }
+        }
+    }
+
+    private static Object fixupValueForJSON(Map.Entry<SerializedOption, Object> entry, ConfigManager configManager) {
+        IOption option = configManager.lookupOption(entry.getKey());
+        if (option != null) {
+            // use the type system for the option to serialize this
+            return option.type().serializeToJSON(entry.getValue());
+        }
+        // not much we can do, let default JSON serialization do its thing
+        return entry.getValue();
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
index d41350f..acfa394 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
@@ -18,7 +18,6 @@
  */
 package org.apache.hyracks.control.common.controllers;
 
-import static org.apache.hyracks.control.common.config.OptionTypes.BOOLEAN;
 import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER;
 import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER_BYTE_UNIT;
 import static org.apache.hyracks.control.common.config.OptionTypes.LONG;
@@ -260,10 +259,18 @@
     }
 
     public NCConfig(String nodeId, ConfigManager configManager) {
+        this(nodeId, configManager, true);
+    }
+
+    public NCConfig(String nodeId, ConfigManager configManager, boolean selfRegister) {
         super(configManager);
         this.appConfig = nodeId == null ? configManager.getAppConfig() : configManager.getNodeEffectiveConfig(nodeId);
-        configManager.register(Option.class);
-        configManager.register(ControllerConfig.Option.class);
+        if (selfRegister) {
+            configManager.register(Option.class);
+            configManager.register(ControllerConfig.Option.class);
+        } else {
+            configManager.register(Option.NODE_ID);
+        }
         setNodeId(nodeId);
         this.nodeId = nodeId;
         configManager.registerArgsListener(appArgs::addAll);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeParameters.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeParameters.java
index d9165e1..e78a423 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeParameters.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeParameters.java
@@ -20,7 +20,7 @@
 
 import java.io.Serializable;
 
-import org.apache.hyracks.api.client.impl.ClusterControllerInfo;
+import org.apache.hyracks.api.client.ClusterControllerInfo;
 
 public class NodeParameters implements Serializable {
     private static final long serialVersionUID = 1L;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java
index b4d835d..f76d9b8 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java
@@ -21,13 +21,20 @@
 import static org.apache.hyracks.util.MXHelper.osMXBean;
 import static org.apache.hyracks.util.MXHelper.runtimeMXBean;
 
+import java.io.IOException;
+import java.io.ObjectInputStream;
 import java.io.Serializable;
 import java.net.InetSocketAddress;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.config.IApplicationConfig;
+import org.apache.hyracks.api.config.IOption;
+import org.apache.hyracks.api.config.SerializedOption;
 import org.apache.hyracks.api.job.resource.NodeCapacity;
+import org.apache.hyracks.api.util.JavaSerializationUtils;
 import org.apache.hyracks.control.common.heartbeat.HeartbeatSchema;
 import org.apache.hyracks.util.MXHelper;
 import org.apache.hyracks.util.PidHelper;
@@ -39,6 +46,8 @@
 
     private final String nodeId;
 
+    @Deprecated // required for binary backward-compatibility when registering with a 0.9.4 CC
+    @SuppressWarnings("unused")
     private final NCConfig ncConfig;
 
     private final NetworkAddress dataPort;
@@ -77,11 +86,12 @@
 
     private final NodeCapacity capacity;
 
+    private final HashMap<SerializedOption, Object> config;
+
     public NodeRegistration(InetSocketAddress ncAddress, String nodeId, NCConfig ncConfig, NetworkAddress dataPort,
             NetworkAddress resultPort, HeartbeatSchema hbSchema, NetworkAddress messagingPort, NodeCapacity capacity) {
         this.ncAddress = ncAddress;
         this.nodeId = nodeId;
-        this.ncConfig = ncConfig;
         this.dataPort = dataPort;
         this.resultPort = resultPort;
         this.hbSchema = hbSchema;
@@ -100,6 +110,12 @@
         this.inputArguments = runtimeMXBean.getInputArguments();
         this.systemProperties = runtimeMXBean.getSystemProperties();
         this.pid = PidHelper.getPid();
+        IApplicationConfig cfg = ncConfig.getConfigManager().getNodeEffectiveConfig(nodeId);
+        this.config = new HashMap<>();
+        for (IOption option : cfg.getOptions()) {
+            config.put(option.toSerializable(), cfg.get(option));
+        }
+        this.ncConfig = null;
     }
 
     public InetSocketAddress getNodeControllerAddress() {
@@ -114,8 +130,8 @@
         return capacity;
     }
 
-    public NCConfig getNCConfig() {
-        return ncConfig;
+    public Map<SerializedOption, Object> getConfig() {
+        return config;
     }
 
     public NetworkAddress getDataPort() {
@@ -185,4 +201,9 @@
     public int getPid() {
         return pid;
     }
-}
+
+    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+        JavaSerializationUtils.readObject(in, this);
+    }
+
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksClientInterfaceRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksClientInterfaceRemoteProxy.java
index 3fac7da..4f27ec1 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksClientInterfaceRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksClientInterfaceRemoteProxy.java
@@ -24,9 +24,10 @@
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.hyracks.api.client.HyracksClientInterfaceFunctions;
 import org.apache.hyracks.api.client.IHyracksClientInterface;
 import org.apache.hyracks.api.client.NodeControllerInfo;
-import org.apache.hyracks.api.client.impl.ClusterControllerInfo;
+import org.apache.hyracks.api.client.ClusterControllerInfo;
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.deployment.DeploymentId;
 import org.apache.hyracks.api.job.DeployedJobSpecId;
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java
index 2c7e82e..faae14b 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java
@@ -40,7 +40,7 @@
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.client.IHyracksClientInterface;
 import org.apache.hyracks.api.client.NodeControllerInfo;
-import org.apache.hyracks.api.client.impl.ClusterControllerInfo;
+import org.apache.hyracks.api.client.ClusterControllerInfo;
 import org.apache.hyracks.api.client.impl.JobSpecificationActivityClusterGraphGeneratorFactory;
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.deployment.DeploymentId;
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/JavaSerializationBasedPayloadSerializerDeserializer.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/JavaSerializationBasedPayloadSerializerDeserializer.java
index c4263d2..439f230 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/JavaSerializationBasedPayloadSerializerDeserializer.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/JavaSerializationBasedPayloadSerializerDeserializer.java
@@ -18,8 +18,11 @@
  */
 package org.apache.hyracks.ipc.impl;
 
+import static org.apache.hyracks.api.util.JavaSerializationUtils.getSerializationProvider;
+
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
+import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.OutputStream;
@@ -28,6 +31,7 @@
 import org.apache.hyracks.ipc.api.IPayloadSerializerDeserializer;
 
 public class JavaSerializationBasedPayloadSerializerDeserializer implements IPayloadSerializerDeserializer {
+
     @Override
     public Object deserializeObject(ByteBuffer buffer, int length) throws Exception {
         return deserialize(buffer, length);
@@ -48,21 +52,23 @@
         return serialize(exception);
     }
 
-    public static void serialize(OutputStream out, Object object) throws Exception {
-        ObjectOutputStream oos = new ObjectOutputStream(out);
-        oos.writeObject(object);
-        oos.flush();
+    public static void serialize(OutputStream out, Object object) throws IOException {
+        try (ObjectOutputStream oos = getSerializationProvider().newObjectOutputStream(out)) {
+            oos.writeObject(object);
+            oos.flush();
+        }
     }
 
     private Object deserialize(ByteBuffer buffer, int length) throws Exception {
-        ObjectInputStream ois =
-                new ObjectInputStream(new ByteArrayInputStream(buffer.array(), buffer.position(), length));
-        Object object = ois.readObject();
-        ois.close();
+        Object object;
+        try (ObjectInputStream ois = getSerializationProvider()
+                .newObjectInputStream(new ByteArrayInputStream(buffer.array(), buffer.position(), length))) {
+            object = ois.readObject();
+        }
         return object;
     }
 
-    private byte[] serialize(Object object) throws Exception {
+    private byte[] serialize(Object object) throws IOException {
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         serialize(baos, object);
         baos.close();
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java
index 7d5beff..cdd3ea2 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hyracks.storage.am.lsm.btree.dataflow;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 
@@ -42,6 +43,7 @@
 import org.apache.hyracks.storage.am.lsm.common.dataflow.LsmResource;
 import org.apache.hyracks.storage.common.IStorageManager;
 import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory;
+import org.apache.hyracks.util.CompatibilityUtil;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -133,4 +135,15 @@
         json.putPOJO("btreeFields", btreeFields);
         json.putPOJO("compressorDecompressorFactory", compressorDecompressorFactory.toJson(registry));
     }
+
+    private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
+        in.defaultReadObject();
+
+        // compat w/ 0.3.4
+        if (compressorDecompressorFactory == null) {
+            CompatibilityUtil.writeField(this, "compressorDecompressorFactory",
+                    NoOpCompressorDecompressorFactory.INSTANCE);
+        }
+    }
+
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResourceFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResourceFactory.java
index ea41c3d..740b3d7 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResourceFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResourceFactory.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hyracks.storage.am.lsm.btree.dataflow;
 
+import java.io.IOException;
 import java.util.Map;
 
 import org.apache.hyracks.api.compression.ICompressorDecompressorFactory;
@@ -33,6 +34,8 @@
 import org.apache.hyracks.storage.am.lsm.common.dataflow.LsmResource;
 import org.apache.hyracks.storage.am.lsm.common.dataflow.LsmResourceFactory;
 import org.apache.hyracks.storage.common.IStorageManager;
+import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory;
+import org.apache.hyracks.util.CompatibilityUtil;
 
 public class LSMBTreeLocalResourceFactory extends LsmResourceFactory {
 
@@ -69,4 +72,14 @@
                 filterTypeTraits, filterCmpFactories, btreeFields, filterFields, opTrackerProvider, ioOpCallbackFactory,
                 metadataPageManagerFactory, vbcProvider, ioSchedulerProvider, durable, compressorDecompressorFactory);
     }
+
+    private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
+        in.defaultReadObject();
+
+        // compat w/ 0.3.4
+        if (compressorDecompressorFactory == null) {
+            CompatibilityUtil.writeField(this, "compressorDecompressorFactory",
+                    NoOpCompressorDecompressorFactory.INSTANCE);
+        }
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/NoOpCompressorDecompressorFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/NoOpCompressorDecompressorFactory.java
index 690f4a2..90a4b56 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/NoOpCompressorDecompressorFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/NoOpCompressorDecompressorFactory.java
@@ -18,6 +18,8 @@
  */
 package org.apache.hyracks.storage.common.compression;
 
+import java.io.ObjectStreamException;
+
 import org.apache.hyracks.api.compression.ICompressorDecompressor;
 import org.apache.hyracks.api.compression.ICompressorDecompressorFactory;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -26,7 +28,7 @@
 
 import com.fasterxml.jackson.databind.JsonNode;
 
-public class NoOpCompressorDecompressorFactory implements ICompressorDecompressorFactory {
+public final class NoOpCompressorDecompressorFactory implements ICompressorDecompressorFactory {
     private static final long serialVersionUID = 1L;
     public static final ICompressorDecompressorFactory INSTANCE = new NoOpCompressorDecompressorFactory();
 
@@ -44,4 +46,8 @@
     public static IJsonSerializable fromJson(IPersistedResourceRegistry registry, JsonNode json) {
         return INSTANCE;
     }
+
+    private Object readResolve() throws ObjectStreamException {
+        return INSTANCE;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/CompatibilityUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/CompatibilityUtil.java
new file mode 100644
index 0000000..392955a
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/CompatibilityUtil.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.util;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class CompatibilityUtil {
+    private static final Logger LOGGER = LogManager.getLogger();
+
+    private CompatibilityUtil() {
+    }
+
+    public static Object readField(Object obj, String fieldName) throws IOException {
+        Class<?> objClass = obj.getClass();
+        LOGGER.debug("reading field '{}' on object of type {}", fieldName, objClass);
+        try {
+            Field f = objClass.getDeclaredField(fieldName);
+            f.setAccessible(true);
+            return f.get(obj);
+        } catch (NoSuchFieldException | IllegalAccessException e) {
+            LOGGER.warn("exception reading field '{}' on object of type {}", fieldName, objClass, e);
+            throw new IOException(e);
+        }
+    }
+
+    public static void writeField(Object obj, String fieldName, Object newValue) throws IOException {
+        Class<?> objClass = obj.getClass();
+        LOGGER.debug("updating field '{}' on object of type {} to {}", fieldName, objClass, newValue);
+        try {
+            Field f = objClass.getDeclaredField(fieldName);
+            f.setAccessible(true);
+            f.set(obj, newValue);
+        } catch (NoSuchFieldException | IllegalAccessException e) {
+            LOGGER.warn("exception updating field '{}' object of type {} to {}", fieldName, objClass, newValue, e);
+            throw new IOException(e);
+        }
+    }
+}
