Configuration Revamp
- Ini section of node / cc details now returns ini param names instead of
managix option names
- Normalized command line -vs- ini file configuration parameter names
- Eliminated unused parameters
- Ini validation
- Migrate *DB parameters out of [app] and into nc / cc sections as
appropriate
- Eliminate [app] section. Cluster-wide configuration lives in [common]
- Sort properties alphabetically when returned by HTTP api
Change-Id: I95b7e0bd4538ef42817c8826e76412150074b754
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1487
Reviewed-by: Michael Blow <mblow@apache.org>
Integration-Tests: Michael Blow <mblow@apache.org>
Tested-by: Michael Blow <mblow@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/pom.xml b/hyracks-fullstack/hyracks/hyracks-api/pom.xml
index cde607f..ec60022 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-api/pom.xml
@@ -94,5 +94,9 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>args4j</groupId>
+ <artifactId>args4j</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplicationConfig.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplicationConfig.java
deleted file mode 100644
index 278a9d7..0000000
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplicationConfig.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.api.application;
-
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Set;
-
-/**
- * Accessor for the data contained in the global application configuration file.
- */
-public interface IApplicationConfig {
- String getString(String section, String key);
-
- String getString(String section, String key, String defaultValue);
-
- int getInt(String section, String key);
-
- int getInt(String section, String key, int defaultValue);
-
- long getLong(String section, String key);
-
- long getLong(String section, String key, long defaultValue);
-
- Set<String> getSections();
-
- Set<String> getKeys(String section);
-
- String[] getStringArray(String section, String key);
-
- List<Set<Entry<String, String>>> getMultiSections(String section);
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplicationContext.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplicationContext.java
index 7b07174..c933d9d 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplicationContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplicationContext.java
@@ -21,6 +21,7 @@
import java.io.Serializable;
import java.util.concurrent.ThreadFactory;
+import org.apache.hyracks.api.config.IApplicationConfig;
import org.apache.hyracks.api.job.IJobSerializerDeserializerContainer;
import org.apache.hyracks.api.messages.IMessageBroker;
import org.apache.hyracks.api.service.IControllerService;
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplicationEntryPoint.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplicationEntryPoint.java
index c11cc7a..4f6f450 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplicationEntryPoint.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplicationEntryPoint.java
@@ -18,7 +18,9 @@
*/
package org.apache.hyracks.api.application;
+import org.apache.hyracks.api.config.IConfigManager;
import org.apache.hyracks.api.job.resource.IJobCapacityController;
+import org.kohsuke.args4j.OptionHandlerFilter;
public interface ICCApplicationEntryPoint {
void start(ICCApplicationContext ccAppCtx, String[] args) throws Exception;
@@ -28,4 +30,10 @@
void startupCompleted() throws Exception;
IJobCapacityController getJobCapacityController();
+
+ void registerConfig(IConfigManager configManager);
+
+ default OptionHandlerFilter getUsageFilter() {
+ return OptionHandlerFilter.PUBLIC;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IClusterLifecycleListener.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IClusterLifecycleListener.java
index 191a4af..2c53e1c 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IClusterLifecycleListener.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IClusterLifecycleListener.java
@@ -20,8 +20,8 @@
import java.util.Collection;
import java.util.Map;
-import java.util.Set;
+import org.apache.hyracks.api.config.IOption;
import org.apache.hyracks.api.exceptions.HyracksException;
/**
@@ -40,9 +40,8 @@
* @param nodeId
* A unique identifier of a Node Controller
* @param ncConfiguration
- * A map containing the set of configuration parameters that were used to start the Node Controller
*/
- public void notifyNodeJoin(String nodeId, Map<String, String> ncConfiguration) throws HyracksException;
+ public void notifyNodeJoin(String nodeId, Map<IOption, Object> ncConfiguration) throws HyracksException;
/**
* @param deadNodeIds
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplicationEntryPoint.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplicationEntryPoint.java
index dea6e4b..a92cd4a 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplicationEntryPoint.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/INCApplicationEntryPoint.java
@@ -18,7 +18,9 @@
*/
package org.apache.hyracks.api.application;
+import org.apache.hyracks.api.config.IConfigManager;
import org.apache.hyracks.api.job.resource.NodeCapacity;
+import org.kohsuke.args4j.OptionHandlerFilter;
public interface INCApplicationEntryPoint {
void start(INCApplicationContext ncAppCtx, String[] args) throws Exception;
@@ -28,4 +30,10 @@
void stop() throws Exception;
NodeCapacity getCapacity();
+
+ void registerConfigOptions(IConfigManager configManager);
+
+ default OptionHandlerFilter getUsageFilter() {
+ return OptionHandlerFilter.PUBLIC;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IApplicationConfig.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IApplicationConfig.java
new file mode 100644
index 0000000..0335b80
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IApplicationConfig.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.config;
+
+import java.util.List;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.logging.Level;
+
+/**
+ * Accessor for the data contained in the global application configuration file.
+ */
+public interface IApplicationConfig {
+ String getString(String section, String key);
+
+ int getInt(String section, String key);
+
+ long getLong(String section, String key);
+
+ Set<String> getSectionNames();
+
+ Set<String> getKeys(String section);
+
+ Object getStatic(IOption option);
+
+ List<String> getNCNames();
+
+ IOption lookupOption(String sectionName, String propertyName);
+
+ Set<IOption> getOptions();
+
+ Set<IOption> getOptions(Section section);
+
+ IApplicationConfig getNCEffectiveConfig(String nodeId);
+
+ Set<Section> getSections();
+
+ Set<Section> getSections(Predicate<Section> predicate);
+
+ default Object get(IOption option) {
+ return option.get(this);
+ }
+
+ default long getLong(IOption option) {
+ return (long)get(option);
+ }
+
+ default int getInt(IOption option) {
+ return (int)get(option);
+ }
+
+ default String getString(IOption option) {
+ return (String)get(option);
+ }
+
+ default boolean getBoolean(IOption option) {
+ return (boolean)get(option);
+ }
+
+ default Level getLoggingLevel(IOption option) {
+ return (Level)get(option);
+ }
+
+ default double getDouble(IOption option) {
+ return (double)get(option);
+ }
+
+ default String [] getStringArray(IOption option) {
+ return (String [])get(option);
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IConfigManager.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IConfigManager.java
new file mode 100644
index 0000000..fb1332b
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IConfigManager.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.config;
+
+import java.util.Set;
+import java.util.function.Predicate;
+
+import org.kohsuke.args4j.OptionHandlerFilter;
+
+public interface IConfigManager {
+ int PARSE_INI_POINTERS_METRIC = 100;
+ int PARSE_INI_METRIC = 200;
+ int PARSE_COMMAND_LINE_METRIC = 300;
+ int APPLY_DEFAULTS_METRIC = 400;
+
+ void register(IOption... options);
+
+ @SuppressWarnings("unchecked")
+ void register(Class<? extends IOption>... optionClasses);
+
+ Set<Section> getSections(Predicate<Section> predicate);
+
+ Set<Section> getSections();
+
+ Set<IOption> getOptions(Section section);
+
+ IApplicationConfig getAppConfig();
+
+ void addConfigurator(int metric, IConfigurator configurator);
+
+ void addIniParamOptions(IOption... options);
+
+ void addCmdLineSections(Section... sections);
+
+ void setUsageFilter(OptionHandlerFilter usageFilter);
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IConfigurator.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IConfigurator.java
new file mode 100644
index 0000000..54f780d
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IConfigurator.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.config;
+
+import java.io.IOException;
+
+import org.kohsuke.args4j.CmdLineException;
+
+public interface IConfigurator {
+ void run() throws IOException, CmdLineException;
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IOption.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IOption.java
new file mode 100644
index 0000000..834d73c
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IOption.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.config;
+
+import java.util.function.Function;
+
+public interface IOption {
+
+ String name();
+
+ Section section();
+
+ String description();
+
+ IOptionType type();
+
+ /**
+ * @return the unresolved default value of this option-
+ */
+ Object defaultValue();
+
+ /**
+ * @return a string to describe the default value, or null if the default should be used
+ */
+ default String usageDefaultOverride(IApplicationConfig accessor, Function<IOption, String> optionPrinter) {
+ return null;
+ }
+
+ /**
+ * Implementations should override this default implementation if this property value is non-static and should be
+ * calculated on every call
+ * @return the current value of this property
+ */
+ default Object get(IApplicationConfig appConfig) {
+ return appConfig.getStatic(this);
+ }
+
+ /**
+ * @return a true value indicates this option should not be advertised (e.g. command-line usage, documentation)
+ */
+ default boolean hidden() { return false; }
+
+ default String cmdline() {
+ return "-" + name().toLowerCase().replace("_", "-");
+ }
+
+ default String ini() {
+ return name().toLowerCase().replace("_", ".");
+ }
+
+ default String toIniString() {
+ return "[" + section().sectionName() + "] " + ini();
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IOptionType.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IOptionType.java
new file mode 100644
index 0000000..2a98fdc
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/IOptionType.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.config;
+
+public interface IOptionType<T> {
+ /**
+ * @throws IllegalArgumentException when the supplied string cannot be interpreted
+ */
+ T parse(String s);
+
+ Class<T> targetType();
+
+ default Object serializeToJSON(Object value) {
+ return value;
+ }
+
+ default String serializeToIni(Object value) {
+ return String.valueOf(value);
+ }
+
+ default String serializeToString(Object value) {
+ return serializeToIni(value);
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/Section.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/Section.java
new file mode 100644
index 0000000..8b19d80
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/config/Section.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.config;
+
+public enum Section {
+ CC,
+ NC,
+ COMMON,
+ LOCALNC,
+ EXTENSION,
+ VIRTUAL; // virtual section indicates options which are not accessible from the cmd-line nor ini file
+
+ public static Section parseSectionName(String name) {
+ for (Section section : values()) {
+ if (section.sectionName().equals(name)) {
+ return section;
+ }
+ }
+ return null;
+ }
+
+ public String sectionName() {
+ return name().toLowerCase();
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/ICCContext.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/ICCContext.java
index f9618cf..83e0482 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/ICCContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/ICCContext.java
@@ -27,9 +27,9 @@
import org.apache.hyracks.api.topology.ClusterTopology;
public interface ICCContext {
- public ClusterControllerInfo getClusterControllerInfo();
+ ClusterControllerInfo getClusterControllerInfo();
- public void getIPAddressNodeMap(Map<InetAddress, Set<String>> map) throws HyracksDataException;
+ void getIPAddressNodeMap(Map<InetAddress, Set<String>> map) throws HyracksDataException;
- public ClusterTopology getClusterTopology();
+ ClusterTopology getClusterTopology();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IODeviceHandle.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IODeviceHandle.java
index 552fbeb..34c58f8 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IODeviceHandle.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IODeviceHandle.java
@@ -22,7 +22,6 @@
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
-import java.util.StringTokenizer;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -79,11 +78,10 @@
* comma separated list of devices
* @return
*/
- public static List<IODeviceHandle> getDevices(String ioDevices) {
+ public static List<IODeviceHandle> getDevices(String [] ioDevices) {
List<IODeviceHandle> devices = new ArrayList<>();
- StringTokenizer tok = new StringTokenizer(ioDevices, ",");
- while (tok.hasMoreElements()) {
- String devPath = tok.nextToken().trim();
+ for (String ioDevice : ioDevices) {
+ String devPath = ioDevice.trim();
devices.add(new IODeviceHandle(new File(devPath), "."));
}
return devices;
diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/test/java/org/apache/hyracks/client/stats/HyracksUtils.java b/hyracks-fullstack/hyracks/hyracks-client/src/test/java/org/apache/hyracks/client/stats/HyracksUtils.java
index a41ddd9..54ae838 100644
--- a/hyracks-fullstack/hyracks/hyracks-client/src/test/java/org/apache/hyracks/client/stats/HyracksUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-client/src/test/java/org/apache/hyracks/client/stats/HyracksUtils.java
@@ -50,37 +50,34 @@
public static void init() throws Exception {
CCConfig ccConfig = new CCConfig();
- ccConfig.clientNetIpAddress = CC_HOST;
- ccConfig.clusterNetIpAddress = CC_HOST;
- ccConfig.clusterNetPort = TEST_HYRACKS_CC_PORT;
- ccConfig.clientNetPort = TEST_HYRACKS_CC_CLIENT_PORT;
- ccConfig.defaultMaxJobAttempts = 0;
- ccConfig.jobHistorySize = 0;
- ccConfig.profileDumpPeriod = -1;
- ccConfig.heartbeatPeriod = 50;
+ ccConfig.setClientListenAddress(CC_HOST);
+ ccConfig.setClusterListenAddress(CC_HOST);
+ ccConfig.setClusterListenPort(TEST_HYRACKS_CC_PORT);
+ ccConfig.setClientListenPort(TEST_HYRACKS_CC_CLIENT_PORT);
+ ccConfig.setJobHistorySize(0);
+ ccConfig.setProfileDumpPeriod(-1);
+ ccConfig.setHeartbeatPeriod(50);
// cluster controller
cc = new ClusterControllerService(ccConfig);
cc.start();
// two node controllers
- NCConfig ncConfig1 = new NCConfig();
- ncConfig1.ccHost = "localhost";
- ncConfig1.clusterNetIPAddress = "localhost";
- ncConfig1.ccPort = TEST_HYRACKS_CC_PORT;
- ncConfig1.dataIPAddress = "127.0.0.1";
- ncConfig1.resultIPAddress = "127.0.0.1";
- ncConfig1.nodeId = NC1_ID;
+ NCConfig ncConfig1 = new NCConfig(NC1_ID);
+ ncConfig1.setClusterAddress("localhost");
+ ncConfig1.setClusterListenAddress("localhost");
+ ncConfig1.setClusterPort(TEST_HYRACKS_CC_PORT);
+ ncConfig1.setDataListenAddress("127.0.0.1");
+ ncConfig1.setResultListenAddress("127.0.0.1");
nc1 = new NodeControllerService(ncConfig1);
nc1.start();
- NCConfig ncConfig2 = new NCConfig();
- ncConfig2.ccHost = "localhost";
- ncConfig2.clusterNetIPAddress = "localhost";
- ncConfig2.ccPort = TEST_HYRACKS_CC_PORT;
- ncConfig2.dataIPAddress = "127.0.0.1";
- ncConfig2.resultIPAddress = "127.0.0.1";
- ncConfig2.nodeId = NC2_ID;
+ NCConfig ncConfig2 = new NCConfig(NC2_ID);
+ ncConfig2.setClusterAddress("localhost");
+ ncConfig2.setClusterListenAddress("localhost");
+ ncConfig2.setClusterPort(TEST_HYRACKS_CC_PORT);
+ ncConfig2.setDataListenAddress("127.0.0.1");
+ ncConfig2.setResultListenAddress("127.0.0.1");
nc2 = new NodeControllerService(ncConfig2);
nc2.start();
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/pom.xml b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/pom.xml
index 9dc3c1a..ba086ad 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/pom.xml
@@ -63,7 +63,7 @@
<licenseFamilies combine.children="append">
<licenseFamily implementation="org.apache.rat.license.MITLicenseFamily"/>
</licenseFamilies>
- <excludes>
+ <excludes combine.children="append">
<!-- See hyracks-fullstack-license/src/main/licenses/templates/source_licenses.ftl -->
<exclude>src/main/resources/static/javascript/flot/jquery.flot.resize.min.js</exclude>
<exclude>src/main/resources/static/javascript/jsplumb/jquery.jsPlumb-1.3.5-all-min.js</exclude>
@@ -104,7 +104,6 @@
<dependency>
<groupId>args4j</groupId>
<artifactId>args4j</artifactId>
- <version>2.0.12</version>
</dependency>
<dependency>
<groupId>org.apache.hyracks</groupId>
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/CCApplicationEntryPoint.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/CCApplicationEntryPoint.java
new file mode 100644
index 0000000..07008df
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/CCApplicationEntryPoint.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.cc;
+
+import java.util.Arrays;
+
+import org.apache.hyracks.api.application.ICCApplicationContext;
+import org.apache.hyracks.api.application.ICCApplicationEntryPoint;
+import org.apache.hyracks.api.config.IConfigManager;
+import org.apache.hyracks.api.config.Section;
+import org.apache.hyracks.api.job.resource.DefaultJobCapacityController;
+import org.apache.hyracks.api.job.resource.IJobCapacityController;
+import org.apache.hyracks.control.common.controllers.CCConfig;
+import org.apache.hyracks.control.common.controllers.ControllerConfig;
+import org.apache.hyracks.control.common.controllers.NCConfig;
+
+public class CCApplicationEntryPoint implements ICCApplicationEntryPoint {
+ public static final ICCApplicationEntryPoint INSTANCE = new CCApplicationEntryPoint();
+
+ protected CCApplicationEntryPoint() {
+ }
+
+ @Override
+ public void start(ICCApplicationContext ccAppCtx, String[] args) throws Exception {
+ if (args.length > 0) {
+ throw new IllegalArgumentException("Unrecognized argument(s): " + Arrays.toString(args));
+ }
+ }
+
+ @Override
+ public void stop() throws Exception {
+ // no-op
+ }
+
+ @Override
+ public void startupCompleted() throws Exception {
+ // no-op
+ }
+
+ @Override
+ public IJobCapacityController getJobCapacityController() {
+ return DefaultJobCapacityController.INSTANCE;
+ }
+
+ @Override
+ public void registerConfig(IConfigManager configManager) {
+ configManager.addIniParamOptions(ControllerConfig.Option.CONFIG_FILE, ControllerConfig.Option.CONFIG_FILE_URL);
+ configManager.addCmdLineSections(Section.CC, Section.COMMON);
+ configManager.setUsageFilter(getUsageFilter());
+ configManager.register(ControllerConfig.Option.class, CCConfig.Option.class, NCConfig.Option.class);
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/CCDriver.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/CCDriver.java
index dff3107..754deac 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/CCDriver.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/CCDriver.java
@@ -18,32 +18,50 @@
*/
package org.apache.hyracks.control.cc;
-import org.kohsuke.args4j.CmdLineParser;
+import static org.apache.hyracks.control.common.controllers.CCConfig.Option.APP_CLASS;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.hyracks.api.application.ICCApplicationEntryPoint;
+import org.apache.hyracks.control.common.config.ConfigManager;
+import org.apache.hyracks.control.common.config.ConfigUtils;
import org.apache.hyracks.control.common.controllers.CCConfig;
+import org.kohsuke.args4j.CmdLineException;
public class CCDriver {
- public static void main(String args []) throws Exception {
- try {
- CCConfig ccConfig = new CCConfig();
- CmdLineParser cp = new CmdLineParser(ccConfig);
- try {
- cp.parseArgument(args);
- } catch (Exception e) {
- System.err.println(e.getMessage());
- cp.printUsage(System.err);
- return;
- }
- ccConfig.loadConfigAndApplyDefaults();
+ private static final Logger LOGGER = Logger.getLogger(CCDriver.class.getName());
- ClusterControllerService ccService = new ClusterControllerService(ccConfig);
+ private CCDriver() {
+ }
+
+ public static void main(String[] args) throws Exception {
+ try {
+ final ConfigManager configManager = new ConfigManager(args);
+ ICCApplicationEntryPoint appEntryPoint = getAppEntryPoint(args);
+ appEntryPoint.registerConfig(configManager);
+ CCConfig ccConfig = new CCConfig(configManager);
+ ClusterControllerService ccService = new ClusterControllerService(ccConfig, appEntryPoint);
ccService.start();
while (true) {
Thread.sleep(100000);
}
+ } catch (CmdLineException e) {
+ LOGGER.log(Level.FINE, "Exception parsing command line: " + Arrays.toString(args), e);
+ System.exit(2);
} catch (Exception e) {
- e.printStackTrace();
+ LOGGER.log(Level.SEVERE, "Exiting NCDriver due to exception", e);
System.exit(1);
}
}
+
+ private static ICCApplicationEntryPoint getAppEntryPoint(String[] args)
+ throws ClassNotFoundException, InstantiationException, IllegalAccessException, IOException {
+ // determine app class so that we can use the correct implementation of the configuration...
+ String appClassName = ConfigUtils.getOptionValue(args, APP_CLASS);
+ return appClassName != null ? (ICCApplicationEntryPoint) (Class.forName(appClassName)).newInstance()
+ : CCApplicationEntryPoint.INSTANCE;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index c47284c..21b9dcf 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -20,6 +20,7 @@
import java.io.File;
import java.io.FileReader;
+import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.net.InetAddress;
@@ -32,20 +33,22 @@
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
+import java.util.TreeMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hyracks.api.application.ICCApplicationEntryPoint;
import org.apache.hyracks.api.client.ClusterControllerInfo;
import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.config.IApplicationConfig;
import org.apache.hyracks.api.context.ICCContext;
import org.apache.hyracks.api.deployment.DeploymentId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.HyracksException;
-import org.apache.hyracks.api.job.resource.DefaultJobCapacityController;
import org.apache.hyracks.api.job.resource.IJobCapacityController;
import org.apache.hyracks.api.service.IControllerService;
import org.apache.hyracks.api.topology.ClusterTopology;
@@ -66,9 +69,10 @@
import org.apache.hyracks.control.cc.work.RemoveDeadNodesWork;
import org.apache.hyracks.control.cc.work.ShutdownNCServiceWork;
import org.apache.hyracks.control.cc.work.TriggerNCWork;
+import org.apache.hyracks.control.common.config.ConfigManager;
import org.apache.hyracks.control.common.context.ServerContext;
import org.apache.hyracks.control.common.controllers.CCConfig;
-import org.apache.hyracks.control.common.controllers.IniUtils;
+import org.apache.hyracks.control.common.controllers.NCConfig;
import org.apache.hyracks.control.common.deployment.DeploymentRun;
import org.apache.hyracks.control.common.ipc.CCNCFunctions;
import org.apache.hyracks.control.common.logs.LogFile;
@@ -77,7 +81,6 @@
import org.apache.hyracks.ipc.api.IIPCI;
import org.apache.hyracks.ipc.impl.IPCSystem;
import org.apache.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
-import org.ini4j.Ini;
import org.xml.sax.InputSource;
public class ClusterControllerService implements IControllerService {
@@ -85,6 +88,8 @@
private final CCConfig ccConfig;
+ private final ConfigManager configManager;
+
private IPCSystem clusterIPC;
private IPCSystem clientIPC;
@@ -127,11 +132,22 @@
private ShutdownRun shutdownCallback;
- private ICCApplicationEntryPoint aep;
+ private final ICCApplicationEntryPoint aep;
- public ClusterControllerService(final CCConfig ccConfig) throws Exception {
- this.ccConfig = ccConfig;
- File jobLogFolder = new File(ccConfig.ccRoot, "logs/jobs");
+ public ClusterControllerService(final CCConfig config) throws Exception {
+ this(config, getApplicationEntryPoint(config));
+ }
+
+ public ClusterControllerService(final CCConfig config,
+ final ICCApplicationEntryPoint aep) throws Exception {
+ this.ccConfig = config;
+ this.configManager = ccConfig.getConfigManager();
+ if (aep == null) {
+ throw new IllegalArgumentException("ICCApplicationEntryPoint cannot be null");
+ }
+ this.aep = aep;
+ configManager.processConfig();
+ File jobLogFolder = new File(ccConfig.getRootDir(), "logs/jobs");
jobLog = new LogFile(jobLogFolder);
// WorkQueue is in charge of heartbeat as well as other events.
@@ -140,7 +156,8 @@
final ClusterTopology topology = computeClusterTopology(ccConfig);
ccContext = new ClusterControllerContext(topology);
sweeper = new DeadNodeSweeper();
- datasetDirectoryService = new DatasetDirectoryService(ccConfig.resultTTL, ccConfig.resultSweepThreshold);
+ datasetDirectoryService = new DatasetDirectoryService(ccConfig.getResultTTL(),
+ ccConfig.getResultSweepThreshold());
deploymentRunMap = new HashMap<>();
stateDumpRunMap = new HashMap<>();
@@ -151,10 +168,10 @@
}
private static ClusterTopology computeClusterTopology(CCConfig ccConfig) throws Exception {
- if (ccConfig.clusterTopologyDefinition == null) {
+ if (ccConfig.getClusterTopology() == null) {
return null;
}
- FileReader fr = new FileReader(ccConfig.clusterTopologyDefinition);
+ FileReader fr = new FileReader(ccConfig.getClusterTopology());
InputSource in = new InputSource(fr);
try {
return TopologyDefinitionParser.parse(in);
@@ -166,20 +183,21 @@
@Override
public void start() throws Exception {
LOGGER.log(Level.INFO, "Starting ClusterControllerService: " + this);
- serverCtx = new ServerContext(ServerContext.ServerType.CLUSTER_CONTROLLER, new File(ccConfig.ccRoot));
+ serverCtx = new ServerContext(ServerContext.ServerType.CLUSTER_CONTROLLER, new File(ccConfig.getRootDir()));
IIPCI ccIPCI = new ClusterControllerIPCI(this);
- clusterIPC = new IPCSystem(new InetSocketAddress(ccConfig.clusterNetPort), ccIPCI,
+ clusterIPC = new IPCSystem(new InetSocketAddress(ccConfig.getClusterListenPort()), ccIPCI,
new CCNCFunctions.SerializerDeserializer());
IIPCI ciIPCI = new ClientInterfaceIPCI(this);
- clientIPC = new IPCSystem(new InetSocketAddress(ccConfig.clientNetIpAddress, ccConfig.clientNetPort), ciIPCI,
- new JavaSerializationBasedPayloadSerializerDeserializer());
- webServer = new WebServer(this, ccConfig.httpPort);
+ clientIPC = new IPCSystem(
+ new InetSocketAddress(ccConfig.getClientListenAddress(), ccConfig.getClientListenPort()),
+ ciIPCI, new JavaSerializationBasedPayloadSerializerDeserializer());
+ webServer = new WebServer(this, ccConfig.getConsoleListenPort());
clusterIPC.start();
clientIPC.start();
webServer.start();
- info = new ClusterControllerInfo(ccConfig.clientNetIpAddress, ccConfig.clientNetPort,
+ info = new ClusterControllerInfo(ccConfig.getClientListenAddress(), ccConfig.getClientListenPort(),
webServer.getListeningPort());
- timer.schedule(sweeper, 0, ccConfig.heartbeatPeriod);
+ timer.schedule(sweeper, 0, ccConfig.getHeartbeatPeriod());
jobLog.open();
startApplication();
@@ -194,84 +212,62 @@
appCtx = new CCApplicationContext(this, serverCtx, ccContext, ccConfig.getAppConfig());
appCtx.addJobLifecycleListener(datasetDirectoryService);
executor = Executors.newCachedThreadPool(appCtx.getThreadFactory());
- String className = ccConfig.appCCMainClass;
-
- IJobCapacityController jobCapacityController = DefaultJobCapacityController.INSTANCE;
- if (className != null) {
- Class<?> c = Class.forName(className);
- aep = (ICCApplicationEntryPoint) c.newInstance();
- String[] args = ccConfig.appArgs == null ? null
- : ccConfig.appArgs.toArray(new String[ccConfig.appArgs.size()]);
- aep.start(appCtx, args);
- jobCapacityController = aep.getJobCapacityController();
- }
+ aep.start(appCtx, ccConfig.getAppArgsArray());
+ IJobCapacityController jobCapacityController = aep.getJobCapacityController();
// Job manager is in charge of job lifecycle management.
try {
Constructor<?> jobManagerConstructor = this.getClass().getClassLoader()
- .loadClass(ccConfig.jobManagerClassName)
+ .loadClass(ccConfig.getJobManagerClass())
.getConstructor(CCConfig.class, ClusterControllerService.class, IJobCapacityController.class);
jobManager = (IJobManager) jobManagerConstructor.newInstance(ccConfig, this, jobCapacityController);
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException | NoSuchMethodException
| InvocationTargetException e) {
if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.log(Level.WARNING, "class " + ccConfig.jobManagerClassName + " could not be used: ", e);
+ LOGGER.log(Level.WARNING, "class " + ccConfig.getJobManagerClass() + " could not be used: ", e);
}
// Falls back to the default implementation if the user-provided class name is not valid.
jobManager = new JobManager(ccConfig, this, jobCapacityController);
}
}
- private void connectNCs() throws Exception {
- Ini ini = ccConfig.getIni();
- if (ini == null || Boolean.parseBoolean(ini.get("cc", "virtual.cluster"))) {
- return;
- }
- for (String section : ini.keySet()) {
- if (!section.startsWith("nc/")) {
- continue;
+ private Map<String, Pair<String, Integer>> getNCServices() throws IOException {
+ Map<String, Pair<String, Integer>> ncMap = new TreeMap<>();
+ for (String ncId : configManager.getNodeNames()) {
+ IApplicationConfig ncConfig = configManager.getNodeEffectiveConfig(ncId);
+ if (!ncConfig.getBoolean(NCConfig.Option.VIRTUAL_NC)) {
+ ncMap.put(ncId, Pair.of(ncConfig.getString(NCConfig.Option.NCSERVICE_ADDRESS),
+ ncConfig.getInt(NCConfig.Option.NCSERVICE_PORT)));
}
- String ncid = section.substring(3);
- String address = IniUtils.getString(ini, section, "address", null);
- int port = IniUtils.getInt(ini, section, "port", 9090);
- if (address == null) {
- address = InetAddress.getLoopbackAddress().getHostAddress();
- }
- workQueue.schedule(new TriggerNCWork(this, address, port, ncid));
}
+ return ncMap;
+ }
+
+ private void connectNCs() throws IOException {
+ getNCServices().entrySet().forEach(ncService -> {
+ final TriggerNCWork triggerWork = new TriggerNCWork(ClusterControllerService.this,
+ ncService.getValue().getLeft(), ncService.getValue().getRight(), ncService.getKey());
+ workQueue.schedule(triggerWork);
+ });
}
private void terminateNCServices() throws Exception {
- Ini ini = ccConfig.getIni();
- if (ini == null || Boolean.parseBoolean(ini.get("cc", "virtual.cluster"))) {
- return;
- }
List<ShutdownNCServiceWork> shutdownNCServiceWorks = new ArrayList<>();
- for (String section : ini.keySet()) {
- if (!section.startsWith("nc/")) {
- continue;
- }
- String ncid = section.substring(3);
- String address = IniUtils.getString(ini, section, "address", null);
- int port = IniUtils.getInt(ini, section, "port", 9090);
- if (address == null) {
- address = InetAddress.getLoopbackAddress().getHostAddress();
- }
- ShutdownNCServiceWork shutdownWork = new ShutdownNCServiceWork(address, port, ncid);
+ getNCServices().entrySet().forEach(ncService -> {
+ ShutdownNCServiceWork shutdownWork = new ShutdownNCServiceWork(ncService.getValue().getLeft(),
+ ncService.getValue().getRight(), ncService.getKey());
workQueue.schedule(shutdownWork);
shutdownNCServiceWorks.add(shutdownWork);
- }
+ });
for (ShutdownNCServiceWork shutdownWork : shutdownNCServiceWorks) {
shutdownWork.sync();
}
}
private void notifyApplication() throws Exception {
- if (aep != null) {
- // Sometimes, there is no application entry point. Check hyracks-client project
- aep.startupCompleted();
- }
+ aep.startupCompleted();
}
+
public void stop(boolean terminateNCService) throws Exception {
if (terminateNCService) {
terminateNCServices();
@@ -294,9 +290,7 @@
}
private void stopApplication() throws Exception {
- if (aep != null) {
- aep.stop();
- }
+ aep.stop();
}
public ServerContext getServerContext() {
@@ -360,7 +354,7 @@
}
public NetworkAddress getDatasetDirectoryServiceInfo() {
- return new NetworkAddress(ccConfig.clientNetIpAddress, ccConfig.clientNetPort);
+ return new NetworkAddress(ccConfig.getClientListenAddress(), ccConfig.getClientListenPort());
}
private final class ClusterControllerContext implements ICCContext {
@@ -390,6 +384,7 @@
public ClusterTopology getClusterTopology() {
return topology;
}
+
}
private class DeadNodeSweeper extends TimerTask {
@@ -458,4 +453,14 @@
public ThreadDumpRun removeThreadDumpRun(String requestKey) {
return threadDumpRunMap.remove(requestKey);
}
+
+ private static ICCApplicationEntryPoint getApplicationEntryPoint(CCConfig ccConfig)
+ throws ClassNotFoundException, IllegalAccessException, InstantiationException {
+ if (ccConfig.getAppClass() != null) {
+ Class<?> c = Class.forName(ccConfig.getAppClass());
+ return (ICCApplicationEntryPoint) c.newInstance();
+ } else {
+ return CCApplicationEntryPoint.INSTANCE;
+ }
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
index 955b7f2..8400a59 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
@@ -282,7 +282,7 @@
public synchronized ObjectNode toSummaryJSON() {
ObjectMapper om = new ObjectMapper();
ObjectNode o = om.createObjectNode();
- o.put("node-id", ncConfig.nodeId);
+ o.put("node-id", ncConfig.getNodeId());
o.put("heap-used", heapUsedSize[(rrdPtr + RRD_SIZE - 1) % RRD_SIZE]);
o.put("system-load-average", systemLoadAverage[(rrdPtr + RRD_SIZE - 1) % RRD_SIZE]);
@@ -293,7 +293,7 @@
ObjectMapper om = new ObjectMapper();
ObjectNode o = om.createObjectNode();
- o.put("node-id", ncConfig.nodeId);
+ o.put("node-id", ncConfig.getNodeId());
if (includeConfig) {
o.put("os-name", osName);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java
index 77b9b17..a8b03bc 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java
@@ -27,9 +27,10 @@
import java.util.Map;
import java.util.Set;
-import org.apache.hyracks.api.application.IApplicationConfig;
import org.apache.hyracks.api.application.ICCApplicationContext;
import org.apache.hyracks.api.application.IClusterLifecycleListener;
+import org.apache.hyracks.api.config.IApplicationConfig;
+import org.apache.hyracks.api.config.IOption;
import org.apache.hyracks.api.context.ICCContext;
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.job.IJobLifecycleListener;
@@ -105,7 +106,7 @@
clusterLifecycleListeners.add(clusterLifecycleListener);
}
- public void notifyNodeJoin(String nodeId, Map<String, String> ncConfiguration) throws HyracksException {
+ public void notifyNodeJoin(String nodeId, Map<IOption, Object> ncConfiguration) throws HyracksException {
for (IClusterLifecycleListener l : clusterLifecycleListeners) {
l.notifyNodeJoin(nodeId, ncConfiguration);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
index 354019c..d6d8bc4 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
@@ -137,7 +137,7 @@
Map.Entry<String, NodeControllerState> entry = nodeIterator.next();
String nodeId = entry.getKey();
NodeControllerState state = entry.getValue();
- if (state.incrementLastHeartbeatDuration() >= ccConfig.maxHeartbeatLapsePeriods) {
+ if (state.incrementLastHeartbeatDuration() >= ccConfig.getHeartbeatMaxMisses()) {
deadNodes.add(nodeId);
affectedJobIds.addAll(state.getActiveJobIds());
// Removes the node from node map.
@@ -172,10 +172,7 @@
// Retrieves the IP address for a given node.
private InetAddress getIpAddress(NodeControllerState ncState) throws HyracksException {
- String ipAddress = ncState.getNCConfig().dataIPAddress;
- if (ncState.getNCConfig().dataPublicIPAddress != null) {
- ipAddress = ncState.getNCConfig().dataPublicIPAddress;
- }
+ String ipAddress = ncState.getNCConfig().getDataPublicAddress();
try {
return InetAddress.getByName(ipAddress);
} catch (UnknownHostException e) {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index 031303b..b35de3d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -68,13 +68,13 @@
this.ccs = ccs;
this.jobCapacityController = jobCapacityController;
try {
- Constructor<?> jobQueueConstructor = this.getClass().getClassLoader().loadClass(ccConfig.jobQueueClassName)
+ Constructor<?> jobQueueConstructor = this.getClass().getClassLoader().loadClass(ccConfig.getJobQueueClass())
.getConstructor(IJobManager.class, IJobCapacityController.class);
jobQueue = (IJobQueue) jobQueueConstructor.newInstance(this, this.jobCapacityController);
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException | NoSuchMethodException
| InvocationTargetException e) {
if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.log(Level.WARNING, "class " + ccConfig.jobQueueClassName + " could not be used: ", e);
+ LOGGER.log(Level.WARNING, "class " + ccConfig.getJobQueueClass() + " could not be used: ", e);
}
// Falls back to the default implementation if the user-provided class name is not valid.
jobQueue = new FIFOJobQueue(this, jobCapacityController);
@@ -85,13 +85,13 @@
@Override
protected boolean removeEldestEntry(Map.Entry<JobId, JobRun> eldest) {
- return size() > ccConfig.jobHistorySize;
+ return size() > ccConfig.getJobHistorySize();
}
};
runMapHistory = new LinkedHashMap<JobId, List<Exception>>() {
private static final long serialVersionUID = 1L;
/** history size + 1 is for the case when history size = 0 */
- private int allowedSize = 100 * (ccConfig.jobHistorySize + 1);
+ private int allowedSize = 100 * (ccConfig.getJobHistorySize() + 1);
@Override
protected boolean removeEldestEntry(Map.Entry<JobId, List<Exception>> eldest) {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeDetailsJSONWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeDetailsJSONWork.java
index 0577002..3dec959 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeDetailsJSONWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeDetailsJSONWork.java
@@ -27,28 +27,26 @@
import java.lang.management.OperatingSystemMXBean;
import java.lang.management.RuntimeMXBean;
import java.lang.management.ThreadMXBean;
-import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Date;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.hyracks.control.cc.NodeControllerState;
-import org.apache.hyracks.control.cc.cluster.INodeManager;
-import org.apache.hyracks.control.common.controllers.CCConfig;
-import org.apache.hyracks.control.common.utils.PidHelper;
-import org.apache.hyracks.control.common.work.IPCResponder;
-import org.apache.hyracks.control.common.work.SynchronizableWork;
-import org.kohsuke.args4j.Option;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.hyracks.api.config.Section;
+import org.apache.hyracks.control.cc.NodeControllerState;
+import org.apache.hyracks.control.cc.cluster.INodeManager;
+import org.apache.hyracks.control.common.config.ConfigUtils;
+import org.apache.hyracks.control.common.controllers.CCConfig;
+import org.apache.hyracks.control.common.controllers.NCConfig;
+import org.apache.hyracks.control.common.utils.PidHelper;
+import org.apache.hyracks.control.common.work.IPCResponder;
+import org.apache.hyracks.control.common.work.SynchronizableWork;
public class GetNodeDetailsJSONWork extends SynchronizableWork {
- private static final Logger LOGGER = Logger.getLogger(GetNodeDetailsJSONWork.class.getName());
+ private static final Section [] CC_SECTIONS = { Section.CC, Section.COMMON };
+ private static final Section [] NC_SECTIONS = { Section.NC, Section.COMMON };
+
private final INodeManager nodeManager;
private final CCConfig ccConfig;
private final String nodeId;
@@ -59,7 +57,7 @@
private ObjectMapper om = new ObjectMapper();
public GetNodeDetailsJSONWork(INodeManager nodeManager, CCConfig ccConfig, String nodeId, boolean includeStats,
- boolean includeConfig, IPCResponder<String> callback) {
+ boolean includeConfig, IPCResponder<String> callback) {
this.nodeManager = nodeManager;
this.ccConfig = ccConfig;
this.nodeId = nodeId;
@@ -69,7 +67,7 @@
}
public GetNodeDetailsJSONWork(INodeManager nodeManager, CCConfig ccConfig, String nodeId, boolean includeStats,
- boolean includeConfig) {
+ boolean includeConfig) {
this(nodeManager, ccConfig, nodeId, includeStats, includeConfig, null);
}
@@ -79,14 +77,18 @@
// null nodeId is a request for CC
detail = getCCDetails();
if (includeConfig) {
- addIni(detail, ccConfig);
+ ConfigUtils.addConfigToJSON(detail, ccConfig.getAppConfig(), CC_SECTIONS);
+ detail.putPOJO("app.args", ccConfig.getAppArgs());
}
} else {
NodeControllerState ncs = nodeManager.getNodeControllerState(nodeId);
if (ncs != null) {
detail = ncs.toDetailedJSON(includeStats, includeConfig);
if (includeConfig) {
- addIni(detail, ncs.getNCConfig());
+ final NCConfig ncConfig = ncs.getNCConfig();
+ ConfigUtils.addConfigToJSON(detail, ncConfig.getConfigManager().getNodeEffectiveConfig(nodeId),
+ NC_SECTIONS);
+ detail.putPOJO("app.args", ncConfig.getAppArgs());
}
}
}
@@ -96,7 +98,7 @@
}
}
- private ObjectNode getCCDetails() {
+ private ObjectNode getCCDetails() {
ObjectNode o = om.createObjectNode();
MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
List<GarbageCollectorMXBean> gcMXBeans = ManagementFactory.getGarbageCollectorMXBeans();
@@ -151,33 +153,6 @@
return o;
}
- private static void addIni(ObjectNode o, Object configBean) {
- Map<String, Object> iniMap = new HashMap<>();
- for (Field f : configBean.getClass().getFields()) {
- Option option = f.getAnnotation(Option.class);
- if (option == null) {
- continue;
- }
- final String optionName = option.name();
- Object value = null;
- try {
- value = f.get(configBean);
- } catch (IllegalAccessException e) {
- LOGGER.log(Level.WARNING, "Unable to access ini option " + optionName, e);
- }
- if (value != null) {
- if ("--".equals(optionName)) {
- iniMap.put("app_args", value);
- } else {
- iniMap.put(optionName.substring(1).replace('-', '_'),
- "-iodevices".equals(optionName)
- ? String.valueOf(value).split(",")
- : value);
- }
- }
- }
- o.putPOJO("ini", iniMap);
- }
public ObjectNode getDetail() {
return detail;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
index dc93515..e97950e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
@@ -23,6 +23,8 @@
import java.util.logging.Level;
import java.util.logging.Logger;
+import org.apache.hyracks.api.config.IApplicationConfig;
+import org.apache.hyracks.api.config.IOption;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.cc.NodeControllerState;
import org.apache.hyracks.control.cc.cluster.INodeManager;
@@ -50,19 +52,22 @@
String id = reg.getNodeId();
IIPCHandle ncIPCHandle = ccs.getClusterIPC().getHandle(reg.getNodeControllerAddress());
CCNCFunctions.NodeRegistrationResult result;
- Map<String, String> ncConfiguration = new HashMap<>();
+ Map<IOption, Object> ncConfiguration = new HashMap<>();
try {
INodeController nodeController = new NodeControllerRemoteProxy(ncIPCHandle);
NodeControllerState state = new NodeControllerState(nodeController, reg);
INodeManager nodeManager = ccs.getNodeManager();
nodeManager.addNode(id, state);
- state.getNCConfig().toMap(ncConfiguration);
+ IApplicationConfig cfg = state.getNCConfig().getConfigManager().getNodeEffectiveConfig(id);
+ for (IOption option : cfg.getOptions()) {
+ ncConfiguration.put(option, cfg.get(option));
+ }
LOGGER.log(Level.INFO, "Registered INodeController: id = " + id);
NodeParameters params = new NodeParameters();
params.setClusterControllerInfo(ccs.getClusterControllerInfo());
params.setDistributedState(ccs.getApplicationContext().getDistributedState());
- params.setHeartbeatPeriod(ccs.getCCConfig().heartbeatPeriod);
- params.setProfileDumpPeriod(ccs.getCCConfig().profileDumpPeriod);
+ params.setHeartbeatPeriod(ccs.getCCConfig().getHeartbeatPeriod());
+ params.setProfileDumpPeriod(ccs.getCCConfig().getProfileDumpPeriod());
result = new CCNCFunctions.NodeRegistrationResult(params, null);
} catch (Exception e) {
result = new CCNCFunctions.NodeRegistrationResult(null, e);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TriggerNCWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TriggerNCWork.java
index a7bca25..ab526e8 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TriggerNCWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TriggerNCWork.java
@@ -27,7 +27,9 @@
import java.util.logging.Level;
import java.util.logging.Logger;
+import org.apache.hyracks.api.config.Section;
import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.common.controllers.NCConfig;
import org.apache.hyracks.control.common.controllers.ServiceConstants.ServiceCommand;
import org.apache.hyracks.control.common.work.AbstractWork;
import org.ini4j.Ini;
@@ -79,14 +81,18 @@
/**
* Given an Ini object, serialize it to String with some enhancements.
- * @param ccini
+ * @param ccini the ini file to decorate and forward to NC
*/
- String serializeIni(Ini ccini) throws IOException {
+ private String serializeIni(Ini ccini) throws IOException {
StringWriter iniString = new StringWriter();
- ccini.store(iniString);
+ ccini.get(Section.NC.sectionName()).putIfAbsent(NCConfig.Option.CLUSTER_ADDRESS.ini(),
+ ccs.getCCConfig().getClusterPublicAddress());
+ ccini.get(Section.NC.sectionName()).putIfAbsent(NCConfig.Option.CLUSTER_PORT.ini(),
+ String.valueOf(ccs.getCCConfig().getClusterPublicPort()));
// Finally insert *this* NC's name into localnc section - this is a fixed
// entry point so that NCs can determine where all their config is.
- iniString.append("\n[localnc]\nid=").append(ncId).append("\n");
+ ccini.put(Section.LOCALNC.sectionName(), NCConfig.Option.NODE_ID.ini(), ncId);
+ ccini.store(iniString);
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine("Returning Ini file:\n" + iniString.toString());
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
index c742a4a..dde3bad 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java
@@ -46,8 +46,8 @@
public void testNormal() throws HyracksException {
IResourceManager resourceManager = new ResourceManager();
INodeManager nodeManager = new NodeManager(makeCCConfig(), resourceManager);
- NodeControllerState ncState1 = mockNodeControllerState(false);
- NodeControllerState ncState2 = mockNodeControllerState(false);
+ NodeControllerState ncState1 = mockNodeControllerState(NODE1, false);
+ NodeControllerState ncState2 = mockNodeControllerState(NODE2, false);
// Verifies states after adding nodes.
nodeManager.addNode(NODE1, ncState1);
@@ -71,7 +71,7 @@
public void testException() throws HyracksException {
IResourceManager resourceManager = new ResourceManager();
INodeManager nodeManager = new NodeManager(makeCCConfig(), resourceManager);
- NodeControllerState ncState1 = mockNodeControllerState(true);
+ NodeControllerState ncState1 = mockNodeControllerState(NODE1, true);
boolean invalidNetworkAddress = false;
// Verifies states after a failure during adding nodes.
@@ -106,11 +106,11 @@
private CCConfig makeCCConfig() {
CCConfig ccConfig = new CCConfig();
- ccConfig.maxHeartbeatLapsePeriods = 0;
+ ccConfig.setHeartbeatMaxMisses(0);
return ccConfig;
}
- private NodeControllerState mockNodeControllerState(boolean invalidIpAddr) {
+ private NodeControllerState mockNodeControllerState(String nodeId, boolean invalidIpAddr) {
NodeControllerState ncState = mock(NodeControllerState.class);
String ipAddr = invalidIpAddr ? "255.255.255:255" : "127.0.0.2";
NetworkAddress dataAddr = new NetworkAddress(ipAddr, 1001);
@@ -120,8 +120,8 @@
when(ncState.getDataPort()).thenReturn(dataAddr);
when(ncState.getDatasetPort()).thenReturn(resultAddr);
when(ncState.getMessagingPort()).thenReturn(msgAddr);
- NCConfig ncConfig = new NCConfig();
- ncConfig.dataIPAddress = ipAddr;
+ NCConfig ncConfig = new NCConfig(nodeId);
+ ncConfig.setDataPublicAddress(ipAddr);
when(ncState.getNCConfig()).thenReturn(ncConfig);
return ncState;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
index 3bb08bd..97b05e7 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/job/JobManagerTest.java
@@ -27,6 +27,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -48,14 +49,23 @@
import org.apache.hyracks.control.common.controllers.CCConfig;
import org.apache.hyracks.control.common.logs.LogFile;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
+import org.kohsuke.args4j.CmdLineException;
import org.mockito.Mockito;
public class JobManagerTest {
+ private CCConfig ccConfig;
+
+ @Before
+ public void setup() throws IOException, CmdLineException {
+ ccConfig = new CCConfig();
+ ccConfig.getConfigManager().processConfig();
+ }
+
@Test
- public void test() throws HyracksException {
- CCConfig ccConfig = new CCConfig();
+ public void test() throws IOException, CmdLineException {
IJobCapacityController jobCapacityController = mock(IJobCapacityController.class);
IJobManager jobManager = spy(new JobManager(ccConfig, mockClusterControllerService(), jobCapacityController));
@@ -114,7 +124,7 @@
}
Assert.assertTrue(jobManager.getRunningJobs().size() == 4096);
Assert.assertTrue(jobManager.getPendingJobs().isEmpty());
- Assert.assertTrue(jobManager.getArchivedJobs().size() == ccConfig.jobHistorySize);
+ Assert.assertTrue(jobManager.getArchivedJobs().size() == ccConfig.getJobHistorySize());
// Completes deferred jobs.
for (JobRun run : deferredRuns) {
@@ -123,14 +133,13 @@
}
Assert.assertTrue(jobManager.getRunningJobs().isEmpty());
Assert.assertTrue(jobManager.getPendingJobs().isEmpty());
- Assert.assertTrue(jobManager.getArchivedJobs().size() == ccConfig.jobHistorySize);
+ Assert.assertTrue(jobManager.getArchivedJobs().size() == ccConfig.getJobHistorySize());
verify(jobManager, times(8192)).prepareComplete(any(), any(), any());
verify(jobManager, times(8192)).finalComplete(any());
}
@Test
public void testExceedMax() throws HyracksException {
- CCConfig ccConfig = new CCConfig();
IJobCapacityController jobCapacityController = mock(IJobCapacityController.class);
IJobManager jobManager = spy(new JobManager(ccConfig, mockClusterControllerService(), jobCapacityController));
boolean rejected = false;
@@ -154,7 +163,6 @@
@Test
public void testAdmitThenReject() throws HyracksException {
- CCConfig ccConfig = new CCConfig();
IJobCapacityController jobCapacityController = mock(IJobCapacityController.class);
IJobManager jobManager = spy(new JobManager(ccConfig, mockClusterControllerService(), jobCapacityController));
@@ -185,7 +193,6 @@
@Test
public void testNullJob() throws HyracksException {
- CCConfig ccConfig = new CCConfig();
IJobCapacityController jobCapacityController = mock(IJobCapacityController.class);
IJobManager jobManager = new JobManager(ccConfig, mockClusterControllerService(), jobCapacityController);
boolean invalidParameter = false;
@@ -249,7 +256,7 @@
}
Assert.assertTrue(jobManager.getPendingJobs().isEmpty());
- Assert.assertTrue(jobManager.getArchivedJobs().size() == ccConfig.jobHistorySize);
+ Assert.assertTrue(jobManager.getArchivedJobs().size() == ccConfig.getJobHistorySize());
verify(jobManager, times(0)).prepareComplete(any(), any(), any());
verify(jobManager, times(0)).finalComplete(any());
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/pom.xml b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/pom.xml
index 08783cc..bd6960a 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/pom.xml
@@ -48,7 +48,6 @@
<dependency>
<groupId>args4j</groupId>
<artifactId>args4j</artifactId>
- <version>2.0.12</version>
</dependency>
<dependency>
<groupId>org.apache.hyracks</groupId>
@@ -67,5 +66,14 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-util</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-collections4</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ApplicationContext.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ApplicationContext.java
index 06bcda3..42bc636 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ApplicationContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ApplicationContext.java
@@ -21,7 +21,7 @@
import java.io.Serializable;
import java.util.concurrent.ThreadFactory;
-import org.apache.hyracks.api.application.IApplicationConfig;
+import org.apache.hyracks.api.config.IApplicationConfig;
import org.apache.hyracks.api.application.IApplicationContext;
import org.apache.hyracks.api.job.IJobSerializerDeserializerContainer;
import org.apache.hyracks.api.job.JobSerializerDeserializerContainer;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ConfigManagerApplicationConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ConfigManagerApplicationConfig.java
new file mode 100644
index 0000000..92e90e7
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ConfigManagerApplicationConfig.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.common.application;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Predicate;
+
+import org.apache.hyracks.api.config.IApplicationConfig;
+import org.apache.hyracks.api.config.IOption;
+import org.apache.hyracks.api.config.Section;
+import org.apache.hyracks.control.common.config.ConfigManager;
+
+/**
+ * An implementation of IApplicationConfig which is backed by the Config Manager.
+ */
+public class ConfigManagerApplicationConfig implements IApplicationConfig, Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final ConfigManager configManager;
+
+ public ConfigManagerApplicationConfig(ConfigManager configManager) {
+ this.configManager = configManager;
+ }
+
+ @Override
+ public String getString(String section, String key) {
+ return (String)get(section, key);
+ }
+
+ @Override
+ public int getInt(String section, String key) {
+ return (int)get(section, key);
+ }
+
+ @Override
+ public long getLong(String section, String key) {
+ return (long)get(section, key);
+ }
+
+ @Override
+ public Set<String> getSectionNames() {
+ return configManager.getSectionNames();
+ }
+
+ @Override
+ public Set<Section> getSections() {
+ return configManager.getSections();
+ }
+
+ @Override
+ public Set<Section> getSections(Predicate<Section> predicate) {
+ return configManager.getSections(predicate);
+ }
+
+ @Override
+ public Set<String> getKeys(String section) {
+ return configManager.getOptionNames(section);
+ }
+
+ private Object get(String section, String key) {
+ return get(configManager.lookupOption(section, key));
+ }
+
+ @Override
+ public Object getStatic(IOption option) {
+ return configManager.get(option);
+ }
+
+ @Override
+ public List<String> getNCNames() {
+ return configManager.getNodeNames();
+ }
+
+ @Override
+ public IOption lookupOption(String sectionName, String propertyName) {
+ return configManager.lookupOption(sectionName, propertyName);
+ }
+
+ @Override
+ public Set<IOption> getOptions() {
+ return configManager.getOptions();
+ }
+
+ @Override
+ public Set<IOption> getOptions(Section section) {
+ return configManager.getOptions(section);
+ }
+
+ @Override
+ public IApplicationConfig getNCEffectiveConfig(String nodeId) {
+ return configManager.getNodeEffectiveConfig(nodeId);
+ }
+
+ public ConfigManager getConfigManager() {
+ return configManager;
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/IniApplicationConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/IniApplicationConfig.java
deleted file mode 100644
index 53db11c..0000000
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/IniApplicationConfig.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.control.common.application;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hyracks.api.application.IApplicationConfig;
-import org.apache.hyracks.control.common.controllers.IniUtils;
-import org.ini4j.Ini;
-import org.ini4j.Profile.Section;
-
-/**
- * An implementation of IApplicationConfig which is backed by Ini4j.
- */
-public class IniApplicationConfig implements IApplicationConfig {
- private final Ini ini;
-
- public IniApplicationConfig(Ini ini) {
- if (ini != null) {
- this.ini = ini;
- } else {
- this.ini = new Ini();
- }
- }
-
- @Override
- public String getString(String section, String key) {
- return IniUtils.getString(ini, section, key, null);
- }
-
- @Override
- public String getString(String section, String key, String defaultValue) {
- return IniUtils.getString(ini, section, key, defaultValue);
- }
-
- @Override
- public String[] getStringArray(String section, String key) {
- return IniUtils.getStringArray(ini, section, key);
- }
-
- @Override
- public int getInt(String section, String key) {
- return IniUtils.getInt(ini, section, key, 0);
- }
-
- @Override
- public int getInt(String section, String key, int defaultValue) {
- return IniUtils.getInt(ini, section, key, defaultValue);
- }
-
- @Override
- public long getLong(String section, String key) {
- return IniUtils.getLong(ini, section, key, 0);
- }
-
- @Override
- public long getLong(String section, String key, long defaultValue) {
- return IniUtils.getLong(ini, section, key, defaultValue);
- }
-
- @Override
- public Set<String> getSections() {
- return ini.keySet();
- }
-
- @Override
- public Set<String> getKeys(String section) {
- return ini.get(section).keySet();
- }
-
- @Override
- public List<Set<Map.Entry<String, String>>> getMultiSections(String section) {
- List<Set<Map.Entry<String, String>>> list = new ArrayList<>();
- List<Section> secs = getMulti(section);
- if (secs != null) {
- for (Section sec : secs) {
- list.add(sec.entrySet());
- }
- }
- return list;
- }
-
- private List<Section> getMulti(String section) {
- return ini.getAll(section);
- }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/Args4jArgument.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/Args4jArgument.java
new file mode 100644
index 0000000..de9b543
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/Args4jArgument.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.common.config;
+
+import java.lang.annotation.Annotation;
+
+import org.kohsuke.args4j.Argument;
+import org.kohsuke.args4j.spi.OptionHandler;
+import org.kohsuke.args4j.spi.StringOptionHandler;
+
+@SuppressWarnings("ClassExplicitlyAnnotation")
+public class Args4jArgument implements Argument {
+ @Override
+ public String usage() {
+ return "";
+ }
+
+ @Override
+ public String metaVar() {
+ return "";
+ }
+
+ @Override
+ public boolean required() {
+ return false;
+ }
+
+ @Override
+ public boolean hidden() {
+ return false;
+ }
+
+ @Override
+ public Class<? extends OptionHandler> handler() {
+ return StringOptionHandler.class;
+ }
+
+ @Override
+ public int index() {
+ return 0;
+ }
+
+ @Override
+ public boolean multiValued() {
+ return true;
+ }
+
+ @Override
+ public Class<? extends Annotation> annotationType() {
+ return Argument.class;
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/Args4jOption.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/Args4jOption.java
new file mode 100644
index 0000000..c904d0b
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/Args4jOption.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.common.config;
+
+import java.lang.annotation.Annotation;
+
+import org.apache.hyracks.api.config.IOption;
+import org.kohsuke.args4j.Option;
+import org.kohsuke.args4j.spi.ExplicitBooleanOptionHandler;
+import org.kohsuke.args4j.spi.IntOptionHandler;
+import org.kohsuke.args4j.spi.OptionHandler;
+import org.kohsuke.args4j.spi.StringOptionHandler;
+
+@SuppressWarnings("ClassExplicitlyAnnotation")
+class Args4jOption implements Option {
+ private final IOption option;
+ private final ConfigManager configManager;
+ private final Class targetType;
+
+ Args4jOption(IOption option, ConfigManager configManager, Class targetType) {
+ this.option = option;
+ this.targetType = targetType;
+ this.configManager = configManager;
+ }
+
+ @Override
+ public String name() {
+ return option.cmdline();
+ }
+
+ @Override
+ public String[] aliases() {
+ return new String[0];
+ }
+
+ @Override
+ public String usage() {
+ return configManager.getUsage(option);
+ }
+
+ @Override
+ public String metaVar() {
+ return "";
+ }
+
+ @Override
+ public boolean required() {
+ return false;
+ }
+
+ @Override
+ public boolean help() {
+ return false;
+ }
+
+ @Override
+ public boolean hidden() {
+ return option.hidden();
+ }
+
+ @Override
+ public Class<? extends OptionHandler> handler() {
+ if (targetType.equals(Boolean.class)) {
+ return ExplicitBooleanOptionHandler.class;
+ } else if (targetType.equals(Integer.class)) {
+ return IntOptionHandler.class;
+ } else {
+ return StringOptionHandler.class;
+ }
+ }
+
+ @Override
+ public String[] depends() {
+ return new String[0];
+ }
+
+ @Override
+ public String[] forbids() {
+ return new String[0];
+ }
+
+ @Override
+ public Class<? extends Annotation> annotationType() {
+ return Option.class;
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/Args4jSetter.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/Args4jSetter.java
new file mode 100644
index 0000000..1367ef0
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/Args4jSetter.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.common.config;
+
+import java.lang.reflect.AnnotatedElement;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
+import org.apache.hyracks.api.config.IOption;
+import org.kohsuke.args4j.CmdLineException;
+import org.kohsuke.args4j.spi.FieldSetter;
+import org.kohsuke.args4j.spi.Setter;
+
+class Args4jSetter implements Setter {
+ private final IOption option;
+ private BiConsumer<IOption, Object> consumer;
+ private final boolean multiValued;
+ private final Class type;
+
+ Args4jSetter(IOption option, BiConsumer<IOption, Object> consumer, boolean multiValued) {
+ this.option = option;
+ this.consumer = consumer;
+ this.multiValued = multiValued;
+ this.type = option.type().targetType();
+ }
+
+ Args4jSetter(Consumer<Object> consumer, boolean multiValued, Class type) {
+ this.option = null;
+ this.consumer = (o, value) -> consumer.accept(value);
+ this.multiValued = multiValued;
+ this.type = type;
+ }
+
+ @Override
+ public void addValue(Object value) throws CmdLineException {
+ consumer.accept(option, value);
+ }
+
+ @Override
+ public Class getType() {
+ return type;
+ }
+
+ @Override
+ public boolean isMultiValued() {
+ return multiValued;
+ }
+
+ @Override
+ public FieldSetter asFieldSetter() {
+ return null;
+ }
+
+ @Override
+ public AnnotatedElement asAnnotatedElement() {
+ return null;
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java
new file mode 100644
index 0000000..bfe759e
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigManager.java
@@ -0,0 +1,549 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.common.config;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.commons.collections4.map.CompositeMap;
+import org.apache.commons.collections4.multimap.ArrayListValuedHashMap;
+import org.apache.hyracks.api.config.IApplicationConfig;
+import org.apache.hyracks.api.config.IConfigManager;
+import org.apache.hyracks.api.config.IConfigurator;
+import org.apache.hyracks.api.config.IOption;
+import org.apache.hyracks.api.config.Section;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.control.common.application.ConfigManagerApplicationConfig;
+import org.ini4j.Ini;
+import org.ini4j.Profile;
+import org.kohsuke.args4j.CmdLineException;
+import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.Option;
+import org.kohsuke.args4j.OptionHandlerFilter;
+
+public class ConfigManager implements IConfigManager, Serializable {
+
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOGGER = Logger.getLogger(ConfigManager.class.getName());
+
+ private HashSet<IOption> registeredOptions = new HashSet<>();
+ private HashMap<IOption, Object> definedMap = new HashMap<>();
+ private HashMap<IOption, Object> defaultMap = new HashMap<>();
+ private CompositeMap<IOption, Object> configurationMap = new CompositeMap<>(definedMap, defaultMap,
+ new NoOpMapMutator());
+ private EnumMap<Section, Map<String, IOption>> sectionMap = new EnumMap<>(Section.class);
+ private TreeMap<String, Map<IOption, Object>> nodeSpecificMap = new TreeMap<>();
+ private transient ArrayListValuedHashMap<IOption, IConfigSetter> optionSetters = new ArrayListValuedHashMap<>();
+ private final String[] args;
+ private ConfigManagerApplicationConfig appConfig = new ConfigManagerApplicationConfig(this);
+ private Set<String> allSections = new HashSet<>();
+ private transient Collection<Consumer<List<String>>> argListeners = new ArrayList<>();
+ private transient Collection<IOption> iniPointerOptions = new ArrayList<>();
+ private transient Collection<Section> cmdLineSections = new ArrayList<>();;
+ private transient OptionHandlerFilter usageFilter;
+ private transient SortedMap<Integer, List<IConfigurator>> configurators = new TreeMap<>();
+ private boolean configured;
+
+ public ConfigManager() {
+ this(null);
+ }
+
+ public ConfigManager(String[] args) {
+ this.args = args;
+ for (Section section : Section.values()) {
+ allSections.add(section.sectionName());
+ }
+ addConfigurator(PARSE_INI_POINTERS_METRIC, this::extractIniPointersFromCommandLine);
+ addConfigurator(PARSE_INI_METRIC, this::parseIni);
+ addConfigurator(PARSE_COMMAND_LINE_METRIC, this::processCommandLine);
+ addConfigurator(APPLY_DEFAULTS_METRIC, this::applyDefaults);
+ }
+
+ @Override
+ public void addConfigurator(int metric, IConfigurator configurator) {
+ configurators.computeIfAbsent(metric, metric1 -> new ArrayList<>()).add(configurator);
+ }
+
+ @Override
+ public void addIniParamOptions(IOption... options) {
+ Stream.of(options).forEach(iniPointerOptions::add);
+ }
+
+ @Override
+ public void addCmdLineSections(Section... sections) {
+ Stream.of(sections).forEach(cmdLineSections::add);
+ }
+
+ @Override
+ public void setUsageFilter(OptionHandlerFilter usageFilter) {
+ this.usageFilter = usageFilter;
+ }
+
+ @Override
+ public void register(IOption... options) {
+ for (IOption option : options) {
+ if (option.section() == Section.VIRTUAL || registeredOptions.contains(option)) {
+ continue;
+ }
+ if (configured) {
+ throw new IllegalStateException("configuration already processed");
+ }
+ LOGGER.fine("registering option: " + option.toIniString());
+ Map<String, IOption> optionMap = sectionMap.computeIfAbsent(option.section(), section -> new HashMap<>());
+ IOption prev = optionMap.put(option.ini(), option);
+ if (prev != null) {
+ if (prev != option) {
+ throw new IllegalStateException("An option cannot be defined multiple times: "
+ + option.toIniString() + ": " + Arrays.asList(option.getClass(), prev.getClass()));
+ }
+ } else {
+ registeredOptions.add(option);
+ optionSetters.put(option, (node, value, isDefault) -> correctedMap(node, isDefault).put(option, value));
+ if (LOGGER.isLoggable(Level.FINE)) {
+ optionSetters.put(option, (node, value, isDefault) -> LOGGER
+ .fine((isDefault ? "defaulting" : "setting ") + option.toIniString() + " to " + value));
+ }
+ }
+ }
+ }
+
+ private Map<IOption, Object> correctedMap(String node, boolean isDefault) {
+ return node == null ? (isDefault ? defaultMap : definedMap)
+ : nodeSpecificMap.computeIfAbsent(node, this::createNodeSpecificMap);
+ }
+
+ public void registerVirtualNode(String nodeId) {
+ LOGGER.fine("registerVirtualNode: " + nodeId);
+ nodeSpecificMap.computeIfAbsent(nodeId, this::createNodeSpecificMap);
+ }
+
+ private Map<IOption, Object> createNodeSpecificMap(String nodeId) {
+ LOGGER.fine("createNodeSpecificMap: " + nodeId);
+ return new HashMap<>();
+ }
+
+ @Override
+ @SafeVarargs
+ public final void register(final Class<? extends IOption>... optionClasses) {
+ for (Class<? extends IOption> optionClass : optionClasses) {
+ register(optionClass.getEnumConstants());
+ }
+ }
+
+ public IOption lookupOption(String section, String key) {
+ Map<String, IOption> map = getSectionOptionMap(Section.parseSectionName(section));
+ return map == null ? null : map.get(key);
+ }
+
+ public void processConfig()
+ throws CmdLineException, IOException {
+ if (!configured) {
+ for (List<IConfigurator> configuratorList : configurators.values()) {
+ for (IConfigurator configurator : configuratorList) {
+ configurator.run();
+ }
+ }
+ configured = true;
+ }
+ }
+
+ private void processCommandLine() throws CmdLineException {
+ List<String> appArgs = processCommandLine(cmdLineSections, usageFilter, this::cmdLineSet);
+ // now propagate the app args to the listeners...
+ argListeners.forEach(l -> l.accept(appArgs));
+ }
+
+ private void extractIniPointersFromCommandLine() throws CmdLineException {
+ Map<IOption, Object> cmdLineOptions = new HashMap<>();
+ processCommandLine(cmdLineSections, usageFilter, cmdLineOptions::put);
+ for (IOption option : iniPointerOptions) {
+ if (cmdLineOptions.containsKey(option)) {
+ set(option, cmdLineOptions.get(option));
+ }
+ }
+ }
+
+ private void cmdLineSet(IOption option, Object value) {
+ invokeSetters(option, option.type().parse(String.valueOf(value)), null);
+ }
+
+ private void invokeSetters(IOption option, Object value, String nodeId) {
+ optionSetters.get(option).forEach(setter -> setter.set(nodeId, value, false));
+ }
+
+ @SuppressWarnings({ "squid:S106", "squid:S1147" }) // use of System.err, System.exit()
+ private List<String> processCommandLine(Collection<Section> sections, OptionHandlerFilter usageFilter,
+ BiConsumer<IOption, Object> setAction)
+ throws CmdLineException {
+ final Args4jBean bean = new Args4jBean();
+ CmdLineParser cmdLineParser = new CmdLineParser(bean);
+ final List<String> appArgs = new ArrayList<>();
+ List<IOption> commandLineOptions = new ArrayList<>();
+ for (Map.Entry<Section, Map<String, IOption>> sectionMapEntry : sectionMap.entrySet()) {
+ if (!sections.contains(sectionMapEntry.getKey())) {
+ continue;
+ }
+ for (IOption option : sectionMapEntry.getValue().values()) {
+ if (option.section() != Section.VIRTUAL) {
+ commandLineOptions.add(option);
+ }
+ }
+ }
+ commandLineOptions.sort(Comparator.comparing(IOption::cmdline));
+
+ commandLineOptions.forEach(option -> cmdLineParser.addOption(new Args4jSetter(option, setAction, false),
+ new Args4jOption(option, this, option.type().targetType())));
+
+ if (!argListeners.isEmpty()) {
+ cmdLineParser.addArgument(new Args4jSetter(o -> appArgs.add(String.valueOf(o)), true, String.class),
+ new Args4jArgument());
+ }
+ LOGGER.fine("parsing cmdline: " + Arrays.toString(args));
+ try {
+ if (args == null || args.length == 0) {
+ LOGGER.info("no command line args supplied");
+ return appArgs;
+ }
+ cmdLineParser.parseArgument(args);
+ if (bean.help) {
+ ConfigUtils.printUsage(cmdLineParser, usageFilter, System.err);
+ System.exit(0);
+ }
+ } catch (CmdLineException e) {
+ if (bean.help) {
+ ConfigUtils.printUsage(cmdLineParser, usageFilter, System.err);
+ System.exit(0);
+ } else {
+ ConfigUtils.printUsage(e, usageFilter, System.err);
+ throw e;
+ }
+ }
+ return appArgs;
+ }
+
+ private void parseIni() throws IOException {
+ Ini ini = null;
+ for (IOption option : iniPointerOptions) {
+ Object pointer = get(option);
+ if (pointer instanceof String) {
+ ini = ConfigUtils.loadINIFile((String)pointer);
+ } else if (pointer instanceof URL) {
+ ini = ConfigUtils.loadINIFile((URL)pointer);
+ } else if (pointer != null) {
+ throw new IllegalArgumentException("config file pointer options must be of type String (for file) or " +
+ "URL, instead of " + option.type().targetType());
+ }
+ }
+ if (ini == null) {
+ LOGGER.info("no INI file specified; skipping parsing");
+ return;
+ }
+ LOGGER.info("parsing INI file: " + ini);
+ for (Profile.Section section : ini.values()) {
+ allSections.add(section.getName());
+ final Section rootSection = Section
+ .parseSectionName(section.getParent() == null ? section.getName() : section.getParent().getName());
+ String node;
+ if (rootSection == Section.EXTENSION) {
+ parseExtensionIniSection(section);
+ continue;
+ } else if (rootSection == Section.NC) {
+ node = section.getName().equals(section.getSimpleName()) ? null : section.getSimpleName();
+ } else if (Section.parseSectionName(section.getName()) != null) {
+ node = null;
+ } else {
+ throw new HyracksException("Unknown section in ini: " + section.getName());
+ }
+ Map<String, IOption> optionMap = getSectionOptionMap(rootSection);
+ for (Map.Entry<String, String> iniOption : section.entrySet()) {
+ String name = iniOption.getKey();
+ final IOption option = optionMap == null ? null : optionMap.get(name);
+ if (option == null) {
+ handleUnknownOption(section, name);
+ return;
+ }
+ final String value = iniOption.getValue();
+ LOGGER.fine("setting " + option.toIniString() + " to " + value);
+ final Object parsed = option.type().parse(value);
+ invokeSetters(option, parsed, node);
+ }
+ }
+ }
+
+ private void parseExtensionIniSection(Profile.Section section) {
+ // TODO(mblow): parse extensions
+ }
+
+ private void handleUnknownOption(Profile.Section section, String name) throws HyracksException {
+ Set<String> matches = new HashSet<>();
+ for (IOption registeredOption : registeredOptions) {
+ if (registeredOption.ini().equals(name)) {
+ matches.add(registeredOption.section().sectionName());
+ }
+ }
+ if (!matches.isEmpty()) {
+ throw new HyracksException(
+ "Section mismatch for [" + section.getName() + "] " + name + ", expected section(s) " + matches);
+ } else {
+ throw new HyracksException("Unknown option in ini: [" + section.getName() + "] " + name);
+ }
+ }
+
+ private void applyDefaults() {
+ LOGGER.fine("applying defaults");
+ for (Map.Entry<Section, Map<String, IOption>> entry : sectionMap.entrySet()) {
+ if (entry.getKey() == Section.NC) {
+ entry.getValue().values().forEach(option -> getNodeNames()
+ .forEach(node -> getOrDefault(getNodeEffectiveMap(node), option, node)));
+ for (Map.Entry<String, Map<IOption, Object>> nodeMap : nodeSpecificMap.entrySet()) {
+ entry.getValue().values()
+ .forEach(option -> getOrDefault(
+ new CompositeMap<>(nodeMap.getValue(), definedMap, new NoOpMapMutator()), option,
+ nodeMap.getKey()));
+ }
+ // also push the defaults to the shared map, if the CC requests NC properties, they should receive the
+ // defaults -- TODO (mblow): seems lame, should log warning on access
+ }
+ entry.getValue().values().forEach(option -> getOrDefault(configurationMap, option, null));
+ }
+ }
+
+ private Object getOrDefault(Map<IOption, Object> map, IOption option, String nodeId) {
+ if (map.containsKey(option)) {
+ return map.get(option);
+ } else {
+ Object value = resolveDefault(option, new ConfigManagerApplicationConfig(this) {
+ @Override
+ public Object getStatic(IOption option) {
+ return getOrDefault(map, option, nodeId);
+ }
+ });
+ if (value != null && optionSetters != null) {
+ optionSetters.get(option).forEach(setter -> setter.set(nodeId, value, true));
+ }
+ return value;
+ }
+ }
+
+ public Object resolveDefault(IOption option, IApplicationConfig applicationConfig) {
+ final Object value = option.defaultValue();
+ if (value instanceof IOption) {
+ return applicationConfig.get((IOption) value);
+ } else if (value instanceof Supplier) {
+ //noinspection unchecked
+ return ((Supplier<?>) value).get();
+ } else if (value instanceof Function) {
+ //noinspection unchecked
+ return ((Function<IApplicationConfig, ?>) value).apply(applicationConfig);
+ } else {
+ return value;
+ }
+ }
+
+ @Override
+ public Set<Section> getSections(Predicate<Section> predicate) {
+ return Arrays.stream(Section.values()).filter(predicate).collect(Collectors.toSet());
+ }
+
+ @Override
+ public Set<Section> getSections() {
+ return getSections(section -> true);
+ }
+
+ public Set<String> getSectionNames() {
+ return Collections.unmodifiableSet(allSections);
+ }
+
+ public Set<String> getOptionNames(String sectionName) {
+ Set<String> optionNames = new HashSet<>();
+ Section section = Section.parseSectionName(sectionName);
+ for (IOption option : getSectionOptionMap(section).values()) {
+ optionNames.add(option.ini());
+ }
+ return optionNames;
+ }
+
+ @Override
+ public Set<IOption> getOptions(Section section) {
+ return getSectionOptionMap(section).values().stream().collect(Collectors.toSet());
+ }
+
+ private Map<String, IOption> getSectionOptionMap(Section section) {
+ final Map<String, IOption> map = sectionMap.get(section);
+ return map != null ? map : Collections.emptyMap();
+ }
+
+ public List<String> getNodeNames() {
+ return Collections.unmodifiableList(new ArrayList(nodeSpecificMap.keySet()));
+ }
+
+ public IApplicationConfig getNodeEffectiveConfig(String nodeId) {
+ final Map<IOption, Object> nodeMap = nodeSpecificMap.computeIfAbsent(nodeId, this::createNodeSpecificMap);
+ Map<IOption, Object> nodeEffectiveMap = getNodeEffectiveMap(nodeId);
+ return new ConfigManagerApplicationConfig(this) {
+ @Override
+ public Object getStatic(IOption option) {
+ if (!nodeEffectiveMap.containsKey(option)) {
+ // we need to calculate the default the the context of the node specific map...
+ nodeMap.put(option, getOrDefault(nodeEffectiveMap, option, nodeId));
+ }
+ return nodeEffectiveMap.get(option);
+ }
+ };
+ }
+
+ private CompositeMap<IOption, Object> getNodeEffectiveMap(String nodeId) {
+ return new CompositeMap<>(nodeSpecificMap.get(nodeId), definedMap, new NoOpMapMutator());
+ }
+
+ public Ini toIni(boolean includeDefaults) {
+ Ini ini = new Ini();
+ for (Map.Entry<IOption, Object> entry : (includeDefaults ? configurationMap : definedMap).entrySet()) {
+ if (entry.getValue() != null) {
+ final IOption option = entry.getKey();
+ ini.add(option.section().sectionName(), option.ini(), option.type().serializeToIni(entry.getValue()));
+ }
+ }
+ for (Map.Entry<String, Map<IOption, Object>> nodeMapEntry : nodeSpecificMap.entrySet()) {
+ String section = Section.NC.sectionName() + "/" + nodeMapEntry.getKey();
+ for (Map.Entry<IOption, Object> entry : nodeMapEntry.getValue().entrySet()) {
+ if (entry.getValue() != null) {
+ final IOption option = entry.getKey();
+ ini.add(section, option.ini(), option.type().serializeToIni(entry.getValue()));
+ }
+ }
+ }
+ return ini;
+ }
+
+ public void set(IOption option, Object value) {
+ set(null, option, value);
+ }
+
+ public void set(String nodeId, IOption option, Object value) {
+ invokeSetters(option, value, nodeId);
+ }
+
+ public Object get(IOption option) {
+ if (!registeredOptions.contains(option)) {
+ throw new IllegalStateException("Option not registered with ConfigManager: " + option.toIniString() + "("
+ + option.getClass() + "." + option + ")");
+ } else if (option.section() == Section.NC) {
+ LOGGER.warning("NC option " + option.toIniString() + " being accessed outside of NC-scoped configuration.");
+ }
+ return getOrDefault(configurationMap, option, null);
+ }
+
+ public Set<IOption> getOptions() {
+ return Collections.unmodifiableSet(registeredOptions);
+ }
+
+ @Override
+ public IApplicationConfig getAppConfig() {
+ return appConfig;
+ }
+
+ public void registerArgsListener(Consumer<List<String>> argListener) {
+ argListeners.add(argListener);
+ }
+
+ String getUsage(IOption option) {
+ final String description = option.description();
+ StringBuilder usage = new StringBuilder();
+ if (description != null && !"".equals(description)) {
+ usage.append(description).append(" ");
+ } else {
+ LOGGER.warning("missing description for option: "
+ + option.getClass().getName().substring(option.getClass().getName().lastIndexOf(".") + 1) + "."
+ + option.name());
+ }
+ usage.append("(default: ");
+ usage.append(defaultTextForUsage(option, IOption::cmdline));
+ usage.append(")");
+ return usage.toString();
+ }
+
+ public String defaultTextForUsage(IOption option, Function<IOption, String> optionPrinter) {
+ StringBuilder buf = new StringBuilder();
+ String override = option.usageDefaultOverride(appConfig, optionPrinter);
+ if (override != null) {
+ buf.append(override);
+ } else {
+ final Object value = option.defaultValue();
+ if (value instanceof IOption) {
+ buf.append("same as ").append(optionPrinter.apply((IOption) value));
+ } else if (value instanceof Function) {
+ // TODO(mblow): defer usage calculation to enable evaluation of function
+ buf.append("<function>");
+ } else {
+ buf.append(option.type().serializeToString(resolveDefault(option, appConfig)));
+ }
+ // TODO(mblow): defer usage calculation to enable inclusion of evaluated actual default
+ }
+ return buf.toString();
+ }
+
+ private static class NoOpMapMutator implements CompositeMap.MapMutator<IOption, Object> {
+ @Override
+ public Object put(CompositeMap<IOption, Object> compositeMap, Map<IOption, Object>[] maps, IOption iOption,
+ Object o) {
+ throw new UnsupportedOperationException("mutations are not allowed");
+ }
+
+ @Override
+ public void putAll(CompositeMap<IOption, Object> compositeMap, Map<IOption, Object>[] maps,
+ Map<? extends IOption, ?> map) {
+ throw new UnsupportedOperationException("mutations are not allowed");
+ }
+
+ @Override
+ public void resolveCollision(CompositeMap<IOption, Object> compositeMap, Map<IOption, Object> map,
+ Map<IOption, Object> map1, Collection<IOption> collection) {
+ // no-op
+ }
+ }
+
+ private static class Args4jBean {
+ @Option(name = "-help", help = true)
+ boolean help;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigUtils.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigUtils.java
new file mode 100644
index 0000000..adf1774
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/ConfigUtils.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.common.config;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintStream;
+import java.lang.reflect.Field;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.function.Predicate;
+
+import org.apache.hyracks.api.config.IApplicationConfig;
+import org.apache.hyracks.api.config.IOption;
+import org.apache.hyracks.control.common.controllers.ControllerConfig;
+import org.ini4j.Ini;
+import org.kohsuke.args4j.CmdLineException;
+import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.OptionHandlerFilter;
+
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+/**
+ * Some utility functions for reading Ini4j objects with default values.
+ * For all getXxx() methods: if the 'section' contains a slash, and the 'key'
+ * is not found in that section, we will search for the key in the section named
+ * by stripping the leaf of the section name (final slash and anything following).
+ * eg. getInt(ini, "nc/red", "dir", null) will first look for the key "dir" in
+ * the section "nc/red", but if it is not found, will look in the section "nc".
+ */
+public class ConfigUtils {
+ private static final int USAGE_WIDTH = 120;
+
+ private ConfigUtils() {
+ }
+
+ private static <T> T getIniValue(Ini ini, String section, String key, T defaultValue, Class<T> clazz) {
+ T value;
+ while (true) {
+ value = ini.get(section, key, clazz);
+ if (value == null) {
+ int idx = section.lastIndexOf('/');
+ if (idx > -1) {
+ section = section.substring(0, idx);
+ continue;
+ }
+ }
+ return (value != null) ? value : defaultValue;
+ }
+ }
+
+ public static String getString(Ini ini, String section, String key, String defaultValue) {
+ return getIniValue(ini, section, key, defaultValue, String.class);
+ }
+
+ public static int getInt(Ini ini, String section, String key, int defaultValue) {
+ return getIniValue(ini, section, key, defaultValue, Integer.class);
+ }
+
+ public static long getLong(Ini ini, String section, String key, long defaultValue) {
+ return getIniValue(ini, section, key, defaultValue, Long.class);
+ }
+
+ public static Ini loadINIFile(String configFile) throws IOException {
+ Ini ini = new Ini();
+ File conffile = new File(configFile);
+ if (!conffile.exists()) {
+ throw new FileNotFoundException(configFile);
+ }
+ ini.load(conffile);
+ return ini;
+ }
+
+ public static Ini loadINIFile(URL configURL) throws IOException {
+ Ini ini = new Ini();
+ ini.load(configURL);
+ return ini;
+ }
+
+ public static Field[] getFields(final Class beanClass, Predicate<Field> predicate) {
+ List<Field> fields = new ArrayList<>();
+ for (Class clazz = beanClass; clazz != null && clazz.getClassLoader() != null
+ && clazz.getClassLoader().getParent() != null; clazz = clazz.getSuperclass()) {
+ for (Field f : clazz.getDeclaredFields()) {
+ if (predicate.test(f)) {
+ fields.add(f);
+ }
+ }
+ }
+ return fields.toArray(new Field[fields.size()]);
+ }
+
+ public static void printUsage(CmdLineParser parser, OptionHandlerFilter filter, PrintStream out) {
+ parser.getProperties().withUsageWidth(USAGE_WIDTH);
+ parser.printUsage(new OutputStreamWriter(out), null, filter);
+ }
+
+ public static void printUsage(CmdLineException e, OptionHandlerFilter filter, PrintStream out) {
+ out.println("ERROR: " + e.getMessage());
+ printUsage(e.getParser(), filter, out);
+ }
+
+ private static String getOptionValue(String[] args, String optionName) {
+ for (int i = 0; i < (args.length - 1); i++) {
+ if (args[i].equals(optionName)) {
+ return args[i + 1];
+ }
+ }
+ return null;
+ }
+
+ public static String getOptionValue(String[] args, IOption option) throws IOException {
+ String value = getOptionValue(args, option.cmdline());
+ if (value == null) {
+ Ini iniFile = null;
+ String configFile = getOptionValue(args, ControllerConfig.Option.CONFIG_FILE.cmdline());
+ String configFileUrl = getOptionValue(args, ControllerConfig.Option.CONFIG_FILE_URL.cmdline());
+ if (configFile != null) {
+ iniFile = loadINIFile(configFile);
+ } else if (configFileUrl != null) {
+ iniFile = loadINIFile(new URL(configFileUrl));
+ }
+ if (iniFile != null) {
+ value = iniFile.get(option.section().sectionName(), option.ini());
+ }
+ }
+ return value;
+ }
+
+ public static String getString(Ini ini, org.apache.hyracks.api.config.Section section,
+ IOption option, String defaultValue) {
+ return getString(ini, section.sectionName(), option.ini(), defaultValue);
+ }
+
+ public static void addConfigToJSON(ObjectNode o, IApplicationConfig cfg,
+ org.apache.hyracks.api.config.Section... sections) {
+ ArrayNode configArray = o.putArray("config");
+ for (org.apache.hyracks.api.config.Section section : cfg.getSections(Arrays.asList(sections)::contains)) {
+ ObjectNode sectionNode = configArray.addObject();
+ Map<String, Object> sectionConfig = getSectionOptionsForJSON(cfg, section, option -> true);
+ sectionNode.put("section", section.sectionName())
+ .putPOJO("properties", sectionConfig);
+ }
+ }
+
+ public static Map<String, Object> getSectionOptionsForJSON(IApplicationConfig cfg,
+ org.apache.hyracks.api.config.Section section,
+ Predicate<IOption> selector) {
+ Map<String, Object> sectionConfig = new TreeMap<>();
+ for (IOption option : cfg.getOptions(section)) {
+ if (selector.test(option)) {
+ sectionConfig.put(option.ini(), option.type().serializeToJSON(cfg.get(option)));
+ }
+ }
+ return sectionConfig;
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/IConfigSetter.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/IConfigSetter.java
new file mode 100644
index 0000000..2234cca
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/IConfigSetter.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.common.config;
+
+@FunctionalInterface
+public interface IConfigSetter {
+ void set(String nodeId, Object value, boolean isDefault) throws SetException;
+
+ class SetException extends RuntimeException {
+ public SetException(Exception e) {
+ super(e);
+ }
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java
new file mode 100644
index 0000000..fc26b5e
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.common.config;
+
+import java.net.MalformedURLException;
+import java.util.logging.Level;
+
+import org.apache.hyracks.api.config.IOptionType;
+import org.apache.hyracks.util.StorageUtil;
+
+public class OptionTypes {
+
+ public static final IOptionType<Integer> INTEGER_BYTE_UNIT = new IOptionType<Integer>() {
+ @Override
+ public Integer parse(String s) {
+ long result1 = StorageUtil.getByteValue(s);
+ if (result1 > Integer.MAX_VALUE || result1 < Integer.MIN_VALUE) {
+ throw new IllegalArgumentException(
+ "The given value: " + result1 + " is not within the int range.");
+ }
+ return (int) result1;
+ }
+
+ @Override
+ public Class<Integer> targetType() {
+ return Integer.class;
+ }
+ };
+
+ public static final IOptionType<Long> LONG_BYTE_UNIT = new IOptionType<Long>() {
+ @Override
+ public Long parse(String s) {
+ return StorageUtil.getByteValue(s);
+ }
+
+ @Override
+ public Class<Long> targetType() {
+ return Long.class;
+ }
+ };
+
+ public static final IOptionType<Integer> INTEGER = new IOptionType<Integer>() {
+ @Override
+ public Integer parse(String s) {
+ return Integer.parseInt(s);
+ }
+
+ @Override
+ public Class<Integer> targetType() {
+ return Integer.class;
+ }
+ };
+
+ public static final IOptionType<Double> DOUBLE = new IOptionType<Double>() {
+ @Override
+ public Double parse(String s) {
+ return Double.parseDouble(s);
+ }
+
+ @Override
+ public Class<Double> targetType() {
+ return Double.class;
+ }
+ };
+
+ public static final IOptionType<String> STRING = new IOptionType<String>() {
+ @Override
+ public String parse(String s) {
+ return s;
+ }
+
+ @Override
+ public Class<String> targetType() {
+ return String.class;
+ }
+ };
+
+ public static final IOptionType<Long> LONG = new IOptionType<Long>() {
+ @Override
+ public Long parse(String s) {
+ return Long.parseLong(s);
+ }
+
+ @Override
+ public Class<Long> targetType() {
+ return Long.class;
+ }
+ };
+
+ public static final IOptionType<Boolean> BOOLEAN = new IOptionType<Boolean>() {
+ @Override
+ public Boolean parse(String s) {
+ return Boolean.parseBoolean(s);
+ }
+
+ @Override
+ public Class<Boolean> targetType() {
+ return Boolean.class;
+ }
+ };
+
+ public static final IOptionType<Level> LEVEL = new IOptionType<Level>() {
+ @Override
+ public Level parse(String s) {
+ return Level.parse(s);
+ }
+
+ @Override
+ public Class<Level> targetType() {
+ return Level.class;
+ }
+
+ @Override
+ public Object serializeToJSON(Object value) {
+ return ((Level)value).getName();
+ }
+
+ @Override
+ public String serializeToIni(Object value) {
+ return ((Level)value).getName();
+ }
+ };
+
+ public static final IOptionType<String []> STRING_ARRAY = new IOptionType<String []>() {
+ @Override
+ public String [] parse(String s) {
+ return s.split("\\s*,\\s*");
+ }
+
+ @Override
+ public Class<String []> targetType() {
+ return String [].class;
+ }
+
+ @Override
+ public String serializeToIni(Object value) {
+ return String.join(",", (String [])value);
+ }
+ };
+
+ public static final IOptionType<java.net.URL> URL = new IOptionType<java.net.URL>() {
+ @Override
+ public java.net.URL parse(String s) {
+ try {
+ return new java.net.URL(s);
+ } catch (MalformedURLException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ @Override
+ public Class<java.net.URL> targetType() {
+ return java.net.URL.class;
+ }
+ };
+
+
+ private OptionTypes() {
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/context/ServerContext.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/context/ServerContext.java
index d4dc054..ff037f0 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/context/ServerContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/context/ServerContext.java
@@ -29,7 +29,7 @@
private final ServerType type;
private final File baseDir;
- public ServerContext(ServerType type, File baseDir) throws Exception {
+ public ServerContext(ServerType type, File baseDir) {
this.type = type;
this.baseDir = baseDir;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
index b636a096..fbde58c 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
@@ -18,156 +18,160 @@
*/
package org.apache.hyracks.control.common.controllers;
+import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER;
+import static org.apache.hyracks.control.common.config.OptionTypes.LONG;
+import static org.apache.hyracks.control.common.config.OptionTypes.STRING;
+
import java.io.File;
-import java.io.IOException;
import java.net.InetAddress;
-import java.net.URL;
+import java.util.ArrayList;
import java.util.List;
+import java.util.function.Supplier;
-import org.apache.hyracks.api.application.IApplicationConfig;
-import org.apache.hyracks.control.common.application.IniApplicationConfig;
+import org.apache.hyracks.api.config.IOption;
+import org.apache.hyracks.api.config.IOptionType;
+import org.apache.hyracks.api.config.Section;
+import org.apache.hyracks.control.common.config.ConfigManager;
+import org.apache.hyracks.util.file.FileUtil;
import org.ini4j.Ini;
-import org.kohsuke.args4j.Argument;
-import org.kohsuke.args4j.Option;
-import org.kohsuke.args4j.spi.StopOptionHandler;
-public class CCConfig {
- @Option(name = "-address", usage = "IP Address for CC (default: localhost)", required = false)
- public String ipAddress = InetAddress.getLoopbackAddress().getHostAddress();
+public class CCConfig extends ControllerConfig {
- @Option(name = "-client-net-ip-address",
- usage = "Sets the IP Address to listen for connections from clients (default: same as -address)",
- required = false)
- public String clientNetIpAddress;
+ public static String defaultDir = System.getProperty("java.io.tmpdir");
- @Option(name = "-client-net-port", usage = "Sets the port to listen for connections from clients (default 1098)")
- public int clientNetPort = 1098;
+ public enum Option implements IOption {
+ APP_CLASS(STRING),
+ ADDRESS(STRING, InetAddress.getLoopbackAddress().getHostAddress()),
+ CLUSTER_LISTEN_ADDRESS(STRING, ADDRESS),
+ CLUSTER_LISTEN_PORT(INTEGER, 1099),
+ CLUSTER_PUBLIC_ADDRESS(STRING, CLUSTER_LISTEN_ADDRESS),
+ CLUSTER_PUBLIC_PORT(INTEGER, CLUSTER_LISTEN_PORT),
+ CLIENT_LISTEN_ADDRESS(STRING, ADDRESS),
+ CLIENT_LISTEN_PORT(INTEGER, 1098),
+ CONSOLE_LISTEN_ADDRESS(STRING, ADDRESS),
+ CONSOLE_LISTEN_PORT(INTEGER, 16001),
+ HEARTBEAT_PERIOD(INTEGER, 10000), // TODO (mblow): add time unit
+ HEARTBEAT_MAX_MISSES(INTEGER, 5),
+ PROFILE_DUMP_PERIOD(INTEGER, 0),
+ JOB_HISTORY_SIZE(INTEGER, 10),
+ RESULT_TTL(LONG, 86400000L), // TODO(mblow): add time unit
+ RESULT_SWEEP_THRESHOLD(LONG, 60000L), // TODO(mblow): add time unit
+ @SuppressWarnings("RedundantCast") // not redundant- false positive from IDEA
+ ROOT_DIR(STRING, (Supplier<String>)() -> FileUtil.joinPath(defaultDir, "ClusterControllerService")),
+ CLUSTER_TOPOLOGY(STRING),
+ JOB_QUEUE_CLASS(STRING, "org.apache.hyracks.control.cc.scheduler.FIFOJobQueue"),
+ JOB_MANAGER_CLASS(STRING, "org.apache.hyracks.control.cc.job.JobManager");
- // QQQ Note that clusterNetIpAddress is *not directly used* yet. Both
- // the cluster listener and the web server listen on "all interfaces".
- // This IP address is only used to instruct the NC on which IP to call in.
- @Option(name = "-cluster-net-ip-address",
- usage = "Sets the IP Address to listen for connections from NCs (default: same as -address)",
- required = false)
- public String clusterNetIpAddress;
+ private final IOptionType parser;
+ private final Object defaultValue;
- @Option(name = "-cluster-net-port",
- usage = "Sets the port to listen for connections from node controllers (default 1099)")
- public int clusterNetPort = 1099;
-
- @Option(name = "-http-port", usage = "Sets the http port for the Cluster Controller (default: 16001)")
- public int httpPort = 16001;
-
- @Option(name = "-heartbeat-period",
- usage = "Sets the time duration between two heartbeats from each node controller in milliseconds" +
- " (default: 10000)")
- public int heartbeatPeriod = 10000;
-
- @Option(name = "-max-heartbeat-lapse-periods",
- usage = "Sets the maximum number of missed heartbeats before a node is marked as dead (default: 5)")
- public int maxHeartbeatLapsePeriods = 5;
-
- @Option(name = "-profile-dump-period", usage = "Sets the time duration between two profile dumps from each node " +
- "controller in milliseconds. 0 to disable. (default: 0)")
- public int profileDumpPeriod = 0;
-
- @Option(name = "-default-max-job-attempts", usage = "Sets the default number of job attempts allowed if not " +
- "specified in the job specification. (default: 5)")
- public int defaultMaxJobAttempts = 5;
-
- @Option(name = "-job-history-size", usage = "Limits the number of historical jobs remembered by the system to " +
- "the specified value. (default: 10)")
- public int jobHistorySize = 10;
-
- @Option(name = "-result-time-to-live", usage = "Limits the amount of time results for asynchronous jobs should " +
- "be retained by the system in milliseconds. (default: 24 hours)")
- public long resultTTL = 86400000;
-
- @Option(name = "-result-sweep-threshold", usage = "The duration within which an instance of the result cleanup " +
- "should be invoked in milliseconds. (default: 1 minute)")
- public long resultSweepThreshold = 60000;
-
- @Option(name = "-cc-root",
- usage = "Sets the root folder used for file operations. (default: ClusterControllerService)")
- public String ccRoot = "ClusterControllerService";
-
- @Option(name = "-cluster-topology", required = false,
- usage = "Sets the XML file that defines the cluster topology. (default: null)")
- public File clusterTopologyDefinition = null;
-
- @Option(name = "-app-cc-main-class", required = false, usage = "Application CC Main Class")
- public String appCCMainClass = null;
-
- @Option(name = "-config-file",
- usage = "Specify path to master configuration file (default: none)", required = false)
- public String configFile = null;
-
- @Option(name = "-job-queue-class-name", usage = "Specify the implementation class name for the job queue. (default:"
- + " org.apache.hyracks.control.cc.scheduler.FIFOJobQueue)",
- required = false)
- public String jobQueueClassName = "org.apache.hyracks.control.cc.scheduler.FIFOJobQueue";
-
- @Option(name = "-job-manager-class-name", usage = "Specify the implementation class name for the job manager. "
- + "(default: org.apache.hyracks.control.cc.job.JobManager)", required = false)
- public String jobManagerClassName = "org.apache.hyracks.control.cc.job.JobManager";
-
- @Argument
- @Option(name = "--", handler = StopOptionHandler.class)
- public List<String> appArgs;
-
- public URL configFileUrl = null;
-
- private Ini ini = null;
-
- private void loadINIFile() throws IOException {
- // This method simply maps from the ini parameters to the CCConfig's fields.
- // It does not apply defaults or any logic.
- if (configFile != null) {
- ini = IniUtils.loadINIFile(configFile);
- } else if (configFileUrl != null) {
- ini = IniUtils.loadINIFile(configFileUrl);
- } else {
- return;
+ <T> Option(IOptionType<T> parser) {
+ this(parser, (T)null);
}
- ipAddress = IniUtils.getString(ini, "cc", "address", ipAddress);
- clientNetIpAddress = IniUtils.getString(ini, "cc", "client.address", clientNetIpAddress);
- clientNetPort = IniUtils.getInt(ini, "cc", "client.port", clientNetPort);
- clusterNetIpAddress = IniUtils.getString(ini, "cc", "cluster.address", clusterNetIpAddress);
- clusterNetPort = IniUtils.getInt(ini, "cc", "cluster.port", clusterNetPort);
- httpPort = IniUtils.getInt(ini, "cc", "http.port", httpPort);
- heartbeatPeriod = IniUtils.getInt(ini, "cc", "heartbeat.period", heartbeatPeriod);
- maxHeartbeatLapsePeriods = IniUtils.getInt(ini, "cc", "heartbeat.maxlapse", maxHeartbeatLapsePeriods);
- profileDumpPeriod = IniUtils.getInt(ini, "cc", "profiledump.period", profileDumpPeriod);
- defaultMaxJobAttempts = IniUtils.getInt(ini, "cc", "job.defaultattempts", defaultMaxJobAttempts);
- jobHistorySize = IniUtils.getInt(ini, "cc", "job.historysize", jobHistorySize);
- resultTTL = IniUtils.getLong(ini, "cc", "results.ttl", resultTTL);
- resultSweepThreshold = IniUtils.getLong(ini, "cc", "results.sweepthreshold", resultSweepThreshold);
- ccRoot = IniUtils.getString(ini, "cc", "rootfolder", ccRoot);
- // QQQ clusterTopologyDefinition is a "File"; should support verifying that the file
- // exists, as @Option likely does
- appCCMainClass = IniUtils.getString(ini, "cc", "app.class", appCCMainClass);
+ <T> Option(IOptionType<T> parser, Option defaultOption) {
+ this.parser = parser;
+ this.defaultValue = defaultOption;
+ }
+
+ <T> Option(IOptionType<T> parser, T defaultValue) {
+ this.parser = parser;
+ this.defaultValue = defaultValue;
+ }
+
+ <T> Option(IOptionType<T> parser, Supplier<T> defaultValue) {
+ this.parser = parser;
+ this.defaultValue = defaultValue;
+ }
+
+ @Override
+ public Section section() {
+ return Section.CC;
+ }
+
+ @Override
+ public IOptionType type() {
+ return parser;
+ }
+
+ @Override
+ public Object defaultValue() {
+ return defaultValue;
+ }
+
+ @Override
+ public String description() {
+ switch (this) {
+ case APP_CLASS:
+ return "Application CC main class";
+ case ADDRESS:
+ return "Default bind address for all services on this cluster controller";
+ case CLUSTER_LISTEN_ADDRESS:
+ return "Sets the IP Address to listen for connections from NCs";
+ case CLUSTER_LISTEN_PORT:
+ return "Sets the port to listen for connections from node controllers";
+ case CLUSTER_PUBLIC_ADDRESS:
+ return "Address that NCs should use to contact this CC";
+ case CLUSTER_PUBLIC_PORT:
+ return "Port that NCs should use to contact this CC";
+ case CLIENT_LISTEN_ADDRESS:
+ return "Sets the IP Address to listen for connections from clients";
+ case CLIENT_LISTEN_PORT:
+ return "Sets the port to listen for connections from clients";
+ case CONSOLE_LISTEN_ADDRESS:
+ return "Sets the listen address for the Cluster Controller";
+ case CONSOLE_LISTEN_PORT:
+ return "Sets the http port for the Cluster Controller)";
+ case HEARTBEAT_PERIOD:
+ return "Sets the time duration between two heartbeats from each node controller in milliseconds";
+ case HEARTBEAT_MAX_MISSES:
+ return "Sets the maximum number of missed heartbeats before a node is marked as dead";
+ case PROFILE_DUMP_PERIOD:
+ return "Sets the time duration between two profile dumps from each node controller in " +
+ "milliseconds; 0 to disable";
+ case JOB_HISTORY_SIZE:
+ return "Limits the number of historical jobs remembered by the system to the specified value";
+ case RESULT_TTL:
+ return "Limits the amount of time results for asynchronous jobs should be retained by the system " +
+ "in milliseconds";
+ case RESULT_SWEEP_THRESHOLD:
+ return "The duration within which an instance of the result cleanup should be invoked in " +
+ "milliseconds";
+ case ROOT_DIR:
+ return "Sets the root folder used for file operations";
+ case CLUSTER_TOPOLOGY:
+ return "Sets the XML file that defines the cluster topology";
+ case JOB_QUEUE_CLASS:
+ return "Specify the implementation class name for the job queue";
+ case JOB_MANAGER_CLASS:
+ return "Specify the implementation class name for the job manager";
+ default:
+ throw new IllegalStateException("NYI: " + this);
+ }
+ }
}
- /**
- * Once all @Option fields have been loaded from command-line or otherwise
- * specified programmatically, call this method to:
- * 1. Load options from a config file (as specified by -config-file)
- * 2. Set default values for certain derived values, such as setting
- * clusterNetIpAddress to ipAddress
- */
- public void loadConfigAndApplyDefaults() throws IOException {
- loadINIFile();
- if (ini != null) {
- // QQQ This way of passing overridden/defaulted values back into
- // the ini feels clunky, and it's clearly incomplete
- ini.add("cc", "cluster.address", clusterNetIpAddress);
- ini.add("cc", "client.address", clientNetIpAddress);
- }
+ private final ConfigManager configManager;
- // "address" is the default for all IP addresses
- clusterNetIpAddress = clusterNetIpAddress == null ? ipAddress : clusterNetIpAddress;
- clientNetIpAddress = clientNetIpAddress == null ? ipAddress : clientNetIpAddress;
+ private List<String> appArgs = new ArrayList<>();
+
+ public CCConfig() {
+ this(new ConfigManager());
+ }
+
+ public CCConfig(ConfigManager configManager) {
+ super(configManager);
+ this.configManager = configManager;
+ configManager.register(Option.class);
+ configManager.registerArgsListener(appArgs::addAll);
+ }
+
+ public List<String> getAppArgs() {
+ return appArgs;
+ }
+
+ public String[] getAppArgsArray() {
+ return appArgs.toArray(new String[appArgs.size()]);
}
/**
@@ -175,15 +179,158 @@
* if -config-file wasn't specified.
*/
public Ini getIni() {
- return ini;
+ return configManager.toIni(false);
}
- /**
- * @return An IApplicationConfig representing this NCConfig.
- * Note: Currently this only includes the values from the configuration
- * file, not anything specified on the command-line. QQQ
- */
- public IApplicationConfig getAppConfig() {
- return new IniApplicationConfig(ini);
+ public ConfigManager getConfigManager() {
+ return configManager;
+ }
+
+ // QQQ Note that clusterListenAddress is *not directly used* yet. Both
+ // the cluster listener and the web server listen on "all interfaces".
+ // This IP address is only used to instruct the NC on which IP to call in.
+ public String getClusterListenAddress() {
+ return getAppConfig().getString(Option.CLUSTER_LISTEN_ADDRESS);
+ }
+
+ public void setClusterListenAddress(String clusterListenAddress) {
+ configManager.set(Option.CLUSTER_LISTEN_ADDRESS, clusterListenAddress);
+ }
+
+ public int getClusterListenPort() {
+ return getAppConfig().getInt(Option.CLUSTER_LISTEN_PORT);
+ }
+
+ public void setClusterListenPort(int clusterListenPort) {
+ configManager.set(Option.CLUSTER_LISTEN_PORT, clusterListenPort);
+ }
+
+ public String getClusterPublicAddress() {
+ return getAppConfig().getString(Option.CLUSTER_PUBLIC_ADDRESS);
+ }
+
+ public void setClusterPublicAddress(String clusterPublicAddress) {
+ configManager.set(Option.CLUSTER_PUBLIC_ADDRESS, clusterPublicAddress);
+ }
+
+ public int getClusterPublicPort() {
+ return getAppConfig().getInt(Option.CLUSTER_PUBLIC_PORT);
+ }
+
+ public void setClusterPublicPort(int clusterPublicPort) {
+ configManager.set(Option.CLUSTER_PUBLIC_PORT, clusterPublicPort);
+ }
+
+ public String getClientListenAddress() {
+ return getAppConfig().getString(Option.CLIENT_LISTEN_ADDRESS);
+ }
+
+ public void setClientListenAddress(String clientListenAddress) {
+ configManager.set(Option.CLIENT_LISTEN_ADDRESS, clientListenAddress);
+ }
+
+ public int getClientListenPort() {
+ return getAppConfig().getInt(Option.CLIENT_LISTEN_PORT);
+ }
+
+ public void setClientListenPort(int clientListenPort) {
+ configManager.set(Option.CLIENT_LISTEN_PORT, clientListenPort);
+ }
+
+ public int getConsoleListenPort() {
+ return getAppConfig().getInt(Option.CONSOLE_LISTEN_PORT);
+ }
+
+ public void setConsoleListenPort(int consoleListenPort) {
+ configManager.set(Option.CONSOLE_LISTEN_PORT, consoleListenPort);
+ }
+
+ public int getHeartbeatPeriod() {
+ return getAppConfig().getInt(Option.HEARTBEAT_PERIOD);
+ }
+
+ public void setHeartbeatPeriod(int heartbeatPeriod) {
+ configManager.set(Option.HEARTBEAT_PERIOD, heartbeatPeriod);
+ }
+
+ public int getHeartbeatMaxMisses() {
+ return getAppConfig().getInt(Option.HEARTBEAT_MAX_MISSES);
+ }
+
+ public void setHeartbeatMaxMisses(int heartbeatMaxMisses) {
+ configManager.set(Option.HEARTBEAT_MAX_MISSES, heartbeatMaxMisses);
+ }
+
+ public int getProfileDumpPeriod() {
+ return getAppConfig().getInt(Option.PROFILE_DUMP_PERIOD);
+ }
+
+ public void setProfileDumpPeriod(int profileDumpPeriod) {
+ configManager.set(Option.PROFILE_DUMP_PERIOD, profileDumpPeriod);
+ }
+
+ public int getJobHistorySize() {
+ return getAppConfig().getInt(Option.JOB_HISTORY_SIZE);
+ }
+
+ public void setJobHistorySize(int jobHistorySize) {
+ configManager.set(Option.JOB_HISTORY_SIZE, jobHistorySize);
+ }
+
+ public long getResultTTL() {
+ return getAppConfig().getLong(Option.RESULT_TTL);
+ }
+
+ public void setResultTTL(long resultTTL) {
+ configManager.set(Option.RESULT_TTL, resultTTL);
+ }
+
+ public long getResultSweepThreshold() {
+ return getAppConfig().getLong(Option.RESULT_SWEEP_THRESHOLD);
+ }
+
+ public void setResultSweepThreshold(long resultSweepThreshold) {
+ configManager.set(Option.RESULT_SWEEP_THRESHOLD, resultSweepThreshold);
+ }
+
+ public String getRootDir() {
+ return getAppConfig().getString(Option.ROOT_DIR);
+ }
+
+ public void setRootDir(String rootDir) {
+ configManager.set(Option.ROOT_DIR, rootDir);
+ }
+
+ public File getClusterTopology() {
+ return getAppConfig().getString(Option.CLUSTER_TOPOLOGY) == null ? null
+ : new File(getAppConfig().getString(Option.CLUSTER_TOPOLOGY));
+ }
+
+ public void setClusterTopology(File clusterTopology) {
+ configManager.set(Option.CLUSTER_TOPOLOGY, clusterTopology);
+ }
+
+ public String getAppClass() {
+ return getAppConfig().getString(Option.APP_CLASS);
+ }
+
+ public void setAppClass(String appClass) {
+ configManager.set(Option.APP_CLASS, appClass);
+ }
+
+ public String getJobQueueClass() {
+ return getAppConfig().getString(Option.JOB_QUEUE_CLASS);
+ }
+
+ public void setJobQueueClass(String jobQueueClass) {
+ configManager.set(Option.JOB_QUEUE_CLASS, jobQueueClass);
+ }
+
+ public String getJobManagerClass() {
+ return getAppConfig().getString(Option.JOB_MANAGER_CLASS);
+ }
+
+ public void setJobManagerClass(String jobManagerClass) {
+ configManager.set(Option.JOB_MANAGER_CLASS, jobManagerClass);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/ControllerConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/ControllerConfig.java
new file mode 100644
index 0000000..4aae6df
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/ControllerConfig.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.common.controllers;
+
+import java.io.Serializable;
+import java.net.URL;
+
+import org.apache.hyracks.api.config.IApplicationConfig;
+import org.apache.hyracks.api.config.IOption;
+import org.apache.hyracks.api.config.IOptionType;
+import org.apache.hyracks.api.config.Section;
+import org.apache.hyracks.control.common.config.ConfigManager;
+import org.apache.hyracks.control.common.config.OptionTypes;
+
+public class ControllerConfig implements Serializable {
+
+ public enum Option implements IOption {
+ CONFIG_FILE(OptionTypes.STRING, "Specify path to master configuration file"),
+ CONFIG_FILE_URL(OptionTypes.URL, "Specify URL to master configuration file");
+
+ private final IOptionType type;
+ private final String description;
+
+
+ Option(IOptionType type, String description) {
+ this.type = type;
+ this.description = description;
+ }
+
+ @Override
+ public Section section() {
+ return Section.COMMON;
+ }
+
+ @Override
+ public String description() {
+ return description;
+ }
+
+ @Override
+ public IOptionType type() {
+ return type;
+ }
+
+ @Override
+ public Object defaultValue() {
+ return null;
+ }
+ }
+
+ protected final ConfigManager configManager;
+
+ protected ControllerConfig(ConfigManager configManager) {
+ this.configManager = configManager;
+ }
+
+ public IApplicationConfig getAppConfig() {
+ return configManager.getAppConfig();
+ }
+
+ public String getConfigFile() {
+ return getAppConfig().getString(ControllerConfig.Option.CONFIG_FILE);
+ }
+
+ public void setConfigFile(String configFile) {
+ configManager.set(ControllerConfig.Option.CONFIG_FILE, configFile);
+ }
+
+ public URL getConfigFileUrl() {
+ return (URL) getAppConfig().get(ControllerConfig.Option.CONFIG_FILE_URL);
+ }
+
+ public void setConfigFileUrl(URL configFileUrl) {
+ configManager.set(ControllerConfig.Option.CONFIG_FILE_URL, configFileUrl);
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/IniUtils.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/IniUtils.java
deleted file mode 100644
index 451421d..0000000
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/IniUtils.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.control.common.controllers;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.lang.reflect.Array;
-import java.net.URL;
-
-import org.ini4j.Ini;
-import org.ini4j.Profile.Section;
-
-/**
- * Some utility functions for reading Ini4j objects with default values.
- * For all getXxx() methods: if the 'section' contains a slash, and the 'key'
- * is not found in that section, we will search for the key in the section named
- * by stripping the leaf of the section name (final slash and anything following).
- * eg. getInt(ini, "nc/red", "dir", null) will first look for the key "dir" in
- * the section "nc/red", but if it is not found, will look in the section "nc".
- */
-public class IniUtils {
-
- private IniUtils() {
- }
-
- private static <T> T getIniValue(Ini ini, String section, String key, T defaultValue, Class<T> clazz) {
- T value;
- while (true) {
- value = ini.get(section, key, clazz);
- if (value == null) {
- int idx = section.lastIndexOf('/');
- if (idx > -1) {
- section = section.substring(0, idx);
- continue;
- }
- }
- break;
- }
- return (value != null) ? value : defaultValue;
- }
-
- @SuppressWarnings("unchecked")
- private static <T> T getIniArray(Ini ini, String section, String key, Class<T> clazz) {
- Section sec = ini.get(section);
- if (clazz.getComponentType() == null) {
- return null;
- }
- if (sec == null) {
- return (T) Array.newInstance(clazz.getComponentType(), 0);
- } else {
- return sec.getAll(key, clazz);
- }
- }
-
- public static String getString(Ini ini, String section, String key, String defaultValue) {
- return getIniValue(ini, section, key, defaultValue, String.class);
- }
-
- public static String[] getStringArray(Ini ini, String section, String key) {
- return getIniArray(ini, section, key, String[].class);
- }
-
- public static int getInt(Ini ini, String section, String key, int defaultValue) {
- return getIniValue(ini, section, key, defaultValue, Integer.class);
- }
-
- public static long getLong(Ini ini, String section, String key, long defaultValue) {
- return getIniValue(ini, section, key, defaultValue, Long.class);
- }
-
- public static Ini loadINIFile(String configFile) throws IOException {
- Ini ini = new Ini();
- File conffile = new File(configFile);
- if (!conffile.exists()) {
- throw new FileNotFoundException(configFile);
- }
- ini.load(conffile);
- return ini;
- }
-
- public static Ini loadINIFile(URL configURL) throws IOException {
- Ini ini = new Ini();
- ini.load(configURL);
- return ini;
- }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
index fa7d76a..7906b52 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
@@ -18,287 +18,485 @@
*/
package org.apache.hyracks.control.common.controllers;
-import java.io.IOException;
-import java.io.Serializable;
+import static org.apache.hyracks.control.common.config.OptionTypes.BOOLEAN;
+import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER;
+import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER_BYTE_UNIT;
+import static org.apache.hyracks.control.common.config.OptionTypes.LONG;
+import static org.apache.hyracks.control.common.config.OptionTypes.STRING;
+import static org.apache.hyracks.control.common.config.OptionTypes.STRING_ARRAY;
+
import java.net.InetAddress;
-import java.net.URL;
+import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
+import java.util.function.Supplier;
-import org.apache.hyracks.api.application.IApplicationConfig;
-import org.apache.hyracks.control.common.application.IniApplicationConfig;
-import org.ini4j.Ini;
-import org.kohsuke.args4j.Argument;
-import org.kohsuke.args4j.Option;
-import org.kohsuke.args4j.spi.StopOptionHandler;
+import org.apache.hyracks.api.config.IApplicationConfig;
+import org.apache.hyracks.api.config.IOption;
+import org.apache.hyracks.api.config.IOptionType;
+import org.apache.hyracks.api.config.Section;
+import org.apache.hyracks.control.common.config.ConfigManager;
+import org.apache.hyracks.util.file.FileUtil;
-public class NCConfig implements Serializable {
- private static final long serialVersionUID = 2L;
+public class NCConfig extends ControllerConfig {
+ private static final long serialVersionUID = 3L;
- @Option(name = "-cc-host", usage = "Cluster Controller host name (required unless specified in config file)",
- required = false)
- public String ccHost = null;
+ public static String defaultDir = System.getProperty("java.io.tmpdir");
+ public static String defaultAppClass = null;
- @Option(name = "-cc-port", usage = "Cluster Controller port (default: 1099)", required = false)
- public int ccPort = 1099;
+ public enum Option implements IOption {
+ ADDRESS(STRING, InetAddress.getLoopbackAddress().getHostAddress()),
+ PUBLIC_ADDRESS(STRING, ADDRESS),
+ CLUSTER_LISTEN_ADDRESS(STRING, ADDRESS),
+ CLUSTER_LISTEN_PORT(INTEGER, 0),
+ NCSERVICE_ADDRESS(STRING, PUBLIC_ADDRESS),
+ NCSERVICE_PORT(INTEGER, 9090),
+ CLUSTER_ADDRESS(STRING, (String)null),
+ CLUSTER_PORT(INTEGER, 1099),
+ CLUSTER_PUBLIC_ADDRESS(STRING, PUBLIC_ADDRESS),
+ CLUSTER_PUBLIC_PORT(INTEGER, CLUSTER_LISTEN_PORT),
+ NODE_ID(STRING, (String)null),
+ DATA_LISTEN_ADDRESS(STRING, ADDRESS),
+ DATA_LISTEN_PORT(INTEGER, 0),
+ DATA_PUBLIC_ADDRESS(STRING, PUBLIC_ADDRESS),
+ DATA_PUBLIC_PORT(INTEGER, DATA_LISTEN_PORT),
+ RESULT_LISTEN_ADDRESS(STRING, ADDRESS),
+ RESULT_LISTEN_PORT(INTEGER, 0),
+ RESULT_PUBLIC_ADDRESS(STRING, PUBLIC_ADDRESS),
+ RESULT_PUBLIC_PORT(INTEGER, RESULT_LISTEN_PORT),
+ MESSAGING_LISTEN_ADDRESS(STRING, ADDRESS),
+ MESSAGING_LISTEN_PORT(INTEGER, 0),
+ MESSAGING_PUBLIC_ADDRESS(STRING, PUBLIC_ADDRESS),
+ MESSAGING_PUBLIC_PORT(INTEGER, MESSAGING_LISTEN_PORT),
+ CLUSTER_CONNECT_RETRIES(INTEGER, 5),
+ @SuppressWarnings("RedundantCast") // not redundant- false positive from IDEA
+ IODEVICES(STRING_ARRAY, (Supplier<String []>)() -> new String [] { FileUtil.joinPath(defaultDir, "iodevice") }),
+ NET_THREAD_COUNT(INTEGER, 1),
+ NET_BUFFER_COUNT(INTEGER, 1),
+ RESULT_TTL(LONG, 86400000L),
+ RESULT_SWEEP_THRESHOLD(LONG, 60000L),
+ RESULT_MANAGER_MEMORY(INTEGER_BYTE_UNIT, -1),
+ @SuppressWarnings("RedundantCast") // not redundant- false positive from IDEA
+ APP_CLASS(STRING, (Supplier<String>)() -> defaultAppClass),
+ NCSERVICE_PID(INTEGER, -1),
+ COMMAND(STRING, "hyracksnc"),
+ JVM_ARGS(STRING, (String)null),
+ VIRTUAL_NC(BOOLEAN, false);
- @Option(name = "-address", usage = "IP Address for NC (default: localhost)", required = false)
- public String ipAddress = InetAddress.getLoopbackAddress().getHostAddress();
+ private final IOptionType parser;
+ private final Object defaultValue;
- @Option(name = "-cluster-net-ip-address", usage = "IP Address to bind cluster listener (default: same as -address)",
- required = false)
- public String clusterNetIPAddress;
-
- @Option(name = "-cluster-net-port", usage = "IP port to bind cluster listener (default: random port)",
- required = false)
- public int clusterNetPort = 0;
-
- @Option(name = "-cluster-net-public-ip-address",
- usage = "Public IP Address to announce cluster listener (default: same as -cluster-net-ip-address)",
- required = false)
- public String clusterNetPublicIPAddress;
-
- @Option(name = "-cluster-net-public-port",
- usage = "Public IP port to announce cluster listener (default: same as -cluster-net-port; " +
- "must set -cluster-net-public-ip-address also)", required = false)
- public int clusterNetPublicPort = 0;
-
- @Option(name = "-node-id", usage = "Logical name of node controller unique within the cluster (required unless " +
- "specified in config file)", required = false)
- public String nodeId = null;
-
- @Option(name = "-data-ip-address", usage = "IP Address to bind data listener (default: same as -address)",
- required = false)
- public String dataIPAddress;
-
- @Option(name = "-data-port", usage = "IP port to bind data listener (default: random port)", required = false)
- public int dataPort = 0;
-
- @Option(name = "-data-public-ip-address",
- usage = "Public IP Address to announce data listener (default: same as -data-ip-address)", required = false)
- public String dataPublicIPAddress;
-
- @Option(name = "-data-public-port",
- usage = "Public IP port to announce data listener (default: same as -data-port; must set " +
- "-data-public-ip-address also)", required = false)
- public int dataPublicPort = 0;
-
- @Option(name = "-result-ip-address",
- usage = "IP Address to bind dataset result distribution listener (default: same as -address)",
- required = false)
- public String resultIPAddress;
-
- @Option(name = "-result-port",
- usage = "IP port to bind dataset result distribution listener (default: random port)",
- required = false)
- public int resultPort = 0;
-
- @Option(name = "-result-public-ip-address",
- usage = "Public IP Address to announce dataset result distribution listener (default: same as " +
- "-result-ip-address)", required = false)
- public String resultPublicIPAddress;
-
- @Option(name = "-result-public-port", usage = "Public IP port to announce dataset result distribution listener " +
- "(default: same as -result-port; must set -result-public-ip-address also)", required = false)
- public int resultPublicPort = 0;
-
- @Option(name = "-retries", usage = "Number of attempts to contact CC before giving up (default: 5)")
- public int retries = 5;
-
- @Option(name = "-iodevices",
- usage = "Comma separated list of IO Device mount points (default: One device in default temp folder)",
- required = false)
- public String ioDevices = System.getProperty("java.io.tmpdir");
-
- @Option(name = "-net-thread-count", usage = "Number of threads to use for Network I/O (default: 1)")
- public int nNetThreads = 1;
-
- @Option(name = "-net-buffer-count", usage = "Number of network buffers per input/output channel (default: 1)",
- required = false)
- public int nNetBuffers = 1;
-
- @Option(name = "-max-memory", usage = "Maximum memory usable at this Node Controller in bytes (default: -1 auto)")
- public int maxMemory = -1;
-
- @Option(name = "-result-time-to-live", usage = "Limits the amount of time results for asynchronous jobs should " +
- "be retained by the system in milliseconds. (default: 24 hours)")
- public long resultTTL = 86400000;
-
- @Option(name = "-result-sweep-threshold", usage = "The duration within which an instance of the result cleanup " +
- "should be invoked in milliseconds. (default: 1 minute)")
- public long resultSweepThreshold = 60000;
-
- @Option(name = "-result-manager-memory",
- usage = "Memory usable for result caching at this Node Controller in bytes (default: -1 auto)")
- public int resultManagerMemory = -1;
-
- @Option(name = "-app-nc-main-class", usage = "Application NC Main Class")
- public String appNCMainClass;
-
- @Option(name = "-config-file", usage = "Specify path to local configuration file (default: no local config)",
- required = false)
- public String configFile = null;
-
- @Option(name = "-messaging-ip-address", usage = "IP Address to bind messaging "
- + "listener (default: same as -address)", required = false)
- public String messagingIPAddress;
-
- @Option(name = "-messaging-port", usage = "IP port to bind messaging listener "
- + "(default: random port)", required = false)
- public int messagingPort = 0;
-
- @Option(name = "-messaging-public-ip-address", usage = "Public IP Address to announce messaging"
- + " listener (default: same as -messaging-ip-address)", required = false)
- public String messagingPublicIPAddress;
-
- @Option(name = "-messaging-public-port", usage = "Public IP port to announce messaging listener"
- + " (default: same as -messaging-port; must set -messaging-public-port also)", required = false)
- public int messagingPublicPort = 0;
-
- @Option(name = "-ncservice-pid", usage = "PID of the NCService which launched this NCDriver", required = false)
- public int ncservicePid = -1;
-
- @Argument
- @Option(name = "--", handler = StopOptionHandler.class)
- public List<String> appArgs;
-
- public URL configFileUrl = null;
-
- private transient Ini ini = null;
-
- private void loadINIFile() throws IOException {
- if (configFile != null) {
- ini = IniUtils.loadINIFile(configFile);
- } else if (configFileUrl != null) {
- ini = IniUtils.loadINIFile(configFileUrl);
- } else {
- return;
+ <T> Option(IOptionType<T> parser, Option defaultOption) {
+ this.parser = parser;
+ this.defaultValue = defaultOption;
}
- // QQQ This should default to cc/address if cluster.address not set, but
- // that logic really should be handled by the ini file sent from the CC
- ccHost = IniUtils.getString(ini, "cc", "cluster.address", ccHost);
- ccPort = IniUtils.getInt(ini, "cc", "cluster.port", ccPort);
-
- // Get ID of *this* NC
- nodeId = IniUtils.getString(ini, "localnc", "id", nodeId);
- String nodeSection = "nc/" + nodeId;
-
- // Network ports
- ipAddress = IniUtils.getString(ini, nodeSection, "address", ipAddress);
-
- clusterNetIPAddress = IniUtils.getString(ini, nodeSection, "cluster.address", clusterNetIPAddress);
- clusterNetPort = IniUtils.getInt(ini, nodeSection, "cluster.port", clusterNetPort);
- dataIPAddress = IniUtils.getString(ini, nodeSection, "data.address", dataIPAddress);
- dataPort = IniUtils.getInt(ini, nodeSection, "data.port", dataPort);
- resultIPAddress = IniUtils.getString(ini, nodeSection, "result.address", resultIPAddress);
- resultPort = IniUtils.getInt(ini, nodeSection, "result.port", resultPort);
-
- clusterNetPublicIPAddress = IniUtils.getString(ini, nodeSection, "public.cluster.address",
- clusterNetPublicIPAddress);
- clusterNetPublicPort = IniUtils.getInt(ini, nodeSection, "public.cluster.port", clusterNetPublicPort);
- dataPublicIPAddress = IniUtils.getString(ini, nodeSection, "public.data.address", dataPublicIPAddress);
- dataPublicPort = IniUtils.getInt(ini, nodeSection, "public.data.port", dataPublicPort);
- resultPublicIPAddress = IniUtils.getString(ini, nodeSection, "public.result.address", resultPublicIPAddress);
- resultPublicPort = IniUtils.getInt(ini, nodeSection, "public.result.port", resultPublicPort);
-
- messagingIPAddress = IniUtils.getString(ini, nodeSection, "messaging.address", messagingIPAddress);
- messagingPort = IniUtils.getInt(ini, nodeSection, "messaging.port", messagingPort);
- messagingPublicIPAddress = IniUtils.getString(ini, nodeSection, "public.messaging.address",
- messagingPublicIPAddress);
- messagingPublicPort = IniUtils.getInt(ini, nodeSection, "public.messaging.port", messagingPublicPort);
-
- retries = IniUtils.getInt(ini, nodeSection, "retries", retries);
-
- // Directories
- ioDevices = IniUtils.getString(ini, nodeSection, "iodevices", ioDevices);
-
- // Hyracks client entrypoint
- appNCMainClass = IniUtils.getString(ini, nodeSection, "app.class", appNCMainClass);
- }
-
- /*
- * Once all @Option fields have been loaded from command-line or otherwise
- * specified programmatically, call this method to:
- * 1. Load options from a config file (as specified by -config-file)
- * 2. Set default values for certain derived values, such as setting
- * clusterNetIpAddress to ipAddress
- */
- public void loadConfigAndApplyDefaults() throws IOException {
- loadINIFile();
-
- // "address" is the default for all IP addresses
- if (clusterNetIPAddress == null) {
- clusterNetIPAddress = ipAddress;
- }
- if (dataIPAddress == null) {
- dataIPAddress = ipAddress;
- }
- if (resultIPAddress == null) {
- resultIPAddress = ipAddress;
+ <T> Option(IOptionType<T> parser, T defaultValue) {
+ this.parser = parser;
+ this.defaultValue = defaultValue;
}
- // All "public" options default to their "non-public" versions
- if (clusterNetPublicIPAddress == null) {
- clusterNetPublicIPAddress = clusterNetIPAddress;
+ <T> Option(IOptionType<T> parser, Supplier<T> defaultValue) {
+ this.parser = parser;
+ this.defaultValue = defaultValue;
}
- if (clusterNetPublicPort == 0) {
- clusterNetPublicPort = clusterNetPort;
+
+ @Override
+ public Section section() {
+ switch (this) {
+ case NODE_ID:
+ return Section.LOCALNC;
+ default:
+ return Section.NC;
+ }
}
- if (dataPublicIPAddress == null) {
- dataPublicIPAddress = dataIPAddress;
+
+ @Override
+ public String description() {
+ switch (this) {
+ case ADDRESS:
+ return "Default IP Address to bind listeners on this NC. All services will bind on this address " +
+ "unless a service-specific listen address is supplied.";
+ case CLUSTER_LISTEN_ADDRESS:
+ return "IP Address to bind cluster listener on this NC";
+ case PUBLIC_ADDRESS:
+ return "Default public address that other processes should use to contact this NC. All services " +
+ "will advertise this address unless a service-specific public address is supplied.";
+ case NCSERVICE_ADDRESS:
+ return "Address the CC should use to contact the NCService associated with this NC";
+ case NCSERVICE_PORT:
+ return "Port the CC should use to contact the NCService associated with this NC";
+ case CLUSTER_ADDRESS:
+ return "Cluster Controller address (required unless specified in config file)";
+ case CLUSTER_PORT:
+ return "Cluster Controller port";
+ case CLUSTER_LISTEN_PORT:
+ return "IP port to bind cluster listener";
+ case CLUSTER_PUBLIC_ADDRESS:
+ return "Public IP Address to announce cluster listener";
+ case CLUSTER_PUBLIC_PORT:
+ return "Public IP port to announce cluster listener";
+ case NODE_ID:
+ return "Logical name of node controller unique within the cluster (required unless specified in " +
+ "config file)";
+ case DATA_LISTEN_ADDRESS:
+ return "IP Address to bind data listener";
+ case DATA_LISTEN_PORT:
+ return "IP port to bind data listener";
+ case DATA_PUBLIC_ADDRESS:
+ return "Public IP Address to announce data listener";
+ case DATA_PUBLIC_PORT:
+ return "Public IP port to announce data listener";
+ case RESULT_LISTEN_ADDRESS:
+ return "IP Address to bind dataset result distribution listener";
+ case RESULT_LISTEN_PORT:
+ return "IP port to bind dataset result distribution listener";
+ case RESULT_PUBLIC_ADDRESS:
+ return "Public IP Address to announce dataset result distribution listener";
+ case RESULT_PUBLIC_PORT:
+ return "Public IP port to announce dataset result distribution listener";
+ case MESSAGING_LISTEN_ADDRESS:
+ return "IP Address to bind messaging listener";
+ case MESSAGING_LISTEN_PORT:
+ return "IP port to bind messaging listener";
+ case MESSAGING_PUBLIC_ADDRESS:
+ return "Public IP Address to announce messaging listener";
+ case MESSAGING_PUBLIC_PORT:
+ return "Public IP port to announce messaging listener";
+ case CLUSTER_CONNECT_RETRIES:
+ return "Number of attempts to contact CC before giving up";
+ case IODEVICES:
+ return "Comma separated list of IO Device mount points";
+ case NET_THREAD_COUNT:
+ return "Number of threads to use for Network I/O";
+ case NET_BUFFER_COUNT:
+ return "Number of network buffers per input/output channel";
+ case RESULT_TTL:
+ return "Limits the amount of time results for asynchronous jobs should be retained by the system " +
+ "in milliseconds";
+ case RESULT_SWEEP_THRESHOLD:
+ return "The duration within which an instance of the result cleanup should be invoked in " +
+ "milliseconds";
+ case RESULT_MANAGER_MEMORY:
+ return "Memory usable for result caching at this Node Controller in bytes";
+ case APP_CLASS:
+ return "Application NC Main Class";
+ case NCSERVICE_PID:
+ return "PID of the NCService which launched this NCDriver";
+ case COMMAND:
+ return "Command NCService should invoke to start the NCDriver";
+ case JVM_ARGS:
+ return "JVM args to pass to the NCDriver";
+ case VIRTUAL_NC:
+ return "A flag indicating if this NC is running on virtual cluster";
+ default:
+ throw new IllegalStateException("NYI: " + this);
+ }
}
- if (dataPublicPort == 0) {
- dataPublicPort = dataPort;
+
+
+ @Override
+ public IOptionType type() {
+ return parser;
}
- if (resultPublicIPAddress == null) {
- resultPublicIPAddress = resultIPAddress;
+
+ @Override
+ public Object defaultValue() {
+ return defaultValue;
}
- if (resultPublicPort == 0) {
- resultPublicPort = resultPort;
+
+ @Override
+ public boolean hidden() {
+ return this == VIRTUAL_NC;
}
}
- /**
- * @return An IApplicationConfig representing this NCConfig.
- * Note: Currently this only includes the values from the configuration
- * file, not anything specified on the command-line. QQQ
- */
+ private List<String> appArgs = new ArrayList<>();
+
+ private final IApplicationConfig appConfig;
+ private final String nodeId;
+
+ public NCConfig(String nodeId) {
+ this(nodeId, new ConfigManager(null));
+ }
+
+ public NCConfig(String nodeId, ConfigManager configManager) {
+ super(configManager);
+ this.appConfig = configManager.getNodeEffectiveConfig(nodeId);
+ configManager.register(Option.class);
+ setNodeId(nodeId);
+ this.nodeId = nodeId;
+ configManager.registerArgsListener(appArgs::addAll);
+ }
+
+ public List<String> getAppArgs() {
+ return appArgs;
+ }
+
+ public String[] getAppArgsArray() {
+ return appArgs.toArray(new String[appArgs.size()]);
+ }
+
+ public ConfigManager getConfigManager() {
+ return configManager;
+ }
+
public IApplicationConfig getAppConfig() {
- return new IniApplicationConfig(ini);
+ return appConfig;
}
- public void toMap(Map<String, String> configuration) {
- configuration.put("cc-host", ccHost);
- configuration.put("cc-port", (String.valueOf(ccPort)));
- configuration.put("cluster-net-ip-address", clusterNetIPAddress);
- configuration.put("cluster-net-port", String.valueOf(clusterNetPort));
- configuration.put("cluster-net-public-ip-address", clusterNetPublicIPAddress);
- configuration.put("cluster-net-public-port", String.valueOf(clusterNetPublicPort));
- configuration.put("node-id", nodeId);
- configuration.put("data-ip-address", dataIPAddress);
- configuration.put("data-port", String.valueOf(dataPort));
- configuration.put("data-public-ip-address", dataPublicIPAddress);
- configuration.put("data-public-port", String.valueOf(dataPublicPort));
- configuration.put("result-ip-address", resultIPAddress);
- configuration.put("result-port", String.valueOf(resultPort));
- configuration.put("result-public-ip-address", resultPublicIPAddress);
- configuration.put("result-public-port", String.valueOf(resultPublicPort));
- configuration.put("retries", String.valueOf(retries));
- configuration.put("iodevices", ioDevices);
- configuration.put("net-thread-count", String.valueOf(nNetThreads));
- configuration.put("net-buffer-count", String.valueOf(nNetBuffers));
- configuration.put("max-memory", String.valueOf(maxMemory));
- configuration.put("result-time-to-live", String.valueOf(resultTTL));
- configuration.put("result-sweep-threshold", String.valueOf(resultSweepThreshold));
- configuration.put("result-manager-memory", String.valueOf(resultManagerMemory));
- configuration.put("messaging-ip-address", messagingIPAddress);
- configuration.put("messaging-port", String.valueOf(messagingPort));
- configuration.put("messaging-public-ip-address", messagingPublicIPAddress);
- configuration.put("messaging-public-port", String.valueOf(messagingPublicPort));
- configuration.put("ncservice-pid", String.valueOf(ncservicePid));
- if (appNCMainClass != null) {
- configuration.put("app-nc-main-class", appNCMainClass);
- }
+ public String getPublicAddress() {
+ return appConfig.getString(Option.PUBLIC_ADDRESS);
+ }
+
+ public void setPublicAddress(String publicAddress) {
+ configManager.set(nodeId, Option.PUBLIC_ADDRESS, publicAddress);
+ }
+
+ public String getNCServiceAddress() {
+ return appConfig.getString(Option.NCSERVICE_ADDRESS);
+ }
+
+ public void setNCServiceAddress(String ncserviceAddress) {
+ configManager.set(nodeId, Option.NCSERVICE_ADDRESS, ncserviceAddress);
+ }
+
+ public int getNCServicePort() {
+ return appConfig.getInt(Option.NCSERVICE_PORT);
+ }
+
+ public void setNCServicePort(int ncservicePort) {
+ configManager.set(nodeId, Option.NCSERVICE_PORT, ncservicePort);
+ }
+
+ public String getClusterAddress() {
+ return appConfig.getString(Option.CLUSTER_ADDRESS);
+ }
+
+ public void setClusterAddress(String clusterAddress) {
+ configManager.set(nodeId, Option.CLUSTER_ADDRESS, clusterAddress);
+ }
+
+ public int getClusterPort() {
+ return appConfig.getInt(Option.CLUSTER_PORT);
+ }
+
+ public void setClusterPort(int clusterPort) {
+ configManager.set(nodeId, Option.CLUSTER_PORT, clusterPort);
+ }
+
+ public String getClusterListenAddress() {
+ return appConfig.getString(Option.CLUSTER_LISTEN_ADDRESS);
+ }
+
+ public void setClusterListenAddress(String clusterListenAddress) {
+ configManager.set(nodeId, Option.CLUSTER_LISTEN_ADDRESS, clusterListenAddress);
+ }
+
+ public int getClusterListenPort() {
+ return appConfig.getInt(Option.CLUSTER_LISTEN_PORT);
+ }
+
+ public void setClusterListenPort(int clusterListenPort) {
+ configManager.set(nodeId, Option.CLUSTER_LISTEN_PORT, clusterListenPort);
+ }
+
+ public String getClusterPublicAddress() {
+ return appConfig.getString(Option.CLUSTER_PUBLIC_ADDRESS);
+ }
+
+ public void setClusterPublicAddress(String clusterPublicAddress) {
+ configManager.set(nodeId, Option.CLUSTER_PUBLIC_ADDRESS, clusterPublicAddress);
+ }
+
+ public int getClusterPublicPort() {
+ return appConfig.getInt(Option.CLUSTER_PUBLIC_PORT);
+ }
+
+ public void setClusterPublicPort(int clusterPublicPort) {
+ configManager.set(nodeId, Option.CLUSTER_PUBLIC_PORT, clusterPublicPort);
+ }
+
+ public String getNodeId() {
+ return appConfig.getString(Option.NODE_ID);
+ }
+
+ public void setNodeId(String nodeId) {
+ configManager.set(nodeId, Option.NODE_ID, nodeId);
+ }
+
+ public String getDataListenAddress() {
+ return appConfig.getString(Option.DATA_LISTEN_ADDRESS);
+ }
+
+ public void setDataListenAddress(String dataListenAddress) {
+ configManager.set(nodeId, Option.DATA_LISTEN_ADDRESS, dataListenAddress);
+ }
+
+ public int getDataListenPort() {
+ return appConfig.getInt(Option.DATA_LISTEN_PORT);
+ }
+
+ public void setDataListenPort(int dataListenPort) {
+ configManager.set(nodeId, Option.DATA_LISTEN_PORT, dataListenPort);
+ }
+
+ public String getDataPublicAddress() {
+ return appConfig.getString(Option.DATA_PUBLIC_ADDRESS);
+ }
+
+ public void setDataPublicAddress(String dataPublicAddress) {
+ configManager.set(nodeId, Option.DATA_PUBLIC_ADDRESS, dataPublicAddress);
+ }
+
+ public int getDataPublicPort() {
+ return appConfig.getInt(Option.DATA_PUBLIC_PORT);
+ }
+
+ public void setDataPublicPort(int dataPublicPort) {
+ configManager.set(nodeId, Option.DATA_PUBLIC_PORT, dataPublicPort);
+ }
+
+ public String getResultListenAddress() {
+ return appConfig.getString(Option.RESULT_LISTEN_ADDRESS);
+ }
+
+ public void setResultListenAddress(String resultListenAddress) {
+ configManager.set(nodeId, Option.RESULT_LISTEN_ADDRESS, resultListenAddress);
+ }
+
+ public int getResultListenPort() {
+ return appConfig.getInt(Option.RESULT_LISTEN_PORT);
+ }
+
+ public void setResultListenPort(int resultListenPort) {
+ configManager.set(nodeId, Option.RESULT_LISTEN_PORT, resultListenPort);
+ }
+
+ public String getResultPublicAddress() {
+ return appConfig.getString(Option.RESULT_PUBLIC_ADDRESS);
+ }
+
+ public void setResultPublicAddress(String resultPublicAddress) {
+ configManager.set(nodeId, Option.RESULT_PUBLIC_ADDRESS, resultPublicAddress);
+ }
+
+ public int getResultPublicPort() {
+ return appConfig.getInt(Option.RESULT_PUBLIC_PORT);
+ }
+
+ public void setResultPublicPort(int resultPublicPort) {
+ configManager.set(nodeId, Option.RESULT_PUBLIC_PORT, resultPublicPort);
+ }
+
+ public String getMessagingListenAddress() {
+ return appConfig.getString(Option.MESSAGING_LISTEN_ADDRESS);
+ }
+
+ public void setMessagingListenAddress(String messagingListenAddress) {
+ configManager.set(nodeId, Option.MESSAGING_LISTEN_ADDRESS, messagingListenAddress);
+ }
+
+ public int getMessagingListenPort() {
+ return appConfig.getInt(Option.MESSAGING_LISTEN_PORT);
+ }
+
+ public void setMessagingListenPort(int messagingListenPort) {
+ configManager.set(nodeId, Option.MESSAGING_LISTEN_PORT, messagingListenPort);
+ }
+
+ public String getMessagingPublicAddress() {
+ return appConfig.getString(Option.MESSAGING_PUBLIC_ADDRESS);
+ }
+
+ public void setMessagingPublicAddress(String messagingPublicAddress) {
+ configManager.set(nodeId, Option.MESSAGING_PUBLIC_ADDRESS, messagingPublicAddress);
+ }
+
+ public int getMessagingPublicPort() {
+ return appConfig.getInt(Option.MESSAGING_PUBLIC_PORT);
+ }
+
+ public void setMessagingPublicPort(int messagingPublicPort) {
+ configManager.set(nodeId, Option.MESSAGING_PUBLIC_PORT, messagingPublicPort);
+ }
+
+ public int getClusterConnectRetries() {
+ return appConfig.getInt(Option.CLUSTER_CONNECT_RETRIES);
+ }
+
+ public void setClusterConnectRetries(int clusterConnectRetries) {
+ configManager.set(nodeId, Option.CLUSTER_CONNECT_RETRIES, clusterConnectRetries);
+ }
+
+ public String[] getIODevices() {
+ return appConfig.getStringArray(Option.IODEVICES);
+ }
+
+ public void setIODevices(String[] iodevices) {
+ configManager.set(nodeId, Option.IODEVICES, iodevices);
+ }
+
+ public int getNetThreadCount() {
+ return appConfig.getInt(Option.NET_THREAD_COUNT);
+ }
+
+ public void setNetThreadCount(int netThreadCount) {
+ configManager.set(nodeId, Option.NET_THREAD_COUNT, netThreadCount);
+ }
+
+ public int getNetBufferCount() {
+ return appConfig.getInt(Option.NET_BUFFER_COUNT);
+ }
+
+ public void setNetBufferCount(int netBufferCount) {
+ configManager.set(nodeId, Option.NET_BUFFER_COUNT, netBufferCount);
+ }
+
+ public long getResultTTL() {
+ return appConfig.getLong(Option.RESULT_TTL);
+ }
+
+ public void setResultTTL(long resultTTL) {
+ configManager.set(nodeId, Option.RESULT_TTL, resultTTL);
+ }
+
+ public long getResultSweepThreshold() {
+ return appConfig.getLong(Option.RESULT_SWEEP_THRESHOLD);
+ }
+
+ public void setResultSweepThreshold(long resultSweepThreshold) {
+ configManager.set(nodeId, Option.RESULT_SWEEP_THRESHOLD, resultSweepThreshold);
+ }
+
+ public int getResultManagerMemory() {
+ return appConfig.getInt(Option.RESULT_MANAGER_MEMORY);
+ }
+
+ public void setResultManagerMemory(int resultManagerMemory) {
+ configManager.set(nodeId, Option.RESULT_MANAGER_MEMORY, resultManagerMemory);
+ }
+
+ public String getAppClass() {
+ return appConfig.getString(Option.APP_CLASS);
+ }
+
+ public void setAppClass(String appClass) {
+ configManager.set(nodeId, Option.APP_CLASS, appClass);
+ }
+
+ public int getNCServicePid() {
+ return appConfig.getInt(Option.NCSERVICE_PID);
+ }
+
+ public void setNCServicePid(int ncservicePid) {
+ configManager.set(nodeId, Option.NCSERVICE_PID, ncservicePid);
+ }
+
+ public boolean getVirtualNC() {
+ return appConfig.getBoolean(Option.VIRTUAL_NC);
+ }
+
+ public void setVirtualNC(boolean virtualNC) {
+ configManager.set(nodeId, Option.VIRTUAL_NC, virtualNC);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/pom.xml b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/pom.xml
index 148cf18..a7e3fa9 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/pom.xml
@@ -60,7 +60,6 @@
<dependency>
<groupId>args4j</groupId>
<artifactId>args4j</artifactId>
- <version>2.0.12</version>
</dependency>
<dependency>
<groupId>org.apache.hyracks</groupId>
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCApplicationEntryPoint.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCApplicationEntryPoint.java
new file mode 100644
index 0000000..d4e67fd
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCApplicationEntryPoint.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc;
+
+import java.lang.management.ManagementFactory;
+import java.util.Arrays;
+
+import org.apache.hyracks.api.application.INCApplicationContext;
+import org.apache.hyracks.api.application.INCApplicationEntryPoint;
+import org.apache.hyracks.api.config.IConfigManager;
+import org.apache.hyracks.api.config.Section;
+import org.apache.hyracks.api.job.resource.NodeCapacity;
+import org.apache.hyracks.control.common.controllers.CCConfig;
+import org.apache.hyracks.control.common.controllers.ControllerConfig;
+import org.apache.hyracks.control.common.controllers.NCConfig;
+
+public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
+ public static final NCApplicationEntryPoint INSTANCE = new NCApplicationEntryPoint();
+
+ protected NCApplicationEntryPoint() {
+ }
+
+ @Override
+ public void start(INCApplicationContext ncAppCtx, String[] args) throws Exception {
+ if (args.length > 0) {
+ throw new IllegalArgumentException("Unrecognized argument(s): " + Arrays.toString(args));
+ }
+ }
+
+ @Override
+ public void notifyStartupComplete() throws Exception {
+ // no-op
+ }
+
+ @Override
+ public void stop() throws Exception {
+ // no-op
+ }
+
+ @Override
+ public NodeCapacity getCapacity() {
+ int allCores = ManagementFactory.getOperatingSystemMXBean().getAvailableProcessors();
+ return new NodeCapacity(Runtime.getRuntime().maxMemory(), allCores > 1 ? allCores - 1 : allCores);
+ }
+
+ @Override
+ public void registerConfigOptions(IConfigManager configManager) {
+ configManager.addIniParamOptions(ControllerConfig.Option.CONFIG_FILE, ControllerConfig.Option.CONFIG_FILE_URL);
+ configManager.addCmdLineSections(Section.NC, Section.COMMON, Section.LOCALNC);
+ configManager.setUsageFilter(getUsageFilter());
+ configManager.register(ControllerConfig.Option.class, CCConfig.Option.class, NCConfig.Option.class);
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCDriver.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCDriver.java
index 2323d71..b52064e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCDriver.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCDriver.java
@@ -18,28 +18,32 @@
*/
package org.apache.hyracks.control.nc;
+import java.io.IOException;
+import java.util.Arrays;
import java.util.logging.Level;
import java.util.logging.Logger;
+import org.apache.hyracks.api.application.INCApplicationEntryPoint;
+import org.apache.hyracks.control.common.config.ConfigManager;
+import org.apache.hyracks.control.common.config.ConfigUtils;
import org.apache.hyracks.control.common.controllers.NCConfig;
-import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.CmdLineException;
+@SuppressWarnings("InfiniteLoopStatement")
public class NCDriver {
private static final Logger LOGGER = Logger.getLogger(NCDriver.class.getName());
- public static void main(String args[]) throws Exception {
+ private NCDriver() {
+ }
+
+ public static void main(String[] args) {
try {
- NCConfig ncConfig = new NCConfig();
- CmdLineParser cp = new CmdLineParser(ncConfig);
- try {
- cp.parseArgument(args);
- } catch (Exception e) {
- e.printStackTrace();
- cp.printUsage(System.err);
- System.exit(1);
- }
- ncConfig.loadConfigAndApplyDefaults();
- final NodeControllerService ncService = new NodeControllerService(ncConfig);
+ final String nodeId = ConfigUtils.getOptionValue(args, NCConfig.Option.NODE_ID);
+ final ConfigManager configManager = new ConfigManager(args);
+ INCApplicationEntryPoint appEntryPoint = getAppEntryPoint(args);
+ appEntryPoint.registerConfigOptions(configManager);
+ NCConfig ncConfig = new NCConfig(nodeId, configManager);
+ final NodeControllerService ncService = new NodeControllerService(ncConfig, appEntryPoint);
if (LOGGER.isLoggable(Level.SEVERE)) {
LOGGER.severe("Setting uncaught exception handler " + ncService.getLifeCycleComponentManager());
}
@@ -49,9 +53,20 @@
while (true) {
Thread.sleep(10000);
}
+ } catch (CmdLineException e) {
+ LOGGER.log(Level.FINE, "Exception parsing command line: " + Arrays.toString(args), e);
+ System.exit(2);
} catch (Exception e) {
- e.printStackTrace();
+ LOGGER.log(Level.SEVERE, "Exiting NCDriver due to exception", e);
System.exit(1);
}
}
+
+ private static INCApplicationEntryPoint getAppEntryPoint(String[] args)
+ throws ClassNotFoundException, InstantiationException, IllegalAccessException, IOException {
+ // determine app class so that we can use the correct implementation of the configuration...
+ String appClassName = ConfigUtils.getOptionValue(args, NCConfig.Option.APP_CLASS);
+ return appClassName != null ? (INCApplicationEntryPoint) (Class.forName(appClassName)).newInstance()
+ : NCApplicationEntryPoint.INSTANCE;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index bf0ddb6..2ee9161 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -19,6 +19,7 @@
package org.apache.hyracks.control.nc;
import java.io.File;
+import java.io.IOException;
import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
@@ -51,11 +52,11 @@
import org.apache.hyracks.api.io.IODeviceHandle;
import org.apache.hyracks.api.job.ActivityClusterGraph;
import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.resource.NodeCapacity;
import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
import org.apache.hyracks.api.service.IControllerService;
import org.apache.hyracks.control.common.base.IClusterController;
+import org.apache.hyracks.control.common.config.ConfigManager;
import org.apache.hyracks.control.common.context.ServerContext;
import org.apache.hyracks.control.common.controllers.NCConfig;
import org.apache.hyracks.control.common.controllers.NodeParameters;
@@ -84,6 +85,7 @@
import org.apache.hyracks.ipc.impl.IPCSystem;
import org.apache.hyracks.net.protocols.muxdemux.FullFrameChannelInterfaceFactory;
import org.apache.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters;
+import org.kohsuke.args4j.CmdLineException;
public class NodeControllerService implements IControllerService {
private static final Logger LOGGER = Logger.getLogger(NodeControllerService.class.getName());
@@ -130,7 +132,7 @@
private NCApplicationContext appCtx;
- private INCApplicationEntryPoint ncAppEntryPoint;
+ private final INCApplicationEntryPoint ncAppEntryPoint;
private final ILifeCycleComponentManager lccm;
@@ -154,13 +156,25 @@
private MessagingNetworkManager messagingNetManager;
- public NodeControllerService(NCConfig ncConfig) throws Exception {
- this.ncConfig = ncConfig;
- id = ncConfig.nodeId;
+ private final ConfigManager configManager;
- ioManager = new IOManager(IODeviceHandle.getDevices(ncConfig.ioDevices));
+ public NodeControllerService(NCConfig config) throws Exception {
+ this(config, getApplicationEntryPoint(config));
+ }
+
+ public NodeControllerService(NCConfig config, INCApplicationEntryPoint aep) throws IOException, CmdLineException {
+ this.ncConfig = config;
+ this.configManager = ncConfig.getConfigManager();
+ if (aep == null) {
+ throw new IllegalArgumentException("INCApplicationEntryPoint cannot be null");
+ }
+ configManager.processConfig();
+ this.ncAppEntryPoint = aep;
+ id = ncConfig.getNodeId();
+
+ ioManager = new IOManager(IODeviceHandle.getDevices(ncConfig.getIODevices()));
if (id == null) {
- throw new Exception("id not set");
+ throw new HyracksException("id not set");
}
lccm = new LifeCycleComponentManager();
@@ -224,14 +238,16 @@
private void init() throws Exception {
ioManager.setExecutor(executor);
- datasetPartitionManager = new DatasetPartitionManager(this, executor, ncConfig.resultManagerMemory,
- ncConfig.resultTTL, ncConfig.resultSweepThreshold);
- datasetNetworkManager = new DatasetNetworkManager(ncConfig.resultIPAddress, ncConfig.resultPort,
- datasetPartitionManager, ncConfig.nNetThreads, ncConfig.nNetBuffers, ncConfig.resultPublicIPAddress,
- ncConfig.resultPublicPort, FullFrameChannelInterfaceFactory.INSTANCE);
- if (ncConfig.messagingIPAddress != null && appCtx.getMessagingChannelInterfaceFactory() != null) {
- messagingNetManager = new MessagingNetworkManager(this, ncConfig.messagingIPAddress, ncConfig.messagingPort,
- ncConfig.nNetThreads, ncConfig.messagingPublicIPAddress, ncConfig.messagingPublicPort,
+ datasetPartitionManager = new DatasetPartitionManager(this, executor, ncConfig.getResultManagerMemory(),
+ ncConfig.getResultTTL(), ncConfig.getResultSweepThreshold());
+ datasetNetworkManager = new DatasetNetworkManager(ncConfig.getResultListenAddress(),
+ ncConfig.getResultListenPort(), datasetPartitionManager, ncConfig.getNetThreadCount(),
+ ncConfig.getNetBufferCount(), ncConfig.getResultPublicAddress(), ncConfig.getResultPublicPort(),
+ FullFrameChannelInterfaceFactory.INSTANCE);
+ if (ncConfig.getMessagingListenAddress() != null && appCtx.getMessagingChannelInterfaceFactory() != null) {
+ messagingNetManager = new MessagingNetworkManager(this, ncConfig.getMessagingListenAddress(),
+ ncConfig.getMessagingListenPort(), ncConfig.getNetThreadCount(),
+ ncConfig.getMessagingPublicAddress(), ncConfig.getMessagingPublicPort(),
appCtx.getMessagingChannelInterfaceFactory());
}
}
@@ -239,12 +255,13 @@
@Override
public void start() throws Exception {
LOGGER.log(Level.INFO, "Starting NodeControllerService");
- ipc = new IPCSystem(new InetSocketAddress(ncConfig.clusterNetIPAddress, ncConfig.clusterNetPort),
+ ipc = new IPCSystem(new InetSocketAddress(ncConfig.getClusterListenAddress(), ncConfig.getClusterListenPort()),
new NodeControllerIPCI(this), new CCNCFunctions.SerializerDeserializer());
ipc.start();
partitionManager = new PartitionManager(this);
- netManager = new NetworkManager(ncConfig.dataIPAddress, ncConfig.dataPort, partitionManager,
- ncConfig.nNetThreads, ncConfig.nNetBuffers, ncConfig.dataPublicIPAddress, ncConfig.dataPublicPort,
+ netManager = new NetworkManager(ncConfig.getDataListenAddress(), ncConfig.getDataListenPort(), partitionManager,
+ ncConfig.getNetThreadCount(), ncConfig.getNetBufferCount(), ncConfig.getDataPublicAddress(),
+ ncConfig.getDataPublicPort(),
FullFrameChannelInterfaceFactory.INSTANCE);
netManager.start();
@@ -255,8 +272,9 @@
if (messagingNetManager != null) {
messagingNetManager.start();
}
- IIPCHandle ccIPCHandle = ipc.getHandle(new InetSocketAddress(ncConfig.ccHost, ncConfig.ccPort),
- ncConfig.retries);
+ IIPCHandle ccIPCHandle = ipc.getHandle(
+ new InetSocketAddress(ncConfig.getClusterAddress(), ncConfig.getClusterPort()),
+ ncConfig.getClusterConnectRetries());
this.ccs = new ClusterControllerRemoteProxy(ccIPCHandle);
HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()];
for (int i = 0; i < gcInfos.length; ++i) {
@@ -274,10 +292,7 @@
runtimeMXBean.getVmName(), runtimeMXBean.getVmVersion(), runtimeMXBean.getVmVendor(),
runtimeMXBean.getClassPath(), runtimeMXBean.getLibraryPath(), runtimeMXBean.getBootClassPath(),
runtimeMXBean.getInputArguments(), runtimeMXBean.getSystemProperties(), hbSchema, meesagingPort,
- ncAppEntryPoint == null
- ? new NodeCapacity(Runtime.getRuntime().maxMemory(), allCores > 1 ? allCores - 1 : allCores)
- : ncAppEntryPoint.getCapacity(),
- PidHelper.getPid()));
+ ncAppEntryPoint.getCapacity(), PidHelper.getPid()));
synchronized (this) {
while (registrationPending) {
@@ -307,21 +322,12 @@
}
LOGGER.log(Level.INFO, "Started NodeControllerService");
- if (ncAppEntryPoint != null) {
- ncAppEntryPoint.notifyStartupComplete();
- }
+ ncAppEntryPoint.notifyStartupComplete();
}
private void startApplication() throws Exception {
appCtx = new NCApplicationContext(this, serverCtx, ioManager, id, memoryManager, lccm, ncConfig.getAppConfig());
- String className = ncConfig.appNCMainClass;
- if (className != null) {
- Class<?> c = Class.forName(className);
- ncAppEntryPoint = (INCApplicationEntryPoint) c.newInstance();
- String[] args = ncConfig.appArgs == null ? new String[0]
- : ncConfig.appArgs.toArray(new String[ncConfig.appArgs.size()]);
- ncAppEntryPoint.start(appCtx, args);
- }
+ ncAppEntryPoint.start(appCtx, ncConfig.getAppArgsArray());
executor = Executors.newCachedThreadPool(appCtx.getThreadFactory());
}
@@ -341,10 +347,8 @@
messagingNetManager.stop();
}
workQueue.stop();
- if (ncAppEntryPoint != null) {
- ncAppEntryPoint.stop();
- }
- /**
+ ncAppEntryPoint.stop();
+ /*
* Stop heartbeat after NC has stopped to avoid false node failure detection
* on CC if an NC takes a long time to stop.
*/
@@ -525,4 +529,14 @@
public MessagingNetworkManager getMessagingNetworkManager() {
return messagingNetManager;
}
+
+ private static INCApplicationEntryPoint getApplicationEntryPoint(NCConfig config)
+ throws ClassNotFoundException, IllegalAccessException, InstantiationException {
+ if (config.getAppClass() != null) {
+ Class<?> c = Class.forName(config.getAppClass());
+ return (INCApplicationEntryPoint) c.newInstance();
+ } else {
+ return NCApplicationEntryPoint.INSTANCE;
+ }
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCApplicationContext.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCApplicationContext.java
index f52099d..6d549c2 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCApplicationContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCApplicationContext.java
@@ -22,10 +22,10 @@
import java.io.OutputStream;
import java.io.Serializable;
-import org.apache.hyracks.api.application.IApplicationConfig;
import org.apache.hyracks.api.application.INCApplicationContext;
import org.apache.hyracks.api.application.IStateDumpHandler;
import org.apache.hyracks.api.comm.IChannelInterfaceFactory;
+import org.apache.hyracks.api.config.IApplicationConfig;
import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
import org.apache.hyracks.api.resources.memory.IMemoryManager;
import org.apache.hyracks.api.service.IControllerService;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/pom.xml b/hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/pom.xml
index 8d9a93b..ae7f272 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/pom.xml
@@ -31,9 +31,6 @@
<dependency>
<groupId>args4j</groupId>
<artifactId>args4j</artifactId>
- <version>2.0.12</version>
- <type>jar</type>
- <scope>compile</scope>
</dependency>
<dependency>
<groupId>org.ini4j</groupId>
@@ -42,6 +39,11 @@
</dependency>
<dependency>
<groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
<artifactId>hyracks-control-common</artifactId>
<version>${project.version}</version>
</dependency>
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/src/main/java/org/apache/hyracks/control/nc/service/NCService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/src/main/java/org/apache/hyracks/control/nc/service/NCService.java
index 9b00cc2..6b11ecc 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/src/main/java/org/apache/hyracks/control/nc/service/NCService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/src/main/java/org/apache/hyracks/control/nc/service/NCService.java
@@ -23,6 +23,7 @@
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.StringReader;
+import java.io.StringWriter;
import java.lang.management.ManagementFactory;
import java.lang.management.OperatingSystemMXBean;
import java.net.InetAddress;
@@ -35,7 +36,9 @@
import java.util.logging.Logger;
import org.apache.commons.lang3.SystemUtils;
-import org.apache.hyracks.control.common.controllers.IniUtils;
+import org.apache.hyracks.control.common.config.ConfigUtils;
+import org.apache.hyracks.control.common.controllers.NCConfig;
+import org.apache.hyracks.api.config.Section;
import org.apache.hyracks.control.common.controllers.ServiceConstants;
import org.apache.hyracks.control.common.controllers.ServiceConstants.ServiceCommand;
import org.ini4j.Ini;
@@ -85,7 +88,7 @@
// Find the command to run. For now, we allow overriding the name, but
// still assume it's located in the bin/ directory of the deployment.
// Even this is likely more configurability than we need.
- String command = IniUtils.getString(ini, nodeSection, "command", "hyracksnc");
+ String command = ConfigUtils.getString(ini, nodeSection, NCConfig.Option.COMMAND.ini(), "hyracksnc");
// app.home is specified by the Maven appassembler plugin. If it isn't set,
// fall back to user's home dir. Again this is likely more flexibility
// than we need.
@@ -112,7 +115,7 @@
}
private static void configEnvironment(Map<String, String> env) {
- String jvmargs = IniUtils.getString(ini, nodeSection, "jvm.args", null);
+ String jvmargs = ConfigUtils.getString(ini, nodeSection, NCConfig.Option.JVM_ARGS.ini(), null);
if (jvmargs != null) {
LOGGER.info("Using JAVA_OPTS from conf file (jvm.args)");
} else {
@@ -188,7 +191,13 @@
return retval == 0;
} catch (Exception e) {
if (LOGGER.isLoggable(Level.SEVERE)) {
- LOGGER.log(Level.SEVERE, "Configuration from CC broken", e);
+ StringWriter sw = new StringWriter();
+ try {
+ ini.store(sw);
+ LOGGER.log(Level.SEVERE, "Configuration from CC broken: \n" + sw.toString(), e);
+ } catch (IOException e1) {
+ LOGGER.log(Level.SEVERE, "Configuration from CC broken, failed to serialize", e1);
+ }
}
return false;
}
@@ -213,7 +222,7 @@
case START_NC:
String iniString = ois.readUTF();
ini = new Ini(new StringReader(iniString));
- ncId = IniUtils.getString(ini, "localnc", "id", "");
+ ncId = ConfigUtils.getString(ini, Section.LOCALNC, NCConfig.Option.NODE_ID, "");
nodeSection = "nc/" + ncId;
return launchNCProcess();
case TERMINATE:
@@ -272,7 +281,7 @@
try (ServerSocket listener = new ServerSocket(port, 5, addr)) {
boolean launched = false;
while (!launched) {
- LOGGER.info("Waiting for connection from CC on " + addr + ":" + port);
+ LOGGER.info("Waiting for connection from CC on " + (addr == null ? "*" : addr) + ":" + port);
try (Socket socket = listener.accept()) {
// QQQ Because acceptConnection() doesn't return if the
// service is started appropriately, the socket remains
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/src/main/java/org/apache/hyracks/control/nc/service/NCServiceConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/src/main/java/org/apache/hyracks/control/nc/service/NCServiceConfig.java
index 91c9f6d..10fa679 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/src/main/java/org/apache/hyracks/control/nc/service/NCServiceConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/src/main/java/org/apache/hyracks/control/nc/service/NCServiceConfig.java
@@ -18,7 +18,7 @@
*/
package org.apache.hyracks.control.nc.service;
-import org.apache.hyracks.control.common.controllers.IniUtils;
+import org.apache.hyracks.control.common.config.ConfigUtils;
import org.ini4j.Ini;
import org.kohsuke.args4j.Option;
@@ -58,10 +58,10 @@
* It does not apply defaults or any logic.
*/
private void loadINIFile() throws IOException {
- ini = IniUtils.loadINIFile(configFile);
- address = IniUtils.getString(ini, "ncservice", "address", address);
- port = IniUtils.getInt(ini, "ncservice", "port", port);
- logdir = IniUtils.getString(ini, "ncservice", "logdir", logdir);
+ ini = ConfigUtils.loadINIFile(configFile);
+ address = ConfigUtils.getString(ini, "ncservice", "address", address);
+ port = ConfigUtils.getInt(ini, "ncservice", "port", port);
+ logdir = ConfigUtils.getString(ini, "ncservice", "logdir", logdir);
}
/**
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/pom.xml b/hyracks-fullstack/hyracks/hyracks-dataflow-std/pom.xml
index 6cfaaa5..e7ed599 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/pom.xml
@@ -47,7 +47,7 @@
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<configuration>
- <excludes>
+ <excludes combine.children="append">
<exclude>src/test/resources/data/beer.txt</exclude>
</excludes>
</configuration>
diff --git a/hyracks-fullstack/hyracks/hyracks-dist/pom.xml b/hyracks-fullstack/hyracks/hyracks-dist/pom.xml
index b88d292..7b6d7ec 100644
--- a/hyracks-fullstack/hyracks/hyracks-dist/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-dist/pom.xml
@@ -88,7 +88,7 @@
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<configuration>
- <excludes>
+ <excludes combine.children="append">
<exclude>src/main/resources/conf/master</exclude>
<exclude>src/main/resources/conf/slaves</exclude>
</excludes>
diff --git a/hyracks-fullstack/hyracks/hyracks-dist/src/main/resources/bin/startDebugNc.sh b/hyracks-fullstack/hyracks/hyracks-dist/src/main/resources/bin/startDebugNc.sh
index 4d2fc51..9794b07 100755
--- a/hyracks-fullstack/hyracks/hyracks-dist/src/main/resources/bin/startDebugNc.sh
+++ b/hyracks-fullstack/hyracks/hyracks-dist/src/main/resources/bin/startDebugNc.sh
@@ -65,4 +65,4 @@
cd $NCTMP_DIR2
#Launch hyracks nc
-$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyracksnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR -data-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS2}" &> $NCLOGS_DIR2/$NODEID.log &
+$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyracksnc -cluster-address $CCHOST -cluster-port $CC_CLUSTERPORT -address $IPADDR -data-listen-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS2}" &> $NCLOGS_DIR2/$NODEID.log &
diff --git a/hyracks-fullstack/hyracks/hyracks-dist/src/main/resources/bin/startcc.sh b/hyracks-fullstack/hyracks/hyracks-dist/src/main/resources/bin/startcc.sh
index 5fa2b86..1bbbe10 100755
--- a/hyracks-fullstack/hyracks/hyracks-dist/src/main/resources/bin/startcc.sh
+++ b/hyracks-fullstack/hyracks/hyracks-dist/src/main/resources/bin/startcc.sh
@@ -42,8 +42,8 @@
chmod -R 755 $HYRACKS_HOME
if [ -f "conf/topology.xml" ]; then
#Launch hyracks cc script with topology
-$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyrackscc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -max-heartbeat-lapse-periods 999999 -default-max-job-attempts 0 -job-history-size 0 -cluster-topology "conf/topology.xml" &> $CCLOGS_DIR/cc.log &
+$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyrackscc -client-listen-address $CCHOST -address $CCHOST -client-listen-port $CC_CLIENTPORT -cluster-listen-port $CC_CLUSTERPORT -heartbeat-max-misses 999999 -job-history-size 0 -cluster-topology "conf/topology.xml" &> $CCLOGS_DIR/cc.log &
else
#Launch hyracks cc script without toplogy
-$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyrackscc -client-net-ip-address $CCHOST -cluster-net-ip-address $CCHOST -client-net-port $CC_CLIENTPORT -cluster-net-port $CC_CLUSTERPORT -max-heartbeat-lapse-periods 999999 -default-max-job-attempts 0 -job-history-size 0 &> $CCLOGS_DIR/cc.log &
+$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyrackscc -client-listen-address $CCHOST -address $CCHOST -client-listen-port $CC_CLIENTPORT -cluster-listen-port $CC_CLUSTERPORT -heartbeat-max-misses 999999 -job-history-size 0 &> $CCLOGS_DIR/cc.log &
fi
diff --git a/hyracks-fullstack/hyracks/hyracks-dist/src/main/resources/bin/startnc.sh b/hyracks-fullstack/hyracks/hyracks-dist/src/main/resources/bin/startnc.sh
index d09b9c1..62d671f 100755
--- a/hyracks-fullstack/hyracks/hyracks-dist/src/main/resources/bin/startnc.sh
+++ b/hyracks-fullstack/hyracks/hyracks-dist/src/main/resources/bin/startnc.sh
@@ -64,4 +64,4 @@
cd $NCTMP_DIR
#Launch hyracks nc
-$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyracksnc -cc-host $CCHOST -cc-port $CC_CLUSTERPORT -cluster-net-ip-address $IPADDR -data-ip-address $IPADDR -result-ip-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS}" &> $NCLOGS_DIR/$NODEID.log &
+$HYRACKS_HOME/hyracks-server/target/appassembler/bin/hyracksnc -cluster-address $CCHOST -cluster-port $CC_CLUSTERPORT -address $IPADDR -data-listen-address $IPADDR -result-listen-address $IPADDR -node-id $NODEID -iodevices "${IO_DIRS}" &> $NCLOGS_DIR/$NODEID.log &
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/pom.xml b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/pom.xml
index db62a85..2486fe8 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/pom.xml
@@ -61,7 +61,6 @@
<dependency>
<groupId>args4j</groupId>
<artifactId>args4j</artifactId>
- <version>2.0.12</version>
</dependency>
<dependency>
<groupId>org.apache.hyracks</groupId>
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/NCApplicationEntryPoint.java b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/NCApplicationEntryPoint.java
index 5180f23..eec28a2 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/NCApplicationEntryPoint.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/NCApplicationEntryPoint.java
@@ -20,6 +20,7 @@
import org.apache.hyracks.api.application.INCApplicationContext;
import org.apache.hyracks.api.application.INCApplicationEntryPoint;
+import org.apache.hyracks.api.config.IConfigManager;
import org.apache.hyracks.api.job.resource.NodeCapacity;
public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
@@ -44,4 +45,10 @@
public NodeCapacity getCapacity() {
return new NodeCapacity(Runtime.getRuntime().maxMemory(), Runtime.getRuntime().availableProcessors() - 1);
}
+
+ @Override
+ public void registerConfigOptions(IConfigManager configManager) {
+ // no-op
+ }
+
}
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/pom.xml b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/pom.xml
index 630b984..b26f00f 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/pom.xml
@@ -41,7 +41,7 @@
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<configuration>
- <excludes>
+ <excludes combine.children="append">
<exclude>data/**</exclude>
</excludes>
</configuration>
@@ -174,5 +174,10 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-util</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
</project>
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java
index a7677f8..82fd737 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractIntegrationTest.java
@@ -18,6 +18,8 @@
*/
package org.apache.hyracks.tests.integration;
+import static org.apache.hyracks.util.file.FileUtil.joinPath;
+
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
@@ -82,47 +84,43 @@
@BeforeClass
public static void init() throws Exception {
CCConfig ccConfig = new CCConfig();
- ccConfig.clientNetIpAddress = "127.0.0.1";
- ccConfig.clientNetPort = 39000;
- ccConfig.clusterNetIpAddress = "127.0.0.1";
- ccConfig.clusterNetPort = 39001;
- ccConfig.profileDumpPeriod = 10000;
+ ccConfig.setClientListenAddress("127.0.0.1");
+ ccConfig.setClientListenPort(39000);
+ ccConfig.setClusterListenAddress("127.0.0.1");
+ ccConfig.setClusterListenPort(39001);
+ ccConfig.setProfileDumpPeriod(10000);
FileUtils.deleteQuietly(new File("target" + File.separator + "data"));
- FileUtils.copyDirectory(new File("data"), new File("target" + File.separator + "data"));
+ FileUtils.copyDirectory(new File("data"), new File(joinPath("target", "data")));
File outDir = new File("target" + File.separator + "ClusterController");
outDir.mkdirs();
File ccRoot = File.createTempFile(AbstractIntegrationTest.class.getName(), ".data", outDir);
ccRoot.delete();
ccRoot.mkdir();
- ccConfig.ccRoot = ccRoot.getAbsolutePath();
+ ccConfig.setRootDir(ccRoot.getAbsolutePath());
cc = new ClusterControllerService(ccConfig);
cc.start();
- NCConfig ncConfig1 = new NCConfig();
- ncConfig1.ccHost = "localhost";
- ncConfig1.ccPort = 39001;
- ncConfig1.clusterNetIPAddress = "127.0.0.1";
- ncConfig1.dataIPAddress = "127.0.0.1";
- ncConfig1.resultIPAddress = "127.0.0.1";
- ncConfig1.nodeId = NC1_ID;
- ncConfig1.ioDevices = System.getProperty("user.dir") + File.separator + "target" + File.separator + "data"
- + File.separator + "device0";
+ NCConfig ncConfig1 = new NCConfig(NC1_ID);
+ ncConfig1.setClusterAddress("localhost");
+ ncConfig1.setClusterPort(39001);
+ ncConfig1.setClusterListenAddress("127.0.0.1");
+ ncConfig1.setDataListenAddress("127.0.0.1");
+ ncConfig1.setResultListenAddress("127.0.0.1");
+ ncConfig1.setIODevices(new String [] { joinPath(System.getProperty("user.dir"), "target", "data", "device0") });
nc1 = new NodeControllerService(ncConfig1);
nc1.start();
- NCConfig ncConfig2 = new NCConfig();
- ncConfig2.ccHost = "localhost";
- ncConfig2.ccPort = 39001;
- ncConfig2.clusterNetIPAddress = "127.0.0.1";
- ncConfig2.dataIPAddress = "127.0.0.1";
- ncConfig2.resultIPAddress = "127.0.0.1";
- ncConfig2.nodeId = NC2_ID;
- ncConfig2.ioDevices = System.getProperty("user.dir") + File.separator + "target" + File.separator + "data"
- + File.separator + "device1";
+ NCConfig ncConfig2 = new NCConfig(NC2_ID);
+ ncConfig2.setClusterAddress("localhost");
+ ncConfig2.setClusterPort(39001);
+ ncConfig2.setClusterListenAddress("127.0.0.1");
+ ncConfig2.setDataListenAddress("127.0.0.1");
+ ncConfig2.setResultListenAddress("127.0.0.1");
+ ncConfig2.setIODevices(new String [] { joinPath(System.getProperty("user.dir"), "target", "data", "device1") });
nc2 = new NodeControllerService(ncConfig2);
nc2.start();
- hcc = new HyracksConnection(ccConfig.clientNetIpAddress, ccConfig.clientNetPort);
+ hcc = new HyracksConnection(ccConfig.getClientListenAddress(), ccConfig.getClientListenPort());
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Starting CC in " + ccRoot.getAbsolutePath());
}
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
index 3d6ac00..f7959d8 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
@@ -26,9 +26,9 @@
import java.util.logging.Level;
import java.util.logging.Logger;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.commons.io.FileUtils;
-import org.apache.hyracks.api.application.ICCApplicationContext;
-import org.apache.hyracks.api.application.ICCApplicationEntryPoint;
import org.apache.hyracks.api.client.HyracksConnection;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.comm.IFrameTupleAccessor;
@@ -42,6 +42,7 @@
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.job.resource.IJobCapacityController;
import org.apache.hyracks.client.dataset.HyracksDataset;
+import org.apache.hyracks.control.cc.CCApplicationEntryPoint;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.common.controllers.CCConfig;
import org.apache.hyracks.control.common.controllers.NCConfig;
@@ -54,9 +55,6 @@
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-
public abstract class AbstractMultiNCIntegrationTest {
private static final Logger LOGGER = Logger.getLogger(AbstractMultiNCIntegrationTest.class.getName());
@@ -82,18 +80,18 @@
@BeforeClass
public static void init() throws Exception {
CCConfig ccConfig = new CCConfig();
- ccConfig.clientNetIpAddress = "127.0.0.1";
- ccConfig.clientNetPort = 39000;
- ccConfig.clusterNetIpAddress = "127.0.0.1";
- ccConfig.clusterNetPort = 39001;
- ccConfig.profileDumpPeriod = 10000;
+ ccConfig.setClientListenAddress("127.0.0.1");
+ ccConfig.setClientListenPort(39000);
+ ccConfig.setClusterListenAddress("127.0.0.1");
+ ccConfig.setClusterListenPort(39001);
+ ccConfig.setProfileDumpPeriod(10000);
File outDir = new File("target" + File.separator + "ClusterController");
outDir.mkdirs();
File ccRoot = File.createTempFile(AbstractMultiNCIntegrationTest.class.getName(), ".data", outDir);
ccRoot.delete();
ccRoot.mkdir();
- ccConfig.ccRoot = ccRoot.getAbsolutePath();
- ccConfig.appCCMainClass = DummyApplicationEntryPoint.class.getName();
+ ccConfig.setRootDir(ccRoot.getAbsolutePath());
+ ccConfig.setAppClass(DummyApplicationEntryPoint.class.getName());
cc = new ClusterControllerService(ccConfig);
cc.start();
@@ -102,19 +100,18 @@
File ioDev = new File("target" + File.separator + ASTERIX_IDS[i] + File.separator + "ioDevice");
FileUtils.forceMkdir(ioDev);
FileUtils.copyDirectory(new File("data" + File.separator + "device0"), ioDev);
- NCConfig ncConfig = new NCConfig();
- ncConfig.ccHost = "localhost";
- ncConfig.ccPort = 39001;
- ncConfig.clusterNetIPAddress = "127.0.0.1";
- ncConfig.dataIPAddress = "127.0.0.1";
- ncConfig.resultIPAddress = "127.0.0.1";
- ncConfig.nodeId = ASTERIX_IDS[i];
- ncConfig.ioDevices = ioDev.getAbsolutePath();
+ NCConfig ncConfig = new NCConfig(ASTERIX_IDS[i]);
+ ncConfig.setClusterAddress("localhost");
+ ncConfig.setClusterPort(39001);
+ ncConfig.setClusterListenAddress("127.0.0.1");
+ ncConfig.setDataListenAddress("127.0.0.1");
+ ncConfig.setResultListenAddress("127.0.0.1");
+ ncConfig.setIODevices(new String [] { ioDev.getAbsolutePath() });
asterixNCs[i] = new NodeControllerService(ncConfig);
asterixNCs[i].start();
}
- hcc = new HyracksConnection(ccConfig.clientNetIpAddress, ccConfig.clientNetPort);
+ hcc = new HyracksConnection(ccConfig.getClientListenAddress(), ccConfig.getClientListenPort());
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Starting CC in " + ccRoot.getAbsolutePath());
}
@@ -219,22 +216,7 @@
return tempFile;
}
- public static class DummyApplicationEntryPoint implements ICCApplicationEntryPoint {
-
- @Override
- public void start(ICCApplicationContext ccAppCtx, String[] args) throws Exception {
-
- }
-
- @Override
- public void stop() throws Exception {
-
- }
-
- @Override
- public void startupCompleted() throws Exception {
-
- }
+ public static class DummyApplicationEntryPoint extends CCApplicationEntryPoint {
@Override
public IJobCapacityController getJobCapacityController() {
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/NodesAPIIntegrationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/NodesAPIIntegrationTest.java
index 6f05600..672d2c4 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/NodesAPIIntegrationTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/NodesAPIIntegrationTest.java
@@ -48,7 +48,7 @@
"net-signaling-bytes-written", "dataset-net-payload-bytes-read", "dataset-net-payload-bytes-written",
"dataset-net-signaling-bytes-read", "dataset-net-signaling-bytes-written", "ipc-messages-sent",
"ipc-message-bytes-sent", "ipc-messages-received", "ipc-message-bytes-received", "disk-reads",
- "disk-writes", "ini" };
+ "disk-writes", "config" };
public static final String ROOT_PATH = "/rest/nodes";
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java
index 2509515..f911a75 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/PredistributedJobsTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.hyracks.tests.integration;
+import static org.apache.hyracks.util.file.FileUtil.joinPath;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.verify;
@@ -54,50 +55,46 @@
@BeforeClass
public static void init() throws Exception {
CCConfig ccConfig = new CCConfig();
- ccConfig.clientNetIpAddress = "127.0.0.1";
- ccConfig.clientNetPort = 39000;
- ccConfig.clusterNetIpAddress = "127.0.0.1";
- ccConfig.clusterNetPort = 39001;
- ccConfig.profileDumpPeriod = 10000;
- FileUtils.deleteQuietly(new File("target" + File.separator + "data"));
- FileUtils.copyDirectory(new File("data"), new File("target" + File.separator + "data"));
+ ccConfig.setClientListenAddress("127.0.0.1");
+ ccConfig.setClientListenPort(39000);
+ ccConfig.setClusterListenAddress("127.0.0.1");
+ ccConfig.setClusterListenPort(39001);
+ ccConfig.setProfileDumpPeriod(10000);
+ FileUtils.deleteQuietly(new File(joinPath("target", "data")));
+ FileUtils.copyDirectory(new File("data"), new File(joinPath("target", "data")));
File outDir = new File("target" + File.separator + "ClusterController");
outDir.mkdirs();
File ccRoot = File.createTempFile(AbstractIntegrationTest.class.getName(), ".data", outDir);
ccRoot.delete();
ccRoot.mkdir();
- ccConfig.ccRoot = ccRoot.getAbsolutePath();
+ ccConfig.setRootDir(ccRoot.getAbsolutePath());
ClusterControllerService ccBase = new ClusterControllerService(ccConfig);
cc = Mockito.spy(ccBase);
cc.start();
- NCConfig ncConfig1 = new NCConfig();
- ncConfig1.ccHost = "localhost";
- ncConfig1.ccPort = 39001;
- ncConfig1.clusterNetIPAddress = "127.0.0.1";
- ncConfig1.dataIPAddress = "127.0.0.1";
- ncConfig1.resultIPAddress = "127.0.0.1";
- ncConfig1.nodeId = NC1_ID;
- ncConfig1.ioDevices = System.getProperty("user.dir") + File.separator + "target" + File.separator + "data"
- + File.separator + "device0";
+ NCConfig ncConfig1 = new NCConfig(NC1_ID);
+ ncConfig1.setClusterAddress("localhost");
+ ncConfig1.setClusterPort(39001);
+ ncConfig1.setClusterListenAddress("127.0.0.1");
+ ncConfig1.setDataListenAddress("127.0.0.1");
+ ncConfig1.setResultListenAddress("127.0.0.1");
+ ncConfig1.setIODevices(new String [] { joinPath(System.getProperty("user.dir"), "target", "data", "device0") });
NodeControllerService nc1Base = new NodeControllerService(ncConfig1);
nc1 = Mockito.spy(nc1Base);
nc1.start();
- NCConfig ncConfig2 = new NCConfig();
- ncConfig2.ccHost = "localhost";
- ncConfig2.ccPort = 39001;
- ncConfig2.clusterNetIPAddress = "127.0.0.1";
- ncConfig2.dataIPAddress = "127.0.0.1";
- ncConfig2.resultIPAddress = "127.0.0.1";
- ncConfig2.nodeId = NC2_ID;
- ncConfig2.ioDevices = System.getProperty("user.dir") + File.separator + "target" + File.separator + "data"
- + File.separator + "device1";
+ NCConfig ncConfig2 = new NCConfig(NC2_ID);
+ ncConfig2.setClusterAddress("localhost");
+ ncConfig2.setClusterPort(39001);
+ ncConfig2.setClusterListenAddress("127.0.0.1");
+ ncConfig2.setDataListenAddress("127.0.0.1");
+ ncConfig2.setResultListenAddress("127.0.0.1");
+ ncConfig2.setIODevices(new String [] { joinPath(System.getProperty("user.dir"), "target", "data", "device1") });
NodeControllerService nc2Base = new NodeControllerService(ncConfig2);
nc2 = Mockito.spy(nc2Base);
nc2.start();
- hcc = new HyracksConnection(ccConfig.clientNetIpAddress, ccConfig.clientNetPort);
+ hcc = new HyracksConnection(ccConfig.getClientListenAddress(), ccConfig.getClientListenPort());
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Starting CC in " + ccRoot.getAbsolutePath());
}
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-shutdown-test/pom.xml b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-shutdown-test/pom.xml
index 0e9d24e..42a104d 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-shutdown-test/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-shutdown-test/pom.xml
@@ -40,9 +40,10 @@
<artifactId>maven-dependency-plugin</artifactId>
<version>2.10</version>
<configuration>
- <failOnWarning>true</failOnWarning>
- <outputXML>true</outputXML>
- <usedDependencies>org.apache.hyracks:hyracks-control-nc,org.apache.hyracks:hyracks-control-cc</usedDependencies>
+ <usedDependencies combine.children="append">
+ <usedDependency>org.apache.hyracks:hyracks-control-nc</usedDependency>
+ <usedDependency>org.apache.hyracks:hyracks-control-cc</usedDependency>
+ </usedDependencies>
</configuration>
<executions>
<execution>
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/pom.xml b/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/pom.xml
index e1155b7..9003724 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/pom.xml
@@ -51,7 +51,6 @@
<dependency>
<groupId>args4j</groupId>
<artifactId>args4j</artifactId>
- <version>2.0.12</version>
</dependency>
<dependency>
<groupId>org.apache.hyracks</groupId>
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/text-example/textserver/pom.xml b/hyracks-fullstack/hyracks/hyracks-examples/text-example/textserver/pom.xml
index 394fa11..ed269ea 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/text-example/textserver/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-examples/text-example/textserver/pom.xml
@@ -40,10 +40,11 @@
<artifactId>maven-dependency-plugin</artifactId>
<version>2.10</version>
<configuration>
- <failOnWarning>true</failOnWarning>
- <outputXML>true</outputXML>
- <usedDependencies>
- org.apache.hyracks:hyracks-control-nc,org.apache.hyracks:hyracks-control-cc,org.apache.hyracks:hyracks-dataflow-std,org.apache.hyracks:texthelper
+ <usedDependencies combine.children="append">
+ <usedDependency>org.apache.hyracks:hyracks-control-nc</usedDependency>
+ <usedDependency>org.apache.hyracks:hyracks-control-cc</usedDependency>
+ <usedDependency>org.apache.hyracks:hyracks-dataflow-std</usedDependency>
+ <usedDependency>org.apache.hyracks:texthelper</usedDependency>
</usedDependencies>
</configuration>
<executions>
@@ -103,7 +104,7 @@
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<configuration>
- <excludes>
+ <excludes combine.children="append">
<exclude>data/file1.txt</exclude>
<exclude>data/file2.txt</exclude>
</excludes>
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/pom.xml b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/pom.xml
index 5bd7796..f8378c6 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/pom.xml
@@ -45,7 +45,6 @@
<dependency>
<groupId>args4j</groupId>
<artifactId>args4j</artifactId>
- <version>2.0.12</version>
</dependency>
<dependency>
<groupId>org.apache.hyracks</groupId>
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/pom.xml b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/pom.xml
index b4abc4a..a8778d6 100644
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/pom.xml
@@ -66,7 +66,7 @@
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<configuration>
- <excludes>
+ <excludes combine.children="append">
<exclude>src/test/resources/data/customer.tbl</exclude>
<exclude>src/test/resources/expected/part-0</exclude>
</excludes>
@@ -406,5 +406,11 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-util</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/dataflow/DataflowTest.java b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/dataflow/DataflowTest.java
index 2307a43..bf7f2a0 100644
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/dataflow/DataflowTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/dataflow/DataflowTest.java
@@ -19,8 +19,6 @@
package org.apache.hyracks.hdfs.dataflow;
-import static org.apache.hyracks.test.support.TestUtils.joinPath;
-
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
@@ -61,6 +59,7 @@
import org.apache.hyracks.hdfs.scheduler.Scheduler;
import org.apache.hyracks.hdfs.utils.HyracksUtils;
import org.apache.hyracks.hdfs.utils.TestUtils;
+import org.apache.hyracks.util.file.FileUtil;
/**
* Test the org.apache.hyracks.hdfs.dataflow package,
@@ -69,19 +68,19 @@
@SuppressWarnings({ "deprecation" })
public class DataflowTest extends TestCase {
- protected static final String ACTUAL_RESULT_DIR = joinPath("target", "actual");
- private static final String TEST_RESOURCES = joinPath("src", "test", "resources");
- protected static final String EXPECTED_RESULT_PATH = joinPath(TEST_RESOURCES, "expected");
- private static final String PATH_TO_HADOOP_CONF = joinPath(TEST_RESOURCES, "hadoop", "conf");
- protected static final String BUILD_DIR = joinPath("target", "build");
+ protected static final String ACTUAL_RESULT_DIR = FileUtil.joinPath("target", "actual");
+ private static final String TEST_RESOURCES = FileUtil.joinPath("src", "test", "resources");
+ protected static final String EXPECTED_RESULT_PATH = FileUtil.joinPath(TEST_RESOURCES, "expected");
+ private static final String PATH_TO_HADOOP_CONF = FileUtil.joinPath(TEST_RESOURCES, "hadoop", "conf");
+ protected static final String BUILD_DIR = FileUtil.joinPath("target", "build");
- private static final String DATA_PATH = joinPath(TEST_RESOURCES, "data", "customer.tbl");
+ private static final String DATA_PATH = FileUtil.joinPath(TEST_RESOURCES, "data", "customer.tbl");
protected static final String HDFS_INPUT_PATH = "/customer/";
protected static final String HDFS_OUTPUT_PATH = "/customer_result/";
private static final String HADOOP_CONF_PATH = ACTUAL_RESULT_DIR + File.separator + "conf.xml";
- private static final String MINIDFS_BASEDIR = joinPath("target", "hdfs");
+ private static final String MINIDFS_BASEDIR = FileUtil.joinPath("target", "hdfs");
private MiniDFSCluster dfsCluster;
private JobConf conf = new JobConf();
@@ -121,8 +120,8 @@
FileSystem lfs = FileSystem.getLocal(new Configuration());
lfs.delete(new Path(BUILD_DIR), true);
- System.setProperty("hadoop.log.dir", joinPath("target", "logs"));
- getConfiguration().set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, MINIDFS_BASEDIR);
+ System.setProperty("hadoop.log.dir", FileUtil.joinPath("target", "logs"));
+ getConfiguration().set("hdfs.minidfs.basedir", MINIDFS_BASEDIR);
dfsCluster = getMiniDFSCluster(getConfiguration(), numberOfNC);
FileSystem dfs = FileSystem.get(getConfiguration());
Path src = new Path(DATA_PATH);
@@ -197,8 +196,8 @@
Path actual = new Path(ACTUAL_RESULT_DIR);
dfs.copyToLocalFile(result, actual);
- TestUtils.compareWithResult(new File(joinPath(EXPECTED_RESULT_PATH, "part-0")), new File(
- joinPath(ACTUAL_RESULT_DIR, "customer_result", "part-0")));
+ TestUtils.compareWithResult(new File(FileUtil.joinPath(EXPECTED_RESULT_PATH, "part-0")), new File(
+ FileUtil.joinPath(ACTUAL_RESULT_DIR, "customer_result", "part-0")));
return true;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/utils/HyracksUtils.java b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/utils/HyracksUtils.java
index cc32527..1fddc46 100644
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/utils/HyracksUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/utils/HyracksUtils.java
@@ -50,36 +50,33 @@
public static void init() throws Exception {
CCConfig ccConfig = new CCConfig();
- ccConfig.clientNetIpAddress = CC_HOST;
- ccConfig.clusterNetIpAddress = CC_HOST;
- ccConfig.clusterNetPort = TEST_HYRACKS_CC_PORT;
- ccConfig.clientNetPort = TEST_HYRACKS_CC_CLIENT_PORT;
- ccConfig.defaultMaxJobAttempts = 0;
- ccConfig.jobHistorySize = 0;
- ccConfig.profileDumpPeriod = -1;
+ ccConfig.setClientListenAddress(CC_HOST);
+ ccConfig.setClusterListenAddress(CC_HOST);
+ ccConfig.setClusterListenPort(TEST_HYRACKS_CC_PORT);
+ ccConfig.setClientListenPort(TEST_HYRACKS_CC_CLIENT_PORT);
+ ccConfig.setJobHistorySize(0);
+ ccConfig.setProfileDumpPeriod(-1);
// cluster controller
cc = new ClusterControllerService(ccConfig);
cc.start();
// two node controllers
- NCConfig ncConfig1 = new NCConfig();
- ncConfig1.ccHost = "localhost";
- ncConfig1.clusterNetIPAddress = "localhost";
- ncConfig1.ccPort = TEST_HYRACKS_CC_PORT;
- ncConfig1.dataIPAddress = "127.0.0.1";
- ncConfig1.resultIPAddress = "127.0.0.1";
- ncConfig1.nodeId = NC1_ID;
+ NCConfig ncConfig1 = new NCConfig(NC1_ID);
+ ncConfig1.setClusterAddress("localhost");
+ ncConfig1.setClusterListenAddress("localhost");
+ ncConfig1.setClusterPort(TEST_HYRACKS_CC_PORT);
+ ncConfig1.setDataListenAddress("127.0.0.1");
+ ncConfig1.setResultListenAddress("127.0.0.1");
nc1 = new NodeControllerService(ncConfig1);
nc1.start();
- NCConfig ncConfig2 = new NCConfig();
- ncConfig2.ccHost = "localhost";
- ncConfig2.clusterNetIPAddress = "localhost";
- ncConfig2.ccPort = TEST_HYRACKS_CC_PORT;
- ncConfig2.dataIPAddress = "127.0.0.1";
- ncConfig2.resultIPAddress = "127.0.0.1";
- ncConfig2.nodeId = NC2_ID;
+ NCConfig ncConfig2 = new NCConfig(NC2_ID);
+ ncConfig2.setClusterAddress("localhost");
+ ncConfig2.setClusterListenAddress("localhost");
+ ncConfig2.setClusterPort(TEST_HYRACKS_CC_PORT);
+ ncConfig2.setDataListenAddress("127.0.0.1");
+ ncConfig2.setResultListenAddress("127.0.0.1");
nc2 = new NodeControllerService(ncConfig2);
nc2.start();
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/AbstractServlet.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/AbstractServlet.java
index cb9deea..e4d9005 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/AbstractServlet.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/AbstractServlet.java
@@ -38,7 +38,7 @@
protected final ConcurrentMap<String, Object> ctx;
private final int[] trims;
- public AbstractServlet(ConcurrentMap<String, Object> ctx, String[] paths) {
+ public AbstractServlet(ConcurrentMap<String, Object> ctx, String... paths) {
this.paths = paths;
this.ctx = ctx;
trims = new int[paths.length];
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
index 9ee135b..fe2bcae 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
@@ -64,14 +64,14 @@
IPCConnectionManager(IPCSystem system, InetSocketAddress socketAddress) throws IOException {
this.system = system;
- this.networkThread = new NetworkThread();
- this.networkThread.setPriority(Thread.MAX_PRIORITY);
this.serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().setReuseAddress(true);
serverSocketChannel.configureBlocking(false);
ServerSocket socket = serverSocketChannel.socket();
socket.bind(socketAddress);
address = new InetSocketAddress(socket.getInetAddress(), socket.getLocalPort());
+ networkThread = new NetworkThread();
+ networkThread.setPriority(Thread.MAX_PRIORITY);
ipcHandleMap = new HashMap<>();
pendingConnections = new ArrayList<>();
workingPendingConnections = new ArrayList<>();
@@ -175,7 +175,7 @@
private final Selector selector;
public NetworkThread() {
- super("IPC Network Listener Thread");
+ super("IPC Network Listener Thread [" + address + "]");
setDaemon(true);
try {
selector = Selector.open();
@@ -323,7 +323,7 @@
failingLoops = 0;
} catch (Exception e) {
int sleepSecs = (int)Math.pow(2, Math.min(11, failingLoops++));
- LOGGER.log(Level.WARNING, "Exception processing message; sleeping " + sleepSecs
+ LOGGER.log(Level.SEVERE, "Exception processing message; sleeping " + sleepSecs
+ " seconds", e);
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(sleepSecs));
diff --git a/hyracks-fullstack/hyracks/hyracks-maven-plugins/hyracks-virtualcluster-maven-plugin/src/main/java/org/apache/hyracks/maven/plugin/HyracksCCStartMojo.java b/hyracks-fullstack/hyracks/hyracks-maven-plugins/hyracks-virtualcluster-maven-plugin/src/main/java/org/apache/hyracks/maven/plugin/HyracksCCStartMojo.java
index a461064..5a9d9dd 100644
--- a/hyracks-fullstack/hyracks/hyracks-maven-plugins/hyracks-virtualcluster-maven-plugin/src/main/java/org/apache/hyracks/maven/plugin/HyracksCCStartMojo.java
+++ b/hyracks-fullstack/hyracks/hyracks-maven-plugins/hyracks-virtualcluster-maven-plugin/src/main/java/org/apache/hyracks/maven/plugin/HyracksCCStartMojo.java
@@ -40,8 +40,8 @@
if (port != 0) {
cmdLineBuffer.append("-port ").append(port);
}
- cmdLineBuffer.append(" -client-net-ip-address 127.0.0.1");
- cmdLineBuffer.append(" -cluster-net-ip-address 127.0.0.1");
+ cmdLineBuffer.append(" -client-listen-address 127.0.0.1");
+ cmdLineBuffer.append(" -address 127.0.0.1");
String args = cmdLineBuffer.toString();
final Process proc = launch(new File(hyracksServerHome, makeScriptName(HYRACKS_CC_SCRIPT)), args, workingDir);
HyracksServiceRegistry.INSTANCE.addServiceProcess(proc);
diff --git a/hyracks-fullstack/hyracks/hyracks-maven-plugins/hyracks-virtualcluster-maven-plugin/src/main/java/org/apache/hyracks/maven/plugin/HyracksNCStartMojo.java b/hyracks-fullstack/hyracks/hyracks-maven-plugins/hyracks-virtualcluster-maven-plugin/src/main/java/org/apache/hyracks/maven/plugin/HyracksNCStartMojo.java
index 1f0e640..926e108 100644
--- a/hyracks-fullstack/hyracks/hyracks-maven-plugins/hyracks-virtualcluster-maven-plugin/src/main/java/org/apache/hyracks/maven/plugin/HyracksNCStartMojo.java
+++ b/hyracks-fullstack/hyracks/hyracks-maven-plugins/hyracks-virtualcluster-maven-plugin/src/main/java/org/apache/hyracks/maven/plugin/HyracksNCStartMojo.java
@@ -55,13 +55,13 @@
@Override
public void execute() throws MojoExecutionException, MojoFailureException {
StringBuilder cmdLineBuffer = new StringBuilder();
- cmdLineBuffer.append(" -cc-host ").append(ccHost);
- cmdLineBuffer.append(" -data-ip-address ").append(dataIpAddress);
+ cmdLineBuffer.append(" -cluster-address ").append(ccHost);
+ cmdLineBuffer.append(" -data-listen-address ").append(dataIpAddress);
cmdLineBuffer.append(" -node-id ").append(nodeId);
- cmdLineBuffer.append(" -cluster-net-ip-address 127.0.0.1");
- cmdLineBuffer.append(" -result-ip-address 127.0.0.1");
+ cmdLineBuffer.append(" -address 127.0.0.1");
+ cmdLineBuffer.append(" -result-listen-address 127.0.0.1");
if (ccPort != 0) {
- cmdLineBuffer.append(" -cc-port ").append(ccPort);
+ cmdLineBuffer.append(" -cluster-port ").append(ccPort);
}
String args = cmdLineBuffer.toString();
final Process proc = launch(new File(hyracksServerHome, makeScriptName(HYRACKS_NC_SCRIPT)), args, workingDir);
diff --git a/hyracks-fullstack/hyracks/hyracks-server/docs/README b/hyracks-fullstack/hyracks/hyracks-server/docs/README
index 06bb1e1..44cdcd8 100644
--- a/hyracks-fullstack/hyracks/hyracks-server/docs/README
+++ b/hyracks-fullstack/hyracks/hyracks-server/docs/README
@@ -19,15 +19,15 @@
The node controller is started by running bin/hyracksnc. It requires at least the following two command line arguments.
- -cc-host VAL : Cluster Controller host name
- -data-ip-address VAL : IP Address to bind data listener
+ -cluster-address VAL : Cluster Controller host name
+ -data-listen-address VAL : IP Address to bind data listener
If the cluster controller was directed to listen on a port other than the default, you will need to pass one more argument to hyracksnc.
- -cc-port N : Cluster Controller port (default: 1099)
+ -cluster-port N : Cluster Controller port (default: 1099)
-The data-ip-address is the interface on which the Node Controller must listen on -- in the event the machine is multi-homed it must listen on an IP that is reachable from
-other Node Controllers. Make sure that the value passed to the data-ip-address is a valid IPv4 address (four octets separated by .).
+The data-listen-address is the interface on which the Node Controller must listen on -- in the event the machine is multi-homed it must listen on an IP that is reachable from
+other Node Controllers. Make sure that the value passed to the data-listen-address is a valid IPv4 address (four octets separated by .).
3. Running a job on Hyracks
diff --git a/hyracks-fullstack/hyracks/hyracks-server/pom.xml b/hyracks-fullstack/hyracks/hyracks-server/pom.xml
index ded28ad..5e36b8a 100644
--- a/hyracks-fullstack/hyracks/hyracks-server/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-server/pom.xml
@@ -47,9 +47,9 @@
<artifactId>maven-dependency-plugin</artifactId>
<version>2.10</version>
<configuration>
- <failOnWarning>true</failOnWarning>
- <outputXML>true</outputXML>
- <usedDependencies>org.apache.hyracks:hyracks-control-nc</usedDependencies>
+ <usedDependencies combine.children="append">
+ <usedDependency>org.apache.hyracks:hyracks-control-nc</usedDependency>
+ </usedDependencies>
</configuration>
<executions>
<execution>
diff --git a/hyracks-fullstack/hyracks/hyracks-server/src/main/java/org/apache/hyracks/server/process/HyracksCCProcess.java b/hyracks-fullstack/hyracks/hyracks-server/src/main/java/org/apache/hyracks/server/process/HyracksCCProcess.java
index 4a70120..b2aa2d1 100644
--- a/hyracks-fullstack/hyracks/hyracks-server/src/main/java/org/apache/hyracks/server/process/HyracksCCProcess.java
+++ b/hyracks-fullstack/hyracks/hyracks-server/src/main/java/org/apache/hyracks/server/process/HyracksCCProcess.java
@@ -18,11 +18,11 @@
*/
package org.apache.hyracks.server.process;
-import org.apache.hyracks.control.cc.CCDriver;
-
import java.io.File;
import java.util.List;
+import org.apache.hyracks.control.cc.CCDriver;
+
public class HyracksCCProcess extends HyracksServerProcess {
public HyracksCCProcess(File configFile, File logFile, File appHome, File workingDir) {
@@ -41,5 +41,6 @@
protected void addJvmArgs(List<String> cList) {
// CC needs more than default memory
cList.add("-Xmx1024m");
+ //cList.add("-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005");
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-server/src/test/java/org/apache/hyracks/server/test/NCServiceIT.java b/hyracks-fullstack/hyracks/hyracks-server/src/test/java/org/apache/hyracks/server/test/NCServiceIT.java
index 2185826..fda099e 100644
--- a/hyracks-fullstack/hyracks/hyracks-server/src/test/java/org/apache/hyracks/server/test/NCServiceIT.java
+++ b/hyracks-fullstack/hyracks/hyracks-server/src/test/java/org/apache/hyracks/server/test/NCServiceIT.java
@@ -34,8 +34,6 @@
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.apache.hyracks.server.process.HyracksVirtualCluster;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
diff --git a/hyracks-fullstack/hyracks/hyracks-server/src/test/resources/NCServiceIT/cc.conf b/hyracks-fullstack/hyracks/hyracks-server/src/test/resources/NCServiceIT/cc.conf
index 2339efb..69676f7 100644
--- a/hyracks-fullstack/hyracks/hyracks-server/src/test/resources/NCServiceIT/cc.conf
+++ b/hyracks-fullstack/hyracks/hyracks-server/src/test/resources/NCServiceIT/cc.conf
@@ -16,13 +16,15 @@
; under the License.
[nc/red]
-address=127.0.0.1
+address = 127.0.0.1
+#jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5006
[nc/blue]
-address=127.0.0.1
-port=9091
+address = 127.0.0.1
+ncservice.port = 9091
+#jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5007
[cc]
-cluster.address = 127.0.0.1
-http.port = 12345
+address = 127.0.0.1
+console.listen.port = 12345
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/pom.xml b/hyracks-fullstack/hyracks/hyracks-test-support/pom.xml
index 9e0535b..9a51b6c 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/pom.xml
@@ -105,9 +105,5 @@
<artifactId>hyracks-util</artifactId>
<version>${project.version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-lang3</artifactId>
- </dependency>
</dependencies>
</project>
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestNCApplicationContext.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestNCApplicationContext.java
index 87739ea..81ee47b 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestNCApplicationContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestNCApplicationContext.java
@@ -21,10 +21,10 @@
import java.io.Serializable;
import java.util.concurrent.ThreadFactory;
-import org.apache.hyracks.api.application.IApplicationConfig;
import org.apache.hyracks.api.application.INCApplicationContext;
import org.apache.hyracks.api.application.IStateDumpHandler;
import org.apache.hyracks.api.comm.IChannelInterfaceFactory;
+import org.apache.hyracks.api.config.IApplicationConfig;
import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.api.job.IJobSerializerDeserializerContainer;
import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
@@ -32,7 +32,6 @@
import org.apache.hyracks.api.messages.IMessageBroker;
import org.apache.hyracks.api.resources.memory.IMemoryManager;
import org.apache.hyracks.api.service.IControllerService;
-import org.apache.hyracks.control.nc.io.IOManager;
public class TestNCApplicationContext implements INCApplicationContext {
private final ILifeCycleComponentManager lccm;
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
index 039cf7d..ab87f93 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java
@@ -23,7 +23,6 @@
import java.util.List;
import java.util.concurrent.Executors;
-import org.apache.commons.lang3.StringUtils;
import org.apache.hyracks.api.application.INCApplicationContext;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.ActivityId;
@@ -54,8 +53,4 @@
devices.add(new IODeviceHandle(new File(System.getProperty("java.io.tmpdir")), "."));
return new IOManager(devices, Executors.newCachedThreadPool());
}
-
- public static String joinPath(String... pathElements) {
- return StringUtils.join(pathElements, File.separatorChar);
- }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/StorageUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/StorageUtil.java
index 31bce7a..a9a529ca 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/StorageUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/StorageUtil.java
@@ -66,7 +66,7 @@
throw new AssertionError("This util class should not be initialized.");
}
- public static int getSizeInBytes(final int size, final StorageUnit unit) {
+ public static int getIntSizeInBytes(final int size, final StorageUnit unit) {
double result = unit.toBytes(size);
if (result > Integer.MAX_VALUE || result < Integer.MIN_VALUE) {
throw new IllegalArgumentException("The given value:" + result + " is not within the integer range.");
@@ -75,7 +75,7 @@
}
}
- public static long getSizeInBytes(final long size, final StorageUnit unit) {
+ public static long getLongSizeInBytes(final long size, final StorageUnit unit) {
double result = unit.toBytes(size);
if (result > Long.MAX_VALUE || result < Long.MIN_VALUE) {
throw new IllegalArgumentException("The given value:" + result + " is not within the long range.");
@@ -85,8 +85,7 @@
}
/**
- * Helper method to parse a byte unit string to its double value and unit
- * (e.g., 10,345.8MB becomes Pair<10345.8, StorageUnit.MB>.)
+ * Helper method to parse a byte unit string to its double value in bytes
*
* @throws IllegalArgumentException
*/
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/file/FileUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/file/FileUtil.java
new file mode 100644
index 0000000..d44f1b4
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/file/FileUtil.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.util.file;
+
+import java.io.File;
+
+public class FileUtil {
+ private FileUtil() {
+ }
+
+ public static String joinPath(String... elements) {
+ return String.join(File.separator, elements)
+ .replaceAll("([^:])(" + File.separator + ")+", "$1$2")
+ .replaceAll(File.separator + "$", "");
+ }
+}