Configuration Revamp

- Ini section of node / cc details now returns ini param names instead of
  managix option names
- Normalized command line -vs- ini file configuration parameter names
- Eliminated unused parameters
- Ini validation
- Migrate *DB parameters out of [app] and into nc / cc sections as
  appropriate
- Eliminate [app] section.  Cluster-wide configuration lives in [common]
- Sort properties alphabetically when returned by HTTP api

Change-Id: I95b7e0bd4538ef42817c8826e76412150074b754
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1487
Reviewed-by: Michael Blow <mblow@apache.org>
Integration-Tests: Michael Blow <mblow@apache.org>
Tested-by: Michael Blow <mblow@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/pom.xml b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/pom.xml
index 9dc3c1a..ba086ad 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/pom.xml
@@ -63,7 +63,7 @@
           <licenseFamilies combine.children="append">
             <licenseFamily implementation="org.apache.rat.license.MITLicenseFamily"/>
           </licenseFamilies>
-          <excludes>
+          <excludes combine.children="append">
             <!-- See hyracks-fullstack-license/src/main/licenses/templates/source_licenses.ftl -->
             <exclude>src/main/resources/static/javascript/flot/jquery.flot.resize.min.js</exclude>
             <exclude>src/main/resources/static/javascript/jsplumb/jquery.jsPlumb-1.3.5-all-min.js</exclude>
@@ -104,7 +104,6 @@
     <dependency>
       <groupId>args4j</groupId>
       <artifactId>args4j</artifactId>
