Configuration Revamp
- Ini section of node / cc details now returns ini param names instead of
managix option names
- Normalized command line -vs- ini file configuration parameter names
- Eliminated unused parameters
- Ini validation
- Migrate *DB parameters out of [app] and into nc / cc sections as
appropriate
- Eliminate [app] section. Cluster-wide configuration lives in [common]
- Sort properties alphabetically when returned by HTTP api
Change-Id: I95b7e0bd4538ef42817c8826e76412150074b754
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1487
Reviewed-by: Michael Blow <mblow@apache.org>
Integration-Tests: Michael Blow <mblow@apache.org>
Tested-by: Michael Blow <mblow@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/pom.xml b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/pom.xml
index 9dc3c1a..ba086ad 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/pom.xml
@@ -63,7 +63,7 @@
<licenseFamilies combine.children="append">
<licenseFamily implementation="org.apache.rat.license.MITLicenseFamily"/>
</licenseFamilies>
- <excludes>
+ <excludes combine.children="append">
<!-- See hyracks-fullstack-license/src/main/licenses/templates/source_licenses.ftl -->
<exclude>src/main/resources/static/javascript/flot/jquery.flot.resize.min.js</exclude>
<exclude>src/main/resources/static/javascript/jsplumb/jquery.jsPlumb-1.3.5-all-min.js</exclude>
@@ -104,7 +104,6 @@
<dependency>
<groupId>args4j</groupId>
<artifactId>args4j</artifactId>
- <version>2.0.12</version>
</dependency>
<dependency>
<groupId>org.apache.hyracks</groupId>
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/CCApplicationEntryPoint.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/CCApplicationEntryPoint.java
new file mode 100644
index 0000000..07008df
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/CCApplicationEntryPoint.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.cc;
+
+import java.util.Arrays;
+
+import org.apache.hyracks.api.application.ICCApplicationContext;
+import org.apache.hyracks.api.application.ICCApplicationEntryPoint;
+import org.apache.hyracks.api.config.IConfigManager;
+import org.apache.hyracks.api.config.Section;
+import org.apache.hyracks.api.job.resource.DefaultJobCapacityController;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
+import org.apache.hyracks.control.common.controllers.CCConfig;
+import org.apache.hyracks.control.common.controllers.ControllerConfig;
+import org.apache.hyracks.control.common.controllers.NCConfig;
+
+public class CCApplicationEntryPoint implements ICCApplicationEntryPoint {
+ public static final ICCApplicationEntryPoint INSTANCE = new CCApplicationEntryPoint();
+
+ protected CCApplicationEntryPoint() {
+ }
+
+ @Override
+ public void start(ICCApplicationContext ccAppCtx, String[] args) throws Exception {
+ if (args.length > 0) {
+ throw new IllegalArgumentException("Unrecognized argument(s): " + Arrays.toString(args));
+ }
+ }
+
+ @Override
+ public void stop() throws Exception {
+ // no-op
+ }
+
+ @Override
+ public void startupCompleted() throws Exception {
+ // no-op
+ }
+
+ @Override
+ public IJobCapacityController getJobCapacityController() {
+ return DefaultJobCapacityController.INSTANCE;
+ }
+
+ @Override
+ public void registerConfig(IConfigManager configManager) {
+ configManager.addIniParamOptions(ControllerConfig.Option.CONFIG_FILE, ControllerConfig.Option.CONFIG_FILE_URL);
+ configManager.addCmdLineSections(Section.CC, Section.COMMON);
+ configManager.setUsageFilter(getUsageFilter());
+ configManager.register(ControllerConfig.Option.class, CCConfig.Option.class, NCConfig.Option.class);
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/CCDriver.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/CCDriver.java
index dff3107..754deac 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/CCDriver.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/CCDriver.java
@@ -18,32 +18,50 @@
*/
package org.apache.hyracks.control.cc;
-import org.kohsuke.args4j.CmdLineParser;
+import static org.apache.hyracks.control.common.controllers.CCConfig.Option.APP_CLASS;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.hyracks.api.application.ICCApplicationEntryPoint;
+import org.apache.hyracks.control.common.config.ConfigManager;
+import org.apache.hyracks.control.common.config.ConfigUtils;
import org.apache.hyracks.control.common.controllers.CCConfig;
+import org.kohsuke.args4j.CmdLineException;
public class CCDriver {
- public static void main(String args []) throws Exception {
- try {
- CCConfig ccConfig = new CCConfig();
- CmdLineParser cp = new CmdLineParser(ccConfig);
- try {
- cp.parseArgument(args);
- } catch (Exception e) {
- System.err.println(e.getMessage());
- cp.printUsage(System.err);
- return;
- }
- ccConfig.loadConfigAndApplyDefaults();
+ private static final Logger LOGGER = Logger.getLogger(CCDriver.class.getName());
- ClusterControllerService ccService = new ClusterControllerService(ccConfig);
+ private CCDriver() {
+ }
+
+ public static void main(String[] args) throws Exception {
+ try {
+ final ConfigManager configManager = new ConfigManager(args);
+ ICCApplicationEntryPoint appEntryPoint = getAppEntryPoint(args);
+ appEntryPoint.registerConfig(configManager);
+ CCConfig ccConfig = new CCConfig(configManager);
+ ClusterControllerService ccService = new ClusterControllerService(ccConfig, appEntryPoint);
ccService.start();
while (true) {
Thread.sleep(100000);
}
+ } catch (CmdLineException e) {
+ LOGGER.log(Level.FINE, "Exception parsing command line: " + Arrays.toString(args), e);
+ System.exit(2);
} catch (Exception e) {
- e.printStackTrace();
+ LOGGER.log(Level.SEVERE, "Exiting NCDriver due to exception", e);
System.exit(1);
}
}
+
+ private static ICCApplicationEntryPoint getAppEntryPoint(String[] args)
+ throws ClassNotFoundException, InstantiationException, IllegalAccessException, IOException {
+ // determine app class so that we can use the correct implementation of the configuration...
+ String appClassName = ConfigUtils.getOptionValue(args, APP_CLASS);
+ return appClassName != null ? (ICCApplicationEntryPoint) (Class.forName(appClassName)).newInstance()
+ : CCApplicationEntryPoint.INSTANCE;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index c47284c..21b9dcf 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -20,6 +20,7 @@
import java.io.File;
import java.io.FileReader;
+import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.net.InetAddress;
@@ -32,20 +33,22 @@
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
+import java.util.TreeMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hyracks.api.application.ICCApplicationEntryPoint;
import org.apache.hyracks.api.client.ClusterControllerInfo;
import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.config.IApplicationConfig;
import org.apache.hyracks.api.context.ICCContext;
import org.apache.hyracks.api.deployment.DeploymentId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.HyracksException;
-import org.apache.hyracks.api.job.resource.DefaultJobCapacityController;
import org.apache.hyracks.api.job.resource.IJobCapacityController;
import org.apache.hyracks.api.service.IControllerService;
import org.apache.hyracks.api.topology.ClusterTopology;
@@ -66,9 +69,10 @@
import org.apache.hyracks.control.cc.work.RemoveDeadNodesWork;
import org.apache.hyracks.control.cc.work.ShutdownNCServiceWork;
import org.apache.hyracks.control.cc.work.TriggerNCWork;
+import org.apache.hyracks.control.common.config.ConfigManager;
import org.apache.hyracks.control.common.context.ServerContext;
import org.apache.hyracks.control.common.controllers.CCConfig;
-import org.apache.hyracks.control.common.controllers.IniUtils;
+import org.apache.hyracks.control.common.controllers.NCConfig;
import org.apache.hyracks.control.common.deployment.DeploymentRun;
import org.apache.hyracks.control.common.ipc.CCNCFunctions;
import org.apache.hyracks.control.common.logs.LogFile;
@@ -77,7 +81,6 @@
import org.apache.hyracks.ipc.api.IIPCI;
import org.apache.hyracks.ipc.impl.IPCSystem;
import org.apache.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
-import org.ini4j.Ini;
import org.xml.sax.InputSource;
public class ClusterControllerService implements IControllerService {
@@ -85,6 +88,8 @@
private final CCConfig ccConfig;
+ private final ConfigManager configManager;
+
private IPCSystem clusterIPC;
private IPCSystem clientIPC;
@@ -127,11 +132,22 @@
private ShutdownRun shutdownCallback;
- private ICCApplicationEntryPoint aep;
+ private final ICCApplicationEntryPoint aep;
- public ClusterControllerService(final CCConfig ccConfig) throws Exception {
- this.ccConfig = ccConfig;
- File jobLogFolder = new File(ccConfig.ccRoot, "logs/jobs");
+ public ClusterControllerService(final CCConfig config) throws Exception {
+ this(config, getApplicationEntryPoint(config));
+ }
+
+ public ClusterControllerService(final CCConfig config,
+ final ICCApplicationEntryPoint aep) throws Exception {
+ this.ccConfig = config;
+ this.configManager = ccConfig.getConfigManager();
+ if (aep == null) {
+ throw new IllegalArgumentException("ICCApplicationEntryPoint cannot be null");
+ }
+ this.aep = aep;
+ configManager.processConfig();
+ File jobLogFolder = new File(ccConfig.getRootDir(), "logs/jobs");
jobLog = new LogFile(jobLogFolder);
// WorkQueue is in charge of heartbeat as well as other events.
@@ -140,7 +156,8 @@
final ClusterTopology topology = computeClusterTopology(ccConfig);
ccContext = new ClusterControllerContext(topology);
sweeper = new DeadNodeSweeper();
- datasetDirectoryService = new DatasetDirectoryService(ccConfig.resultTTL, ccConfig.resultSweepThreshold);
+ datasetDirectoryService = new DatasetDirectoryService(ccConfig.getResultTTL(),
+ ccConfig.getResultSweepThreshold());
deploymentRunMap = new HashMap<>();
stateDumpRunMap = new HashMap<>();
@@ -151,10 +168,10 @@
}
private static ClusterTopology computeClusterTopology(CCConfig ccConfig) throws Exception {
- if (ccConfig.clusterTopologyDefinition == null) {
+ if (ccConfig.getClusterTopology() == null) {
return null;
}
- FileReader fr = new FileReader(ccConfig.clusterTopologyDefinition);
+ FileReader fr = new FileReader(ccConfig.getClusterTopology());
InputSource in = new InputSource(fr);
try {
return TopologyDefinitionParser.parse(in);
@@ -166,20 +183,21 @@
@Override
public void start() throws Exception {
LOGGER.log(Level.INFO, "Starting ClusterControllerService: " + this);
- serverCtx = new ServerContext(ServerContext.ServerType.CLUSTER_CONTROLLER, new File(ccConfig.ccRoot));
+ serverCtx = new ServerContext(ServerContext.ServerType.CLUSTER_CONTROLLER, new File(ccConfig.getRootDir()));
IIPCI ccIPCI = new ClusterControllerIPCI(this);
- clusterIPC = new IPCSystem(new InetSocketAddress(ccConfig.clusterNetPort), ccIPCI,
+ clusterIPC = new IPCSystem(new InetSocketAddress(ccConfig.getClusterListenPort()), ccIPCI,
new CCNCFunctions.SerializerDeserializer());
IIPCI ciIPCI = new ClientInterfaceIPCI(this);
- clientIPC = new IPCSystem(new InetSocketAddress(ccConfig.clientNetIpAddress, ccConfig.clientNetPort), ciIPCI,
- new JavaSerializationBasedPayloadSerializerDeserializer());
- webServer = new WebServer(this, ccConfig.httpPort);
+ clientIPC = new IPCSystem(
+ new InetSocketAddress(ccConfig.getClientListenAddress(), ccConfig.getClientListenPort()),
+ ciIPCI, new JavaSerializationBasedPayloadSerializerDeserializer());
+ webServer = new WebServer(this, ccConfig.getConsoleListenPort());
clusterIPC.start();
clientIPC.start();
webServer.start();
- info = new ClusterControllerInfo(ccConfig.clientNetIpAddress, ccConfig.clientNetPort,
+ info = new ClusterControllerInfo(ccConfig.getClientListenAddress(), ccConfig.getClientListenPort(),
webServer.getListeningPort());
- timer.schedule(sweeper, 0, ccConfig.heartbeatPeriod);
+ timer.schedule(sweeper, 0, ccConfig.getHeartbeatPeriod());
jobLog.open();
startApplication();
@@ -194,84 +212,62 @@
appCtx = new CCApplicationContext(this, serverCtx, ccContext, ccConfig.getAppConfig());
appCtx.addJobLifecycleListener(datasetDirectoryService);
executor = Executors.newCachedThreadPool(appCtx.getThreadFactory());
- String className = ccConfig.appCCMainClass;
-
- IJobCapacityController jobCapacityController = DefaultJobCapacityController.INSTANCE;
- if (className != null) {
- Class<?> c = Class.forName(className);
- aep = (ICCApplicationEntryPoint) c.newInstance();
- String[] args = ccConfig.appArgs == null ? null
- : ccConfig.appArgs.toArray(new String[ccConfig.appArgs.size()]);
- aep.start(appCtx, args);
- jobCapacityController = aep.getJobCapacityController();
- }
+ aep.start(appCtx, ccConfig.getAppArgsArray());
+ IJobCapacityController jobCapacityController = aep.getJobCapacityController();
// Job manager is in charge of job lifecycle management.
try {
Constructor<?> jobManagerConstructor = this.getClass().getClassLoader()
- .loadClass(ccConfig.jobManagerClassName)
+ .loadClass(ccConfig.getJobManagerClass())
.getConstructor(CCConfig.class, ClusterControllerService.class, IJobCapacityController.class);
jobManager = (IJobManager) jobManagerConstructor.newInstance(ccConfig, this, jobCapacityController);
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException | NoSuchMethodException
| InvocationTargetException e) {
if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.log(Level.WARNING, "class " + ccConfig.jobManagerClassName + " could not be used: ", e);
+ LOGGER.log(Level.WARNING, "class " + ccConfig.getJobManagerClass() + " could not be used: ", e);
}
// Falls back to the default implementation if the user-provided class name is not valid.
jobManager = new JobManager(ccConfig, this, jobCapacityController);
}
}
- private void connectNCs() throws Exception {
- Ini ini = ccConfig.getIni();
- if (ini == null || Boolean.parseBoolean(ini.get("cc", "virtual.cluster"))) {
- return;
- }
- for (String section : ini.keySet()) {
- if (!section.startsWith("nc/")) {
- continue;
+ private Map<String, Pair<String, Integer>> getNCServices() throws IOException {
+ Map<String, Pair<String, Integer>> ncMap = new TreeMap<>();
+ for (String ncId : configManager.getNodeNames()) {
+ IApplicationConfig ncConfig = configManager.getNodeEffectiveConfig(ncId);
+ if (!ncConfig.getBoolean(NCConfig.Option.VIRTUAL_NC)) {
+ ncMap.put(ncId, Pair.of(ncConfig.getString(NCConfig.Option.NCSERVICE_ADDRESS),
+ ncConfig.getInt(NCConfig.Option.NCSERVICE_PORT)));
}
- String ncid = section.substring(3);
- String address = IniUtils.getString(ini, section, "address", null);
- int port = IniUtils.getInt(ini, section, "port", 9090);
- if (address == null) {
- address = InetAddress.getLoopbackAddress().getHostAddress();
- }
- workQueue.schedule(new TriggerNCWork(this, address, port, ncid));
}
+ return ncMap;
+ }
+
+ private void connectNCs() throws IOException {
+ getNCServices().entrySet().forEach(ncService -> {
+ final TriggerNCWork triggerWork = new TriggerNCWork(ClusterControllerService.this,
+ ncService.getValue().getLeft(), ncService.getValue().getRight(), ncService.getKey());
+ workQueue.schedule(triggerWork);
+ });
}
private void terminateNCServices() throws Exception {
- Ini ini = ccConfig.getIni();
- if (ini == null || Boolean.parseBoolean(ini.get("cc", "virtual.cluster"))) {
- return;
- }
List<ShutdownNCServiceWork> shutdownNCServiceWorks = new ArrayList<>();
- for (String section : ini.keySet()) {
- if (!section.startsWith("nc/")) {
- continue;
- }
- String ncid = section.substring(3);
- String address = IniUtils.getString(ini, section, "address", null);
- int port = IniUtils.getInt(ini, section, "port", 9090);
- if (address == null) {
- address = InetAddress.getLoopbackAddress().getHostAddress();
- }
- ShutdownNCServiceWork shutdownWork = new ShutdownNCServiceWork(address, port, ncid);
+ getNCServices().entrySet().forEach(ncService -> {
+ ShutdownNCServiceWork shutdownWork = new ShutdownNCServiceWork(ncService.getValue().getLeft(),
+ ncService.getValue().getRight(), ncService.getKey());
workQueue.schedule(shutdownWork);
shutdownNCServiceWorks.add(shutdownWork);
- }
+ });
for (ShutdownNCServiceWork shutdownWork : shutdownNCServiceWorks) {
shutdownWork.sync();
}
}
private void notifyApplication() throws Exception {
- if (aep != null) {
- // Sometimes, there is no application entry point. Check hyracks-client project
- aep.startupCompleted();
- }
+ aep.startupCompleted();
}
+
public void stop(boolean terminateNCService) throws Exception {
if (terminateNCService) {
terminateNCServices();
@@ -294,9 +290,7 @@
}
private void stopApplication() throws Exception {
- if (aep != null) {
- aep.stop();
- }
+ aep.stop();
}
public ServerContext getServerContext() {
@@ -360,7 +354,7 @@
}
public NetworkAddress getDatasetDirectoryServiceInfo() {
- return new NetworkAddress(ccConfig.clientNetIpAddress, ccConfig.clientNetPort);
+ return new NetworkAddress(ccConfig.getClientListenAddress(), ccConfig.getClientListenPort());
}
private final class ClusterControllerContext implements ICCContext {
@@ -390,6 +384,7 @@
public ClusterTopology getClusterTopology() {
return topology;
}
+
}
private class DeadNodeSweeper extends TimerTask {
@@ -458,4 +453,14 @@
public ThreadDumpRun removeThreadDumpRun(String requestKey) {
return threadDumpRunMap.remove(requestKey);
}
+
+ private static ICCApplicationEntryPoint getApplicationEntryPoint(CCConfig ccConfig)
+ throws ClassNotFoundException, IllegalAccessException, InstantiationException {
+ if (ccConfig.getAppClass() != null) {
+ Class<?> c = Class.forName(ccConfig.getAppClass());
+ return (ICCApplicationEntryPoint) c.newInstance();
+ } else {
+ return CCApplicationEntryPoint.INSTANCE;
+ }
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
index 955b7f2..8400a59 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
@@ -282,7 +282,7 @@
public synchronized ObjectNode toSummaryJSON() {
ObjectMapper om = new ObjectMapper();
ObjectNode o = om.createObjectNode();
- o.put("node-id", ncConfig.nodeId);
+ o.put("node-id", ncConfig.getNodeId());
o.put("heap-used", heapUsedSize[(rrdPtr + RRD_SIZE - 1) % RRD_SIZE]);
o.put("system-load-average", systemLoadAverage[(rrdPtr + RRD_SIZE - 1) % RRD_SIZE]);
@@ -293,7 +293,7 @@
ObjectMapper om = new ObjectMapper();
ObjectNode o = om.createObjectNode();
- o.put("node-id", ncConfig.nodeId);
+ o.put("node-id", ncConfig.getNodeId());
if (includeConfig) {
o.put("os-name", osName);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java
index 77b9b17..a8b03bc 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java
@@ -27,9 +27,10 @@
import java.util.Map;
import java.util.Set;
-import org.apache.hyracks.api.application.IApplicationConfig;
import org.apache.hyracks.api.application.ICCApplicationContext;
import org.apache.hyracks.api.application.IClusterLifecycleListener;
+import org.apache.hyracks.api.config.IApplicationConfig;
+import org.apache.hyracks.api.config.IOption;
import org.apache.hyracks.api.context.ICCContext;
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.job.IJobLifecycleListener;
@@ -105,7 +106,7 @@
clusterLifecycleListeners.add(clusterLifecycleListener);
}
- public void notifyNodeJoin(String nodeId, Map<String, String> ncConfiguration) throws HyracksException {
+ public void notifyNodeJoin(String nodeId, Map<IOption, Object> ncConfiguration) throws HyracksException {
for (IClusterLifecycleListener l : clusterLifecycleListeners) {
l.notifyNodeJoin(nodeId, ncConfiguration);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
index 354019c..d6d8bc4 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
@@ -137,7 +137,7 @@
Map.Entry<String, NodeControllerState> entry = nodeIterator.next();
String nodeId = entry.getKey();
NodeControllerState state = entry.getValue();
- if (state.incrementLastHeartbeatDuration() >= ccConfig.maxHeartbeatLapsePeriods) {
+ if (state.incrementLastHeartbeatDuration() >= ccConfig.getHeartbeatMaxMisses()) {
deadNodes.add(nodeId);
affectedJobIds.addAll(state.getActiveJobIds());
// Removes the node from node map.
@@ -172,10 +172,7 @@
// Retrieves the IP address for a given node.
private InetAddress getIpAddress(NodeControllerState ncState) throws HyracksException {
- String ipAddress = ncState.getNCConfig().dataIPAddress;
- if (ncState.getNCConfig().dataPublicIPAddress != null) {
- ipAddress = ncState.getNCConfig().dataPublicIPAddress;
- }
+ String ipAddress = ncState.getNCConfig().getDataPublicAddress();
try {
return InetAddress.getByName(ipAddress);
} catch (UnknownHostException e) {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index 031303b..b35de3d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -68,13 +68,13 @@
this.ccs = ccs;
this.jobCapacityController = jobCapacityController;
try {
- Constructor<?> jobQueueConstructor = this.getClass().getClassLoader().loadClass(ccConfig.jobQueueClassName)
+ Constructor<?> jobQueueConstructor = this.getClass().getClassLoader().loadClass(ccConfig.getJobQueueClass())
.getConstructor(IJobManager.class, IJobCapacityController.class);
jobQueue = (IJobQueue) jobQueueConstructor.newInstance(this, this.jobCapacityController);
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException | NoSuchMethodException
| InvocationTargetException e) {
if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.log(Level.WARNING, "class " + ccConfig.jobQueueClassName + " could not be used: ", e);
+ LOGGER.log(Level.WARNING, "class " + ccConfig.getJobQueueClass() + " could not be used: ", e);
}
// Falls back to the default implementation if the user-provided class name is not valid.
jobQueue = new FIFOJobQueue(this, jobCapacityController);
@@ -85,13 +85,13 @@
@Override
protected boolean removeEldestEntry(Map.Entry<JobId, JobRun> eldest) {
- return size() > ccConfig.jobHistorySize;
+ return size() > ccConfig.getJobHistorySize();
}
};
runMapHistory = new LinkedHashMap<JobId, List<Exception>>() {
private static final long serialVersionUID = 1L;
/** history size + 1 is for the case when history size = 0 */
- private int allowedSize = 100 * (ccConfig.jobHistorySize + 1);
+ private int allowedSize = 100 * (ccConfig.getJobHistorySize() + 1);
@Override
protected boolean removeEldestEntry(Map.Entry<JobId, List<Exception>> eldest) {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeDetailsJSONWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeDetailsJSONWork.java
index 0577002..3dec959 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeDetailsJSONWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeDetailsJSONWork.java
@@ -27,28 +27,26 @@
import java.lang.management.OperatingSystemMXBean;
import java.lang.management.RuntimeMXBean;
import java.lang.management.ThreadMXBean;
-import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Date;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.hyracks.control.cc.NodeControllerState;
-import org.apache.hyracks.control.cc.cluster.INodeManager;
-import org.apache.hyracks.control.common.controllers.CCConfig;
-import org.apache.hyracks.control.common.utils.PidHelper;
-import org.apache.hyracks.control.common.work.IPCResponder;
-import org.apache.hyracks.control.common.work.SynchronizableWork;
-import org.kohsuke.args4j.Option;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.hyracks.api.config.Section;
+import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.cluster.INodeManager;
+import org.apache.hyracks.control.common.config.ConfigUtils;
+import org.apache.hyracks.control.common.controllers.CCConfig;
+import org.apache.hyracks.control.common.controllers.NCConfig;
+import org.apache.hyracks.control.common.utils.PidHelper;
+import org.apache.hyracks.control.common.work.IPCResponder;
+import org.apache.hyracks.control.common.work.SynchronizableWork;
public class GetNodeDetailsJSONWork extends SynchronizableWork {
- private static final Logger LOGGER = Logger.getLogger(GetNodeDetailsJSONWork.class.getName());
+ private static final Section [] CC_SECTIONS = { Section.CC, Section.COMMON };
+ private static final Section [] NC_SECTIONS = { Section.NC, Section.COMMON };
+
private final INodeManager nodeManager;
private final CCConfig ccConfig;
private final String nodeId;
@@ -59,7 +57,7 @@
private ObjectMapper om = new ObjectMapper();
public GetNodeDetailsJSONWork(INodeManager nodeManager, CCConfig ccConfig, String nodeId, boolean includeStats,
- boolean includeConfig, IPCResponder<String> callback) {
+ boolean includeConfig, IPCResponder<String> callback) {
this.nodeManager = nodeManager;
this.ccConfig = ccConfig;
this.nodeId = nodeId;
@@ -69,7 +67,7 @@
}
public GetNodeDetailsJSONWork(INodeManager nodeManager, CCConfig ccConfig, String nodeId, boolean includeStats,
- boolean includeConfig) {
+ boolean includeConfig) {
this(nodeManager, ccConfig, nodeId, includeStats, includeConfig, null);
}
@@ -79,14 +77,18 @@
// null nodeId is a request for CC
detail = getCCDetails();
if (includeConfig) {
- addIni(detail, ccConfig);
+ ConfigUtils.addConfigToJSON(detail, ccConfig.getAppConfig(), CC_SECTIONS);
+ detail.putPOJO("app.args", ccConfig.getAppArgs());
}
} else {
NodeControllerState ncs = nodeManager.getNodeControllerState(nodeId);
if (ncs != null) {
detail = ncs.toDetailedJSON(includeStats, includeConfig);
if (includeConfig) {
- addIni(detail, ncs.getNCConfig());
+ final NCConfig ncConfig = ncs.getNCConfig();
+ ConfigUtils.addConfigToJSON(detail, ncConfig.getConfigManager().getNodeEffectiveConfig(nodeId),
+ NC_SECTIONS);
+ detail.putPOJO("app.args", ncConfig.getAppArgs());
}
}
}
@@ -96,7 +98,7 @@
}
}
- private ObjectNode getCCDetails() {
+ private ObjectNode getCCDetails() {
ObjectNode o = om.createObjectNode();
MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
List<GarbageCollectorMXBean> gcMXBeans = ManagementFactory.getGarbageCollectorMXBeans();
@@ -151,33 +153,6 @@
return o;
}
- private static void addIni(ObjectNode o, Object configBean) {
- Map<String, Object> iniMap = new HashMap<>();
- for (Field f : configBean.getClass().getFields()) {
- Option option = f.getAnnotation(Option.class);
- if (option == null) {
- continue;
- }
- final String optionName = option.name();
- Object value = null;
- try {
- value = f.get(configBean);
- } catch (IllegalAccessException e) {
- LOGGER.log(Level.WARNING, "Unable to access ini option " + optionName, e);
- }
- if (value != null) {
- if ("--".equals(optionName)) {
- iniMap.put("app_args", value);
- } else {
- iniMap.put(optionName.substring(1).replace('-', '_'),
- "-iodevices".equals(optionName)
- ? String.valueOf(value).split(",")
- : value);
- }
- }
- }
- o.putPOJO("ini", iniMap);
- }
public ObjectNode getDetail() {
return detail;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
index dc93515..e97950e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
@@ -23,6 +23,8 @@
import java.util.logging.Level;
import java.util.logging.Logger;
+import org.apache.hyracks.api.config.IApplicationConfig;
+import org.apache.hyracks.api.config.IOption;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.cc.NodeControllerState;
import org.apache.hyracks.control.cc.cluster.INodeManager;
@@ -50,19 +52,22 @@
String id = reg.getNodeId();
IIPCHandle ncIPCHandle = ccs.getClusterIPC().getHandle(reg.getNodeControllerAddress());
CCNCFunctions.NodeRegistrationResult result;
- Map<String, String> ncConfiguration = new HashMap<>();
+ Map<IOption, Object> ncConfiguration = new HashMap<>();
try {
INodeController nodeController = new NodeControllerRemoteProxy(ncIPCHandle);
NodeControllerState state = new NodeControllerState(nodeController, reg);
INodeManager nodeManager = ccs.getNodeManager();
nodeManager.addNode(id, state);
- state.getNCConfig().toMap(ncConfiguration);
+ IApplicationConfig cfg = state.getNCConfig().getConfigManager().getNodeEffectiveConfig(id);
+ for (IOption option : cfg.getOptions()) {
+ ncConfiguration.put(option, cfg.get(option));
+ }
LOGGER.log(Level.INFO, "Registered INodeController: id = " + id);
NodeParameters params = new NodeParameters();
params.setClusterControllerInfo(ccs.getClusterControllerInfo());
params.setDistributedState(ccs.getApplicationContext().getDistributedState());
- params.setHeartbeatPeriod(ccs.getCCConfig().heartbeatPeriod);
- params.setProfileDumpPeriod(ccs.getCCConfig().profileDumpPeriod);
+ params.setHeartbeatPeriod(ccs.getCCConfig().getHeartbeatPeriod());
+ params.setProfileDumpPeriod(ccs.getCCConfig().getProfileDumpPeriod());
result = new CCNCFunctions.NodeRegistrationResult(params, null);
} catch (Exception e) {
result = new CCNCFunctions.NodeRegistrationResult(null, e);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TriggerNCWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TriggerNCWork.java
index a7bca25..ab526e8 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TriggerNCWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TriggerNCWork.java
@@ -27,7 +27,9 @@
import java.util.logging.Level;
import java.util.logging.Logger;
+import org.apache.hyracks.api.config.Section;
import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.common.controllers.NCConfig;
import org.apache.hyracks.control.common.controllers.ServiceConstants.ServiceCommand;
import org.apache.hyracks.control.common.work.AbstractWork;
import org.ini4j.Ini;
@@ -79,14 +81,18 @@
/**
* Given an Ini object, serialize it to String with some enhancements.
- * @param ccini
+ * @param ccini the ini file to decorate and forward to NC
*/
- String serializeIni(Ini ccini) throws IOException {
+ private String serializeIni(Ini ccini) throws IOException {
StringWriter iniString = new StringWriter();
- ccini.store(iniString);
+ ccini.get(Section.NC.sectionName()).putIfAbsent(NCConfig.Option.CLUSTER_ADDRESS.ini(),
+ ccs.getCCConfig().getClusterPublicAddress());
+ ccini.get(Section.NC.sectionName()).putIfAbsent(NCConfig.Option.CLUSTER_PORT.ini(),
+ String.valueOf(ccs.getCCConfig().getClusterPublicPort()));
// Finally insert *this* NC's name into localnc section - this is a fixed
// entry point so that NCs can determine where all their config is.
- iniString.append("\n[localnc]\nid=").append(ncId).append("\n");
+ ccini.put(Section.LOCALNC.sectionName(), NCConfig.Option.NODE_ID.ini(), ncId);
+ ccini.store(iniString);
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine("Returning Ini file:\n" + iniString.toString());
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
index c742a4a..dde3bad 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
@@ -46,8 +46,8 @@
public void testNormal() throws HyracksException {
IResourceManager resourceManager = new ResourceManager();
INodeManager nodeManager = new NodeManager(makeCCConfig(), resourceManager);
- NodeControllerState ncState1 = mockNodeControllerState(false);
- NodeControllerState ncState2 = mockNodeControllerState(false);
+ NodeControllerState ncState1 = mockNodeControllerState(NODE1, false);
+ NodeControllerState ncState2 = mockNodeControllerState(NODE2, false);
// Verifies states after adding nodes.
nodeManager.addNode(NODE1, ncState1);
@@ -71,7 +71,7 @@
public void testException() throws HyracksException {
IResourceManager resourceManager = new ResourceManager();
INodeManager nodeManager = new NodeManager(makeCCConfig(), resourceManager);
- NodeControllerState ncState1 = mockNodeControllerState(true);
+ NodeControllerState ncState1 = mockNodeControllerState(NODE1, true);
boolean invalidNetworkAddress = false;
// Verifies states after a failure during adding nodes.
@@ -106,11 +106,11 @@
private CCConfig makeCCConfig() {
CCConfig ccConfig = new CCConfig();
- ccConfig.maxHeartbeatLapsePeriods = 0;
+ ccConfig.setHeartbeatMaxMisses(0);
return ccConfig;
}
- private NodeControllerState mockNodeControllerState(boolean invalidIpAddr) {
+ private NodeControllerState mockNodeControllerState(String nodeId, boolean invalidIpAddr) {
NodeControllerState ncState = mock(NodeControllerState.class);
String ipAddr = invalidIpAddr ? "255.255.255:255" : "127.0.0.2";
NetworkAddress dataAddr = new NetworkAddress(ipAddr, 1001);
@@ -120,8 +120,8 @@
when(ncState.getDataPort()).thenReturn(dataAddr);
when(ncState.getDatasetPort()).thenReturn(resultAddr);
when(ncState.getMessagingPort()).thenReturn(msgAddr);
- NCConfig ncConfig = new NCConfig();
- ncConfig.dataIPAddress = ipAddr;
+ NCConfig ncConfig = new NCConfig(nodeId);
+ ncConfig.setDataPublicAddress(ipAddr);
when(ncState.getNCConfig()).thenReturn(ncConfig);
return ncState;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
index 3bb08bd..97b05e7 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
@@ -27,6 +27,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -48,14 +49,23 @@
import org.apache.hyracks.control.common.controllers.CCConfig;
import org.apache.hyracks.control.common.logs.LogFile;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
+import org.kohsuke.args4j.CmdLineException;
import org.mockito.Mockito;
public class JobManagerTest {
+ private CCConfig ccConfig;
+
+ @Before
+ public void setup() throws IOException, CmdLineException {
+ ccConfig = new CCConfig();
+ ccConfig.getConfigManager().processConfig();
+ }
+
@Test
- public void test() throws HyracksException {
- CCConfig ccConfig = new CCConfig();
+ public void test() throws IOException, CmdLineException {
IJobCapacityController jobCapacityController = mock(IJobCapacityController.class);
IJobManager jobManager = spy(new JobManager(ccConfig, mockClusterControllerService(), jobCapacityController));
@@ -114,7 +124,7 @@
}
Assert.assertTrue(jobManager.getRunningJobs().size() == 4096);
Assert.assertTrue(jobManager.getPendingJobs().isEmpty());
- Assert.assertTrue(jobManager.getArchivedJobs().size() == ccConfig.jobHistorySize);
+ Assert.assertTrue(jobManager.getArchivedJobs().size() == ccConfig.getJobHistorySize());
// Completes deferred jobs.
for (JobRun run : deferredRuns) {
@@ -123,14 +133,13 @@
}
Assert.assertTrue(jobManager.getRunningJobs().isEmpty());
Assert.assertTrue(jobManager.getPendingJobs().isEmpty());
- Assert.assertTrue(jobManager.getArchivedJobs().size() == ccConfig.jobHistorySize);
+ Assert.assertTrue(jobManager.getArchivedJobs().size() == ccConfig.getJobHistorySize());
verify(jobManager, times(8192)).prepareComplete(any(), any(), any());
verify(jobManager, times(8192)).finalComplete(any());
}
@Test
public void testExceedMax() throws HyracksException {
- CCConfig ccConfig = new CCConfig();
IJobCapacityController jobCapacityController = mock(IJobCapacityController.class);
IJobManager jobManager = spy(new JobManager(ccConfig, mockClusterControllerService(), jobCapacityController));
boolean rejected = false;
@@ -154,7 +163,6 @@
@Test
public void testAdmitThenReject() throws HyracksException {
- CCConfig ccConfig = new CCConfig();
IJobCapacityController jobCapacityController = mock(IJobCapacityController.class);
IJobManager jobManager = spy(new JobManager(ccConfig, mockClusterControllerService(), jobCapacityController));
@@ -185,7 +193,6 @@
@Test
public void testNullJob() throws HyracksException {
- CCConfig ccConfig = new CCConfig();
IJobCapacityController jobCapacityController = mock(IJobCapacityController.class);
IJobManager jobManager = new JobManager(ccConfig, mockClusterControllerService(), jobCapacityController);
boolean invalidParameter = false;
@@ -249,7 +256,7 @@
}
Assert.assertTrue(jobManager.getPendingJobs().isEmpty());
- Assert.assertTrue(jobManager.getArchivedJobs().size() == ccConfig.jobHistorySize);
+ Assert.assertTrue(jobManager.getArchivedJobs().size() == ccConfig.getJobHistorySize());
verify(jobManager, times(0)).prepareComplete(any(), any(), any());
verify(jobManager, times(0)).finalComplete(any());
}