diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/pom.xml b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/pom.xml
index 9dc3c1a..ba086ad 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/pom.xml
@@ -63,7 +63,7 @@
           <licenseFamilies combine.children="append">
             <licenseFamily implementation="org.apache.rat.license.MITLicenseFamily"/>
           </licenseFamilies>
-          <excludes>
+          <excludes combine.children="append">
             <!-- See hyracks-fullstack-license/src/main/licenses/templates/source_licenses.ftl -->
             <exclude>src/main/resources/static/javascript/flot/jquery.flot.resize.min.js</exclude>
             <exclude>src/main/resources/static/javascript/jsplumb/jquery.jsPlumb-1.3.5-all-min.js</exclude>
@@ -104,7 +104,6 @@
     <dependency>
       <groupId>args4j</groupId>
       <artifactId>args4j</artifactId>
-      <version>2.0.12</version>
     </dependency>
     <dependency>
       <groupId>org.apache.hyracks</groupId>
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/CCApplicationEntryPoint.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/CCApplicationEntryPoint.java
new file mode 100644
index 0000000..07008df
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/CCApplicationEntryPoint.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.cc;
+
+import java.util.Arrays;
+
+import org.apache.hyracks.api.application.ICCApplicationContext;
+import org.apache.hyracks.api.application.ICCApplicationEntryPoint;
+import org.apache.hyracks.api.config.IConfigManager;
+import org.apache.hyracks.api.config.Section;
+import org.apache.hyracks.api.job.resource.DefaultJobCapacityController;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
+import org.apache.hyracks.control.common.controllers.CCConfig;
+import org.apache.hyracks.control.common.controllers.ControllerConfig;
+import org.apache.hyracks.control.common.controllers.NCConfig;
+
+public class CCApplicationEntryPoint implements ICCApplicationEntryPoint {
+    public static final ICCApplicationEntryPoint INSTANCE = new CCApplicationEntryPoint();
+
+    protected CCApplicationEntryPoint() {
+    }
+
+    @Override
+    public void start(ICCApplicationContext ccAppCtx, String[] args) throws Exception {
+        if (args.length > 0) {
+            throw new IllegalArgumentException("Unrecognized argument(s): " + Arrays.toString(args));
+        }
+    }
+
+    @Override
+    public void stop() throws Exception {
+        // no-op
+    }
+
+    @Override
+    public void startupCompleted() throws Exception {
+        // no-op
+    }
+
+    @Override
+    public IJobCapacityController getJobCapacityController() {
+        return DefaultJobCapacityController.INSTANCE;
+    }
+
+    @Override
+    public void registerConfig(IConfigManager configManager) {
+        configManager.addIniParamOptions(ControllerConfig.Option.CONFIG_FILE, ControllerConfig.Option.CONFIG_FILE_URL);
+        configManager.addCmdLineSections(Section.CC, Section.COMMON);
+        configManager.setUsageFilter(getUsageFilter());
+        configManager.register(ControllerConfig.Option.class, CCConfig.Option.class, NCConfig.Option.class);
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/CCDriver.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/CCDriver.java
index dff3107..754deac 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/CCDriver.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/CCDriver.java
@@ -18,32 +18,50 @@
  */
 package org.apache.hyracks.control.cc;
 
-import org.kohsuke.args4j.CmdLineParser;
+import static org.apache.hyracks.control.common.controllers.CCConfig.Option.APP_CLASS;
 
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.hyracks.api.application.ICCApplicationEntryPoint;
+import org.apache.hyracks.control.common.config.ConfigManager;
+import org.apache.hyracks.control.common.config.ConfigUtils;
 import org.apache.hyracks.control.common.controllers.CCConfig;
+import org.kohsuke.args4j.CmdLineException;
 
 public class CCDriver {
-    public static void main(String args []) throws Exception {
-        try {
-            CCConfig ccConfig = new CCConfig();
-            CmdLineParser cp = new CmdLineParser(ccConfig);
-            try {
-                cp.parseArgument(args);
-            } catch (Exception e) {
-                System.err.println(e.getMessage());
-                cp.printUsage(System.err);
-                return;
-            }
-            ccConfig.loadConfigAndApplyDefaults();
+    private static final Logger LOGGER = Logger.getLogger(CCDriver.class.getName());
 
-            ClusterControllerService ccService = new ClusterControllerService(ccConfig);
+    private CCDriver() {
+    }
+
+    public static void main(String[] args) throws Exception {
+        try {
+            final ConfigManager configManager = new ConfigManager(args);
+            ICCApplicationEntryPoint appEntryPoint = getAppEntryPoint(args);
+            appEntryPoint.registerConfig(configManager);
+            CCConfig ccConfig = new CCConfig(configManager);
+            ClusterControllerService ccService = new ClusterControllerService(ccConfig, appEntryPoint);
             ccService.start();
             while (true) {
                 Thread.sleep(100000);
             }
+        } catch (CmdLineException e) {
+            LOGGER.log(Level.FINE, "Exception parsing command line: " + Arrays.toString(args), e);
+            System.exit(2);
         } catch (Exception e) {
-            e.printStackTrace();
+            LOGGER.log(Level.SEVERE, "Exiting NCDriver due to exception", e);
             System.exit(1);
         }
     }
+
+    private static ICCApplicationEntryPoint getAppEntryPoint(String[] args)
+            throws ClassNotFoundException, InstantiationException, IllegalAccessException, IOException {
+        // determine app class so that we can use the correct implementation of the configuration...
+        String appClassName = ConfigUtils.getOptionValue(args, APP_CLASS);
+        return appClassName != null ? (ICCApplicationEntryPoint) (Class.forName(appClassName)).newInstance()
+                : CCApplicationEntryPoint.INSTANCE;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index c47284c..21b9dcf 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -20,6 +20,7 @@
 
 import java.io.File;
 import java.io.FileReader;
+import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.net.InetAddress;
@@ -32,20 +33,22 @@
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.TreeMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hyracks.api.application.ICCApplicationEntryPoint;
 import org.apache.hyracks.api.client.ClusterControllerInfo;
 import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.config.IApplicationConfig;
 import org.apache.hyracks.api.context.ICCContext;
 import org.apache.hyracks.api.deployment.DeploymentId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.HyracksException;
-import org.apache.hyracks.api.job.resource.DefaultJobCapacityController;
 import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.apache.hyracks.api.service.IControllerService;
 import org.apache.hyracks.api.topology.ClusterTopology;
@@ -66,9 +69,10 @@
 import org.apache.hyracks.control.cc.work.RemoveDeadNodesWork;
 import org.apache.hyracks.control.cc.work.ShutdownNCServiceWork;
 import org.apache.hyracks.control.cc.work.TriggerNCWork;
+import org.apache.hyracks.control.common.config.ConfigManager;
 import org.apache.hyracks.control.common.context.ServerContext;
 import org.apache.hyracks.control.common.controllers.CCConfig;
-import org.apache.hyracks.control.common.controllers.IniUtils;
+import org.apache.hyracks.control.common.controllers.NCConfig;
 import org.apache.hyracks.control.common.deployment.DeploymentRun;
 import org.apache.hyracks.control.common.ipc.CCNCFunctions;
 import org.apache.hyracks.control.common.logs.LogFile;
@@ -77,7 +81,6 @@
 import org.apache.hyracks.ipc.api.IIPCI;
 import org.apache.hyracks.ipc.impl.IPCSystem;
 import org.apache.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
-import org.ini4j.Ini;
 import org.xml.sax.InputSource;
 
 public class ClusterControllerService implements IControllerService {
@@ -85,6 +88,8 @@
 
     private final CCConfig ccConfig;
 
+    private final ConfigManager configManager;
+
     private IPCSystem clusterIPC;
 
     private IPCSystem clientIPC;
@@ -127,11 +132,22 @@
 
     private ShutdownRun shutdownCallback;
 
-    private ICCApplicationEntryPoint aep;
+    private final ICCApplicationEntryPoint aep;
 
-    public ClusterControllerService(final CCConfig ccConfig) throws Exception {
-        this.ccConfig = ccConfig;
-        File jobLogFolder = new File(ccConfig.ccRoot, "logs/jobs");
+    public ClusterControllerService(final CCConfig config) throws Exception {
+        this(config, getApplicationEntryPoint(config));
+    }
+
+    public ClusterControllerService(final CCConfig config,
+                                    final ICCApplicationEntryPoint aep) throws Exception {
+        this.ccConfig = config;
+        this.configManager = ccConfig.getConfigManager();
+        if (aep == null) {
+            throw new IllegalArgumentException("ICCApplicationEntryPoint cannot be null");
+        }
+        this.aep = aep;
+        configManager.processConfig();
+        File jobLogFolder = new File(ccConfig.getRootDir(), "logs/jobs");
         jobLog = new LogFile(jobLogFolder);
 
         // WorkQueue is in charge of heartbeat as well as other events.
@@ -140,7 +156,8 @@
         final ClusterTopology topology = computeClusterTopology(ccConfig);
         ccContext = new ClusterControllerContext(topology);
         sweeper = new DeadNodeSweeper();
-        datasetDirectoryService = new DatasetDirectoryService(ccConfig.resultTTL, ccConfig.resultSweepThreshold);
+        datasetDirectoryService = new DatasetDirectoryService(ccConfig.getResultTTL(),
+                ccConfig.getResultSweepThreshold());
 
         deploymentRunMap = new HashMap<>();
         stateDumpRunMap = new HashMap<>();
@@ -151,10 +168,10 @@
     }
 
     private static ClusterTopology computeClusterTopology(CCConfig ccConfig) throws Exception {
-        if (ccConfig.clusterTopologyDefinition == null) {
+        if (ccConfig.getClusterTopology() == null) {
             return null;
         }
-        FileReader fr = new FileReader(ccConfig.clusterTopologyDefinition);
+        FileReader fr = new FileReader(ccConfig.getClusterTopology());
         InputSource in = new InputSource(fr);
         try {
             return TopologyDefinitionParser.parse(in);
@@ -166,20 +183,21 @@
     @Override
     public void start() throws Exception {
         LOGGER.log(Level.INFO, "Starting ClusterControllerService: " + this);
-        serverCtx = new ServerContext(ServerContext.ServerType.CLUSTER_CONTROLLER, new File(ccConfig.ccRoot));
+        serverCtx = new ServerContext(ServerContext.ServerType.CLUSTER_CONTROLLER, new File(ccConfig.getRootDir()));
         IIPCI ccIPCI = new ClusterControllerIPCI(this);
-        clusterIPC = new IPCSystem(new InetSocketAddress(ccConfig.clusterNetPort), ccIPCI,
+        clusterIPC = new IPCSystem(new InetSocketAddress(ccConfig.getClusterListenPort()), ccIPCI,
                 new CCNCFunctions.SerializerDeserializer());
         IIPCI ciIPCI = new ClientInterfaceIPCI(this);
-        clientIPC = new IPCSystem(new InetSocketAddress(ccConfig.clientNetIpAddress, ccConfig.clientNetPort), ciIPCI,
-                new JavaSerializationBasedPayloadSerializerDeserializer());
-        webServer = new WebServer(this, ccConfig.httpPort);
+        clientIPC = new IPCSystem(
+                new InetSocketAddress(ccConfig.getClientListenAddress(), ccConfig.getClientListenPort()),
+                ciIPCI, new JavaSerializationBasedPayloadSerializerDeserializer());
+        webServer = new WebServer(this, ccConfig.getConsoleListenPort());
         clusterIPC.start();
         clientIPC.start();
         webServer.start();
-        info = new ClusterControllerInfo(ccConfig.clientNetIpAddress, ccConfig.clientNetPort,
+        info = new ClusterControllerInfo(ccConfig.getClientListenAddress(), ccConfig.getClientListenPort(),
                 webServer.getListeningPort());
-        timer.schedule(sweeper, 0, ccConfig.heartbeatPeriod);
+        timer.schedule(sweeper, 0, ccConfig.getHeartbeatPeriod());
         jobLog.open();
         startApplication();
 
@@ -194,84 +212,62 @@
         appCtx = new CCApplicationContext(this, serverCtx, ccContext, ccConfig.getAppConfig());
         appCtx.addJobLifecycleListener(datasetDirectoryService);
         executor = Executors.newCachedThreadPool(appCtx.getThreadFactory());
-        String className = ccConfig.appCCMainClass;
-
-        IJobCapacityController jobCapacityController = DefaultJobCapacityController.INSTANCE;
-        if (className != null) {
-            Class<?> c = Class.forName(className);
-            aep = (ICCApplicationEntryPoint) c.newInstance();
-            String[] args = ccConfig.appArgs == null ? null
-                    : ccConfig.appArgs.toArray(new String[ccConfig.appArgs.size()]);
-            aep.start(appCtx, args);
-            jobCapacityController = aep.getJobCapacityController();
-        }
+        aep.start(appCtx, ccConfig.getAppArgsArray());
+        IJobCapacityController jobCapacityController = aep.getJobCapacityController();
 
         // Job manager is in charge of job lifecycle management.
         try {
             Constructor<?> jobManagerConstructor = this.getClass().getClassLoader()
-                    .loadClass(ccConfig.jobManagerClassName)
+                    .loadClass(ccConfig.getJobManagerClass())
                     .getConstructor(CCConfig.class, ClusterControllerService.class, IJobCapacityController.class);
             jobManager = (IJobManager) jobManagerConstructor.newInstance(ccConfig, this, jobCapacityController);
         } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | NoSuchMethodException
                 | InvocationTargetException e) {
             if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.log(Level.WARNING, "class " + ccConfig.jobManagerClassName + " could not be used: ", e);
+                LOGGER.log(Level.WARNING, "class " + ccConfig.getJobManagerClass() + " could not be used: ", e);
             }
             // Falls back to the default implementation if the user-provided class name is not valid.
             jobManager = new JobManager(ccConfig, this, jobCapacityController);
         }
     }
 
-    private void connectNCs() throws Exception {
-        Ini ini = ccConfig.getIni();
-        if (ini == null || Boolean.parseBoolean(ini.get("cc", "virtual.cluster"))) {
-            return;
-        }
-        for (String section : ini.keySet()) {
-            if (!section.startsWith("nc/")) {
-                continue;
+    private Map<String, Pair<String, Integer>> getNCServices() throws IOException {
+        Map<String, Pair<String, Integer>> ncMap = new TreeMap<>();
+        for (String ncId : configManager.getNodeNames()) {
+            IApplicationConfig ncConfig = configManager.getNodeEffectiveConfig(ncId);
+            if (!ncConfig.getBoolean(NCConfig.Option.VIRTUAL_NC)) {
+                ncMap.put(ncId, Pair.of(ncConfig.getString(NCConfig.Option.NCSERVICE_ADDRESS),
+                        ncConfig.getInt(NCConfig.Option.NCSERVICE_PORT)));
             }
-            String ncid = section.substring(3);
-            String address = IniUtils.getString(ini, section, "address", null);
-            int port = IniUtils.getInt(ini, section, "port", 9090);
-            if (address == null) {
-                address = InetAddress.getLoopbackAddress().getHostAddress();
-            }
-            workQueue.schedule(new TriggerNCWork(this, address, port, ncid));
         }
+        return ncMap;
+    }
+
+    private void connectNCs() throws IOException {
+        getNCServices().entrySet().forEach(ncService -> {
+            final TriggerNCWork triggerWork = new TriggerNCWork(ClusterControllerService.this,
+                    ncService.getValue().getLeft(), ncService.getValue().getRight(), ncService.getKey());
+            workQueue.schedule(triggerWork);
+        });
     }
 
     private void terminateNCServices() throws Exception {
-        Ini ini = ccConfig.getIni();
-        if (ini == null || Boolean.parseBoolean(ini.get("cc", "virtual.cluster"))) {
-            return;
-        }
         List<ShutdownNCServiceWork> shutdownNCServiceWorks = new ArrayList<>();
-        for (String section : ini.keySet()) {
-            if (!section.startsWith("nc/")) {
-                continue;
-            }
-            String ncid = section.substring(3);
-            String address = IniUtils.getString(ini, section, "address", null);
-            int port = IniUtils.getInt(ini, section, "port", 9090);
-            if (address == null) {
-                address = InetAddress.getLoopbackAddress().getHostAddress();
-            }
-            ShutdownNCServiceWork shutdownWork = new ShutdownNCServiceWork(address, port, ncid);
+        getNCServices().entrySet().forEach(ncService -> {
+            ShutdownNCServiceWork shutdownWork = new ShutdownNCServiceWork(ncService.getValue().getLeft(),
+                    ncService.getValue().getRight(), ncService.getKey());
             workQueue.schedule(shutdownWork);
             shutdownNCServiceWorks.add(shutdownWork);
-        }
+        });
         for (ShutdownNCServiceWork shutdownWork : shutdownNCServiceWorks) {
             shutdownWork.sync();
         }
     }
 
     private void notifyApplication() throws Exception {
-        if (aep != null) {
-            // Sometimes, there is no application entry point. Check hyracks-client project
-            aep.startupCompleted();
-        }
+        aep.startupCompleted();
     }
+
     public void stop(boolean terminateNCService) throws Exception {
         if (terminateNCService) {
             terminateNCServices();
@@ -294,9 +290,7 @@
     }
 
     private void stopApplication() throws Exception {
-        if (aep != null) {
-            aep.stop();
-        }
+        aep.stop();
     }
 
     public ServerContext getServerContext() {
@@ -360,7 +354,7 @@
     }
 
     public NetworkAddress getDatasetDirectoryServiceInfo() {
-        return new NetworkAddress(ccConfig.clientNetIpAddress, ccConfig.clientNetPort);
+        return new NetworkAddress(ccConfig.getClientListenAddress(), ccConfig.getClientListenPort());
     }
 
     private final class ClusterControllerContext implements ICCContext {
@@ -390,6 +384,7 @@
         public ClusterTopology getClusterTopology() {
             return topology;
         }
+
     }
 
     private class DeadNodeSweeper extends TimerTask {
@@ -458,4 +453,14 @@
     public ThreadDumpRun removeThreadDumpRun(String requestKey) {
         return threadDumpRunMap.remove(requestKey);
     }
+
+    private static ICCApplicationEntryPoint getApplicationEntryPoint(CCConfig ccConfig)
+            throws ClassNotFoundException, IllegalAccessException, InstantiationException {
+        if (ccConfig.getAppClass() != null) {
+            Class<?> c = Class.forName(ccConfig.getAppClass());
+            return (ICCApplicationEntryPoint) c.newInstance();
+        } else {
+            return CCApplicationEntryPoint.INSTANCE;
+        }
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
index 955b7f2..8400a59 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
@@ -282,7 +282,7 @@
     public synchronized ObjectNode toSummaryJSON()  {
         ObjectMapper om = new ObjectMapper();
         ObjectNode o = om.createObjectNode();
-        o.put("node-id", ncConfig.nodeId);
+        o.put("node-id", ncConfig.getNodeId());
         o.put("heap-used", heapUsedSize[(rrdPtr + RRD_SIZE - 1) % RRD_SIZE]);
         o.put("system-load-average", systemLoadAverage[(rrdPtr + RRD_SIZE - 1) % RRD_SIZE]);
 
@@ -293,7 +293,7 @@
         ObjectMapper om = new ObjectMapper();
         ObjectNode o = om.createObjectNode();
 
-        o.put("node-id", ncConfig.nodeId);
+        o.put("node-id", ncConfig.getNodeId());
 
         if (includeConfig) {
             o.put("os-name", osName);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java
index 77b9b17..a8b03bc 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java
@@ -27,9 +27,10 @@
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.hyracks.api.application.IApplicationConfig;
 import org.apache.hyracks.api.application.ICCApplicationContext;
 import org.apache.hyracks.api.application.IClusterLifecycleListener;
+import org.apache.hyracks.api.config.IApplicationConfig;
+import org.apache.hyracks.api.config.IOption;
 import org.apache.hyracks.api.context.ICCContext;
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.IJobLifecycleListener;
@@ -105,7 +106,7 @@
         clusterLifecycleListeners.add(clusterLifecycleListener);
     }
 
-    public void notifyNodeJoin(String nodeId, Map<String, String> ncConfiguration) throws HyracksException {
+    public void notifyNodeJoin(String nodeId, Map<IOption, Object> ncConfiguration) throws HyracksException {
         for (IClusterLifecycleListener l : clusterLifecycleListeners) {
             l.notifyNodeJoin(nodeId, ncConfiguration);
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
index 354019c..d6d8bc4 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
@@ -137,7 +137,7 @@
             Map.Entry<String, NodeControllerState> entry = nodeIterator.next();
             String nodeId = entry.getKey();
             NodeControllerState state = entry.getValue();
-            if (state.incrementLastHeartbeatDuration() >= ccConfig.maxHeartbeatLapsePeriods) {
+            if (state.incrementLastHeartbeatDuration() >= ccConfig.getHeartbeatMaxMisses()) {
                 deadNodes.add(nodeId);
                 affectedJobIds.addAll(state.getActiveJobIds());
                 // Removes the node from node map.
@@ -172,10 +172,7 @@
 
     // Retrieves the IP address for a given node.
     private InetAddress getIpAddress(NodeControllerState ncState) throws HyracksException {
-        String ipAddress = ncState.getNCConfig().dataIPAddress;
-        if (ncState.getNCConfig().dataPublicIPAddress != null) {
-            ipAddress = ncState.getNCConfig().dataPublicIPAddress;
-        }
+        String ipAddress = ncState.getNCConfig().getDataPublicAddress();
         try {
             return InetAddress.getByName(ipAddress);
         } catch (UnknownHostException e) {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index 031303b..b35de3d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -68,13 +68,13 @@
         this.ccs = ccs;
         this.jobCapacityController = jobCapacityController;
         try {
-            Constructor<?> jobQueueConstructor = this.getClass().getClassLoader().loadClass(ccConfig.jobQueueClassName)
+            Constructor<?> jobQueueConstructor = this.getClass().getClassLoader().loadClass(ccConfig.getJobQueueClass())
                     .getConstructor(IJobManager.class, IJobCapacityController.class);
             jobQueue = (IJobQueue) jobQueueConstructor.newInstance(this, this.jobCapacityController);
         } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | NoSuchMethodException
                 | InvocationTargetException e) {
             if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.log(Level.WARNING, "class " + ccConfig.jobQueueClassName + " could not be used: ", e);
+                LOGGER.log(Level.WARNING, "class " + ccConfig.getJobQueueClass() + " could not be used: ", e);
             }
             // Falls back to the default implementation if the user-provided class name is not valid.
             jobQueue = new FIFOJobQueue(this, jobCapacityController);
@@ -85,13 +85,13 @@
 
             @Override
             protected boolean removeEldestEntry(Map.Entry<JobId, JobRun> eldest) {
-                return size() > ccConfig.jobHistorySize;
+                return size() > ccConfig.getJobHistorySize();
             }
         };
         runMapHistory = new LinkedHashMap<JobId, List<Exception>>() {
             private static final long serialVersionUID = 1L;
             /** history size + 1 is for the case when history size = 0 */
-            private int allowedSize = 100 * (ccConfig.jobHistorySize + 1);
+            private int allowedSize = 100 * (ccConfig.getJobHistorySize() + 1);
 
             @Override
             protected boolean removeEldestEntry(Map.Entry<JobId, List<Exception>> eldest) {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeDetailsJSONWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeDetailsJSONWork.java
index 0577002..3dec959 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeDetailsJSONWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeDetailsJSONWork.java
@@ -27,28 +27,26 @@
 import java.lang.management.OperatingSystemMXBean;
 import java.lang.management.RuntimeMXBean;
 import java.lang.management.ThreadMXBean;
-import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Date;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.hyracks.control.cc.NodeControllerState;
-import org.apache.hyracks.control.cc.cluster.INodeManager;
-import org.apache.hyracks.control.common.controllers.CCConfig;
-import org.apache.hyracks.control.common.utils.PidHelper;
-import org.apache.hyracks.control.common.work.IPCResponder;
-import org.apache.hyracks.control.common.work.SynchronizableWork;
-import org.kohsuke.args4j.Option;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.hyracks.api.config.Section;
+import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.cluster.INodeManager;
+import org.apache.hyracks.control.common.config.ConfigUtils;
+import org.apache.hyracks.control.common.controllers.CCConfig;
+import org.apache.hyracks.control.common.controllers.NCConfig;
+import org.apache.hyracks.control.common.utils.PidHelper;
+import org.apache.hyracks.control.common.work.IPCResponder;
+import org.apache.hyracks.control.common.work.SynchronizableWork;
 
 public class GetNodeDetailsJSONWork extends SynchronizableWork {
-    private static final Logger LOGGER = Logger.getLogger(GetNodeDetailsJSONWork.class.getName());
+    private static final Section [] CC_SECTIONS = { Section.CC, Section.COMMON };
+    private static final Section [] NC_SECTIONS = { Section.NC, Section.COMMON };
+
     private final INodeManager nodeManager;
     private final CCConfig ccConfig;
     private final String nodeId;
@@ -59,7 +57,7 @@
     private ObjectMapper om = new ObjectMapper();
 
     public GetNodeDetailsJSONWork(INodeManager nodeManager, CCConfig ccConfig, String nodeId, boolean includeStats,
-                                  boolean includeConfig, IPCResponder<String> callback) {
+            boolean includeConfig, IPCResponder<String> callback) {
         this.nodeManager = nodeManager;
         this.ccConfig = ccConfig;
         this.nodeId = nodeId;
@@ -69,7 +67,7 @@
     }
 
     public GetNodeDetailsJSONWork(INodeManager nodeManager, CCConfig ccConfig, String nodeId, boolean includeStats,
-                                  boolean includeConfig) {
+            boolean includeConfig) {
         this(nodeManager, ccConfig, nodeId, includeStats, includeConfig, null);
     }
 
@@ -79,14 +77,18 @@
             // null nodeId is a request for CC
             detail = getCCDetails();
             if (includeConfig) {
-                addIni(detail, ccConfig);
+                ConfigUtils.addConfigToJSON(detail, ccConfig.getAppConfig(), CC_SECTIONS);
+                detail.putPOJO("app.args", ccConfig.getAppArgs());
             }
         } else {
             NodeControllerState ncs = nodeManager.getNodeControllerState(nodeId);
             if (ncs != null) {
                 detail = ncs.toDetailedJSON(includeStats, includeConfig);
                 if (includeConfig) {
-                    addIni(detail, ncs.getNCConfig());
+                    final NCConfig ncConfig = ncs.getNCConfig();
+                    ConfigUtils.addConfigToJSON(detail, ncConfig.getConfigManager().getNodeEffectiveConfig(nodeId),
+                            NC_SECTIONS);
+                    detail.putPOJO("app.args", ncConfig.getAppArgs());
                 }
             }
         }
@@ -96,7 +98,7 @@
         }
     }
 
-    private ObjectNode getCCDetails()  {
+    private ObjectNode getCCDetails() {
         ObjectNode o = om.createObjectNode();
         MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
         List<GarbageCollectorMXBean> gcMXBeans = ManagementFactory.getGarbageCollectorMXBeans();
@@ -151,33 +153,6 @@
         return o;
     }
 
-    private static void addIni(ObjectNode o, Object configBean)  {
-        Map<String, Object> iniMap = new HashMap<>();
-        for (Field f : configBean.getClass().getFields()) {
-            Option option = f.getAnnotation(Option.class);
-            if (option == null) {
-                continue;
-            }
-            final String optionName = option.name();
-            Object value = null;
-            try {
-                value = f.get(configBean);
-            } catch (IllegalAccessException e) {
-                LOGGER.log(Level.WARNING, "Unable to access ini option " + optionName, e);
-            }
-            if (value != null) {
-                if ("--".equals(optionName)) {
-                    iniMap.put("app_args", value);
-                } else {
-                    iniMap.put(optionName.substring(1).replace('-', '_'),
-                            "-iodevices".equals(optionName)
-                            ? String.valueOf(value).split(",")
-                            : value);
-                }
-            }
-        }
-        o.putPOJO("ini", iniMap);
-    }
 
     public ObjectNode getDetail() {
         return detail;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
index dc93515..e97950e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
@@ -23,6 +23,8 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.hyracks.api.config.IApplicationConfig;
+import org.apache.hyracks.api.config.IOption;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.cc.NodeControllerState;
 import org.apache.hyracks.control.cc.cluster.INodeManager;
@@ -50,19 +52,22 @@
         String id = reg.getNodeId();
         IIPCHandle ncIPCHandle = ccs.getClusterIPC().getHandle(reg.getNodeControllerAddress());
         CCNCFunctions.NodeRegistrationResult result;
-        Map<String, String> ncConfiguration = new HashMap<>();
+        Map<IOption, Object> ncConfiguration = new HashMap<>();
         try {
             INodeController nodeController = new NodeControllerRemoteProxy(ncIPCHandle);
             NodeControllerState state = new NodeControllerState(nodeController, reg);
             INodeManager nodeManager = ccs.getNodeManager();
             nodeManager.addNode(id, state);
-            state.getNCConfig().toMap(ncConfiguration);
+            IApplicationConfig cfg = state.getNCConfig().getConfigManager().getNodeEffectiveConfig(id);
+            for (IOption option : cfg.getOptions()) {
+                ncConfiguration.put(option, cfg.get(option));
+            }
             LOGGER.log(Level.INFO, "Registered INodeController: id = " + id);
             NodeParameters params = new NodeParameters();
             params.setClusterControllerInfo(ccs.getClusterControllerInfo());
             params.setDistributedState(ccs.getApplicationContext().getDistributedState());
-            params.setHeartbeatPeriod(ccs.getCCConfig().heartbeatPeriod);
-            params.setProfileDumpPeriod(ccs.getCCConfig().profileDumpPeriod);
+            params.setHeartbeatPeriod(ccs.getCCConfig().getHeartbeatPeriod());
+            params.setProfileDumpPeriod(ccs.getCCConfig().getProfileDumpPeriod());
             result = new CCNCFunctions.NodeRegistrationResult(params, null);
         } catch (Exception e) {
             result = new CCNCFunctions.NodeRegistrationResult(null, e);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TriggerNCWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TriggerNCWork.java
index a7bca25..ab526e8 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TriggerNCWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TriggerNCWork.java
@@ -27,7 +27,9 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.hyracks.api.config.Section;
 import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.common.controllers.NCConfig;
 import org.apache.hyracks.control.common.controllers.ServiceConstants.ServiceCommand;
 import org.apache.hyracks.control.common.work.AbstractWork;
 import org.ini4j.Ini;
@@ -79,14 +81,18 @@
 
     /**
      * Given an Ini object, serialize it to String with some enhancements.
-     * @param ccini
+     * @param ccini the ini file to decorate and forward to NC
      */
-    String serializeIni(Ini ccini) throws IOException {
+    private String serializeIni(Ini ccini) throws IOException {
         StringWriter iniString = new StringWriter();
-        ccini.store(iniString);
+        ccini.get(Section.NC.sectionName()).putIfAbsent(NCConfig.Option.CLUSTER_ADDRESS.ini(),
+                ccs.getCCConfig().getClusterPublicAddress());
+        ccini.get(Section.NC.sectionName()).putIfAbsent(NCConfig.Option.CLUSTER_PORT.ini(),
+                String.valueOf(ccs.getCCConfig().getClusterPublicPort()));
         // Finally insert *this* NC's name into localnc section - this is a fixed
         // entry point so that NCs can determine where all their config is.
-        iniString.append("\n[localnc]\nid=").append(ncId).append("\n");
+        ccini.put(Section.LOCALNC.sectionName(), NCConfig.Option.NODE_ID.ini(), ncId);
+        ccini.store(iniString);
         if (LOGGER.isLoggable(Level.FINE)) {
             LOGGER.fine("Returning Ini file:\n" + iniString.toString());
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
index c742a4a..dde3bad 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
@@ -46,8 +46,8 @@
     public void testNormal() throws HyracksException {
         IResourceManager resourceManager = new ResourceManager();
         INodeManager nodeManager = new NodeManager(makeCCConfig(), resourceManager);
-        NodeControllerState ncState1 = mockNodeControllerState(false);
-        NodeControllerState ncState2 = mockNodeControllerState(false);
+        NodeControllerState ncState1 = mockNodeControllerState(NODE1, false);
+        NodeControllerState ncState2 = mockNodeControllerState(NODE2, false);
 
         // Verifies states after adding nodes.
         nodeManager.addNode(NODE1, ncState1);
@@ -71,7 +71,7 @@
     public void testException() throws HyracksException {
         IResourceManager resourceManager = new ResourceManager();
         INodeManager nodeManager = new NodeManager(makeCCConfig(), resourceManager);
-        NodeControllerState ncState1 = mockNodeControllerState(true);
+        NodeControllerState ncState1 = mockNodeControllerState(NODE1, true);
 
         boolean invalidNetworkAddress = false;
         // Verifies states after a failure during adding nodes.
@@ -106,11 +106,11 @@
 
     private CCConfig makeCCConfig() {
         CCConfig ccConfig = new CCConfig();
-        ccConfig.maxHeartbeatLapsePeriods = 0;
+        ccConfig.setHeartbeatMaxMisses(0);
         return ccConfig;
     }
 
-    private NodeControllerState mockNodeControllerState(boolean invalidIpAddr) {
+    private NodeControllerState mockNodeControllerState(String nodeId, boolean invalidIpAddr) {
         NodeControllerState ncState = mock(NodeControllerState.class);
         String ipAddr = invalidIpAddr ? "255.255.255:255" : "127.0.0.2";
         NetworkAddress dataAddr = new NetworkAddress(ipAddr, 1001);
@@ -120,8 +120,8 @@
         when(ncState.getDataPort()).thenReturn(dataAddr);
         when(ncState.getDatasetPort()).thenReturn(resultAddr);
         when(ncState.getMessagingPort()).thenReturn(msgAddr);
-        NCConfig ncConfig = new NCConfig();
-        ncConfig.dataIPAddress = ipAddr;
+        NCConfig ncConfig = new NCConfig(nodeId);
+        ncConfig.setDataPublicAddress(ipAddr);
         when(ncState.getNCConfig()).thenReturn(ncConfig);
         return ncState;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
index 3bb08bd..97b05e7 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
@@ -27,6 +27,7 @@
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -48,14 +49,23 @@
 import org.apache.hyracks.control.common.controllers.CCConfig;
 import org.apache.hyracks.control.common.logs.LogFile;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
+import org.kohsuke.args4j.CmdLineException;
 import org.mockito.Mockito;
 
 public class JobManagerTest {
 
+    private CCConfig ccConfig;
+
+    @Before
+    public void setup() throws IOException, CmdLineException {
+        ccConfig = new CCConfig();
+        ccConfig.getConfigManager().processConfig();
+    }
+
     @Test
-    public void test() throws HyracksException {
-        CCConfig ccConfig = new CCConfig();
+    public void test() throws IOException, CmdLineException {
         IJobCapacityController jobCapacityController = mock(IJobCapacityController.class);
         IJobManager jobManager = spy(new JobManager(ccConfig, mockClusterControllerService(), jobCapacityController));
 
@@ -114,7 +124,7 @@
         }
         Assert.assertTrue(jobManager.getRunningJobs().size() == 4096);
         Assert.assertTrue(jobManager.getPendingJobs().isEmpty());
-        Assert.assertTrue(jobManager.getArchivedJobs().size() == ccConfig.jobHistorySize);
+        Assert.assertTrue(jobManager.getArchivedJobs().size() == ccConfig.getJobHistorySize());
 
         // Completes deferred jobs.
         for (JobRun run : deferredRuns) {
@@ -123,14 +133,13 @@
         }
         Assert.assertTrue(jobManager.getRunningJobs().isEmpty());
         Assert.assertTrue(jobManager.getPendingJobs().isEmpty());
-        Assert.assertTrue(jobManager.getArchivedJobs().size() == ccConfig.jobHistorySize);
+        Assert.assertTrue(jobManager.getArchivedJobs().size() == ccConfig.getJobHistorySize());
         verify(jobManager, times(8192)).prepareComplete(any(), any(), any());
         verify(jobManager, times(8192)).finalComplete(any());
     }
 
     @Test
     public void testExceedMax() throws HyracksException {
-        CCConfig ccConfig = new CCConfig();
         IJobCapacityController jobCapacityController = mock(IJobCapacityController.class);
         IJobManager jobManager = spy(new JobManager(ccConfig, mockClusterControllerService(), jobCapacityController));
         boolean rejected = false;
@@ -154,7 +163,6 @@
 
     @Test
     public void testAdmitThenReject() throws HyracksException {
-        CCConfig ccConfig = new CCConfig();
         IJobCapacityController jobCapacityController = mock(IJobCapacityController.class);
         IJobManager jobManager = spy(new JobManager(ccConfig, mockClusterControllerService(), jobCapacityController));
 
@@ -185,7 +193,6 @@
 
     @Test
     public void testNullJob() throws HyracksException {
-        CCConfig ccConfig = new CCConfig();
         IJobCapacityController jobCapacityController = mock(IJobCapacityController.class);
         IJobManager jobManager = new JobManager(ccConfig, mockClusterControllerService(), jobCapacityController);
         boolean invalidParameter = false;
@@ -249,7 +256,7 @@
         }
 
         Assert.assertTrue(jobManager.getPendingJobs().isEmpty());
-        Assert.assertTrue(jobManager.getArchivedJobs().size() == ccConfig.jobHistorySize);
+        Assert.assertTrue(jobManager.getArchivedJobs().size() == ccConfig.getJobHistorySize());
         verify(jobManager, times(0)).prepareComplete(any(), any(), any());
         verify(jobManager, times(0)).finalComplete(any());
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/pom.xml b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/pom.xml
index 08783cc..bd6960a 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/pom.xml
@@ -48,7 +48,6 @@
     <dependency>
       <groupId>args4j</groupId>
       <artifactId>args4j</artifactId>
-      <version>2.0.12</version>
     </dependency>
     <dependency>
       <groupId>org.apache.hyracks</groupId>
@@ -67,5 +66,14 @@
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-databind</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-util</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-collections4</artifactId>
+    </dependency>
   </dependencies>
 </project>
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ApplicationContext.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ApplicationContext.java
index 06bcda3..42bc636 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ApplicationContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ApplicationContext.java
@@ -21,7 +21,7 @@
 import java.io.Serializable;
 import java.util.concurrent.ThreadFactory;
 
-import org.apache.hyracks.api.application.IApplicationConfig;
+import org.apache.hyracks.api.config.IApplicationConfig;
 import org.apache.hyracks.api.application.IApplicationContext;
 import org.apache.hyracks.api.job.IJobSerializerDeserializerContainer;
 import org.apache.hyracks.api.job.JobSerializerDeserializerContainer;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ConfigManagerApplicationConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ConfigManagerApplicationConfig.java
new file mode 100644
index 0000000..92e90e7
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ConfigManagerApplicationConfig.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.common.application;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Predicate;
+
+import org.apache.hyracks.api.config.IApplicationConfig;
+import org.apache.hyracks.api.config.IOption;
+import org.apache.hyracks.api.config.Section;
+import org.apache.hyracks.control.common.config.ConfigManager;
+
+/**
+ * An implementation of IApplicationConfig which is backed by the Config Manager.
+ */
+public class ConfigManagerApplicationConfig implements IApplicationConfig, Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final ConfigManager configManager;
+
+    public ConfigManagerApplicationConfig(ConfigManager configManager) {
+        this.configManager = configManager;
+    }
+
+    @Override
+    public String getString(String section, String key) {
+        return (String)get(section, key);
+    }
+
+    @Override
+    public int getInt(String section, String key) {
+        return (int)get(section, key);
+    }
+
+    @Override
+    public long getLong(String section, String key) {
+        return (long)get(section, key);
+    }
+
+    @Override
+    public Set<String> getSectionNames() {
+        return configManager.getSectionNames();
+    }
+
+    @Override
+    public Set<Section> getSections() {
+        return configManager.getSections();
+    }
+
+    @Override
+    public Set<Section> getSections(Predicate<Section> predicate) {
+        return configManager.getSections(predicate);
+    }
+
+    @Override
+    public Set<String> getKeys(String section) {
+        return configManager.getOptionNames(section);
+    }
+
+    private Object get(String section, String key) {
+        return get(configManager.lookupOption(section, key));
+    }
+
+    @Override
+    public Object getStatic(IOption option) {
+        return configManager.get(option);
+    }
+
+    @Override
+    public List<String> getNCNames() {
+        return configManager.getNodeNames();
+    }
+
+    @Override
+    public IOption lookupOption(String sectionName, String propertyName) {
+        return configManager.lookupOption(sectionName, propertyName);
+    }
+
+    @Override
+    public Set<IOption> getOptions() {
+        return configManager.getOptions();
+    }
+
+    @Override
+    public Set<IOption> getOptions(Section section) {
+        return configManager.getOptions(section);
+    }
+
+    @Override
+    public IApplicationConfig getNCEffectiveConfig(String nodeId) {
+        return configManager.getNodeEffectiveConfig(nodeId);
+    }
+
+    public ConfigManager getConfigManager() {
+        return configManager;
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/IniApplicationConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/IniApplicationConfig.java
deleted file mode 100644
index 53db11c..0000000
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/IniApplicationConfig.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.control.common.application;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hyracks.api.application.IApplicationConfig;
-import org.apache.hyracks.control.common.controllers.IniUtils;
-import org.ini4j.Ini;
-import org.ini4j.Profile.Section;
-
-/**
- * An implementation of IApplicationConfig which is backed by Ini4j.
- */
-public class IniApplicationConfig implements IApplicationConfig {
-    private final Ini ini;
-
-    public IniApplicationConfig(Ini ini) {
-        if (ini != null) {
-            this.ini = ini;
-        } else {
-            this.ini = new Ini();
-        }
-    }
-
-    @Override
-    public String getString(String section, String key) {
-        return IniUtils.getString(ini, section, key, null);
-    }
-
-    @Override
-    public String getString(String section, String key, String defaultValue) {
-        return IniUtils.getString(ini, section, key, defaultValue);
-    }
-
-    @Override
-    public String[] getStringArray(String section, String key) {
-        return IniUtils.getStringArray(ini, section, key);
-    }
-
-    @Override
-    public int getInt(String section, String key) {
-        return IniUtils.getInt(ini, section, key, 0);
-    }
-
-    @Override
-    public int getInt(String section, String key, int defaultValue) {
-        return IniUtils.getInt(ini, section, key, defaultValue);
-    }
-
-    @Override
-    public long getLong(String section, String key) {
-        return IniUtils.getLong(ini, section, key, 0);
-    }
-
-    @Override
-    public long getLong(String section, String key, long defaultValue) {
-        return IniUtils.getLong(ini, section, key, defaultValue);
-    }
-
-    @Override
-    public Set<String> getSections() {
-        return ini.keySet();
-    }
-
-    @Override
-    public Set<String> getKeys(String section) {
-        return ini.get(section).keySet();
-    }
-
-    @Override
-    public List<Set<Map.Entry<String, String>>> getMultiSections(String section) {
-        List<Set<Map.Entry<String, String>>> list = new ArrayList<>();
-        List<Section> secs = getMulti(section);
-        if (secs != null) {
-            for (Section sec : secs) {
-                list.add(sec.entrySet());
-            }
-        }
-        return list;
-    }
-
-    private List<Section> getMulti(String section) {
-        return ini.getAll(section);
-    }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/Args4jArgument.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/Args4jArgument.java
new file mode 100644
index 0000000..de9b543
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/Args4jArgument.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.common.config;
+
+import java.lang.annotation.Annotation;
+
+import org.kohsuke.args4j.Argument;
+import org.kohsuke.args4j.spi.OptionHandler;
+import org.kohsuke.args4j.spi.StringOptionHandler;
+
+@SuppressWarnings("ClassExplicitlyAnnotation")
+public class Args4jArgument implements Argument {
+    @Override
+    public String usage() {
+        return "";
+    }
+
+    @Override
+    public String metaVar() {
+        return "";
+    }
+
+    @Override
+    public boolean required() {
+        return false;
+    }
+
+    @Override
+    public boolean hidden() {
+        return false;
+    }
+
+    @Override
+    public Class<? extends OptionHandler> handler() {
+        return StringOptionHandler.class;
+    }
+
+    @Override
+    public int index() {
+        return 0;
+    }
+
+    @Override
+    public boolean multiValued() {
+        return true;
+    }
+
+    @Override
+    public Class<? extends Annotation> annotationType() {
+        return Argument.class;
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/Args4jOption.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/Args4jOption.java
new file mode 100644
index 0000000..c904d0b
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/Args4jOption.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.common.config;
+
+import java.lang.annotation.Annotation;
+
+import org.apache.hyracks.api.config.IOption;
+import org.kohsuke.args4j.Option;
+import org.kohsuke.args4j.spi.ExplicitBooleanOptionHandler;
+import org.kohsuke.args4j.spi.IntOptionHandler;
+import org.kohsuke.args4j.spi.OptionHandler;
+import org.kohsuke.args4j.spi.StringOptionHandler;
+
+@SuppressWarnings("ClassExplicitlyAnnotation")
+class Args4jOption implements Option {
+    private final IOption option;
+    private final ConfigManager configManager;
+    private final Class targetType;
+
+    Args4jOption(IOption option, ConfigManager configManager, Class targetType) {
+        this.option = option;
+        this.targetType = targetType;
+        this.configManager = configManager;
+    }
+
+    @Override
+    public String name() {
+        return option.cmdline();
+    }
+
+    @Override
+    public String[] aliases() {
+        return new String[0];
+    }
+
+    @Override
+    public String usage() {
+        return configManager.getUsage(option);
+    }
+
+    @Override
+    public String metaVar() {
+        return "";
+    }
+
+    @Override
+    public boolean required() {
+        return false;
+    }
+
+    @Override
+    public boolean help() {
+        return false;
+    }
+
+    @Override
+    public boolean hidden() {
+        return option.hidden();
+    }
+
+    @Override
+    public Class<? extends OptionHandler> handler() {
+        if (targetType.equals(Boolean.class)) {
+            return ExplicitBooleanOptionHandler.class;
+        } else if (targetType.equals(Integer.class)) {
+            return IntOptionHandler.class;
+        } else {
+            return StringOptionHandler.class;
+        }
+    }
+
+    @Override
+    public String[] depends() {
+        return new String[0];
+    }
+
+    @Override
+    public String[] forbids() {
+        return new String[0];
+    }
+
+    @Override
+    public Class<? extends Annotation> annotationType() {
+        return Option.class;
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/Args4jSetter.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/Args4jSetter.java
new file mode 100644
index 0000000..1367ef0
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/Args4jSetter.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.common.config;
+
+import java.lang.reflect.AnnotatedElement;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
+import org.apache.hyracks.api.config.IOption;
+import org.kohsuke.args4j.CmdLineException;
+import org.kohsuke.args4j.spi.FieldSetter;
+import org.kohsuke.args4j.spi.Setter;
+
+class Args4jSetter implements Setter {
+    private final IOption option;
+    private BiConsumer<IOption, Object> consumer;
+    private final boolean multiValued;
+    private final Class type;
+
+    Args4jSetter(IOption option, BiConsumer<IOption, Object> consumer, boolean multiValued) {
+        this.option = option;
+        this.consumer = consumer;
+        this.multiValued = multiValued;
+        this.type = option.type().targetType();
+    }
+
+    Args4jSetter(Consumer<Object> consumer, boolean multiValued, Class type) {
+        this.option = null;
+        this.consumer = (o, value) -> consumer.accept(value);
+        this.multiValued = multiValued;
+        this.type = type;
+    }
+
+    @Override
+    public void addValue(Object value) throws CmdLineException {
+        consumer.accept(option, value);
+    }
+
+    @Override
+    public Class getType() {
+        return type;
+    }
+
+    @Override
+    public boolean isMultiValued() {
+        return multiValued;
+    }
+
+    @Override
+    public FieldSetter asFieldSetter() {
+        return null;
+    }
+
+    @Override
+    public AnnotatedElement asAnnotatedElement() {
+        return null;
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java
new file mode 100644
index 0000000..bfe759e
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java
@@ -0,0 +1,549 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.common.config;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.commons.collections4.map.CompositeMap;
+import org.apache.commons.collections4.multimap.ArrayListValuedHashMap;
+import org.apache.hyracks.api.config.IApplicationConfig;
+import org.apache.hyracks.api.config.IConfigManager;
+import org.apache.hyracks.api.config.IConfigurator;
+import org.apache.hyracks.api.config.IOption;
+import org.apache.hyracks.api.config.Section;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.control.common.application.ConfigManagerApplicationConfig;
+import org.ini4j.Ini;
+import org.ini4j.Profile;
+import org.kohsuke.args4j.CmdLineException;
+import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.Option;
+import org.kohsuke.args4j.OptionHandlerFilter;
+
+public class ConfigManager implements IConfigManager, Serializable {
+
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOGGER = Logger.getLogger(ConfigManager.class.getName());
+
+    private HashSet<IOption> registeredOptions = new HashSet<>();
+    private HashMap<IOption, Object> definedMap = new HashMap<>();
+    private HashMap<IOption, Object> defaultMap = new HashMap<>();
+    private CompositeMap<IOption, Object> configurationMap = new CompositeMap<>(definedMap, defaultMap,
+            new NoOpMapMutator());
+    private EnumMap<Section, Map<String, IOption>> sectionMap = new EnumMap<>(Section.class);
+    private TreeMap<String, Map<IOption, Object>> nodeSpecificMap = new TreeMap<>();
+    private transient ArrayListValuedHashMap<IOption, IConfigSetter> optionSetters = new ArrayListValuedHashMap<>();
+    private final String[] args;
+    private ConfigManagerApplicationConfig appConfig = new ConfigManagerApplicationConfig(this);
+    private Set<String> allSections = new HashSet<>();
+    private transient Collection<Consumer<List<String>>> argListeners = new ArrayList<>();
+    private transient Collection<IOption> iniPointerOptions = new ArrayList<>();
+    private transient Collection<Section> cmdLineSections = new ArrayList<>();;
+    private transient OptionHandlerFilter usageFilter;
+    private transient SortedMap<Integer, List<IConfigurator>> configurators = new TreeMap<>();
+    private boolean configured;
+
+    public ConfigManager() {
+        this(null);
+    }
+
+    public ConfigManager(String[] args) {
+        this.args = args;
+        for (Section section : Section.values()) {
+            allSections.add(section.sectionName());
+        }
+        addConfigurator(PARSE_INI_POINTERS_METRIC, this::extractIniPointersFromCommandLine);
+        addConfigurator(PARSE_INI_METRIC, this::parseIni);
+        addConfigurator(PARSE_COMMAND_LINE_METRIC, this::processCommandLine);
+        addConfigurator(APPLY_DEFAULTS_METRIC, this::applyDefaults);
+    }
+
+    @Override
+    public void addConfigurator(int metric, IConfigurator configurator) {
+        configurators.computeIfAbsent(metric, metric1 -> new ArrayList<>()).add(configurator);
+    }
+
+    @Override
+    public void addIniParamOptions(IOption... options) {
+        Stream.of(options).forEach(iniPointerOptions::add);
+    }
+
+    @Override
+    public void addCmdLineSections(Section... sections) {
+        Stream.of(sections).forEach(cmdLineSections::add);
+    }
+
+    @Override
+    public void setUsageFilter(OptionHandlerFilter usageFilter) {
+        this.usageFilter = usageFilter;
+    }
+
+    @Override
+    public void register(IOption... options) {
+        for (IOption option : options) {
+            if (option.section() == Section.VIRTUAL || registeredOptions.contains(option)) {
+                continue;
+            }
+            if (configured) {
+                throw new IllegalStateException("configuration already processed");
+            }
+            LOGGER.fine("registering option: " + option.toIniString());
+            Map<String, IOption> optionMap = sectionMap.computeIfAbsent(option.section(), section -> new HashMap<>());
+            IOption prev = optionMap.put(option.ini(), option);
+            if (prev != null) {
+                if (prev != option) {
+                    throw new IllegalStateException("An option cannot be defined multiple times: "
+                            + option.toIniString() + ": " + Arrays.asList(option.getClass(), prev.getClass()));
+                }
+            } else {
+                registeredOptions.add(option);
+                optionSetters.put(option, (node, value, isDefault) -> correctedMap(node, isDefault).put(option, value));
+                if (LOGGER.isLoggable(Level.FINE)) {
+                    optionSetters.put(option, (node, value, isDefault) -> LOGGER
+                            .fine((isDefault ? "defaulting" : "setting ") + option.toIniString() + " to " + value));
+                }
+            }
+        }
+    }
+
+    private Map<IOption, Object> correctedMap(String node, boolean isDefault) {
+        return node == null ? (isDefault ? defaultMap : definedMap)
+                : nodeSpecificMap.computeIfAbsent(node, this::createNodeSpecificMap);
+    }
+
+    public void registerVirtualNode(String nodeId) {
+        LOGGER.fine("registerVirtualNode: " + nodeId);
+        nodeSpecificMap.computeIfAbsent(nodeId, this::createNodeSpecificMap);
+    }
+
+    private Map<IOption, Object> createNodeSpecificMap(String nodeId) {
+        LOGGER.fine("createNodeSpecificMap: " + nodeId);
+        return new HashMap<>();
+    }
+
+    @Override
+    @SafeVarargs
+    public final void register(final Class<? extends IOption>... optionClasses) {
+        for (Class<? extends IOption> optionClass : optionClasses) {
+            register(optionClass.getEnumConstants());
+        }
+    }
+
+    public IOption lookupOption(String section, String key) {
+        Map<String, IOption> map = getSectionOptionMap(Section.parseSectionName(section));
+        return map == null ? null : map.get(key);
+    }
+
+    public void processConfig()
+            throws CmdLineException, IOException {
+        if (!configured) {
+            for (List<IConfigurator> configuratorList : configurators.values()) {
+                for (IConfigurator configurator : configuratorList) {
+                    configurator.run();
+                }
+            }
+            configured = true;
+        }
+    }
+
+    private void processCommandLine() throws CmdLineException {
+        List<String> appArgs = processCommandLine(cmdLineSections, usageFilter, this::cmdLineSet);
+        // now propagate the app args to the listeners...
+        argListeners.forEach(l -> l.accept(appArgs));
+    }
+
+    private void extractIniPointersFromCommandLine() throws CmdLineException {
+        Map<IOption, Object> cmdLineOptions = new HashMap<>();
+        processCommandLine(cmdLineSections, usageFilter, cmdLineOptions::put);
+        for (IOption option : iniPointerOptions) {
+            if (cmdLineOptions.containsKey(option)) {
+                set(option, cmdLineOptions.get(option));
+            }
+        }
+    }
+
+    private void cmdLineSet(IOption option, Object value) {
+        invokeSetters(option, option.type().parse(String.valueOf(value)), null);
+    }
+
+    private void invokeSetters(IOption option, Object value, String nodeId) {
+        optionSetters.get(option).forEach(setter -> setter.set(nodeId, value, false));
+    }
+
+    @SuppressWarnings({ "squid:S106", "squid:S1147" }) // use of System.err, System.exit()
+    private List<String> processCommandLine(Collection<Section> sections, OptionHandlerFilter usageFilter,
+            BiConsumer<IOption, Object> setAction)
+            throws CmdLineException {
+        final Args4jBean bean = new Args4jBean();
+        CmdLineParser cmdLineParser = new CmdLineParser(bean);
+        final List<String> appArgs = new ArrayList<>();
+        List<IOption> commandLineOptions = new ArrayList<>();
+        for (Map.Entry<Section, Map<String, IOption>> sectionMapEntry : sectionMap.entrySet()) {
+            if (!sections.contains(sectionMapEntry.getKey())) {
+                continue;
+            }
+            for (IOption option : sectionMapEntry.getValue().values()) {
+                if (option.section() != Section.VIRTUAL) {
+                    commandLineOptions.add(option);
+                }
+            }
+        }
+        commandLineOptions.sort(Comparator.comparing(IOption::cmdline));
+
+        commandLineOptions.forEach(option -> cmdLineParser.addOption(new Args4jSetter(option, setAction, false),
+                new Args4jOption(option, this, option.type().targetType())));
+
+        if (!argListeners.isEmpty()) {
+            cmdLineParser.addArgument(new Args4jSetter(o -> appArgs.add(String.valueOf(o)), true, String.class),
+                    new Args4jArgument());
+        }
+        LOGGER.fine("parsing cmdline: " + Arrays.toString(args));
+        try {
+            if (args == null || args.length == 0) {
+                LOGGER.info("no command line args supplied");
+                return appArgs;
+            }
+            cmdLineParser.parseArgument(args);
+            if (bean.help) {
+                ConfigUtils.printUsage(cmdLineParser, usageFilter, System.err);
+                System.exit(0);
+            }
+        } catch (CmdLineException e) {
+            if (bean.help) {
+                ConfigUtils.printUsage(cmdLineParser, usageFilter, System.err);
+                System.exit(0);
+            } else {
+                ConfigUtils.printUsage(e, usageFilter, System.err);
+                throw e;
+            }
+        }
+        return appArgs;
+    }
+
+    private void parseIni() throws IOException {
+        Ini ini = null;
+        for (IOption option : iniPointerOptions) {
+            Object pointer = get(option);
+            if (pointer instanceof String) {
+                ini = ConfigUtils.loadINIFile((String)pointer);
+            } else if (pointer instanceof URL) {
+                ini = ConfigUtils.loadINIFile((URL)pointer);
+            } else if (pointer != null) {
+                throw new IllegalArgumentException("config file pointer options must be of type String (for file) or " +
+                        "URL, instead of " + option.type().targetType());
+            }
+        }
+        if (ini == null) {
+            LOGGER.info("no INI file specified; skipping parsing");
+            return;
+        }
+        LOGGER.info("parsing INI file: " + ini);
+        for (Profile.Section section : ini.values()) {
+            allSections.add(section.getName());
+            final Section rootSection = Section
+                    .parseSectionName(section.getParent() == null ? section.getName() : section.getParent().getName());
+            String node;
+            if (rootSection == Section.EXTENSION) {
+                parseExtensionIniSection(section);
+                continue;
+            } else if (rootSection == Section.NC) {
+                node = section.getName().equals(section.getSimpleName()) ? null : section.getSimpleName();
+            } else if (Section.parseSectionName(section.getName()) != null) {
+                node = null;
+            } else {
+                throw new HyracksException("Unknown section in ini: " + section.getName());
+            }
+            Map<String, IOption> optionMap = getSectionOptionMap(rootSection);
+            for (Map.Entry<String, String> iniOption : section.entrySet()) {
+                String name = iniOption.getKey();
+                final IOption option = optionMap == null ? null : optionMap.get(name);
+                if (option == null) {
+                    handleUnknownOption(section, name);
+                    return;
+                }
+                final String value = iniOption.getValue();
+                LOGGER.fine("setting " + option.toIniString() + " to " + value);
+                final Object parsed = option.type().parse(value);
+                invokeSetters(option, parsed, node);
+            }
+        }
+    }
+
+    private void parseExtensionIniSection(Profile.Section section) {
+        // TODO(mblow): parse extensions
+    }
+
+    private void handleUnknownOption(Profile.Section section, String name) throws HyracksException {
+        Set<String> matches = new HashSet<>();
+        for (IOption registeredOption : registeredOptions) {
+            if (registeredOption.ini().equals(name)) {
+                matches.add(registeredOption.section().sectionName());
+            }
+        }
+        if (!matches.isEmpty()) {
+            throw new HyracksException(
+                    "Section mismatch for [" + section.getName() + "] " + name + ", expected section(s) " + matches);
+        } else {
+            throw new HyracksException("Unknown option in ini: [" + section.getName() + "] " + name);
+        }
+    }
+
+    private void applyDefaults() {
+        LOGGER.fine("applying defaults");
+        for (Map.Entry<Section, Map<String, IOption>> entry : sectionMap.entrySet()) {
+            if (entry.getKey() == Section.NC) {
+                entry.getValue().values().forEach(option -> getNodeNames()
+                        .forEach(node -> getOrDefault(getNodeEffectiveMap(node), option, node)));
+                for (Map.Entry<String, Map<IOption, Object>> nodeMap : nodeSpecificMap.entrySet()) {
+                    entry.getValue().values()
+                            .forEach(option -> getOrDefault(
+                                    new CompositeMap<>(nodeMap.getValue(), definedMap, new NoOpMapMutator()), option,
+                                    nodeMap.getKey()));
+                }
+                // also push the defaults to the shared map, if the CC requests NC properties, they should receive the
+                // defaults -- TODO (mblow): seems lame, should log warning on access
+            }
+            entry.getValue().values().forEach(option -> getOrDefault(configurationMap, option, null));
+        }
+    }
+
+    private Object getOrDefault(Map<IOption, Object> map, IOption option, String nodeId) {
+        if (map.containsKey(option)) {
+            return map.get(option);
+        } else {
+            Object value = resolveDefault(option, new ConfigManagerApplicationConfig(this) {
+                @Override
+                public Object getStatic(IOption option) {
+                    return getOrDefault(map, option, nodeId);
+                }
+            });
+            if (value != null && optionSetters != null) {
+                optionSetters.get(option).forEach(setter -> setter.set(nodeId, value, true));
+            }
+            return value;
+        }
+    }
+
+    public Object resolveDefault(IOption option, IApplicationConfig applicationConfig) {
+        final Object value = option.defaultValue();
+        if (value instanceof IOption) {
+            return applicationConfig.get((IOption) value);
+        } else if (value instanceof Supplier) {
+            //noinspection unchecked
+            return ((Supplier<?>) value).get();
+        } else if (value instanceof Function) {
+            //noinspection unchecked
+            return ((Function<IApplicationConfig, ?>) value).apply(applicationConfig);
+        } else {
+            return value;
+        }
+    }
+
+    @Override
+    public Set<Section> getSections(Predicate<Section> predicate) {
+        return Arrays.stream(Section.values()).filter(predicate).collect(Collectors.toSet());
+    }
+
+    @Override
+    public Set<Section> getSections() {
+        return getSections(section -> true);
+    }
+
+    public Set<String> getSectionNames() {
+        return Collections.unmodifiableSet(allSections);
+    }
+
+    public Set<String> getOptionNames(String sectionName) {
+        Set<String> optionNames = new HashSet<>();
+        Section section = Section.parseSectionName(sectionName);
+        for (IOption option : getSectionOptionMap(section).values()) {
+            optionNames.add(option.ini());
+        }
+        return optionNames;
+    }
+
+    @Override
+    public Set<IOption> getOptions(Section section) {
+        return getSectionOptionMap(section).values().stream().collect(Collectors.toSet());
+    }
+
+    private Map<String, IOption> getSectionOptionMap(Section section) {
+        final Map<String, IOption> map = sectionMap.get(section);
+        return map != null ? map : Collections.emptyMap();
+    }
+
+    public List<String> getNodeNames() {
+        return Collections.unmodifiableList(new ArrayList(nodeSpecificMap.keySet()));
+    }
+
+    public IApplicationConfig getNodeEffectiveConfig(String nodeId) {
+        final Map<IOption, Object> nodeMap = nodeSpecificMap.computeIfAbsent(nodeId, this::createNodeSpecificMap);
+        Map<IOption, Object> nodeEffectiveMap = getNodeEffectiveMap(nodeId);
+        return new ConfigManagerApplicationConfig(this) {
+            @Override
+            public Object getStatic(IOption option) {
+                if (!nodeEffectiveMap.containsKey(option)) {
+                    // we need to calculate the default the the context of the node specific map...
+                    nodeMap.put(option, getOrDefault(nodeEffectiveMap, option, nodeId));
+                }
+                return nodeEffectiveMap.get(option);
+            }
+        };
+    }
+
+    private CompositeMap<IOption, Object> getNodeEffectiveMap(String nodeId) {
+        return new CompositeMap<>(nodeSpecificMap.get(nodeId), definedMap, new NoOpMapMutator());
+    }
+
+    public Ini toIni(boolean includeDefaults) {
+        Ini ini = new Ini();
+        for (Map.Entry<IOption, Object> entry : (includeDefaults ? configurationMap : definedMap).entrySet()) {
+            if (entry.getValue() != null) {
+                final IOption option = entry.getKey();
+                ini.add(option.section().sectionName(), option.ini(), option.type().serializeToIni(entry.getValue()));
+            }
+        }
+        for (Map.Entry<String, Map<IOption, Object>> nodeMapEntry : nodeSpecificMap.entrySet()) {
+            String section = Section.NC.sectionName() + "/" + nodeMapEntry.getKey();
+            for (Map.Entry<IOption, Object> entry : nodeMapEntry.getValue().entrySet()) {
+                if (entry.getValue() != null) {
+                    final IOption option = entry.getKey();
+                    ini.add(section, option.ini(), option.type().serializeToIni(entry.getValue()));
+                }
+            }
+        }
+        return ini;
+    }
+
+    public void set(IOption option, Object value) {
+        set(null, option, value);
+    }
+
+    public void set(String nodeId, IOption option, Object value) {
+        invokeSetters(option, value, nodeId);
+    }
+
+    public Object get(IOption option) {
+        if (!registeredOptions.contains(option)) {
+            throw new IllegalStateException("Option not registered with ConfigManager: " + option.toIniString() + "("
+                    + option.getClass() + "." + option + ")");
+        } else if (option.section() == Section.NC) {
+            LOGGER.warning("NC option " + option.toIniString() + " being accessed outside of NC-scoped configuration.");
+        }
+        return getOrDefault(configurationMap, option, null);
+    }
+
+    public Set<IOption> getOptions() {
+        return Collections.unmodifiableSet(registeredOptions);
+    }
+
+    @Override
+    public IApplicationConfig getAppConfig() {
+        return appConfig;
+    }
+
+    public void registerArgsListener(Consumer<List<String>> argListener) {
+        argListeners.add(argListener);
+    }
+
+    String getUsage(IOption option) {
+        final String description = option.description();
+        StringBuilder usage = new StringBuilder();
+        if (description != null && !"".equals(description)) {
+            usage.append(description).append(" ");
+        } else {
+            LOGGER.warning("missing description for option: "
+                    + option.getClass().getName().substring(option.getClass().getName().lastIndexOf(".") + 1) + "."
+                    + option.name());
+        }
+        usage.append("(default: ");
+        usage.append(defaultTextForUsage(option, IOption::cmdline));
+        usage.append(")");
+        return usage.toString();
+    }
+
+    public String defaultTextForUsage(IOption option, Function<IOption, String> optionPrinter) {
+        StringBuilder buf = new StringBuilder();
+        String override = option.usageDefaultOverride(appConfig, optionPrinter);
+        if (override != null) {
+            buf.append(override);
+        } else {
+            final Object value = option.defaultValue();
+            if (value instanceof IOption) {
+                buf.append("same as ").append(optionPrinter.apply((IOption) value));
+            } else if (value instanceof Function) {
+                // TODO(mblow): defer usage calculation to enable evaluation of function
+                buf.append("<function>");
+            } else {
+                buf.append(option.type().serializeToString(resolveDefault(option, appConfig)));
+            }
+            // TODO(mblow): defer usage calculation to enable inclusion of evaluated actual default
+        }
+        return buf.toString();
+    }
+
+    private static class NoOpMapMutator implements CompositeMap.MapMutator<IOption, Object> {
+        @Override
+        public Object put(CompositeMap<IOption, Object> compositeMap, Map<IOption, Object>[] maps, IOption iOption,
+                Object o) {
+            throw new UnsupportedOperationException("mutations are not allowed");
+        }
+
+        @Override
+        public void putAll(CompositeMap<IOption, Object> compositeMap, Map<IOption, Object>[] maps,
+                Map<? extends IOption, ?> map) {
+            throw new UnsupportedOperationException("mutations are not allowed");
+        }
+
+        @Override
+        public void resolveCollision(CompositeMap<IOption, Object> compositeMap, Map<IOption, Object> map,
+                Map<IOption, Object> map1, Collection<IOption> collection) {
+            // no-op
+        }
+    }
+
+    private static class Args4jBean {
+        @Option(name = "-help", help = true)
+        boolean help;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigUtils.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigUtils.java
new file mode 100644
index 0000000..adf1774
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigUtils.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.common.config;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintStream;
+import java.lang.reflect.Field;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.function.Predicate;
+
+import org.apache.hyracks.api.config.IApplicationConfig;
+import org.apache.hyracks.api.config.IOption;
+import org.apache.hyracks.control.common.controllers.ControllerConfig;
+import org.ini4j.Ini;
+import org.kohsuke.args4j.CmdLineException;
+import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.OptionHandlerFilter;
+
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+/**
+ * Some utility functions for reading Ini4j objects with default values.
+ * For all getXxx() methods: if the 'section' contains a slash, and the 'key'
+ * is not found in that section, we will search for the key in the section named
+ * by stripping the leaf of the section name (final slash and anything following).
+ * eg. getInt(ini, "nc/red", "dir", null) will first look for the key "dir" in
+ * the section "nc/red", but if it is not found, will look in the section "nc".
+ */
+public class ConfigUtils {
+    private static final int USAGE_WIDTH = 120;
+
+    private ConfigUtils() {
+    }
+
+    private static <T> T getIniValue(Ini ini, String section, String key, T defaultValue, Class<T> clazz) {
+        T value;
+        while (true) {
+            value = ini.get(section, key, clazz);
+            if (value == null) {
+                int idx = section.lastIndexOf('/');
+                if (idx > -1) {
+                    section = section.substring(0, idx);
+                    continue;
+                }
+            }
+            return (value != null) ? value : defaultValue;
+        }
+    }
+
+    public static String getString(Ini ini, String section, String key, String defaultValue) {
+        return getIniValue(ini, section, key, defaultValue, String.class);
+    }
+
+    public static int getInt(Ini ini, String section, String key, int defaultValue) {
+        return getIniValue(ini, section, key, defaultValue, Integer.class);
+    }
+
+    public static long getLong(Ini ini, String section, String key, long defaultValue) {
+        return getIniValue(ini, section, key, defaultValue, Long.class);
+    }
+
+    public static Ini loadINIFile(String configFile) throws IOException {
+        Ini ini = new Ini();
+        File conffile = new File(configFile);
+        if (!conffile.exists()) {
+            throw new FileNotFoundException(configFile);
+        }
+        ini.load(conffile);
+        return ini;
+    }
+
+    public static Ini loadINIFile(URL configURL) throws IOException {
+        Ini ini = new Ini();
+        ini.load(configURL);
+        return ini;
+    }
+
+    public static Field[] getFields(final Class beanClass, Predicate<Field> predicate) {
+        List<Field> fields = new ArrayList<>();
+        for (Class clazz = beanClass; clazz != null && clazz.getClassLoader() != null
+                && clazz.getClassLoader().getParent() != null; clazz = clazz.getSuperclass()) {
+            for (Field f : clazz.getDeclaredFields()) {
+                if (predicate.test(f)) {
+                    fields.add(f);
+                }
+            }
+        }
+        return fields.toArray(new Field[fields.size()]);
+    }
+
+    public static void printUsage(CmdLineParser parser, OptionHandlerFilter filter, PrintStream out) {
+        parser.getProperties().withUsageWidth(USAGE_WIDTH);
+        parser.printUsage(new OutputStreamWriter(out), null, filter);
+    }
+
+    public static void printUsage(CmdLineException e, OptionHandlerFilter filter, PrintStream out) {
+        out.println("ERROR: " + e.getMessage());
+        printUsage(e.getParser(), filter, out);
+    }
+
+    private static String getOptionValue(String[] args, String optionName) {
+        for (int i = 0; i < (args.length - 1); i++) {
+            if (args[i].equals(optionName)) {
+                return args[i + 1];
+            }
+        }
+        return null;
+    }
+
+    public static String getOptionValue(String[] args, IOption option) throws IOException {
+        String value = getOptionValue(args, option.cmdline());
+        if (value == null) {
+            Ini iniFile = null;
+            String configFile = getOptionValue(args, ControllerConfig.Option.CONFIG_FILE.cmdline());
+            String configFileUrl = getOptionValue(args, ControllerConfig.Option.CONFIG_FILE_URL.cmdline());
+            if (configFile != null) {
+                iniFile = loadINIFile(configFile);
+            } else if (configFileUrl != null) {
+                iniFile = loadINIFile(new URL(configFileUrl));
+            }
+            if (iniFile != null) {
+                value = iniFile.get(option.section().sectionName(), option.ini());
+            }
+        }
+        return value;
+    }
+
+    public static String getString(Ini ini, org.apache.hyracks.api.config.Section section,
+                                   IOption option, String defaultValue) {
+        return getString(ini, section.sectionName(), option.ini(), defaultValue);
+    }
+
+    public static void addConfigToJSON(ObjectNode o, IApplicationConfig cfg,
+                                       org.apache.hyracks.api.config.Section... sections) {
+        ArrayNode configArray = o.putArray("config");
+        for (org.apache.hyracks.api.config.Section section : cfg.getSections(Arrays.asList(sections)::contains)) {
+            ObjectNode sectionNode = configArray.addObject();
+            Map<String, Object> sectionConfig = getSectionOptionsForJSON(cfg, section, option -> true);
+            sectionNode.put("section", section.sectionName())
+                    .putPOJO("properties", sectionConfig);
+        }
+    }
+
+    public static Map<String, Object> getSectionOptionsForJSON(IApplicationConfig cfg,
+                                                                org.apache.hyracks.api.config.Section section,
+                                                                Predicate<IOption> selector) {
+        Map<String, Object> sectionConfig = new TreeMap<>();
+        for (IOption option : cfg.getOptions(section)) {
+            if (selector.test(option)) {
+                sectionConfig.put(option.ini(), option.type().serializeToJSON(cfg.get(option)));
+            }
+        }
+        return sectionConfig;
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/IConfigSetter.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/IConfigSetter.java
new file mode 100644
index 0000000..2234cca
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/IConfigSetter.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.common.config;
+
+@FunctionalInterface
+public interface IConfigSetter {
+    void set(String nodeId, Object value, boolean isDefault) throws SetException;
+
+    class SetException extends RuntimeException {
+        public SetException(Exception e) {
+            super(e);
+        }
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java
new file mode 100644
index 0000000..fc26b5e
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.common.config;
+
+import java.net.MalformedURLException;
+import java.util.logging.Level;
+
+import org.apache.hyracks.api.config.IOptionType;
+import org.apache.hyracks.util.StorageUtil;
+
+public class OptionTypes {
+
+    public static final IOptionType<Integer> INTEGER_BYTE_UNIT = new IOptionType<Integer>() {
+        @Override
+        public Integer parse(String s) {
+            long result1 = StorageUtil.getByteValue(s);
+            if (result1 > Integer.MAX_VALUE || result1 < Integer.MIN_VALUE) {
+                throw new IllegalArgumentException(
+                        "The given value: " + result1 + " is not within the int range.");
+            }
+            return (int) result1;
+        }
+
+        @Override
+        public Class<Integer> targetType() {
+            return Integer.class;
+        }
+    };
+
+    public static final IOptionType<Long> LONG_BYTE_UNIT = new IOptionType<Long>() {
+        @Override
+        public Long parse(String s) {
+            return StorageUtil.getByteValue(s);
+        }
+
+        @Override
+        public Class<Long> targetType() {
+            return Long.class;
+        }
+    };
+
+    public static final IOptionType<Integer> INTEGER = new IOptionType<Integer>() {
+        @Override
+        public Integer parse(String s) {
+            return Integer.parseInt(s);
+        }
+
+        @Override
+        public Class<Integer> targetType() {
+            return Integer.class;
+        }
+    };
+
+    public static final IOptionType<Double> DOUBLE = new IOptionType<Double>() {
+        @Override
+        public Double parse(String s) {
+            return Double.parseDouble(s);
+        }
+
+        @Override
+        public Class<Double> targetType() {
+            return Double.class;
+        }
+    };
+
+    public static final IOptionType<String> STRING = new IOptionType<String>() {
+        @Override
+        public String parse(String s) {
+            return s;
+        }
+
+        @Override
+        public Class<String> targetType() {
+            return String.class;
+        }
+    };
+
+    public static final IOptionType<Long> LONG = new IOptionType<Long>() {
+        @Override
+        public Long parse(String s) {
+            return Long.parseLong(s);
+        }
+
+        @Override
+        public Class<Long> targetType() {
+            return Long.class;
+        }
+    };
+
+    public static final IOptionType<Boolean> BOOLEAN = new IOptionType<Boolean>() {
+        @Override
+        public Boolean parse(String s) {
+            return Boolean.parseBoolean(s);
+        }
+
+        @Override
+        public Class<Boolean> targetType() {
+            return Boolean.class;
+        }
+    };
+
+    public static final IOptionType<Level> LEVEL = new IOptionType<Level>() {
+        @Override
+        public Level parse(String s) {
+            return Level.parse(s);
+        }
+
+        @Override
+        public Class<Level> targetType() {
+            return Level.class;
+        }
+
+        @Override
+        public Object serializeToJSON(Object value) {
+            return ((Level)value).getName();
+        }
+
+        @Override
+        public String serializeToIni(Object value) {
+            return ((Level)value).getName();
+        }
+    };
+
+    public static final IOptionType<String []> STRING_ARRAY = new IOptionType<String []>() {
+        @Override
+        public String [] parse(String s) {
+            return s.split("\\s*,\\s*");
+        }
+
+        @Override
+        public Class<String []> targetType() {
+            return String [].class;
+        }
+
+        @Override
+        public String serializeToIni(Object value) {
+            return String.join(",", (String [])value);
+        }
+    };
+
+    public static final IOptionType<java.net.URL> URL = new IOptionType<java.net.URL>() {
+        @Override
+        public java.net.URL parse(String s) {
+            try {
+                return new java.net.URL(s);
+            } catch (MalformedURLException e) {
+                throw new IllegalArgumentException(e);
+            }
+        }
+
+        @Override
+        public Class<java.net.URL> targetType() {
+            return java.net.URL.class;
+        }
+    };
+
+
+    private OptionTypes() {
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/context/ServerContext.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/context/ServerContext.java
index d4dc054..ff037f0 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/context/ServerContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/context/ServerContext.java
@@ -29,7 +29,7 @@
     private final ServerType type;
     private final File baseDir;
 
-    public ServerContext(ServerType type, File baseDir) throws Exception {
+    public ServerContext(ServerType type, File baseDir) {
         this.type = type;
         this.baseDir = baseDir;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
index b636a096..fbde58c 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
@@ -18,156 +18,160 @@
  */
 package org.apache.hyracks.control.common.controllers;
 
+import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER;
+import static org.apache.hyracks.control.common.config.OptionTypes.LONG;
+import static org.apache.hyracks.control.common.config.OptionTypes.STRING;
+
 import java.io.File;
-import java.io.IOException;
 import java.net.InetAddress;
-import java.net.URL;
+import java.util.ArrayList;
 import java.util.List;
+import java.util.function.Supplier;
 
-import org.apache.hyracks.api.application.IApplicationConfig;
-import org.apache.hyracks.control.common.application.IniApplicationConfig;
+import org.apache.hyracks.api.config.IOption;
+import org.apache.hyracks.api.config.IOptionType;
+import org.apache.hyracks.api.config.Section;
+import org.apache.hyracks.control.common.config.ConfigManager;
+import org.apache.hyracks.util.file.FileUtil;
 import org.ini4j.Ini;
-import org.kohsuke.args4j.Argument;
-import org.kohsuke.args4j.Option;
-import org.kohsuke.args4j.spi.StopOptionHandler;
 
-public class CCConfig {
-    @Option(name = "-address", usage = "IP Address for CC (default: localhost)", required = false)
-    public String ipAddress = InetAddress.getLoopbackAddress().getHostAddress();
+public class CCConfig extends ControllerConfig {
 
-    @Option(name = "-client-net-ip-address",
-            usage = "Sets the IP Address to listen for connections from clients (default: same as -address)",
-            required = false)
-    public String clientNetIpAddress;
+    public static String defaultDir = System.getProperty("java.io.tmpdir");
 
-    @Option(name = "-client-net-port", usage = "Sets the port to listen for connections from clients (default 1098)")
-    public int clientNetPort = 1098;
+    public enum Option implements IOption {
+        APP_CLASS(STRING),
+        ADDRESS(STRING, InetAddress.getLoopbackAddress().getHostAddress()),
+        CLUSTER_LISTEN_ADDRESS(STRING, ADDRESS),
+        CLUSTER_LISTEN_PORT(INTEGER, 1099),
+        CLUSTER_PUBLIC_ADDRESS(STRING, CLUSTER_LISTEN_ADDRESS),
+        CLUSTER_PUBLIC_PORT(INTEGER, CLUSTER_LISTEN_PORT),
+        CLIENT_LISTEN_ADDRESS(STRING, ADDRESS),
+        CLIENT_LISTEN_PORT(INTEGER, 1098),
+        CONSOLE_LISTEN_ADDRESS(STRING, ADDRESS),
+        CONSOLE_LISTEN_PORT(INTEGER, 16001),
+        HEARTBEAT_PERIOD(INTEGER, 10000), // TODO (mblow): add time unit
+        HEARTBEAT_MAX_MISSES(INTEGER, 5),
+        PROFILE_DUMP_PERIOD(INTEGER, 0),
+        JOB_HISTORY_SIZE(INTEGER, 10),
+        RESULT_TTL(LONG, 86400000L), // TODO(mblow): add time unit
+        RESULT_SWEEP_THRESHOLD(LONG, 60000L), // TODO(mblow): add time unit
+        @SuppressWarnings("RedundantCast") // not redundant- false positive from IDEA
+        ROOT_DIR(STRING, (Supplier<String>)() -> FileUtil.joinPath(defaultDir, "ClusterControllerService")),
+        CLUSTER_TOPOLOGY(STRING),
+        JOB_QUEUE_CLASS(STRING, "org.apache.hyracks.control.cc.scheduler.FIFOJobQueue"),
+        JOB_MANAGER_CLASS(STRING, "org.apache.hyracks.control.cc.job.JobManager");
 
-    // QQQ Note that clusterNetIpAddress is *not directly used* yet. Both
-    // the cluster listener and the web server listen on "all interfaces".
-    // This IP address is only used to instruct the NC on which IP to call in.
-    @Option(name = "-cluster-net-ip-address",
-            usage = "Sets the IP Address to listen for connections from NCs (default: same as -address)",
-            required = false)
-    public String clusterNetIpAddress;
+        private final IOptionType parser;
+        private final Object defaultValue;
 
-    @Option(name = "-cluster-net-port",
-            usage = "Sets the port to listen for connections from node controllers (default 1099)")
-    public int clusterNetPort = 1099;
-
-    @Option(name = "-http-port", usage = "Sets the http port for the Cluster Controller (default: 16001)")
-    public int httpPort = 16001;
-
-    @Option(name = "-heartbeat-period",
-            usage = "Sets the time duration between two heartbeats from each node controller in milliseconds" +
-                    " (default: 10000)")
-    public int heartbeatPeriod = 10000;
-
-    @Option(name = "-max-heartbeat-lapse-periods",
-            usage = "Sets the maximum number of missed heartbeats before a node is marked as dead (default: 5)")
-    public int maxHeartbeatLapsePeriods = 5;
-
-    @Option(name = "-profile-dump-period", usage = "Sets the time duration between two profile dumps from each node " +
-            "controller in milliseconds. 0 to disable. (default: 0)")
-    public int profileDumpPeriod = 0;
-
-    @Option(name = "-default-max-job-attempts", usage = "Sets the default number of job attempts allowed if not " +
-            "specified in the job specification. (default: 5)")
-    public int defaultMaxJobAttempts = 5;
-
-    @Option(name = "-job-history-size", usage = "Limits the number of historical jobs remembered by the system to " +
-            "the specified value. (default: 10)")
-    public int jobHistorySize = 10;
-
-    @Option(name = "-result-time-to-live", usage = "Limits the amount of time results for asynchronous jobs should " +
-            "be retained by the system in milliseconds. (default: 24 hours)")
-    public long resultTTL = 86400000;
-
-    @Option(name = "-result-sweep-threshold", usage = "The duration within which an instance of the result cleanup " +
-            "should be invoked in milliseconds. (default: 1 minute)")
-    public long resultSweepThreshold = 60000;
-
-    @Option(name = "-cc-root",
-            usage = "Sets the root folder used for file operations. (default: ClusterControllerService)")
-    public String ccRoot = "ClusterControllerService";
-
-    @Option(name = "-cluster-topology", required = false,
-            usage = "Sets the XML file that defines the cluster topology. (default: null)")
-    public File clusterTopologyDefinition = null;
-
-    @Option(name = "-app-cc-main-class", required = false, usage = "Application CC Main Class")
-    public String appCCMainClass = null;
-
-    @Option(name = "-config-file",
-            usage = "Specify path to master configuration file (default: none)", required = false)
-    public String configFile = null;
-
-    @Option(name = "-job-queue-class-name", usage = "Specify the implementation class name for the job queue. (default:"
-            + " org.apache.hyracks.control.cc.scheduler.FIFOJobQueue)",
-            required = false)
-    public String jobQueueClassName = "org.apache.hyracks.control.cc.scheduler.FIFOJobQueue";
-
-    @Option(name = "-job-manager-class-name", usage = "Specify the implementation class name for the job manager. "
-            + "(default: org.apache.hyracks.control.cc.job.JobManager)", required = false)
-    public String jobManagerClassName = "org.apache.hyracks.control.cc.job.JobManager";
-
-    @Argument
-    @Option(name = "--", handler = StopOptionHandler.class)
-    public List<String> appArgs;
-
-    public URL configFileUrl = null;
-
-    private Ini ini = null;
-
-    private void loadINIFile() throws IOException {
-        // This method simply maps from the ini parameters to the CCConfig's fields.
-        // It does not apply defaults or any logic.
-        if (configFile != null) {
-            ini = IniUtils.loadINIFile(configFile);
-        } else if (configFileUrl != null) {
-            ini = IniUtils.loadINIFile(configFileUrl);
-        } else {
-            return;
+        <T> Option(IOptionType<T> parser) {
+            this(parser, (T)null);
         }
 
-        ipAddress = IniUtils.getString(ini, "cc", "address", ipAddress);
-        clientNetIpAddress = IniUtils.getString(ini, "cc", "client.address", clientNetIpAddress);
-        clientNetPort = IniUtils.getInt(ini, "cc", "client.port", clientNetPort);
-        clusterNetIpAddress = IniUtils.getString(ini, "cc", "cluster.address", clusterNetIpAddress);
-        clusterNetPort = IniUtils.getInt(ini, "cc", "cluster.port", clusterNetPort);
-        httpPort = IniUtils.getInt(ini, "cc", "http.port", httpPort);
-        heartbeatPeriod = IniUtils.getInt(ini, "cc", "heartbeat.period", heartbeatPeriod);
-        maxHeartbeatLapsePeriods = IniUtils.getInt(ini, "cc", "heartbeat.maxlapse", maxHeartbeatLapsePeriods);
-        profileDumpPeriod = IniUtils.getInt(ini, "cc", "profiledump.period", profileDumpPeriod);
-        defaultMaxJobAttempts = IniUtils.getInt(ini, "cc", "job.defaultattempts", defaultMaxJobAttempts);
-        jobHistorySize = IniUtils.getInt(ini, "cc", "job.historysize", jobHistorySize);
-        resultTTL = IniUtils.getLong(ini, "cc", "results.ttl", resultTTL);
-        resultSweepThreshold = IniUtils.getLong(ini, "cc", "results.sweepthreshold", resultSweepThreshold);
-        ccRoot = IniUtils.getString(ini, "cc", "rootfolder", ccRoot);
-        // QQQ clusterTopologyDefinition is a "File"; should support verifying that the file
-        // exists, as @Option likely does
-        appCCMainClass = IniUtils.getString(ini, "cc", "app.class", appCCMainClass);
+        <T> Option(IOptionType<T> parser, Option defaultOption) {
+            this.parser = parser;
+            this.defaultValue = defaultOption;
+        }
+
+        <T> Option(IOptionType<T> parser, T defaultValue) {
+            this.parser = parser;
+            this.defaultValue = defaultValue;
+        }
+
+        <T> Option(IOptionType<T> parser, Supplier<T> defaultValue) {
+            this.parser = parser;
+            this.defaultValue = defaultValue;
+        }
+
+        @Override
+        public Section section() {
+            return Section.CC;
+        }
+
+        @Override
+        public IOptionType type() {
+            return parser;
+        }
+
+        @Override
+        public Object defaultValue() {
+            return defaultValue;
+        }
+
+        @Override
+        public String description() {
+            switch (this) {
+                case APP_CLASS:
+                    return "Application CC main class";
+                case ADDRESS:
+                    return "Default bind address for all services on this cluster controller";
+                case CLUSTER_LISTEN_ADDRESS:
+                    return "Sets the IP Address to listen for connections from NCs";
+                case CLUSTER_LISTEN_PORT:
+                    return "Sets the port to listen for connections from node controllers";
+                case CLUSTER_PUBLIC_ADDRESS:
+                    return "Address that NCs should use to contact this CC";
+                case CLUSTER_PUBLIC_PORT:
+                    return "Port that NCs should use to contact this CC";
+                case CLIENT_LISTEN_ADDRESS:
+                    return "Sets the IP Address to listen for connections from clients";
+                case CLIENT_LISTEN_PORT:
+                    return "Sets the port to listen for connections from clients";
+                case CONSOLE_LISTEN_ADDRESS:
+                    return "Sets the listen address for the Cluster Controller";
+                case CONSOLE_LISTEN_PORT:
+                    return "Sets the http port for the Cluster Controller)";
+                case HEARTBEAT_PERIOD:
+                    return "Sets the time duration between two heartbeats from each node controller in milliseconds";
+                case HEARTBEAT_MAX_MISSES:
+                    return "Sets the maximum number of missed heartbeats before a node is marked as dead";
+                case PROFILE_DUMP_PERIOD:
+                    return "Sets the time duration between two profile dumps from each node controller in " +
+                            "milliseconds; 0 to disable";
+                case JOB_HISTORY_SIZE:
+                    return "Limits the number of historical jobs remembered by the system to the specified value";
+                case RESULT_TTL:
+                    return "Limits the amount of time results for asynchronous jobs should be retained by the system " +
+                            "in milliseconds";
+                case RESULT_SWEEP_THRESHOLD:
+                    return "The duration within which an instance of the result cleanup should be invoked in " +
+                            "milliseconds";
+                case ROOT_DIR:
+                    return "Sets the root folder used for file operations";
+                case CLUSTER_TOPOLOGY:
+                    return "Sets the XML file that defines the cluster topology";
+                case JOB_QUEUE_CLASS:
+                    return "Specify the implementation class name for the job queue";
+                case JOB_MANAGER_CLASS:
+                    return "Specify the implementation class name for the job manager";
+                default:
+                    throw new IllegalStateException("NYI: " + this);
+            }
+        }
     }
 
-    /**
-     * Once all @Option fields have been loaded from command-line or otherwise
-     * specified programmatically, call this method to:
-     * 1. Load options from a config file (as specified by -config-file)
-     * 2. Set default values for certain derived values, such as setting
-     *    clusterNetIpAddress to ipAddress
-     */
-    public void loadConfigAndApplyDefaults() throws IOException {
-        loadINIFile();
-        if (ini != null) {
-            // QQQ This way of passing overridden/defaulted values back into
-            // the ini feels clunky, and it's clearly incomplete
-            ini.add("cc", "cluster.address", clusterNetIpAddress);
-            ini.add("cc", "client.address", clientNetIpAddress);
-        }
+    private final ConfigManager configManager;
 
-        // "address" is the default for all IP addresses
-        clusterNetIpAddress = clusterNetIpAddress == null ? ipAddress : clusterNetIpAddress;
-        clientNetIpAddress = clientNetIpAddress == null ? ipAddress : clientNetIpAddress;
+    private List<String> appArgs = new ArrayList<>();
+
+    public CCConfig() {
+        this(new ConfigManager());
+    }
+
+    public CCConfig(ConfigManager configManager) {
+        super(configManager);
+        this.configManager = configManager;
+        configManager.register(Option.class);
+        configManager.registerArgsListener(appArgs::addAll);
+    }
+
+    public List<String> getAppArgs() {
+        return appArgs;
+    }
+
+    public String[] getAppArgsArray() {
+        return appArgs.toArray(new String[appArgs.size()]);
     }
 
     /**
@@ -175,15 +179,158 @@
      * if -config-file wasn't specified.
      */
     public Ini getIni() {
-        return ini;
+        return configManager.toIni(false);
     }
 
-    /**
-     * @return An IApplicationConfig representing this NCConfig.
-     * Note: Currently this only includes the values from the configuration
-     * file, not anything specified on the command-line. QQQ
-     */
-    public IApplicationConfig getAppConfig() {
-        return new IniApplicationConfig(ini);
+    public ConfigManager getConfigManager() {
+        return configManager;
+    }
+
+    // QQQ Note that clusterListenAddress is *not directly used* yet. Both
+    // the cluster listener and the web server listen on "all interfaces".
+    // This IP address is only used to instruct the NC on which IP to call in.
+    public String getClusterListenAddress() {
+        return getAppConfig().getString(Option.CLUSTER_LISTEN_ADDRESS);
+    }
+
+    public void setClusterListenAddress(String clusterListenAddress) {
+        configManager.set(Option.CLUSTER_LISTEN_ADDRESS, clusterListenAddress);
+    }
+
+    public int getClusterListenPort() {
+        return getAppConfig().getInt(Option.CLUSTER_LISTEN_PORT);
+    }
+
+    public void setClusterListenPort(int clusterListenPort) {
+        configManager.set(Option.CLUSTER_LISTEN_PORT, clusterListenPort);
+    }
+
+    public String getClusterPublicAddress() {
+        return getAppConfig().getString(Option.CLUSTER_PUBLIC_ADDRESS);
+    }
+
+    public void setClusterPublicAddress(String clusterPublicAddress) {
+        configManager.set(Option.CLUSTER_PUBLIC_ADDRESS, clusterPublicAddress);
+    }
+
+    public int getClusterPublicPort() {
+        return getAppConfig().getInt(Option.CLUSTER_PUBLIC_PORT);
+    }
+
+    public void setClusterPublicPort(int clusterPublicPort) {
+        configManager.set(Option.CLUSTER_PUBLIC_PORT, clusterPublicPort);
+    }
+
+    public String getClientListenAddress() {
+        return getAppConfig().getString(Option.CLIENT_LISTEN_ADDRESS);
+    }
+
+    public void setClientListenAddress(String clientListenAddress) {
+        configManager.set(Option.CLIENT_LISTEN_ADDRESS, clientListenAddress);
+    }
+
+    public int getClientListenPort() {
+        return getAppConfig().getInt(Option.CLIENT_LISTEN_PORT);
+    }
+
+    public void setClientListenPort(int clientListenPort) {
+        configManager.set(Option.CLIENT_LISTEN_PORT, clientListenPort);
+    }
+
+    public int getConsoleListenPort() {
+        return getAppConfig().getInt(Option.CONSOLE_LISTEN_PORT);
+    }
+
+    public void setConsoleListenPort(int consoleListenPort) {
+        configManager.set(Option.CONSOLE_LISTEN_PORT, consoleListenPort);
+    }
+
+    public int getHeartbeatPeriod() {
+        return getAppConfig().getInt(Option.HEARTBEAT_PERIOD);
+    }
+
+    public void setHeartbeatPeriod(int heartbeatPeriod) {
+        configManager.set(Option.HEARTBEAT_PERIOD, heartbeatPeriod);
+    }
+
+    public int getHeartbeatMaxMisses() {
+        return getAppConfig().getInt(Option.HEARTBEAT_MAX_MISSES);
+    }
+
+    public void setHeartbeatMaxMisses(int heartbeatMaxMisses) {
+        configManager.set(Option.HEARTBEAT_MAX_MISSES, heartbeatMaxMisses);
+    }
+
+    public int getProfileDumpPeriod() {
+        return getAppConfig().getInt(Option.PROFILE_DUMP_PERIOD);
+    }
+
+    public void setProfileDumpPeriod(int profileDumpPeriod) {
+        configManager.set(Option.PROFILE_DUMP_PERIOD, profileDumpPeriod);
+    }
+
+    public int getJobHistorySize() {
+        return getAppConfig().getInt(Option.JOB_HISTORY_SIZE);
+    }
+
+    public void setJobHistorySize(int jobHistorySize) {
+        configManager.set(Option.JOB_HISTORY_SIZE, jobHistorySize);
+    }
+
+    public long getResultTTL() {
+        return getAppConfig().getLong(Option.RESULT_TTL);
+    }
+
+    public void setResultTTL(long resultTTL) {
+        configManager.set(Option.RESULT_TTL, resultTTL);
+    }
+
+    public long getResultSweepThreshold() {
+        return getAppConfig().getLong(Option.RESULT_SWEEP_THRESHOLD);
+    }
+
+    public void setResultSweepThreshold(long resultSweepThreshold) {
+        configManager.set(Option.RESULT_SWEEP_THRESHOLD, resultSweepThreshold);
+    }
+
+    public String getRootDir() {
+        return getAppConfig().getString(Option.ROOT_DIR);
+    }
+
+    public void setRootDir(String rootDir) {
+        configManager.set(Option.ROOT_DIR, rootDir);
+    }
+
+    public File getClusterTopology() {
+        return getAppConfig().getString(Option.CLUSTER_TOPOLOGY) == null ? null
+                : new File(getAppConfig().getString(Option.CLUSTER_TOPOLOGY));
+    }
+
+    public void setClusterTopology(File clusterTopology) {
+        configManager.set(Option.CLUSTER_TOPOLOGY, clusterTopology);
+    }
+
+    public String getAppClass() {
+        return getAppConfig().getString(Option.APP_CLASS);
+    }
+
+    public void setAppClass(String appClass) {
+        configManager.set(Option.APP_CLASS, appClass);
+    }
+
+    public String getJobQueueClass() {
+        return getAppConfig().getString(Option.JOB_QUEUE_CLASS);
+    }
+
+    public void setJobQueueClass(String jobQueueClass) {
+        configManager.set(Option.JOB_QUEUE_CLASS, jobQueueClass);
+    }
+
+    public String getJobManagerClass() {
+        return getAppConfig().getString(Option.JOB_MANAGER_CLASS);
+    }
+
+    public void setJobManagerClass(String jobManagerClass) {
+        configManager.set(Option.JOB_MANAGER_CLASS, jobManagerClass);
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/ControllerConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/ControllerConfig.java
new file mode 100644
index 0000000..4aae6df
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/ControllerConfig.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.common.controllers;
+
+import java.io.Serializable;
+import java.net.URL;
+
+import org.apache.hyracks.api.config.IApplicationConfig;
+import org.apache.hyracks.api.config.IOption;
+import org.apache.hyracks.api.config.IOptionType;
+import org.apache.hyracks.api.config.Section;
+import org.apache.hyracks.control.common.config.ConfigManager;
+import org.apache.hyracks.control.common.config.OptionTypes;
+
+public class ControllerConfig implements Serializable {
+
+    public enum Option implements IOption {
+        CONFIG_FILE(OptionTypes.STRING, "Specify path to master configuration file"),
+        CONFIG_FILE_URL(OptionTypes.URL, "Specify URL to master configuration file");
+
+        private final IOptionType type;
+        private final String description;
+
+
+        Option(IOptionType type, String description) {
+            this.type = type;
+            this.description = description;
+        }
+
+        @Override
+        public Section section() {
+            return Section.COMMON;
+        }
+
+        @Override
+        public String description() {
+            return description;
+        }
+
+        @Override
+        public IOptionType type() {
+            return type;
+        }
+
+        @Override
+        public Object defaultValue() {
+            return null;
+        }
+    }
+
+    protected final ConfigManager configManager;
+
+    protected ControllerConfig(ConfigManager configManager) {
+        this.configManager = configManager;
+    }
+
+    public IApplicationConfig getAppConfig() {
+        return configManager.getAppConfig();
+    }
+
+    public String getConfigFile() {
+        return getAppConfig().getString(ControllerConfig.Option.CONFIG_FILE);
+    }
+
+    public void setConfigFile(String configFile) {
+        configManager.set(ControllerConfig.Option.CONFIG_FILE, configFile);
+    }
+
+    public URL getConfigFileUrl() {
+        return (URL) getAppConfig().get(ControllerConfig.Option.CONFIG_FILE_URL);
+    }
+
+    public void setConfigFileUrl(URL configFileUrl) {
+        configManager.set(ControllerConfig.Option.CONFIG_FILE_URL, configFileUrl);
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/IniUtils.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/IniUtils.java
deleted file mode 100644
index 451421d..0000000
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/IniUtils.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.control.common.controllers;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.lang.reflect.Array;
-import java.net.URL;
-
-import org.ini4j.Ini;
-import org.ini4j.Profile.Section;
-
-/**
- * Some utility functions for reading Ini4j objects with default values.
- * For all getXxx() methods: if the 'section' contains a slash, and the 'key'
- * is not found in that section, we will search for the key in the section named
- * by stripping the leaf of the section name (final slash and anything following).
- * eg. getInt(ini, "nc/red", "dir", null) will first look for the key "dir" in
- * the section "nc/red", but if it is not found, will look in the section "nc".
- */
-public class IniUtils {
-
-    private IniUtils() {
-    }
-
-    private static <T> T getIniValue(Ini ini, String section, String key, T defaultValue, Class<T> clazz) {
-        T value;
-        while (true) {
-            value = ini.get(section, key, clazz);
-            if (value == null) {
-                int idx = section.lastIndexOf('/');
-                if (idx > -1) {
-                    section = section.substring(0, idx);
-                    continue;
-                }
-            }
-            break;
-        }
-        return (value != null) ? value : defaultValue;
-    }
-
-    @SuppressWarnings("unchecked")
-    private static <T> T getIniArray(Ini ini, String section, String key, Class<T> clazz) {
-        Section sec = ini.get(section);
-        if (clazz.getComponentType() == null) {
-            return null;
-        }
-        if (sec == null) {
-            return (T) Array.newInstance(clazz.getComponentType(), 0);
-        } else {
-            return sec.getAll(key, clazz);
-        }
-    }
-
-    public static String getString(Ini ini, String section, String key, String defaultValue) {
-        return getIniValue(ini, section, key, defaultValue, String.class);
-    }
-
-    public static String[] getStringArray(Ini ini, String section, String key) {
-        return getIniArray(ini, section, key, String[].class);
-    }
-
-    public static int getInt(Ini ini, String section, String key, int defaultValue) {
-        return getIniValue(ini, section, key, defaultValue, Integer.class);
-    }
-
-    public static long getLong(Ini ini, String section, String key, long defaultValue) {
-        return getIniValue(ini, section, key, defaultValue, Long.class);
-    }
-
-    public static Ini loadINIFile(String configFile) throws IOException {
-        Ini ini = new Ini();
-        File conffile = new File(configFile);
-        if (!conffile.exists()) {
-            throw new FileNotFoundException(configFile);
-        }
-        ini.load(conffile);
-        return ini;
-    }
-
-    public static Ini loadINIFile(URL configURL) throws IOException {
-        Ini ini = new Ini();
-        ini.load(configURL);
-        return ini;
-    }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
index fa7d76a..7906b52 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
@@ -18,287 +18,485 @@
  */
 package org.apache.hyracks.control.common.controllers;
 
-import java.io.IOException;
-import java.io.Serializable;
+import static org.apache.hyracks.control.common.config.OptionTypes.BOOLEAN;
+import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER;
+import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER_BYTE_UNIT;
+import static org.apache.hyracks.control.common.config.OptionTypes.LONG;
+import static org.apache.hyracks.control.common.config.OptionTypes.STRING;
+import static org.apache.hyracks.control.common.config.OptionTypes.STRING_ARRAY;
+
 import java.net.InetAddress;
-import java.net.URL;
+import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
+import java.util.function.Supplier;
 
-import org.apache.hyracks.api.application.IApplicationConfig;
-import org.apache.hyracks.control.common.application.IniApplicationConfig;
-import org.ini4j.Ini;
-import org.kohsuke.args4j.Argument;
-import org.kohsuke.args4j.Option;
-import org.kohsuke.args4j.spi.StopOptionHandler;
+import org.apache.hyracks.api.config.IApplicationConfig;
+import org.apache.hyracks.api.config.IOption;
+import org.apache.hyracks.api.config.IOptionType;
+import org.apache.hyracks.api.config.Section;
+import org.apache.hyracks.control.common.config.ConfigManager;
+import org.apache.hyracks.util.file.FileUtil;
 
-public class NCConfig implements Serializable {
-    private static final long serialVersionUID = 2L;
+public class NCConfig extends ControllerConfig {
+    private static final long serialVersionUID = 3L;
 
-    @Option(name = "-cc-host", usage = "Cluster Controller host name (required unless specified in config file)",
-            required = false)
-    public String ccHost = null;
+    public static String defaultDir = System.getProperty("java.io.tmpdir");
+    public static String defaultAppClass = null;
 
-    @Option(name = "-cc-port", usage = "Cluster Controller port (default: 1099)", required = false)
-    public int ccPort = 1099;
+    public enum Option implements IOption {
+        ADDRESS(STRING, InetAddress.getLoopbackAddress().getHostAddress()),
+        PUBLIC_ADDRESS(STRING, ADDRESS),
+        CLUSTER_LISTEN_ADDRESS(STRING, ADDRESS),
+        CLUSTER_LISTEN_PORT(INTEGER, 0),
+        NCSERVICE_ADDRESS(STRING, PUBLIC_ADDRESS),
+        NCSERVICE_PORT(INTEGER, 9090),
+        CLUSTER_ADDRESS(STRING, (String)null),
+        CLUSTER_PORT(INTEGER, 1099),
+        CLUSTER_PUBLIC_ADDRESS(STRING, PUBLIC_ADDRESS),
+        CLUSTER_PUBLIC_PORT(INTEGER, CLUSTER_LISTEN_PORT),
+        NODE_ID(STRING, (String)null),
+        DATA_LISTEN_ADDRESS(STRING, ADDRESS),
+        DATA_LISTEN_PORT(INTEGER, 0),
+        DATA_PUBLIC_ADDRESS(STRING, PUBLIC_ADDRESS),
+        DATA_PUBLIC_PORT(INTEGER, DATA_LISTEN_PORT),
+        RESULT_LISTEN_ADDRESS(STRING, ADDRESS),
+        RESULT_LISTEN_PORT(INTEGER, 0),
+        RESULT_PUBLIC_ADDRESS(STRING, PUBLIC_ADDRESS),
+        RESULT_PUBLIC_PORT(INTEGER, RESULT_LISTEN_PORT),
+        MESSAGING_LISTEN_ADDRESS(STRING, ADDRESS),
+        MESSAGING_LISTEN_PORT(INTEGER, 0),
+        MESSAGING_PUBLIC_ADDRESS(STRING, PUBLIC_ADDRESS),
+        MESSAGING_PUBLIC_PORT(INTEGER, MESSAGING_LISTEN_PORT),
+        CLUSTER_CONNECT_RETRIES(INTEGER, 5),
+        @SuppressWarnings("RedundantCast") // not redundant- false positive from IDEA
+        IODEVICES(STRING_ARRAY, (Supplier<String []>)() -> new String [] { FileUtil.joinPath(defaultDir, "iodevice") }),
+        NET_THREAD_COUNT(INTEGER, 1),
+        NET_BUFFER_COUNT(INTEGER, 1),
+        RESULT_TTL(LONG, 86400000L),
+        RESULT_SWEEP_THRESHOLD(LONG, 60000L),
+        RESULT_MANAGER_MEMORY(INTEGER_BYTE_UNIT, -1),
+        @SuppressWarnings("RedundantCast") // not redundant- false positive from IDEA
+        APP_CLASS(STRING, (Supplier<String>)() -> defaultAppClass),
+        NCSERVICE_PID(INTEGER, -1),
+        COMMAND(STRING, "hyracksnc"),
+        JVM_ARGS(STRING, (String)null),
+        VIRTUAL_NC(BOOLEAN, false);
 
-    @Option(name = "-address", usage = "IP Address for NC (default: localhost)", required = false)
-    public String ipAddress = InetAddress.getLoopbackAddress().getHostAddress();
+        private final IOptionType parser;
+        private final Object defaultValue;
 
-    @Option(name = "-cluster-net-ip-address", usage = "IP Address to bind cluster listener (default: same as -address)",
-            required = false)
-    public String clusterNetIPAddress;
-
-    @Option(name = "-cluster-net-port", usage = "IP port to bind cluster listener (default: random port)",
-            required = false)
-    public int clusterNetPort = 0;
-
-    @Option(name = "-cluster-net-public-ip-address",
-            usage = "Public IP Address to announce cluster listener (default: same as -cluster-net-ip-address)",
-            required = false)
-    public String clusterNetPublicIPAddress;
-
-    @Option(name = "-cluster-net-public-port",
-            usage = "Public IP port to announce cluster listener (default: same as -cluster-net-port; " +
-                    "must set -cluster-net-public-ip-address also)", required = false)
-    public int clusterNetPublicPort = 0;
-
-    @Option(name = "-node-id", usage = "Logical name of node controller unique within the cluster (required unless " +
-            "specified in config file)", required = false)
-    public String nodeId = null;
-
-    @Option(name = "-data-ip-address", usage = "IP Address to bind data listener (default: same as -address)",
-            required = false)
-    public String dataIPAddress;
-
-    @Option(name = "-data-port", usage = "IP port to bind data listener (default: random port)", required = false)
-    public int dataPort = 0;
-
-    @Option(name = "-data-public-ip-address",
-            usage = "Public IP Address to announce data listener (default: same as -data-ip-address)", required = false)
-    public String dataPublicIPAddress;
-
-    @Option(name = "-data-public-port",
-            usage = "Public IP port to announce data listener (default: same as -data-port; must set " +
-                    "-data-public-ip-address also)", required = false)
-    public int dataPublicPort = 0;
-
-    @Option(name = "-result-ip-address",
-            usage = "IP Address to bind dataset result distribution listener (default: same as -address)",
-            required = false)
-    public String resultIPAddress;
-
-    @Option(name = "-result-port",
-            usage = "IP port to bind dataset result distribution listener (default: random port)",
-            required = false)
-    public int resultPort = 0;
-
-    @Option(name = "-result-public-ip-address",
-            usage = "Public IP Address to announce dataset result distribution listener (default: same as " +
-                    "-result-ip-address)", required = false)
-    public String resultPublicIPAddress;
-
-    @Option(name = "-result-public-port", usage = "Public IP port to announce dataset result distribution listener " +
-            "(default: same as -result-port; must set -result-public-ip-address also)", required = false)
-    public int resultPublicPort = 0;
-
-    @Option(name = "-retries", usage = "Number of attempts to contact CC before giving up (default: 5)")
-    public int retries = 5;
-
-    @Option(name = "-iodevices",
-            usage = "Comma separated list of IO Device mount points (default: One device in default temp folder)",
-            required = false)
-    public String ioDevices = System.getProperty("java.io.tmpdir");
-
-    @Option(name = "-net-thread-count", usage = "Number of threads to use for Network I/O (default: 1)")
-    public int nNetThreads = 1;
-
-    @Option(name = "-net-buffer-count", usage = "Number of network buffers per input/output channel (default: 1)",
-            required = false)
-    public int nNetBuffers = 1;
-
-    @Option(name = "-max-memory", usage = "Maximum memory usable at this Node Controller in bytes (default: -1 auto)")
-    public int maxMemory = -1;
-
-    @Option(name = "-result-time-to-live", usage = "Limits the amount of time results for asynchronous jobs should " +
-            "be retained by the system in milliseconds. (default: 24 hours)")
-    public long resultTTL = 86400000;
-
-    @Option(name = "-result-sweep-threshold", usage = "The duration within which an instance of the result cleanup " +
-            "should be invoked in milliseconds. (default: 1 minute)")
-    public long resultSweepThreshold = 60000;
-
-    @Option(name = "-result-manager-memory",
-            usage = "Memory usable for result caching at this Node Controller in bytes (default: -1 auto)")
-    public int resultManagerMemory = -1;
-
-    @Option(name = "-app-nc-main-class", usage = "Application NC Main Class")
-    public String appNCMainClass;
-
-    @Option(name = "-config-file", usage = "Specify path to local configuration file (default: no local config)",
-            required = false)
-    public String configFile = null;
-
-    @Option(name = "-messaging-ip-address", usage = "IP Address to bind messaging "
-            + "listener (default: same as -address)", required = false)
-    public String messagingIPAddress;
-
-    @Option(name = "-messaging-port", usage = "IP port to bind messaging listener "
-            + "(default: random port)", required = false)
-    public int messagingPort = 0;
-
-    @Option(name = "-messaging-public-ip-address", usage = "Public IP Address to announce messaging"
-            + " listener (default: same as -messaging-ip-address)", required = false)
-    public String messagingPublicIPAddress;
-
-    @Option(name = "-messaging-public-port", usage = "Public IP port to announce messaging listener"
-            + " (default: same as -messaging-port; must set -messaging-public-port also)", required = false)
-    public int messagingPublicPort = 0;
-
-    @Option(name = "-ncservice-pid", usage = "PID of the NCService which launched this NCDriver", required = false)
-    public int ncservicePid = -1;
-
-    @Argument
-    @Option(name = "--", handler = StopOptionHandler.class)
-    public List<String> appArgs;
-
-    public URL configFileUrl = null;
-
-    private transient Ini ini = null;
-
-    private void loadINIFile() throws IOException {
-        if (configFile != null) {
-            ini = IniUtils.loadINIFile(configFile);
-        } else if (configFileUrl != null) {
-            ini = IniUtils.loadINIFile(configFileUrl);
-        } else {
-            return;
+        <T> Option(IOptionType<T> parser, Option defaultOption) {
+            this.parser = parser;
+            this.defaultValue = defaultOption;
         }
 
-        // QQQ This should default to cc/address if cluster.address not set, but
-        // that logic really should be handled by the ini file sent from the CC
-        ccHost = IniUtils.getString(ini, "cc", "cluster.address", ccHost);
-        ccPort = IniUtils.getInt(ini, "cc", "cluster.port", ccPort);
-
-        // Get ID of *this* NC
-        nodeId = IniUtils.getString(ini, "localnc", "id", nodeId);
-        String nodeSection = "nc/" + nodeId;
-
-        // Network ports
-        ipAddress = IniUtils.getString(ini, nodeSection, "address", ipAddress);
-
-        clusterNetIPAddress = IniUtils.getString(ini, nodeSection, "cluster.address", clusterNetIPAddress);
-        clusterNetPort = IniUtils.getInt(ini, nodeSection, "cluster.port", clusterNetPort);
-        dataIPAddress = IniUtils.getString(ini, nodeSection, "data.address", dataIPAddress);
-        dataPort = IniUtils.getInt(ini, nodeSection, "data.port", dataPort);
-        resultIPAddress = IniUtils.getString(ini, nodeSection, "result.address", resultIPAddress);
-        resultPort = IniUtils.getInt(ini, nodeSection, "result.port", resultPort);
-
-        clusterNetPublicIPAddress = IniUtils.getString(ini, nodeSection, "public.cluster.address",
-                clusterNetPublicIPAddress);
-        clusterNetPublicPort = IniUtils.getInt(ini, nodeSection, "public.cluster.port", clusterNetPublicPort);
-        dataPublicIPAddress = IniUtils.getString(ini, nodeSection, "public.data.address", dataPublicIPAddress);
-        dataPublicPort = IniUtils.getInt(ini, nodeSection, "public.data.port", dataPublicPort);
-        resultPublicIPAddress = IniUtils.getString(ini, nodeSection, "public.result.address", resultPublicIPAddress);
-        resultPublicPort = IniUtils.getInt(ini, nodeSection, "public.result.port", resultPublicPort);
-
-        messagingIPAddress = IniUtils.getString(ini, nodeSection, "messaging.address", messagingIPAddress);
-        messagingPort = IniUtils.getInt(ini, nodeSection, "messaging.port", messagingPort);
-        messagingPublicIPAddress = IniUtils.getString(ini, nodeSection, "public.messaging.address",
-                messagingPublicIPAddress);
-        messagingPublicPort = IniUtils.getInt(ini, nodeSection, "public.messaging.port", messagingPublicPort);
-
-        retries = IniUtils.getInt(ini, nodeSection, "retries", retries);
-
-        // Directories
-        ioDevices = IniUtils.getString(ini, nodeSection, "iodevices", ioDevices);
-
-        // Hyracks client entrypoint
-        appNCMainClass = IniUtils.getString(ini, nodeSection, "app.class", appNCMainClass);
-    }
-
-    /*
-     * Once all @Option fields have been loaded from command-line or otherwise
-     * specified programmatically, call this method to:
-     * 1. Load options from a config file (as specified by -config-file)
-     * 2. Set default values for certain derived values, such as setting
-     *    clusterNetIpAddress to ipAddress
-     */
-    public void loadConfigAndApplyDefaults() throws IOException {
-        loadINIFile();
-
-        // "address" is the default for all IP addresses
-        if (clusterNetIPAddress == null) {
-            clusterNetIPAddress = ipAddress;
-        }
-        if (dataIPAddress == null) {
-            dataIPAddress = ipAddress;
-        }
-        if (resultIPAddress == null) {
-            resultIPAddress = ipAddress;
+        <T> Option(IOptionType<T> parser, T defaultValue) {
+            this.parser = parser;
+            this.defaultValue = defaultValue;
         }
 
-        // All "public" options default to their "non-public" versions
-        if (clusterNetPublicIPAddress == null) {
-            clusterNetPublicIPAddress = clusterNetIPAddress;
+        <T> Option(IOptionType<T> parser, Supplier<T> defaultValue) {
+            this.parser = parser;
+            this.defaultValue = defaultValue;
         }
-        if (clusterNetPublicPort == 0) {
-            clusterNetPublicPort = clusterNetPort;
+
+        @Override
+        public Section section() {
+            switch (this) {
+                case NODE_ID:
+                    return Section.LOCALNC;
+                default:
+                    return Section.NC;
+            }
         }
-        if (dataPublicIPAddress == null) {
-            dataPublicIPAddress = dataIPAddress;
+
+        @Override
+        public String description() {
+            switch (this) {
+                case ADDRESS:
+                    return "Default IP Address to bind listeners on this NC.  All services will bind on this address " +
+                            "unless a service-specific listen address is supplied.";
+                case CLUSTER_LISTEN_ADDRESS:
+                    return "IP Address to bind cluster listener on this NC";
+                case PUBLIC_ADDRESS:
+                    return "Default public address that other processes should use to contact this NC.  All services " +
+                            "will advertise this address unless a service-specific public address is supplied.";
+                case NCSERVICE_ADDRESS:
+                    return "Address the CC should use to contact the NCService associated with this NC";
+                case NCSERVICE_PORT:
+                    return "Port the CC should use to contact the NCService associated with this NC";
+                case CLUSTER_ADDRESS:
+                    return "Cluster Controller address (required unless specified in config file)";
+                case CLUSTER_PORT:
+                    return "Cluster Controller port";
+                case CLUSTER_LISTEN_PORT:
+                    return "IP port to bind cluster listener";
+                case CLUSTER_PUBLIC_ADDRESS:
+                    return "Public IP Address to announce cluster listener";
+                case CLUSTER_PUBLIC_PORT:
+                    return "Public IP port to announce cluster listener";
+                case NODE_ID:
+                    return "Logical name of node controller unique within the cluster (required unless specified in " +
+                            "config file)";
+                case DATA_LISTEN_ADDRESS:
+                    return "IP Address to bind data listener";
+                case DATA_LISTEN_PORT:
+                    return "IP port to bind data listener";
+                case DATA_PUBLIC_ADDRESS:
+                    return "Public IP Address to announce data listener";
+                case DATA_PUBLIC_PORT:
+                    return "Public IP port to announce data listener";
+                case RESULT_LISTEN_ADDRESS:
+                    return "IP Address to bind dataset result distribution listener";
+                case RESULT_LISTEN_PORT:
+                    return "IP port to bind dataset result distribution listener";
+                case RESULT_PUBLIC_ADDRESS:
+                    return "Public IP Address to announce dataset result distribution listener";
+                case RESULT_PUBLIC_PORT:
+                    return "Public IP port to announce dataset result distribution listener";
+                case MESSAGING_LISTEN_ADDRESS:
+                    return "IP Address to bind messaging listener";
+                case MESSAGING_LISTEN_PORT:
+                    return "IP port to bind messaging listener";
+                case MESSAGING_PUBLIC_ADDRESS:
+                    return "Public IP Address to announce messaging listener";
+                case MESSAGING_PUBLIC_PORT:
+                    return "Public IP port to announce messaging listener";
+                case CLUSTER_CONNECT_RETRIES:
+                    return "Number of attempts to contact CC before giving up";
+                case IODEVICES:
+                    return "Comma separated list of IO Device mount points";
+                case NET_THREAD_COUNT:
+                    return "Number of threads to use for Network I/O";
+                case NET_BUFFER_COUNT:
+                    return "Number of network buffers per input/output channel";
+                case RESULT_TTL:
+                    return "Limits the amount of time results for asynchronous jobs should be retained by the system " +
+                            "in milliseconds";
+                case RESULT_SWEEP_THRESHOLD:
+                    return "The duration within which an instance of the result cleanup should be invoked in " +
+                            "milliseconds";
+                case RESULT_MANAGER_MEMORY:
+                    return "Memory usable for result caching at this Node Controller in bytes";
+                case APP_CLASS:
+                    return "Application NC Main Class";
+                case NCSERVICE_PID:
+                    return "PID of the NCService which launched this NCDriver";
+                case COMMAND:
+                    return "Command NCService should invoke to start the NCDriver";
+                case JVM_ARGS:
+                    return "JVM args to pass to the NCDriver";
+                case VIRTUAL_NC:
+                    return "A flag indicating if this NC is running on virtual cluster";
+                default:
+                    throw new IllegalStateException("NYI: " + this);
+            }
         }
-        if (dataPublicPort == 0) {
-            dataPublicPort = dataPort;
+
+
+        @Override
+        public IOptionType type() {
+            return parser;
         }
-        if (resultPublicIPAddress == null) {
-            resultPublicIPAddress = resultIPAddress;
+
+        @Override
+        public Object defaultValue() {
+            return defaultValue;
         }
-        if (resultPublicPort == 0) {
-            resultPublicPort = resultPort;
+
+        @Override
+        public boolean hidden() {
+            return this == VIRTUAL_NC;
         }
     }
 
-    /**
-     * @return An IApplicationConfig representing this NCConfig.
-     *         Note: Currently this only includes the values from the configuration
-     *         file, not anything specified on the command-line. QQQ
-     */
+    private List<String> appArgs = new ArrayList<>();
+
+    private final IApplicationConfig appConfig;
+    private final String nodeId;
+
+    public NCConfig(String nodeId) {
+        this(nodeId, new ConfigManager(null));
+    }
+
+    public NCConfig(String nodeId, ConfigManager configManager) {
+        super(configManager);
+        this.appConfig = configManager.getNodeEffectiveConfig(nodeId);
+        configManager.register(Option.class);
+        setNodeId(nodeId);
+        this.nodeId = nodeId;
+        configManager.registerArgsListener(appArgs::addAll);
+    }
+
+    public List<String> getAppArgs() {
+        return appArgs;
+    }
+
+    public String[] getAppArgsArray() {
+        return appArgs.toArray(new String[appArgs.size()]);
+    }
+
+    public ConfigManager getConfigManager() {
+        return configManager;
+    }
+
     public IApplicationConfig getAppConfig() {
-        return new IniApplicationConfig(ini);
+        return appConfig;
     }
 
-    public void toMap(Map<String, String> configuration) {
-        configuration.put("cc-host", ccHost);
-        configuration.put("cc-port", (String.valueOf(ccPort)));
-        configuration.put("cluster-net-ip-address", clusterNetIPAddress);
-        configuration.put("cluster-net-port", String.valueOf(clusterNetPort));
-        configuration.put("cluster-net-public-ip-address", clusterNetPublicIPAddress);
-        configuration.put("cluster-net-public-port", String.valueOf(clusterNetPublicPort));
-        configuration.put("node-id", nodeId);
-        configuration.put("data-ip-address", dataIPAddress);
-        configuration.put("data-port", String.valueOf(dataPort));
-        configuration.put("data-public-ip-address", dataPublicIPAddress);
-        configuration.put("data-public-port", String.valueOf(dataPublicPort));
-        configuration.put("result-ip-address", resultIPAddress);
-        configuration.put("result-port", String.valueOf(resultPort));
-        configuration.put("result-public-ip-address", resultPublicIPAddress);
-        configuration.put("result-public-port", String.valueOf(resultPublicPort));
-        configuration.put("retries", String.valueOf(retries));
-        configuration.put("iodevices", ioDevices);
-        configuration.put("net-thread-count", String.valueOf(nNetThreads));
-        configuration.put("net-buffer-count", String.valueOf(nNetBuffers));
-        configuration.put("max-memory", String.valueOf(maxMemory));
-        configuration.put("result-time-to-live", String.valueOf(resultTTL));
-        configuration.put("result-sweep-threshold", String.valueOf(resultSweepThreshold));
-        configuration.put("result-manager-memory", String.valueOf(resultManagerMemory));
-        configuration.put("messaging-ip-address", messagingIPAddress);
-        configuration.put("messaging-port", String.valueOf(messagingPort));
-        configuration.put("messaging-public-ip-address", messagingPublicIPAddress);
-        configuration.put("messaging-public-port", String.valueOf(messagingPublicPort));
-        configuration.put("ncservice-pid", String.valueOf(ncservicePid));
-        if (appNCMainClass != null) {
-            configuration.put("app-nc-main-class", appNCMainClass);
-        }
+    public String getPublicAddress() {
+        return appConfig.getString(Option.PUBLIC_ADDRESS);
+    }
+
+    public void setPublicAddress(String publicAddress) {
+        configManager.set(nodeId, Option.PUBLIC_ADDRESS, publicAddress);
+    }
+
+    public String getNCServiceAddress() {
+        return appConfig.getString(Option.NCSERVICE_ADDRESS);
+    }
+
+    public void setNCServiceAddress(String ncserviceAddress) {
+        configManager.set(nodeId, Option.NCSERVICE_ADDRESS, ncserviceAddress);
+    }
+
+    public int getNCServicePort() {
+        return appConfig.getInt(Option.NCSERVICE_PORT);
+    }
+
+    public void setNCServicePort(int ncservicePort) {
+        configManager.set(nodeId, Option.NCSERVICE_PORT, ncservicePort);
+    }
+
+    public String getClusterAddress() {
+        return appConfig.getString(Option.CLUSTER_ADDRESS);
+    }
+
+    public void setClusterAddress(String clusterAddress) {
+        configManager.set(nodeId, Option.CLUSTER_ADDRESS, clusterAddress);
+    }
+
+    public int getClusterPort() {
+        return appConfig.getInt(Option.CLUSTER_PORT);
+    }
+
+    public void setClusterPort(int clusterPort) {
+        configManager.set(nodeId, Option.CLUSTER_PORT, clusterPort);
+    }
+
+    public String getClusterListenAddress() {
+        return appConfig.getString(Option.CLUSTER_LISTEN_ADDRESS);
+    }
+
+    public void setClusterListenAddress(String clusterListenAddress) {
+        configManager.set(nodeId, Option.CLUSTER_LISTEN_ADDRESS, clusterListenAddress);
+    }
+
+    public int getClusterListenPort() {
+        return appConfig.getInt(Option.CLUSTER_LISTEN_PORT);
+    }
+
+    public void setClusterListenPort(int clusterListenPort) {
+        configManager.set(nodeId, Option.CLUSTER_LISTEN_PORT, clusterListenPort);
+    }
+
+    public String getClusterPublicAddress() {
+        return appConfig.getString(Option.CLUSTER_PUBLIC_ADDRESS);
+    }
+
+    public void setClusterPublicAddress(String clusterPublicAddress) {
+        configManager.set(nodeId, Option.CLUSTER_PUBLIC_ADDRESS, clusterPublicAddress);
+    }
+
+    public int getClusterPublicPort() {
+        return appConfig.getInt(Option.CLUSTER_PUBLIC_PORT);
+    }
+
+    public void setClusterPublicPort(int clusterPublicPort) {
+        configManager.set(nodeId, Option.CLUSTER_PUBLIC_PORT, clusterPublicPort);
+    }
+
+    public String getNodeId() {
+        return appConfig.getString(Option.NODE_ID);
+    }
+
+    public void setNodeId(String nodeId) {
+        configManager.set(nodeId, Option.NODE_ID, nodeId);
+    }
+
+    public String getDataListenAddress() {
+        return appConfig.getString(Option.DATA_LISTEN_ADDRESS);
+    }
+
+    public void setDataListenAddress(String dataListenAddress) {
+        configManager.set(nodeId, Option.DATA_LISTEN_ADDRESS, dataListenAddress);
+    }
+
+    public int getDataListenPort() {
+        return appConfig.getInt(Option.DATA_LISTEN_PORT);
+    }
+
+    public void setDataListenPort(int dataListenPort) {
+        configManager.set(nodeId, Option.DATA_LISTEN_PORT, dataListenPort);
+    }
+
+    public String getDataPublicAddress() {
+        return appConfig.getString(Option.DATA_PUBLIC_ADDRESS);
+    }
+
+    public void setDataPublicAddress(String dataPublicAddress) {
+        configManager.set(nodeId, Option.DATA_PUBLIC_ADDRESS, dataPublicAddress);
+    }
+
+    public int getDataPublicPort() {
+        return appConfig.getInt(Option.DATA_PUBLIC_PORT);
+    }
+
+    public void setDataPublicPort(int dataPublicPort) {
+        configManager.set(nodeId, Option.DATA_PUBLIC_PORT, dataPublicPort);
+    }
+
+    public String getResultListenAddress() {
+        return appConfig.getString(Option.RESULT_LISTEN_ADDRESS);
+    }
+
+    public void setResultListenAddress(String resultListenAddress) {
+        configManager.set(nodeId, Option.RESULT_LISTEN_ADDRESS, resultListenAddress);
+    }
+
+    public int getResultListenPort() {
+        return appConfig.getInt(Option.RESULT_LISTEN_PORT);
+    }
+
+    public void setResultListenPort(int resultListenPort) {
+        configManager.set(nodeId, Option.RESULT_LISTEN_PORT, resultListenPort);
+    }
+
+    public String getResultPublicAddress() {
+        return appConfig.getString(Option.RESULT_PUBLIC_ADDRESS);
+    }
+
+    public void setResultPublicAddress(String resultPublicAddress) {
+        configManager.set(nodeId, Option.RESULT_PUBLIC_ADDRESS, resultPublicAddress);
+    }
+
+    public int getResultPublicPort() {
+        return appConfig.getInt(Option.RESULT_PUBLIC_PORT);
+    }
+
+    public void setResultPublicPort(int resultPublicPort) {
+        configManager.set(nodeId, Option.RESULT_PUBLIC_PORT, resultPublicPort);
+    }
+
+    public String getMessagingListenAddress() {
+        return appConfig.getString(Option.MESSAGING_LISTEN_ADDRESS);
+    }
+
+    public void setMessagingListenAddress(String messagingListenAddress) {
+        configManager.set(nodeId, Option.MESSAGING_LISTEN_ADDRESS, messagingListenAddress);
+    }
+
+    public int getMessagingListenPort() {
+        return appConfig.getInt(Option.MESSAGING_LISTEN_PORT);
+    }
+
+    public void setMessagingListenPort(int messagingListenPort) {
+        configManager.set(nodeId, Option.MESSAGING_LISTEN_PORT, messagingListenPort);
+    }
+
+    public String getMessagingPublicAddress() {
+        return appConfig.getString(Option.MESSAGING_PUBLIC_ADDRESS);
+    }
+
+    public void setMessagingPublicAddress(String messagingPublicAddress) {
+        configManager.set(nodeId, Option.MESSAGING_PUBLIC_ADDRESS, messagingPublicAddress);
+    }
+
+    public int getMessagingPublicPort() {
+        return appConfig.getInt(Option.MESSAGING_PUBLIC_PORT);
+    }
+
+    public void setMessagingPublicPort(int messagingPublicPort) {
+        configManager.set(nodeId, Option.MESSAGING_PUBLIC_PORT, messagingPublicPort);
+    }
+
+    public int getClusterConnectRetries() {
+        return appConfig.getInt(Option.CLUSTER_CONNECT_RETRIES);
+    }
+
+    public void setClusterConnectRetries(int clusterConnectRetries) {
+        configManager.set(nodeId, Option.CLUSTER_CONNECT_RETRIES, clusterConnectRetries);
+    }
+
+    public String[] getIODevices() {
+        return appConfig.getStringArray(Option.IODEVICES);
+    }
+
+    public void setIODevices(String[] iodevices) {
+        configManager.set(nodeId, Option.IODEVICES, iodevices);
+    }
+
+    public int getNetThreadCount() {
+        return appConfig.getInt(Option.NET_THREAD_COUNT);
+    }
+
+    public void setNetThreadCount(int netThreadCount) {
+        configManager.set(nodeId, Option.NET_THREAD_COUNT, netThreadCount);
+    }
+
+    public int getNetBufferCount() {
+        return appConfig.getInt(Option.NET_BUFFER_COUNT);
+    }
+
+    public void setNetBufferCount(int netBufferCount) {
+        configManager.set(nodeId, Option.NET_BUFFER_COUNT, netBufferCount);
+    }
+
+    public long getResultTTL() {
+        return appConfig.getLong(Option.RESULT_TTL);
+    }
+
+    public void setResultTTL(long resultTTL) {
+        configManager.set(nodeId, Option.RESULT_TTL, resultTTL);
+    }
+
+    public long getResultSweepThreshold() {
+        return appConfig.getLong(Option.RESULT_SWEEP_THRESHOLD);
+    }
+
+    public void setResultSweepThreshold(long resultSweepThreshold) {
+        configManager.set(nodeId, Option.RESULT_SWEEP_THRESHOLD, resultSweepThreshold);
+    }
+
+    public int getResultManagerMemory() {
+        return appConfig.getInt(Option.RESULT_MANAGER_MEMORY);
+    }
+
+    public void setResultManagerMemory(int resultManagerMemory) {
+        configManager.set(nodeId, Option.RESULT_MANAGER_MEMORY, resultManagerMemory);
+    }
+
+    public String getAppClass() {
+        return appConfig.getString(Option.APP_CLASS);
+    }
+
+    public void setAppClass(String appClass) {
+        configManager.set(nodeId, Option.APP_CLASS, appClass);
+    }
+
+    public int getNCServicePid() {
+        return appConfig.getInt(Option.NCSERVICE_PID);
+    }
+
+    public void setNCServicePid(int ncservicePid) {
+        configManager.set(nodeId, Option.NCSERVICE_PID, ncservicePid);
+    }
+
+    public boolean getVirtualNC() {
+        return appConfig.getBoolean(Option.VIRTUAL_NC);
+    }
+
+    public void setVirtualNC(boolean virtualNC) {
+        configManager.set(nodeId, Option.VIRTUAL_NC, virtualNC);
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/pom.xml b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/pom.xml
index 148cf18..a7e3fa9 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/pom.xml
@@ -60,7 +60,6 @@
     <dependency>
       <groupId>args4j</groupId>
       <artifactId>args4j</artifactId>
-      <version>2.0.12</version>
     </dependency>
     <dependency>
       <groupId>org.apache.hyracks</groupId>
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCApplicationEntryPoint.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCApplicationEntryPoint.java
new file mode 100644
index 0000000..d4e67fd
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCApplicationEntryPoint.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc;
+
+import java.lang.management.ManagementFactory;
+import java.util.Arrays;
+
+import org.apache.hyracks.api.application.INCApplicationContext;
+import org.apache.hyracks.api.application.INCApplicationEntryPoint;
+import org.apache.hyracks.api.config.IConfigManager;
+import org.apache.hyracks.api.config.Section;
+import org.apache.hyracks.api.job.resource.NodeCapacity;
+import org.apache.hyracks.control.common.controllers.CCConfig;
+import org.apache.hyracks.control.common.controllers.ControllerConfig;
+import org.apache.hyracks.control.common.controllers.NCConfig;
+
+public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
+    public static final NCApplicationEntryPoint INSTANCE = new NCApplicationEntryPoint();
+
+    protected NCApplicationEntryPoint() {
+    }
+
+    @Override
+    public void start(INCApplicationContext ncAppCtx, String[] args) throws Exception {
+        if (args.length > 0) {
+            throw new IllegalArgumentException("Unrecognized argument(s): " + Arrays.toString(args));
+        }
+    }
+
+    @Override
+    public void notifyStartupComplete() throws Exception {
+        // no-op
+    }
+
+    @Override
+    public void stop() throws Exception {
+        // no-op
+    }
+
+    @Override
+    public NodeCapacity getCapacity() {
+        int allCores = ManagementFactory.getOperatingSystemMXBean().getAvailableProcessors();
+        return new NodeCapacity(Runtime.getRuntime().maxMemory(), allCores > 1 ? allCores - 1 : allCores);
+    }
+
+    @Override
+    public void registerConfigOptions(IConfigManager configManager) {
+        configManager.addIniParamOptions(ControllerConfig.Option.CONFIG_FILE, ControllerConfig.Option.CONFIG_FILE_URL);
+        configManager.addCmdLineSections(Section.NC, Section.COMMON, Section.LOCALNC);
+        configManager.setUsageFilter(getUsageFilter());
+        configManager.register(ControllerConfig.Option.class, CCConfig.Option.class, NCConfig.Option.class);
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCDriver.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCDriver.java
index 2323d71..b52064e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCDriver.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCDriver.java
@@ -18,28 +18,32 @@
  */
 package org.apache.hyracks.control.nc;
 
+import java.io.IOException;
+import java.util.Arrays;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.hyracks.api.application.INCApplicationEntryPoint;
+import org.apache.hyracks.control.common.config.ConfigManager;
+import org.apache.hyracks.control.common.config.ConfigUtils;
 import org.apache.hyracks.control.common.controllers.NCConfig;
-import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.CmdLineException;
 
+@SuppressWarnings("InfiniteLoopStatement")
 public class NCDriver {
     private static final Logger LOGGER = Logger.getLogger(NCDriver.class.getName());
 
-    public static void main(String args[]) throws Exception {
+    private NCDriver() {
+    }
+
+    public static void main(String[] args) {
         try {
-            NCConfig ncConfig = new NCConfig();
-            CmdLineParser cp = new CmdLineParser(ncConfig);
-            try {
-                cp.parseArgument(args);
-            } catch (Exception e) {
-                e.printStackTrace();
-                cp.printUsage(System.err);
-                System.exit(1);
-            }
-            ncConfig.loadConfigAndApplyDefaults();
-            final NodeControllerService ncService = new NodeControllerService(ncConfig);
+            final String nodeId = ConfigUtils.getOptionValue(args, NCConfig.Option.NODE_ID);
+            final ConfigManager configManager = new ConfigManager(args);
+            INCApplicationEntryPoint appEntryPoint = getAppEntryPoint(args);
+            appEntryPoint.registerConfigOptions(configManager);
+            NCConfig ncConfig = new NCConfig(nodeId, configManager);
+            final NodeControllerService ncService = new NodeControllerService(ncConfig, appEntryPoint);
             if (LOGGER.isLoggable(Level.SEVERE)) {
                 LOGGER.severe("Setting uncaught exception handler " + ncService.getLifeCycleComponentManager());
             }
@@ -49,9 +53,20 @@
             while (true) {
                 Thread.sleep(10000);
             }
+        } catch (CmdLineException e) {
+            LOGGER.log(Level.FINE, "Exception parsing command line: " + Arrays.toString(args), e);
+            System.exit(2);
         } catch (Exception e) {
-            e.printStackTrace();
+            LOGGER.log(Level.SEVERE, "Exiting NCDriver due to exception", e);
             System.exit(1);
         }
     }
+
+    private static INCApplicationEntryPoint getAppEntryPoint(String[] args)
+            throws ClassNotFoundException, InstantiationException, IllegalAccessException, IOException {
+        // determine app class so that we can use the correct implementation of the configuration...
+        String appClassName = ConfigUtils.getOptionValue(args, NCConfig.Option.APP_CLASS);
+        return appClassName != null ? (INCApplicationEntryPoint) (Class.forName(appClassName)).newInstance()
+                : NCApplicationEntryPoint.INSTANCE;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index bf0ddb6..2ee9161 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -19,6 +19,7 @@
 package org.apache.hyracks.control.nc;
 
 import java.io.File;
+import java.io.IOException;
 import java.lang.management.GarbageCollectorMXBean;
 import java.lang.management.ManagementFactory;
 import java.lang.management.MemoryMXBean;
@@ -51,11 +52,11 @@
 import org.apache.hyracks.api.io.IODeviceHandle;
 import org.apache.hyracks.api.job.ActivityClusterGraph;
 import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.resource.NodeCapacity;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
 import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
 import org.apache.hyracks.api.service.IControllerService;
 import org.apache.hyracks.control.common.base.IClusterController;
+import org.apache.hyracks.control.common.config.ConfigManager;
 import org.apache.hyracks.control.common.context.ServerContext;
 import org.apache.hyracks.control.common.controllers.NCConfig;
 import org.apache.hyracks.control.common.controllers.NodeParameters;
@@ -84,6 +85,7 @@
 import org.apache.hyracks.ipc.impl.IPCSystem;
 import org.apache.hyracks.net.protocols.muxdemux.FullFrameChannelInterfaceFactory;
 import org.apache.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters;
+import org.kohsuke.args4j.CmdLineException;
 
 public class NodeControllerService implements IControllerService {
     private static final Logger LOGGER = Logger.getLogger(NodeControllerService.class.getName());
@@ -130,7 +132,7 @@
 
     private NCApplicationContext appCtx;
 
-    private INCApplicationEntryPoint ncAppEntryPoint;
+    private final INCApplicationEntryPoint ncAppEntryPoint;
 
     private final ILifeCycleComponentManager lccm;
 
@@ -154,13 +156,25 @@
 
     private MessagingNetworkManager messagingNetManager;
 
-    public NodeControllerService(NCConfig ncConfig) throws Exception {
-        this.ncConfig = ncConfig;
-        id = ncConfig.nodeId;
+    private final ConfigManager configManager;
 
-        ioManager = new IOManager(IODeviceHandle.getDevices(ncConfig.ioDevices));
+    public NodeControllerService(NCConfig config) throws Exception {
+        this(config, getApplicationEntryPoint(config));
+    }
+
+    public NodeControllerService(NCConfig config, INCApplicationEntryPoint aep) throws IOException, CmdLineException {
+        this.ncConfig = config;
+        this.configManager = ncConfig.getConfigManager();
+        if (aep == null) {
+            throw new IllegalArgumentException("INCApplicationEntryPoint cannot be null");
+        }
+        configManager.processConfig();
+        this.ncAppEntryPoint = aep;
+        id = ncConfig.getNodeId();
+
+        ioManager = new IOManager(IODeviceHandle.getDevices(ncConfig.getIODevices()));
         if (id == null) {
-            throw new Exception("id not set");
+            throw new HyracksException("id not set");
         }
 
         lccm = new LifeCycleComponentManager();
@@ -224,14 +238,16 @@
 
     private void init() throws Exception {
         ioManager.setExecutor(executor);
-        datasetPartitionManager = new DatasetPartitionManager(this, executor, ncConfig.resultManagerMemory,
-                ncConfig.resultTTL, ncConfig.resultSweepThreshold);
-        datasetNetworkManager = new DatasetNetworkManager(ncConfig.resultIPAddress, ncConfig.resultPort,
-                datasetPartitionManager, ncConfig.nNetThreads, ncConfig.nNetBuffers, ncConfig.resultPublicIPAddress,
-                ncConfig.resultPublicPort, FullFrameChannelInterfaceFactory.INSTANCE);
-        if (ncConfig.messagingIPAddress != null && appCtx.getMessagingChannelInterfaceFactory() != null) {
-            messagingNetManager = new MessagingNetworkManager(this, ncConfig.messagingIPAddress, ncConfig.messagingPort,
-                    ncConfig.nNetThreads, ncConfig.messagingPublicIPAddress, ncConfig.messagingPublicPort,
+        datasetPartitionManager = new DatasetPartitionManager(this, executor, ncConfig.getResultManagerMemory(),
+                ncConfig.getResultTTL(), ncConfig.getResultSweepThreshold());
+        datasetNetworkManager = new DatasetNetworkManager(ncConfig.getResultListenAddress(),
+                ncConfig.getResultListenPort(), datasetPartitionManager, ncConfig.getNetThreadCount(),
+                ncConfig.getNetBufferCount(), ncConfig.getResultPublicAddress(), ncConfig.getResultPublicPort(),
+                FullFrameChannelInterfaceFactory.INSTANCE);
+        if (ncConfig.getMessagingListenAddress() != null && appCtx.getMessagingChannelInterfaceFactory() != null) {
+            messagingNetManager = new MessagingNetworkManager(this, ncConfig.getMessagingListenAddress(),
+                    ncConfig.getMessagingListenPort(), ncConfig.getNetThreadCount(),
+                    ncConfig.getMessagingPublicAddress(), ncConfig.getMessagingPublicPort(),
                     appCtx.getMessagingChannelInterfaceFactory());
         }
     }
@@ -239,12 +255,13 @@
     @Override
     public void start() throws Exception {
         LOGGER.log(Level.INFO, "Starting NodeControllerService");
-        ipc = new IPCSystem(new InetSocketAddress(ncConfig.clusterNetIPAddress, ncConfig.clusterNetPort),
+        ipc = new IPCSystem(new InetSocketAddress(ncConfig.getClusterListenAddress(), ncConfig.getClusterListenPort()),
                 new NodeControllerIPCI(this), new CCNCFunctions.SerializerDeserializer());
         ipc.start();
         partitionManager = new PartitionManager(this);
-        netManager = new NetworkManager(ncConfig.dataIPAddress, ncConfig.dataPort, partitionManager,
-                ncConfig.nNetThreads, ncConfig.nNetBuffers, ncConfig.dataPublicIPAddress, ncConfig.dataPublicPort,
+        netManager = new NetworkManager(ncConfig.getDataListenAddress(), ncConfig.getDataListenPort(), partitionManager,
+                ncConfig.getNetThreadCount(), ncConfig.getNetBufferCount(), ncConfig.getDataPublicAddress(),
+                ncConfig.getDataPublicPort(),
                 FullFrameChannelInterfaceFactory.INSTANCE);
         netManager.start();
 
@@ -255,8 +272,9 @@
         if (messagingNetManager != null) {
             messagingNetManager.start();
         }
-        IIPCHandle ccIPCHandle = ipc.getHandle(new InetSocketAddress(ncConfig.ccHost, ncConfig.ccPort),
-                ncConfig.retries);
+        IIPCHandle ccIPCHandle = ipc.getHandle(
+                new InetSocketAddress(ncConfig.getClusterAddress(), ncConfig.getClusterPort()),
+                ncConfig.getClusterConnectRetries());
         this.ccs = new ClusterControllerRemoteProxy(ccIPCHandle);
         HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()];
         for (int i = 0; i < gcInfos.length; ++i) {
@@ -274,10 +292,7 @@
                 runtimeMXBean.getVmName(), runtimeMXBean.getVmVersion(), runtimeMXBean.getVmVendor(),
                 runtimeMXBean.getClassPath(), runtimeMXBean.getLibraryPath(), runtimeMXBean.getBootClassPath(),
                 runtimeMXBean.getInputArguments(), runtimeMXBean.getSystemProperties(), hbSchema, meesagingPort,
-                ncAppEntryPoint == null
-                        ? new NodeCapacity(Runtime.getRuntime().maxMemory(), allCores > 1 ? allCores - 1 : allCores)
-                        : ncAppEntryPoint.getCapacity(),
-                PidHelper.getPid()));
+                ncAppEntryPoint.getCapacity(), PidHelper.getPid()));
 
         synchronized (this) {
             while (registrationPending) {
@@ -307,21 +322,12 @@
         }
 
         LOGGER.log(Level.INFO, "Started NodeControllerService");
-        if (ncAppEntryPoint != null) {
-            ncAppEntryPoint.notifyStartupComplete();
-        }
+        ncAppEntryPoint.notifyStartupComplete();
     }
 
     private void startApplication() throws Exception {
         appCtx = new NCApplicationContext(this, serverCtx, ioManager, id, memoryManager, lccm, ncConfig.getAppConfig());
-        String className = ncConfig.appNCMainClass;
-        if (className != null) {
-            Class<?> c = Class.forName(className);
-            ncAppEntryPoint = (INCApplicationEntryPoint) c.newInstance();
-            String[] args = ncConfig.appArgs == null ? new String[0]
-                    : ncConfig.appArgs.toArray(new String[ncConfig.appArgs.size()]);
-            ncAppEntryPoint.start(appCtx, args);
-        }
+        ncAppEntryPoint.start(appCtx, ncConfig.getAppArgsArray());
         executor = Executors.newCachedThreadPool(appCtx.getThreadFactory());
     }
 
@@ -341,10 +347,8 @@
                 messagingNetManager.stop();
             }
             workQueue.stop();
-            if (ncAppEntryPoint != null) {
-                ncAppEntryPoint.stop();
-            }
-            /**
+            ncAppEntryPoint.stop();
+            /*
              * Stop heartbeat after NC has stopped to avoid false node failure detection
              * on CC if an NC takes a long time to stop.
              */
@@ -525,4 +529,14 @@
     public MessagingNetworkManager getMessagingNetworkManager() {
         return messagingNetManager;
     }
+
+    private static INCApplicationEntryPoint getApplicationEntryPoint(NCConfig config)
+            throws ClassNotFoundException, IllegalAccessException, InstantiationException {
+            if (config.getAppClass() != null) {
+                Class<?> c = Class.forName(config.getAppClass());
+                return (INCApplicationEntryPoint) c.newInstance();
+            } else {
+                return NCApplicationEntryPoint.INSTANCE;
+            }
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCApplicationContext.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCApplicationContext.java
index f52099d..6d549c2 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCApplicationContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCApplicationContext.java
@@ -22,10 +22,10 @@
 import java.io.OutputStream;
 import java.io.Serializable;
 
-import org.apache.hyracks.api.application.IApplicationConfig;
 import org.apache.hyracks.api.application.INCApplicationContext;
 import org.apache.hyracks.api.application.IStateDumpHandler;
 import org.apache.hyracks.api.comm.IChannelInterfaceFactory;
+import org.apache.hyracks.api.config.IApplicationConfig;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
 import org.apache.hyracks.api.resources.memory.IMemoryManager;
 import org.apache.hyracks.api.service.IControllerService;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/pom.xml b/hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/pom.xml
index 8d9a93b..ae7f272 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/pom.xml
@@ -31,9 +31,6 @@
     <dependency>
       <groupId>args4j</groupId>
       <artifactId>args4j</artifactId>
-      <version>2.0.12</version>
-      <type>jar</type>
-      <scope>compile</scope>
     </dependency>
     <dependency>
       <groupId>org.ini4j</groupId>
@@ -42,6 +39,11 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-api</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
       <artifactId>hyracks-control-common</artifactId>
       <version>${project.version}</version>
     </dependency>
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/src/main/java/org/apache/hyracks/control/nc/service/NCService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/src/main/java/org/apache/hyracks/control/nc/service/NCService.java
index 9b00cc2..6b11ecc 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/src/main/java/org/apache/hyracks/control/nc/service/NCService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/src/main/java/org/apache/hyracks/control/nc/service/NCService.java
@@ -23,6 +23,7 @@
 import java.io.InputStream;
 import java.io.ObjectInputStream;
 import java.io.StringReader;
+import java.io.StringWriter;
 import java.lang.management.ManagementFactory;
 import java.lang.management.OperatingSystemMXBean;
 import java.net.InetAddress;
@@ -35,7 +36,9 @@
 import java.util.logging.Logger;
 
 import org.apache.commons.lang3.SystemUtils;
-import org.apache.hyracks.control.common.controllers.IniUtils;
+import org.apache.hyracks.control.common.config.ConfigUtils;
+import org.apache.hyracks.control.common.controllers.NCConfig;
+import org.apache.hyracks.api.config.Section;
 import org.apache.hyracks.control.common.controllers.ServiceConstants;
 import org.apache.hyracks.control.common.controllers.ServiceConstants.ServiceCommand;
 import org.ini4j.Ini;
@@ -85,7 +88,7 @@
         // Find the command to run. For now, we allow overriding the name, but
         // still assume it's located in the bin/ directory of the deployment.
         // Even this is likely more configurability than we need.
-        String command = IniUtils.getString(ini, nodeSection, "command", "hyracksnc");
+        String command = ConfigUtils.getString(ini, nodeSection, NCConfig.Option.COMMAND.ini(), "hyracksnc");
         // app.home is specified by the Maven appassembler plugin. If it isn't set,
         // fall back to user's home dir. Again this is likely more flexibility
         // than we need.
@@ -112,7 +115,7 @@
     }
 
     private static void configEnvironment(Map<String, String> env) {
-        String jvmargs = IniUtils.getString(ini, nodeSection, "jvm.args", null);
+        String jvmargs = ConfigUtils.getString(ini, nodeSection, NCConfig.Option.JVM_ARGS.ini(), null);
         if (jvmargs != null) {
             LOGGER.info("Using JAVA_OPTS from conf file (jvm.args)");
         } else {
@@ -188,7 +191,13 @@
             return retval == 0;
         } catch (Exception e) {
             if (LOGGER.isLoggable(Level.SEVERE)) {
-                LOGGER.log(Level.SEVERE, "Configuration from CC broken", e);
+                StringWriter sw = new StringWriter();
+                try {
+                    ini.store(sw);
+                    LOGGER.log(Level.SEVERE, "Configuration from CC broken: \n" + sw.toString(), e);
+                } catch (IOException e1) {
+                    LOGGER.log(Level.SEVERE, "Configuration from CC broken, failed to serialize", e1);
+                }
             }
             return false;
         }
@@ -213,7 +222,7 @@
                 case START_NC:
                     String iniString = ois.readUTF();
                     ini = new Ini(new StringReader(iniString));
-                    ncId = IniUtils.getString(ini, "localnc", "id", "");
+                    ncId = ConfigUtils.getString(ini, Section.LOCALNC, NCConfig.Option.NODE_ID, "");
                     nodeSection = "nc/" + ncId;
                     return launchNCProcess();
                 case TERMINATE:
@@ -272,7 +281,7 @@
             try (ServerSocket listener = new ServerSocket(port, 5, addr)) {
                 boolean launched = false;
                 while (!launched) {
-                    LOGGER.info("Waiting for connection from CC on " + addr + ":" + port);
+                    LOGGER.info("Waiting for connection from CC on " + (addr == null ? "*" : addr) + ":" + port);
                     try (Socket socket = listener.accept()) {
                         // QQQ Because acceptConnection() doesn't return if the
                         // service is started appropriately, the socket remains
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/src/main/java/org/apache/hyracks/control/nc/service/NCServiceConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/src/main/java/org/apache/hyracks/control/nc/service/NCServiceConfig.java
index 91c9f6d..10fa679 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/src/main/java/org/apache/hyracks/control/nc/service/NCServiceConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/src/main/java/org/apache/hyracks/control/nc/service/NCServiceConfig.java
@@ -18,7 +18,7 @@
  */
 package org.apache.hyracks.control.nc.service;
 
-import org.apache.hyracks.control.common.controllers.IniUtils;
+import org.apache.hyracks.control.common.config.ConfigUtils;
 import org.ini4j.Ini;
 import org.kohsuke.args4j.Option;
 
@@ -58,10 +58,10 @@
      * It does not apply defaults or any logic.
      */
     private void loadINIFile() throws IOException {
-        ini = IniUtils.loadINIFile(configFile);
-        address = IniUtils.getString(ini, "ncservice", "address", address);
-        port = IniUtils.getInt(ini, "ncservice", "port", port);
-        logdir = IniUtils.getString(ini, "ncservice", "logdir", logdir);
+        ini = ConfigUtils.loadINIFile(configFile);
+        address = ConfigUtils.getString(ini, "ncservice", "address", address);
+        port = ConfigUtils.getInt(ini, "ncservice", "port", port);
+        logdir = ConfigUtils.getString(ini, "ncservice", "logdir", logdir);
     }
 
     /**