-      <version>2.0.12</version>
     </dependency>
     <dependency>
       <groupId>org.apache.hyracks</groupId>
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/CCApplicationEntryPoint.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/CCApplicationEntryPoint.java
new file mode 100644
index 0000000..07008df
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/CCApplicationEntryPoint.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.cc;
+
+import java.util.Arrays;
+
+import org.apache.hyracks.api.application.ICCApplicationContext;
+import org.apache.hyracks.api.application.ICCApplicationEntryPoint;
+import org.apache.hyracks.api.config.IConfigManager;
+import org.apache.hyracks.api.config.Section;
+import org.apache.hyracks.api.job.resource.DefaultJobCapacityController;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
+import org.apache.hyracks.control.common.controllers.CCConfig;
+import org.apache.hyracks.control.common.controllers.ControllerConfig;
+import org.apache.hyracks.control.common.controllers.NCConfig;
+
+public class CCApplicationEntryPoint implements ICCApplicationEntryPoint {
+    public static final ICCApplicationEntryPoint INSTANCE = new CCApplicationEntryPoint();
+
+    protected CCApplicationEntryPoint() {
+    }
+
+    @Override
+    public void start(ICCApplicationContext ccAppCtx, String[] args) throws Exception {
+        if (args.length > 0) {
+            throw new IllegalArgumentException("Unrecognized argument(s): " + Arrays.toString(args));
+        }
+    }
+
+    @Override
+    public void stop() throws Exception {
+        // no-op
+    }
+
+    @Override
+    public void startupCompleted() throws Exception {
+        // no-op
+    }
+
+    @Override
+    public IJobCapacityController getJobCapacityController() {
+        return DefaultJobCapacityController.INSTANCE;
+    }
+
+    @Override
+    public void registerConfig(IConfigManager configManager) {
+        configManager.addIniParamOptions(ControllerConfig.Option.CONFIG_FILE, ControllerConfig.Option.CONFIG_FILE_URL);
+        configManager.addCmdLineSections(Section.CC, Section.COMMON);
+        configManager.setUsageFilter(getUsageFilter());
+        configManager.register(ControllerConfig.Option.class, CCConfig.Option.class, NCConfig.Option.class);
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/CCDriver.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/CCDriver.java
index dff3107..754deac 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/CCDriver.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/CCDriver.java
@@ -18,32 +18,50 @@
  */
 package org.apache.hyracks.control.cc;
 
-import org.kohsuke.args4j.CmdLineParser;
+import static org.apache.hyracks.control.common.controllers.CCConfig.Option.APP_CLASS;
 
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.hyracks.api.application.ICCApplicationEntryPoint;
+import org.apache.hyracks.control.common.config.ConfigManager;
+import org.apache.hyracks.control.common.config.ConfigUtils;
 import org.apache.hyracks.control.common.controllers.CCConfig;
+import org.kohsuke.args4j.CmdLineException;
 
 public class CCDriver {
-    public static void main(String args []) throws Exception {
-        try {
-            CCConfig ccConfig = new CCConfig();
-            CmdLineParser cp = new CmdLineParser(ccConfig);
-            try {
-                cp.parseArgument(args);
-            } catch (Exception e) {
-                System.err.println(e.getMessage());
-                cp.printUsage(System.err);
-                return;
-            }
-            ccConfig.loadConfigAndApplyDefaults();
+    private static final Logger LOGGER = Logger.getLogger(CCDriver.class.getName());
 
-            ClusterControllerService ccService = new ClusterControllerService(ccConfig);
+    private CCDriver() {
+    }
+
+    public static void main(String[] args) throws Exception {
+        try {
+            final ConfigManager configManager = new ConfigManager(args);
+            ICCApplicationEntryPoint appEntryPoint = getAppEntryPoint(args);
+            appEntryPoint.registerConfig(configManager);
+            CCConfig ccConfig = new CCConfig(configManager);
+            ClusterControllerService ccService = new ClusterControllerService(ccConfig, appEntryPoint);
             ccService.start();
             while (true) {
                 Thread.sleep(100000);
             }
+        } catch (CmdLineException e) {
+            LOGGER.log(Level.FINE, "Exception parsing command line: " + Arrays.toString(args), e);
+            System.exit(2);
         } catch (Exception e) {
-            e.printStackTrace();
+            LOGGER.log(Level.SEVERE, "Exiting NCDriver due to exception", e);
             System.exit(1);
         }
     }
+
+    private static ICCApplicationEntryPoint getAppEntryPoint(String[] args)
+            throws ClassNotFoundException, InstantiationException, IllegalAccessException, IOException {
+        // determine app class so that we can use the correct implementation of the configuration...
+        String appClassName = ConfigUtils.getOptionValue(args, APP_CLASS);
+        return appClassName != null ? (ICCApplicationEntryPoint) (Class.forName(appClassName)).newInstance()
+                : CCApplicationEntryPoint.INSTANCE;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index c47284c..21b9dcf 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -20,6 +20,7 @@
 
 import java.io.File;
 import java.io.FileReader;
+import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.net.InetAddress;
@@ -32,20 +33,22 @@
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.TreeMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hyracks.api.application.ICCApplicationEntryPoint;
 import org.apache.hyracks.api.client.ClusterControllerInfo;
 import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.config.IApplicationConfig;
 import org.apache.hyracks.api.context.ICCContext;
 import org.apache.hyracks.api.deployment.DeploymentId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.HyracksException;
-import org.apache.hyracks.api.job.resource.DefaultJobCapacityController;
 import org.apache.hyracks.api.job.resource.IJobCapacityController;
 import org.apache.hyracks.api.service.IControllerService;
 import org.apache.hyracks.api.topology.ClusterTopology;
@@ -66,9 +69,10 @@
 import org.apache.hyracks.control.cc.work.RemoveDeadNodesWork;
 import org.apache.hyracks.control.cc.work.ShutdownNCServiceWork;
 import org.apache.hyracks.control.cc.work.TriggerNCWork;
+import org.apache.hyracks.control.common.config.ConfigManager;
 import org.apache.hyracks.control.common.context.ServerContext;
 import org.apache.hyracks.control.common.controllers.CCConfig;
-import org.apache.hyracks.control.common.controllers.IniUtils;
+import org.apache.hyracks.control.common.controllers.NCConfig;
 import org.apache.hyracks.control.common.deployment.DeploymentRun;
 import org.apache.hyracks.control.common.ipc.CCNCFunctions;
 import org.apache.hyracks.control.common.logs.LogFile;
@@ -77,7 +81,6 @@
 import org.apache.hyracks.ipc.api.IIPCI;
 import org.apache.hyracks.ipc.impl.IPCSystem;
 import org.apache.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
-import org.ini4j.Ini;
 import org.xml.sax.InputSource;
 
 public class ClusterControllerService implements IControllerService {
@@ -85,6 +88,8 @@
 
     private final CCConfig ccConfig;
 
+    private final ConfigManager configManager;
+
     private IPCSystem clusterIPC;
 
     private IPCSystem clientIPC;
@@ -127,11 +132,22 @@
 
     private ShutdownRun shutdownCallback;
 
-    private ICCApplicationEntryPoint aep;
+    private final ICCApplicationEntryPoint aep;
 
-    public ClusterControllerService(final CCConfig ccConfig) throws Exception {
-        this.ccConfig = ccConfig;
-        File jobLogFolder = new File(ccConfig.ccRoot, "logs/jobs");
+    public ClusterControllerService(final CCConfig config) throws Exception {
+        this(config, getApplicationEntryPoint(config));
+    }
+
+    public ClusterControllerService(final CCConfig config,
+                                    final ICCApplicationEntryPoint aep) throws Exception {
+        this.ccConfig = config;
+        this.configManager = ccConfig.getConfigManager();
+        if (aep == null) {
+            throw new IllegalArgumentException("ICCApplicationEntryPoint cannot be null");
+        }
+        this.aep = aep;
+        configManager.processConfig();
+        File jobLogFolder = new File(ccConfig.getRootDir(), "logs/jobs");
         jobLog = new LogFile(jobLogFolder);
 
         // WorkQueue is in charge of heartbeat as well as other events.
@@ -140,7 +156,8 @@
         final ClusterTopology topology = computeClusterTopology(ccConfig);
         ccContext = new ClusterControllerContext(topology);
         sweeper = new DeadNodeSweeper();
-        datasetDirectoryService = new DatasetDirectoryService(ccConfig.resultTTL, ccConfig.resultSweepThreshold);
+        datasetDirectoryService = new DatasetDirectoryService(ccConfig.getResultTTL(),
+                ccConfig.getResultSweepThreshold());
 
         deploymentRunMap = new HashMap<>();
         stateDumpRunMap = new HashMap<>();
@@ -151,10 +168,10 @@
     }
 
     private static ClusterTopology computeClusterTopology(CCConfig ccConfig) throws Exception {
-        if (ccConfig.clusterTopologyDefinition == null) {
+        if (ccConfig.getClusterTopology() == null) {
             return null;
         }
-        FileReader fr = new FileReader(ccConfig.clusterTopologyDefinition);
+        FileReader fr = new FileReader(ccConfig.getClusterTopology());
         InputSource in = new InputSource(fr);
         try {
             return TopologyDefinitionParser.parse(in);
@@ -166,20 +183,21 @@
     @Override
     public void start() throws Exception {
         LOGGER.log(Level.INFO, "Starting ClusterControllerService: " + this);
-        serverCtx = new ServerContext(ServerContext.ServerType.CLUSTER_CONTROLLER, new File(ccConfig.ccRoot));
+        serverCtx = new ServerContext(ServerContext.ServerType.CLUSTER_CONTROLLER, new File(ccConfig.getRootDir()));
         IIPCI ccIPCI = new ClusterControllerIPCI(this);
-        clusterIPC = new IPCSystem(new InetSocketAddress(ccConfig.clusterNetPort), ccIPCI,
+        clusterIPC = new IPCSystem(new InetSocketAddress(ccConfig.getClusterListenPort()), ccIPCI,
                 new CCNCFunctions.SerializerDeserializer());
         IIPCI ciIPCI = new ClientInterfaceIPCI(this);
-        clientIPC = new IPCSystem(new InetSocketAddress(ccConfig.clientNetIpAddress, ccConfig.clientNetPort), ciIPCI,
-                new JavaSerializationBasedPayloadSerializerDeserializer());
-        webServer = new WebServer(this, ccConfig.httpPort);
+        clientIPC = new IPCSystem(
+                new InetSocketAddress(ccConfig.getClientListenAddress(), ccConfig.getClientListenPort()),
+                ciIPCI, new JavaSerializationBasedPayloadSerializerDeserializer());
+        webServer = new WebServer(this, ccConfig.getConsoleListenPort());
         clusterIPC.start();
         clientIPC.start();
         webServer.start();
-        info = new ClusterControllerInfo(ccConfig.clientNetIpAddress, ccConfig.clientNetPort,
+        info = new ClusterControllerInfo(ccConfig.getClientListenAddress(), ccConfig.getClientListenPort(),
                 webServer.getListeningPort());
-        timer.schedule(sweeper, 0, ccConfig.heartbeatPeriod);
+        timer.schedule(sweeper, 0, ccConfig.getHeartbeatPeriod());
         jobLog.open();
         startApplication();
 
@@ -194,84 +212,62 @@
         appCtx = new CCApplicationContext(this, serverCtx, ccContext, ccConfig.getAppConfig());
         appCtx.addJobLifecycleListener(datasetDirectoryService);
         executor = Executors.newCachedThreadPool(appCtx.getThreadFactory());
-        String className = ccConfig.appCCMainClass;
-
-        IJobCapacityController jobCapacityController = DefaultJobCapacityController.INSTANCE;
-        if (className != null) {
-            Class<?> c = Class.forName(className);
-            aep = (ICCApplicationEntryPoint) c.newInstance();
-            String[] args = ccConfig.appArgs == null ? null
-                    : ccConfig.appArgs.toArray(new String[ccConfig.appArgs.size()]);
-            aep.start(appCtx, args);
-            jobCapacityController = aep.getJobCapacityController();
-        }
+        aep.start(appCtx, ccConfig.getAppArgsArray());
+        IJobCapacityController jobCapacityController = aep.getJobCapacityController();
 
         // Job manager is in charge of job lifecycle management.
         try {
             Constructor<?> jobManagerConstructor = this.getClass().getClassLoader()
-                    .loadClass(ccConfig.jobManagerClassName)
+                    .loadClass(ccConfig.getJobManagerClass())
                     .getConstructor(CCConfig.class, ClusterControllerService.class, IJobCapacityController.class);
             jobManager = (IJobManager) jobManagerConstructor.newInstance(ccConfig, this, jobCapacityController);
         } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | NoSuchMethodException
                 | InvocationTargetException e) {
             if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.log(Level.WARNING, "class " + ccConfig.jobManagerClassName + " could not be used: ", e);
+                LOGGER.log(Level.WARNING, "class " + ccConfig.getJobManagerClass() + " could not be used: ", e);
             }
             // Falls back to the default implementation if the user-provided class name is not valid.
             jobManager = new JobManager(ccConfig, this, jobCapacityController);
         }
     }
 
-    private void connectNCs() throws Exception {
-        Ini ini = ccConfig.getIni();
-        if (ini == null || Boolean.parseBoolean(ini.get("cc", "virtual.cluster"))) {
-            return;
-        }
-        for (String section : ini.keySet()) {
-            if (!section.startsWith("nc/")) {
-                continue;
+    private Map<String, Pair<String, Integer>> getNCServices() throws IOException {
+        Map<String, Pair<String, Integer>> ncMap = new TreeMap<>();
+        for (String ncId : configManager.getNodeNames()) {
+            IApplicationConfig ncConfig = configManager.getNodeEffectiveConfig(ncId);
+            if (!ncConfig.getBoolean(NCConfig.Option.VIRTUAL_NC)) {
+                ncMap.put(ncId, Pair.of(ncConfig.getString(NCConfig.Option.NCSERVICE_ADDRESS),
+                        ncConfig.getInt(NCConfig.Option.NCSERVICE_PORT)));
             }
-            String ncid = section.substring(3);
-            String address = IniUtils.getString(ini, section, "address", null);
-            int port = IniUtils.getInt(ini, section, "port", 9090);
-            if (address == null) {
-                address = InetAddress.getLoopbackAddress().getHostAddress();
-            }
-            workQueue.schedule(new TriggerNCWork(this, address, port, ncid));
         }
+        return ncMap;
+    }
+
+    private void connectNCs() throws IOException {
+        getNCServices().entrySet().forEach(ncService -> {
+            final TriggerNCWork triggerWork = new TriggerNCWork(ClusterControllerService.this,
+                    ncService.getValue().getLeft(), ncService.getValue().getRight(), ncService.getKey());
+            workQueue.schedule(triggerWork);
+        });
     }
 
     private void terminateNCServices() throws Exception {
-        Ini ini = ccConfig.getIni();
-        if (ini == null || Boolean.parseBoolean(ini.get("cc", "virtual.cluster"))) {
-            return;
-        }
         List<ShutdownNCServiceWork> shutdownNCServiceWorks = new ArrayList<>();
-        for (String section : ini.keySet()) {
-            if (!section.startsWith("nc/")) {
-                continue;
-            }
-            String ncid = section.substring(3);
-            String address = IniUtils.getString(ini, section, "address", null);
-            int port = IniUtils.getInt(ini, section, "port", 9090);
-            if (address == null) {
-                address = InetAddress.getLoopbackAddress().getHostAddress();
-            }
-            ShutdownNCServiceWork shutdownWork = new ShutdownNCServiceWork(address, port, ncid);
+        getNCServices().entrySet().forEach(ncService -> {
+            ShutdownNCServiceWork shutdownWork = new ShutdownNCServiceWork(ncService.getValue().getLeft(),
+                    ncService.getValue().getRight(), ncService.getKey());
             workQueue.schedule(shutdownWork);
             shutdownNCServiceWorks.add(shutdownWork);
-        }
+        });
         for (ShutdownNCServiceWork shutdownWork : shutdownNCServiceWorks) {
             shutdownWork.sync();
         }
     }
 
     private void notifyApplication() throws Exception {
-        if (aep != null) {
-            // Sometimes, there is no application entry point. Check hyracks-client project
-            aep.startupCompleted();
-        }
+        aep.startupCompleted();
     }
+
     public void stop(boolean terminateNCService) throws Exception {
         if (terminateNCService) {
             terminateNCServices();
@@ -294,9 +290,7 @@
     }
 
     private void stopApplication() throws Exception {
-        if (aep != null) {
-            aep.stop();
-        }
+        aep.stop();
     }
 
     public ServerContext getServerContext() {
@@ -360,7 +354,7 @@
     }
 
     public NetworkAddress getDatasetDirectoryServiceInfo() {
-        return new NetworkAddress(ccConfig.clientNetIpAddress, ccConfig.clientNetPort);
+        return new NetworkAddress(ccConfig.getClientListenAddress(), ccConfig.getClientListenPort());
     }
 
     private final class ClusterControllerContext implements ICCContext {
@@ -390,6 +384,7 @@
         public ClusterTopology getClusterTopology() {
             return topology;
         }
+
     }
 
     private class DeadNodeSweeper extends TimerTask {
@@ -458,4 +453,14 @@
     public ThreadDumpRun removeThreadDumpRun(String requestKey) {
         return threadDumpRunMap.remove(requestKey);
     }
+
+    private static ICCApplicationEntryPoint getApplicationEntryPoint(CCConfig ccConfig)
+            throws ClassNotFoundException, IllegalAccessException, InstantiationException {
+        if (ccConfig.getAppClass() != null) {
+            Class<?> c = Class.forName(ccConfig.getAppClass());
+            return (ICCApplicationEntryPoint) c.newInstance();
+        } else {
+            return CCApplicationEntryPoint.INSTANCE;
+        }
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
index 955b7f2..8400a59 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
@@ -282,7 +282,7 @@
     public synchronized ObjectNode toSummaryJSON()  {
         ObjectMapper om = new ObjectMapper();
         ObjectNode o = om.createObjectNode();
-        o.put("node-id", ncConfig.nodeId);
+        o.put("node-id", ncConfig.getNodeId());
         o.put("heap-used", heapUsedSize[(rrdPtr + RRD_SIZE - 1) % RRD_SIZE]);
         o.put("system-load-average", systemLoadAverage[(rrdPtr + RRD_SIZE - 1) % RRD_SIZE]);
 
@@ -293,7 +293,7 @@
         ObjectMapper om = new ObjectMapper();
         ObjectNode o = om.createObjectNode();
 
-        o.put("node-id", ncConfig.nodeId);
+        o.put("node-id", ncConfig.getNodeId());
 
         if (includeConfig) {
             o.put("os-name", osName);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java
index 77b9b17..a8b03bc 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java
@@ -27,9 +27,10 @@
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.hyracks.api.application.IApplicationConfig;
 import org.apache.hyracks.api.application.ICCApplicationContext;
 import org.apache.hyracks.api.application.IClusterLifecycleListener;
+import org.apache.hyracks.api.config.IApplicationConfig;
+import org.apache.hyracks.api.config.IOption;
 import org.apache.hyracks.api.context.ICCContext;
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.IJobLifecycleListener;
@@ -105,7 +106,7 @@
         clusterLifecycleListeners.add(clusterLifecycleListener);
     }
 
-    public void notifyNodeJoin(String nodeId, Map<String, String> ncConfiguration) throws HyracksException {
+    public void notifyNodeJoin(String nodeId, Map<IOption, Object> ncConfiguration) throws HyracksException {
         for (IClusterLifecycleListener l : clusterLifecycleListeners) {
             l.notifyNodeJoin(nodeId, ncConfiguration);
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
index 354019c..d6d8bc4 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
@@ -137,7 +137,7 @@
             Map.Entry<String, NodeControllerState> entry = nodeIterator.next();
             String nodeId = entry.getKey();
             NodeControllerState state = entry.getValue();
-            if (state.incrementLastHeartbeatDuration() >= ccConfig.maxHeartbeatLapsePeriods) {
+            if (state.incrementLastHeartbeatDuration() >= ccConfig.getHeartbeatMaxMisses()) {
                 deadNodes.add(nodeId);
                 affectedJobIds.addAll(state.getActiveJobIds());
                 // Removes the node from node map.
@@ -172,10 +172,7 @@
 
     // Retrieves the IP address for a given node.
     private InetAddress getIpAddress(NodeControllerState ncState) throws HyracksException {
-        String ipAddress = ncState.getNCConfig().dataIPAddress;
-        if (ncState.getNCConfig().dataPublicIPAddress != null) {
-            ipAddress = ncState.getNCConfig().dataPublicIPAddress;
-        }
+        String ipAddress = ncState.getNCConfig().getDataPublicAddress();
         try {
             return InetAddress.getByName(ipAddress);
         } catch (UnknownHostException e) {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index 031303b..b35de3d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -68,13 +68,13 @@
         this.ccs = ccs;
         this.jobCapacityController = jobCapacityController;
         try {
-            Constructor<?> jobQueueConstructor = this.getClass().getClassLoader().loadClass(ccConfig.jobQueueClassName)
+            Constructor<?> jobQueueConstructor = this.getClass().getClassLoader().loadClass(ccConfig.getJobQueueClass())
                     .getConstructor(IJobManager.class, IJobCapacityController.class);
             jobQueue = (IJobQueue) jobQueueConstructor.newInstance(this, this.jobCapacityController);
         } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | NoSuchMethodException
                 | InvocationTargetException e) {
             if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.log(Level.WARNING, "class " + ccConfig.jobQueueClassName + " could not be used: ", e);
+                LOGGER.log(Level.WARNING, "class " + ccConfig.getJobQueueClass() + " could not be used: ", e);
             }
             // Falls back to the default implementation if the user-provided class name is not valid.
             jobQueue = new FIFOJobQueue(this, jobCapacityController);
@@ -85,13 +85,13 @@
 
             @Override
             protected boolean removeEldestEntry(Map.Entry<JobId, JobRun> eldest) {
-                return size() > ccConfig.jobHistorySize;
+                return size() > ccConfig.getJobHistorySize();
             }
         };
         runMapHistory = new LinkedHashMap<JobId, List<Exception>>() {
             private static final long serialVersionUID = 1L;
             /** history size + 1 is for the case when history size = 0 */
-            private int allowedSize = 100 * (ccConfig.jobHistorySize + 1);
+            private int allowedSize = 100 * (ccConfig.getJobHistorySize() + 1);
 
             @Override
             protected boolean removeEldestEntry(Map.Entry<JobId, List<Exception>> eldest) {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeDetailsJSONWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeDetailsJSONWork.java
index 0577002..3dec959 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeDetailsJSONWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeDetailsJSONWork.java
@@ -27,28 +27,26 @@
 import java.lang.management.OperatingSystemMXBean;
 import java.lang.management.RuntimeMXBean;
 import java.lang.management.ThreadMXBean;
-import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Date;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.hyracks.control.cc.NodeControllerState;
-import org.apache.hyracks.control.cc.cluster.INodeManager;
-import org.apache.hyracks.control.common.controllers.CCConfig;
-import org.apache.hyracks.control.common.utils.PidHelper;
-import org.apache.hyracks.control.common.work.IPCResponder;
-import org.apache.hyracks.control.common.work.SynchronizableWork;
-import org.kohsuke.args4j.Option;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.hyracks.api.config.Section;
+import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.cluster.INodeManager;
+import org.apache.hyracks.control.common.config.ConfigUtils;
+import org.apache.hyracks.control.common.controllers.CCConfig;
+import org.apache.hyracks.control.common.controllers.NCConfig;
+import org.apache.hyracks.control.common.utils.PidHelper;
+import org.apache.hyracks.control.common.work.IPCResponder;
+import org.apache.hyracks.control.common.work.SynchronizableWork;
 
 public class GetNodeDetailsJSONWork extends SynchronizableWork {
-    private static final Logger LOGGER = Logger.getLogger(GetNodeDetailsJSONWork.class.getName());
+    private static final Section [] CC_SECTIONS = { Section.CC, Section.COMMON };
+    private static final Section [] NC_SECTIONS = { Section.NC, Section.COMMON };
+
     private final INodeManager nodeManager;
     private final CCConfig ccConfig;
     private final String nodeId;
@@ -59,7 +57,7 @@
     private ObjectMapper om = new ObjectMapper();
 
     public GetNodeDetailsJSONWork(INodeManager nodeManager, CCConfig ccConfig, String nodeId, boolean includeStats,
-                                  boolean includeConfig, IPCResponder<String> callback) {
+            boolean includeConfig, IPCResponder<String> callback) {
         this.nodeManager = nodeManager;
         this.ccConfig = ccConfig;
         this.nodeId = nodeId;
@@ -69,7 +67,7 @@
     }
 
     public GetNodeDetailsJSONWork(INodeManager nodeManager, CCConfig ccConfig, String nodeId, boolean includeStats,
-                                  boolean includeConfig) {
+            boolean includeConfig) {
         this(nodeManager, ccConfig, nodeId, includeStats, includeConfig, null);
     }
 
@@ -79,14 +77,18 @@
             // null nodeId is a request for CC
             detail = getCCDetails();
             if (includeConfig) {
-                addIni(detail, ccConfig);
+                ConfigUtils.addConfigToJSON(detail, ccConfig.getAppConfig(), CC_SECTIONS);
+                detail.putPOJO("app.args", ccConfig.getAppArgs());
             }
         } else {
             NodeControllerState ncs = nodeManager.getNodeControllerState(nodeId);
             if (ncs != null) {
                 detail = ncs.toDetailedJSON(includeStats, includeConfig);
                 if (includeConfig) {
-                    addIni(detail, ncs.getNCConfig());
+                    final NCConfig ncConfig = ncs.getNCConfig();
+                    ConfigUtils.addConfigToJSON(detail, ncConfig.getConfigManager().getNodeEffectiveConfig(nodeId),
+                            NC_SECTIONS);
+                    detail.putPOJO("app.args", ncConfig.getAppArgs());
                 }
             }
         }
@@ -96,7 +98,7 @@
         }
     }
 
-    private ObjectNode getCCDetails()  {
+    private ObjectNode getCCDetails() {
         ObjectNode o = om.createObjectNode();
         MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
         List<GarbageCollectorMXBean> gcMXBeans = ManagementFactory.getGarbageCollectorMXBeans();
@@ -151,33 +153,6 @@
         return o;
     }
 
-    private static void addIni(ObjectNode o, Object configBean)  {
-        Map<String, Object> iniMap = new HashMap<>();
-        for (Field f : configBean.getClass().getFields()) {
-            Option option = f.getAnnotation(Option.class);
-            if (option == null) {
-                continue;
-            }
-            final String optionName = option.name();
-            Object value = null;
-            try {
-                value = f.get(configBean);
-            } catch (IllegalAccessException e) {
-                LOGGER.log(Level.WARNING, "Unable to access ini option " + optionName, e);
-            }
-            if (value != null) {
-                if ("--".equals(optionName)) {
-                    iniMap.put("app_args", value);
-                } else {
-                    iniMap.put(optionName.substring(1).replace('-', '_'),
-                            "-iodevices".equals(optionName)
-                            ? String.valueOf(value).split(",")
-                            : value);
-                }
-            }
-        }
-        o.putPOJO("ini", iniMap);
-    }
 
     public ObjectNode getDetail() {
         return detail;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
index dc93515..e97950e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
@@ -23,6 +23,8 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.hyracks.api.config.IApplicationConfig;
+import org.apache.hyracks.api.config.IOption;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.cc.NodeControllerState;
 import org.apache.hyracks.control.cc.cluster.INodeManager;
@@ -50,19 +52,22 @@
         String id = reg.getNodeId();
         IIPCHandle ncIPCHandle = ccs.getClusterIPC().getHandle(reg.getNodeControllerAddress());
         CCNCFunctions.NodeRegistrationResult result;
-        Map<String, String> ncConfiguration = new HashMap<>();
+        Map<IOption, Object> ncConfiguration = new HashMap<>();
         try {
             INodeController nodeController = new NodeControllerRemoteProxy(ncIPCHandle);
             NodeControllerState state = new NodeControllerState(nodeController, reg);
             INodeManager nodeManager = ccs.getNodeManager();
             nodeManager.addNode(id, state);
-            state.getNCConfig().toMap(ncConfiguration);
+            IApplicationConfig cfg = state.getNCConfig().getConfigManager().getNodeEffectiveConfig(id);
+            for (IOption option : cfg.getOptions()) {
+                ncConfiguration.put(option, cfg.get(option));
+            }
             LOGGER.log(Level.INFO, "Registered INodeController: id = " + id);
             NodeParameters params = new NodeParameters();
             params.setClusterControllerInfo(ccs.getClusterControllerInfo());
             params.setDistributedState(ccs.getApplicationContext().getDistributedState());
-            params.setHeartbeatPeriod(ccs.getCCConfig().heartbeatPeriod);
-            params.setProfileDumpPeriod(ccs.getCCConfig().profileDumpPeriod);
+            params.setHeartbeatPeriod(ccs.getCCConfig().getHeartbeatPeriod());
+            params.setProfileDumpPeriod(ccs.getCCConfig().getProfileDumpPeriod());
             result = new CCNCFunctions.NodeRegistrationResult(params, null);
         } catch (Exception e) {
             result = new CCNCFunctions.NodeRegistrationResult(null, e);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TriggerNCWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TriggerNCWork.java
index a7bca25..ab526e8 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TriggerNCWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TriggerNCWork.java
@@ -27,7 +27,9 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.hyracks.api.config.Section;
 import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.common.controllers.NCConfig;
 import org.apache.hyracks.control.common.controllers.ServiceConstants.ServiceCommand;
 import org.apache.hyracks.control.common.work.AbstractWork;
 import org.ini4j.Ini;
@@ -79,14 +81,18 @@
 
     /**
      * Given an Ini object, serialize it to String with some enhancements.
-     * @param ccini
+     * @param ccini the ini file to decorate and forward to NC
      */
-    String serializeIni(Ini ccini) throws IOException {
+    private String serializeIni(Ini ccini) throws IOException {
         StringWriter iniString = new StringWriter();
-        ccini.store(iniString);
+        ccini.get(Section.NC.sectionName()).putIfAbsent(NCConfig.Option.CLUSTER_ADDRESS.ini(),
+                ccs.getCCConfig().getClusterPublicAddress());
+        ccini.get(Section.NC.sectionName()).putIfAbsent(NCConfig.Option.CLUSTER_PORT.ini(),
+                String.valueOf(ccs.getCCConfig().getClusterPublicPort()));
         // Finally insert *this* NC's name into localnc section - this is a fixed
         // entry point so that NCs can determine where all their config is.
-        iniString.append("\n[localnc]\nid=").append(ncId).append("\n");
+        ccini.put(Section.LOCALNC.sectionName(), NCConfig.Option.NODE_ID.ini(), ncId);
+        ccini.store(iniString);
         if (LOGGER.isLoggable(Level.FINE)) {
             LOGGER.fine("Returning Ini file:\n" + iniString.toString());
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
index c742a4a..dde3bad 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
@@ -46,8 +46,8 @@
     public void testNormal() throws HyracksException {
         IResourceManager resourceManager = new ResourceManager();
         INodeManager nodeManager = new NodeManager(makeCCConfig(), resourceManager);
-        NodeControllerState ncState1 = mockNodeControllerState(false);
-        NodeControllerState ncState2 = mockNodeControllerState(false);
+        NodeControllerState ncState1 = mockNodeControllerState(NODE1, false);
+        NodeControllerState ncState2 = mockNodeControllerState(NODE2, false);
 
         // Verifies states after adding nodes.
         nodeManager.addNode(NODE1, ncState1);
@@ -71,7 +71,7 @@
     public void testException() throws HyracksException {
         IResourceManager resourceManager = new ResourceManager();
         INodeManager nodeManager = new NodeManager(makeCCConfig(), resourceManager);
-        NodeControllerState ncState1 = mockNodeControllerState(true);
+        NodeControllerState ncState1 = mockNodeControllerState(NODE1, true);
 
         boolean invalidNetworkAddress = false;
         // Verifies states after a failure during adding nodes.
@@ -106,11 +106,11 @@
 
     private CCConfig makeCCConfig() {
         CCConfig ccConfig = new CCConfig();
-        ccConfig.maxHeartbeatLapsePeriods = 0;
+        ccConfig.setHeartbeatMaxMisses(0);
         return ccConfig;
     }
 
-    private NodeControllerState mockNodeControllerState(boolean invalidIpAddr) {
+    private NodeControllerState mockNodeControllerState(String nodeId, boolean invalidIpAddr) {
         NodeControllerState ncState = mock(NodeControllerState.class);
         String ipAddr = invalidIpAddr ? "255.255.255:255" : "127.0.0.2";
         NetworkAddress dataAddr = new NetworkAddress(ipAddr, 1001);
@@ -120,8 +120,8 @@
         when(ncState.getDataPort()).thenReturn(dataAddr);
         when(ncState.getDatasetPort()).thenReturn(resultAddr);
         when(ncState.getMessagingPort()).thenReturn(msgAddr);
-        NCConfig ncConfig = new NCConfig();
-        ncConfig.dataIPAddress = ipAddr;
+        NCConfig ncConfig = new NCConfig(nodeId);
+        ncConfig.setDataPublicAddress(ipAddr);
         when(ncState.getNCConfig()).thenReturn(ncConfig);
         return ncState;
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
index 3bb08bd..97b05e7 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
@@ -27,6 +27,7 @@
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -48,14 +49,23 @@
 import org.apache.hyracks.control.common.controllers.CCConfig;
 import org.apache.hyracks.control.common.logs.LogFile;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
+import org.kohsuke.args4j.CmdLineException;
 import org.mockito.Mockito;
 
 public class JobManagerTest {
 
+    private CCConfig ccConfig;
+
+    @Before
+    public void setup() throws IOException, CmdLineException {
+        ccConfig = new CCConfig();
+        ccConfig.getConfigManager().processConfig();
+    }
+
     @Test
-    public void test() throws HyracksException {
-        CCConfig ccConfig = new CCConfig();
+    public void test() throws IOException, CmdLineException {
         IJobCapacityController jobCapacityController = mock(IJobCapacityController.class);
         IJobManager jobManager = spy(new JobManager(ccConfig, mockClusterControllerService(), jobCapacityController));
 
@@ -114,7 +124,7 @@
         }
         Assert.assertTrue(jobManager.getRunningJobs().size() == 4096);
         Assert.assertTrue(jobManager.getPendingJobs().isEmpty());
-        Assert.assertTrue(jobManager.getArchivedJobs().size() == ccConfig.jobHistorySize);
+        Assert.assertTrue(jobManager.getArchivedJobs().size() == ccConfig.getJobHistorySize());
 
         // Completes deferred jobs.
         for (JobRun run : deferredRuns) {
@@ -123,14 +133,13 @@
         }
         Assert.assertTrue(jobManager.getRunningJobs().isEmpty());
         Assert.assertTrue(jobManager.getPendingJobs().isEmpty());
-        Assert.assertTrue(jobManager.getArchivedJobs().size() == ccConfig.jobHistorySize);
+        Assert.assertTrue(jobManager.getArchivedJobs().size() == ccConfig.getJobHistorySize());
         verify(jobManager, times(8192)).prepareComplete(any(), any(), any());
         verify(jobManager, times(8192)).finalComplete(any());
     }
 
     @Test
     public void testExceedMax() throws HyracksException {
-        CCConfig ccConfig = new CCConfig();
         IJobCapacityController jobCapacityController = mock(IJobCapacityController.class);
         IJobManager jobManager = spy(new JobManager(ccConfig, mockClusterControllerService(), jobCapacityController));
         boolean rejected = false;
@@ -154,7 +163,6 @@
 
     @Test
     public void testAdmitThenReject() throws HyracksException {
-        CCConfig ccConfig = new CCConfig();
         IJobCapacityController jobCapacityController = mock(IJobCapacityController.class);
         IJobManager jobManager = spy(new JobManager(ccConfig, mockClusterControllerService(), jobCapacityController));
 
@@ -185,7 +193,6 @@
 
     @Test
     public void testNullJob() throws HyracksException {
-        CCConfig ccConfig = new CCConfig();
         IJobCapacityController jobCapacityController = mock(IJobCapacityController.class);
         IJobManager jobManager = new JobManager(ccConfig, mockClusterControllerService(), jobCapacityController);
         boolean invalidParameter = false;
@@ -249,7 +256,7 @@
         }
 
         Assert.assertTrue(jobManager.getPendingJobs().isEmpty());
-        Assert.assertTrue(jobManager.getArchivedJobs().size() == ccConfig.jobHistorySize);
+        Assert.assertTrue(jobManager.getArchivedJobs().size() == ccConfig.getJobHistorySize());
         verify(jobManager, times(0)).prepareComplete(any(), any(), any());
         verify(jobManager, times(0)).finalComplete(any());
     }