Overhaul of Hyracks configuration management.
Includes Asterix changes to make use of new conf management as a
Hyracks application.
Change-Id: Ie3027c8c839f25ea858790bd3340187f4b11f212
Reviewed-on: https://asterix-gerrit.ics.uci.edu/336
Tested-by: Chris Hillery <ceej@lambda.nu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <michael.blow@couchbase.com>
Reviewed-by: Ian Maxon <imaxon@apache.org>
diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml
index bdd86ba..b33f1eb 100644
--- a/asterixdb/asterix-app/pom.xml
+++ b/asterixdb/asterix-app/pom.xml
@@ -145,6 +145,10 @@
<artifactId>hyracks-client</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-api</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.apache.asterix</groupId>
<artifactId>asterix-algebra</artifactId>
<version>${project.version}</version>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
index c58462f..8342be5 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
@@ -68,6 +68,7 @@
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepositoryFactory;
import org.apache.asterix.transaction.management.service.transaction.TransactionSubsystem;
+import org.apache.hyracks.api.application.IApplicationConfig;
import org.apache.hyracks.api.application.INCApplicationContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.IIOManager;
@@ -95,16 +96,6 @@
public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAsterixPropertiesProvider {
private static final Logger LOGGER = Logger.getLogger(AsterixAppRuntimeContext.class.getName());
- private static final AsterixPropertiesAccessor ASTERIX_PROPERTIES_ACCESSOR;
-
- static {
- try {
- ASTERIX_PROPERTIES_ACCESSOR = new AsterixPropertiesAccessor();
- } catch (AsterixException e) {
- throw new ExceptionInInitializerError(e);
- }
- }
-
private ILSMMergePolicyFactory metadataMergePolicyFactory;
private final INCApplicationContext ncApplicationContext;
@@ -137,16 +128,27 @@
private IReplicaResourcesManager replicaResourcesManager;
private final int metadataRmiPort;
- public AsterixAppRuntimeContext(INCApplicationContext ncApplicationContext, int metadataRmiPort) {
+ public AsterixAppRuntimeContext(INCApplicationContext ncApplicationContext, int metadataRmiPort)
+ throws AsterixException {
this.ncApplicationContext = ncApplicationContext;
- compilerProperties = new AsterixCompilerProperties(ASTERIX_PROPERTIES_ACCESSOR);
- externalProperties = new AsterixExternalProperties(ASTERIX_PROPERTIES_ACCESSOR);
- metadataProperties = new AsterixMetadataProperties(ASTERIX_PROPERTIES_ACCESSOR);
- storageProperties = new AsterixStorageProperties(ASTERIX_PROPERTIES_ACCESSOR);
- txnProperties = new AsterixTransactionProperties(ASTERIX_PROPERTIES_ACCESSOR);
- feedProperties = new AsterixFeedProperties(ASTERIX_PROPERTIES_ACCESSOR);
- buildProperties = new AsterixBuildProperties(ASTERIX_PROPERTIES_ACCESSOR);
- replicationProperties = new AsterixReplicationProperties(ASTERIX_PROPERTIES_ACCESSOR,
+ // Determine whether to use old-style asterix-configuration.xml or new-style configuration.
+ // QQQ strip this out eventually
+ AsterixPropertiesAccessor propertiesAccessor;
+ IApplicationConfig cfg = ncApplicationContext.getAppConfig();
+ // QQQ this is NOT a good way to determine whether the config is valid
+ if (cfg.getString("cc", "cluster.address") != null) {
+ propertiesAccessor = new AsterixPropertiesAccessor(cfg);
+ } else {
+ propertiesAccessor = new AsterixPropertiesAccessor();
+ }
+ compilerProperties = new AsterixCompilerProperties(propertiesAccessor);
+ externalProperties = new AsterixExternalProperties(propertiesAccessor);
+ metadataProperties = new AsterixMetadataProperties(propertiesAccessor);
+ storageProperties = new AsterixStorageProperties(propertiesAccessor);
+ txnProperties = new AsterixTransactionProperties(propertiesAccessor);
+ feedProperties = new AsterixFeedProperties(propertiesAccessor);
+ buildProperties = new AsterixBuildProperties(propertiesAccessor);
+ replicationProperties = new AsterixReplicationProperties(propertiesAccessor,
AsterixClusterProperties.INSTANCE.getCluster());
this.metadataRmiPort = metadataRmiPort;
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
index 8ceee62..62bdbf5 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
@@ -43,21 +43,27 @@
import org.apache.asterix.common.configuration.Store;
import org.apache.asterix.common.configuration.TransactionLogDir;
import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.hyracks.api.application.IApplicationConfig;
public class AsterixPropertiesAccessor {
private static Logger LOGGER = Logger.getLogger(AsterixPropertiesAccessor.class.getName());
private final String instanceName;
private final String metadataNodeName;
- private final List<String> nodeNames;
- private final Map<String, String[]> stores;
- private final Map<String, String> coredumpConfig;
+ private final List<String> nodeNames = new ArrayList<>();;
+ private final Map<String, String[]> stores = new HashMap<>();;
+ private final Map<String, String> coredumpConfig = new HashMap<>();
private final Map<String, Property> asterixConfigurationParams;
- private final Map<String, String> transactionLogDirs;
+ private final IApplicationConfig cfg;
+ private final Map<String, String> transactionLogDirs = new HashMap<>();
private final Map<String, String> asterixBuildProperties;
private final Map<String, ClusterPartition[]> nodePartitionsMap;
- private final SortedMap<Integer, ClusterPartition> clusterPartitions;
+ private final SortedMap<Integer, ClusterPartition> clusterPartitions = new TreeMap<>();
+ /**
+ * Constructor which reads asterix-configuration.xml, the old way.
+ * @throws AsterixException
+ */
public AsterixPropertiesAccessor() throws AsterixException {
String fileName = System.getProperty(GlobalConfig.CONFIG_FILE_PROPERTY);
if (fileName == null) {
@@ -75,6 +81,7 @@
}
AsterixConfiguration asterixConfiguration = null;
+ cfg = null;
try {
JAXBContext ctx = JAXBContext.newInstance(AsterixConfiguration.class);
Unmarshaller unmarshaller = ctx.createUnmarshaller();
@@ -84,11 +91,8 @@
}
instanceName = asterixConfiguration.getInstanceName();
metadataNodeName = asterixConfiguration.getMetadataNode();
- stores = new HashMap<String, String[]>();
List<Store> configuredStores = asterixConfiguration.getStore();
- nodeNames = new ArrayList<String>();
nodePartitionsMap = new HashMap<>();
- clusterPartitions = new TreeMap<>();
int uniquePartitionId = 0;
for (Store store : configuredStores) {
String trimmedStoreDirs = store.getStoreDirs().trim();
@@ -107,11 +111,9 @@
for (Property p : asterixConfiguration.getProperty()) {
asterixConfigurationParams.put(p.getName(), p);
}
- coredumpConfig = new HashMap<String, String>();
for (Coredump cd : asterixConfiguration.getCoredump()) {
coredumpConfig.put(cd.getNcId(), cd.getCoredumpPath());
}
- transactionLogDirs = new HashMap<String, String>();
for (TransactionLogDir txnLogDir : asterixConfiguration.getTransactionLogDir()) {
transactionLogDirs.put(txnLogDir.getNcId(), txnLogDir.getTxnLogDirPath());
}
@@ -125,7 +127,44 @@
} catch (IOException e) {
throw new AsterixException(e);
}
+ }
+ /**
+ * Constructor which wraps an IApplicationConfig.
+ */
+ public AsterixPropertiesAccessor (IApplicationConfig cfg) {
+ this.cfg = cfg;
+ instanceName = cfg.getString("asterix", "instance", "DEFAULT_INSTANCE");
+ String mdNode = null;
+ for (String section : cfg.getSections()) {
+ if (!section.startsWith("nc/")) {
+ continue;
+ }
+ String ncId = section.substring(3);
+ nodeNames.add(ncId);
+
+ if (mdNode == null) {
+ // Default is first node == metadata node
+ mdNode = ncId;
+ }
+ if (cfg.getString(section, "metadata.port") != null) {
+ // QQQ But we don't actually *honor* metadata.port yet!
+ mdNode = ncId;
+ }
+
+ // QQQ Default values? Should they be specified here? Or should there
+ // be a default.ini? They can't be inserted by TriggerNCWork except
+ // possibly for hyracks-specified values. Certainly wherever they are,
+ // they should be platform-dependent.
+ stores.put(ncId, cfg.getString(section, "iodevices", "/var/lib/asterixdb/data").split(","));
+ coredumpConfig.put(ncId, cfg.getString(section, "coredumpdir", "/var/lib/asterixdb/coredump"));
+ transactionLogDirs.put(ncId, cfg.getString(section, "txnlogdir", "/var/lib/asterixdb/txn-log"));
+ }
+
+ metadataNodeName = mdNode;
+ asterixConfigurationParams = null;
+ asterixBuildProperties = null;
+ nodePartitionsMap = null;
}
public String getMetadataNodeName() {
@@ -171,15 +210,28 @@
}
public <T> T getProperty(String property, T defaultValue, IPropertyInterpreter<T> interpreter) {
- Property p = asterixConfigurationParams.get(property);
- if (p == null) {
+ String value;
+ Property p = null;
+ if (asterixConfigurationParams != null) {
+ p = asterixConfigurationParams.get(property);
+ value = (p == null) ? null : p.getValue();
+ } else {
+ value = cfg.getString("asterix", property);
+ }
+ if (value == null) {
return defaultValue;
}
-
try {
- return interpreter.interpret(p);
+ return interpreter.interpret(value);
} catch (IllegalArgumentException e) {
- logConfigurationError(p, defaultValue);
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ StringBuilder msg = new StringBuilder("Invalid property value '" + value + "' for property '" + property + "'.\n");
+ if (p != null) {
+ msg.append("See the description: \n" + p.getDescription() + "\n");
+ }
+ msg.append("Default = " + defaultValue);
+ LOGGER.severe(msg.toString());
+ }
throw e;
}
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/IPropertyInterpreter.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/IPropertyInterpreter.java
index b6793af..36f5716 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/IPropertyInterpreter.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/IPropertyInterpreter.java
@@ -18,8 +18,6 @@
*/
package org.apache.asterix.common.config;
-import org.apache.asterix.common.configuration.Property;
-
public interface IPropertyInterpreter<T> {
- public T interpret(Property p) throws IllegalArgumentException;
+ public T interpret(String s) throws IllegalArgumentException;
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertyInterpreters.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertyInterpreters.java
index 5e43971..b54bcc3 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertyInterpreters.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertyInterpreters.java
@@ -20,17 +20,14 @@
import java.util.logging.Level;
-import org.apache.asterix.common.configuration.Property;
-
public class PropertyInterpreters {
public static IPropertyInterpreter<Integer> getIntegerPropertyInterpreter() {
return new IPropertyInterpreter<Integer>() {
-
@Override
- public Integer interpret(Property p) throws IllegalArgumentException {
+ public Integer interpret(String s) throws IllegalArgumentException {
try {
- return Integer.parseInt(p.getValue());
+ return Integer.parseInt(s);
} catch (NumberFormatException e) {
throw new IllegalArgumentException(e);
}
@@ -40,20 +37,19 @@
public static IPropertyInterpreter<Boolean> getBooleanPropertyInterpreter() {
return new IPropertyInterpreter<Boolean>() {
-
- public Boolean interpret(Property p) throws IllegalArgumentException {
- return Boolean.parseBoolean(p.getValue());
+ @Override
+ public Boolean interpret(String s) throws IllegalArgumentException {
+ return Boolean.parseBoolean(s);
}
};
}
public static IPropertyInterpreter<Long> getLongPropertyInterpreter() {
return new IPropertyInterpreter<Long>() {
-
@Override
- public Long interpret(Property p) throws IllegalArgumentException {
+ public Long interpret(String s) throws IllegalArgumentException {
try {
- return Long.parseLong(p.getValue());
+ return Long.parseLong(s);
} catch (NumberFormatException e) {
throw new IllegalArgumentException(e);
}
@@ -63,31 +59,28 @@
public static IPropertyInterpreter<Level> getLevelPropertyInterpreter() {
return new IPropertyInterpreter<Level>() {
-
@Override
- public Level interpret(Property p) throws IllegalArgumentException {
- return Level.parse(p.getValue());
+ public Level interpret(String s) throws IllegalArgumentException {
+ return Level.parse(s);
}
};
}
public static IPropertyInterpreter<String> getStringPropertyInterpreter() {
return new IPropertyInterpreter<String>() {
-
@Override
- public String interpret(Property p) throws IllegalArgumentException {
- return p.getValue();
+ public String interpret(String s) throws IllegalArgumentException {
+ return s;
}
};
}
public static IPropertyInterpreter<Double> getDoublePropertyInterpreter() {
return new IPropertyInterpreter<Double>() {
-
@Override
- public Double interpret(Property p) throws IllegalArgumentException {
+ public Double interpret(String s) throws IllegalArgumentException {
try {
- return Double.parseDouble(p.getValue());
+ return Double.parseDouble(s);
} catch (NumberFormatException e) {
throw new IllegalArgumentException(e);
}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixAppContextInfo.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixAppContextInfo.java
index 33584d4..e26a92b 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixAppContextInfo.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixAppContextInfo.java
@@ -34,6 +34,7 @@
import org.apache.asterix.common.dataflow.IAsterixApplicationContextInfo;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
+import org.apache.hyracks.api.application.IApplicationConfig;
import org.apache.hyracks.api.application.ICCApplicationContext;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
@@ -62,10 +63,21 @@
public static void initialize(ICCApplicationContext ccAppCtx, IHyracksClientConnection hcc,
IGlobalRecoveryMaanger globalRecoveryMaanger) throws AsterixException {
- if (INSTANCE == null) {
- INSTANCE = new AsterixAppContextInfo(ccAppCtx, hcc, globalRecoveryMaanger);
+ if (INSTANCE != null) {
+ return;
}
- AsterixPropertiesAccessor propertiesAccessor = new AsterixPropertiesAccessor();
+ INSTANCE = new AsterixAppContextInfo(ccAppCtx, hcc, globalRecoveryMaanger);
+
+ // Determine whether to use old-style asterix-configuration.xml or new-style configuration.
+ // QQQ strip this out eventually
+ AsterixPropertiesAccessor propertiesAccessor;
+ IApplicationConfig cfg = ccAppCtx.getAppConfig();
+ // QQQ this is NOT a good way to determine whether the config is valid
+ if (cfg.getString("cc", "cluster.address") != null) {
+ propertiesAccessor = new AsterixPropertiesAccessor(cfg);
+ } else {
+ propertiesAccessor = new AsterixPropertiesAccessor();
+ }
INSTANCE.compilerProperties = new AsterixCompilerProperties(propertiesAccessor);
INSTANCE.externalProperties = new AsterixExternalProperties(propertiesAccessor);
INSTANCE.metadataProperties = new AsterixMetadataProperties(propertiesAccessor);
diff --git a/asterixdb/asterix-server/pom.xml b/asterixdb/asterix-server/pom.xml
index 7f29317..f22049b 100644
--- a/asterixdb/asterix-server/pom.xml
+++ b/asterixdb/asterix-server/pom.xml
@@ -74,6 +74,13 @@
<commandLineArgument>org.apache.asterix.hyracks.bootstrap.NCApplicationEntryPoint</commandLineArgument>
</commandLineArguments>
</program>
+ <program>
+ <platforms>
+ <platform>unix</platform>
+ </platforms>
+ <name>asterixncservice</name>
+ <mainClass>org.apache.hyracks.control.nc.service.NCService</mainClass>
+ </program>
</programs>
<daemons>
<daemon>
@@ -161,6 +168,12 @@
<scope>compile</scope>
</dependency>
<dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-nc-service</artifactId>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.asterix</groupId>
<artifactId>asterix-app</artifactId>
<version>0.8.9-SNAPSHOT</version>
diff --git a/asterixdb/pom.xml b/asterixdb/pom.xml
index 3ce25e1..59c5c96 100644
--- a/asterixdb/pom.xml
+++ b/asterixdb/pom.xml
@@ -639,6 +639,11 @@
</dependency>
<dependency>
<groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-nc-service</artifactId>
+ <version>${hyracks.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
<artifactId>hyracks-server</artifactId>
<version>${hyracks.version}</version>
</dependency>
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplicationConfig.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplicationConfig.java
new file mode 100644
index 0000000..bd40813
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplicationConfig.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.application;
+
+import java.util.Set;
+
+/**
+ * Accessor for the data contained in the global application configuration file.
+ */
+public interface IApplicationConfig {
+ String getString(String section, String key);
+ String getString(String section, String key, String defaultValue);
+ int getInt(String section, String key);
+ int getInt(String section, String key, int defaultValue);
+ long getLong(String section, String key);
+ long getLong(String section, String key, long defaultValue);
+ Set<String> getSections();
+ Set<String> getKeys(String section);
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplicationContext.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplicationContext.java
index 6c79abd..7b07174 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplicationContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplicationContext.java
@@ -49,6 +49,8 @@
public void setThreadFactory(ThreadFactory threadFactory);
+ public IApplicationConfig getAppConfig();
+
/**
* @return The controller service which the application context belongs to.
*/
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/CCDriver.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/CCDriver.java
index d2e7cb5..786a89f 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/CCDriver.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/CCDriver.java
@@ -34,6 +34,8 @@
cp.printUsage(System.err);
return;
}
+ ccConfig.loadConfigAndApplyDefaults();
+
ClusterControllerService ccService = new ClusterControllerService(ccConfig);
ccService.start();
while (true) {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index e8b2c27..8fb83d1 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -34,7 +34,10 @@
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;
-
+import org.apache.hyracks.control.cc.work.TriggerNCWork;
+import org.apache.hyracks.control.common.controllers.IniUtils;
+import org.ini4j.Ini;
+import org.xml.sax.InputSource;
import org.apache.hyracks.api.application.ICCApplicationEntryPoint;
import org.apache.hyracks.api.client.ClusterControllerInfo;
import org.apache.hyracks.api.client.HyracksClientInterfaceFunctions;
@@ -244,6 +247,7 @@
datasetDirectoryService.init(executor);
workQueue.start();
+ connectNCs();
LOGGER.log(Level.INFO, "Started ClusterControllerService");
if (aep != null) {
// Sometimes, there is no application entry point. Check hyracks-client project
@@ -252,7 +256,7 @@
}
private void startApplication() throws Exception {
- appCtx = new CCApplicationContext(this, serverCtx, ccContext);
+ appCtx = new CCApplicationContext(this, serverCtx, ccContext, ccConfig.getAppConfig());
appCtx.addJobLifecycleListener(datasetDirectoryService);
String className = ccConfig.appCCMainClass;
if (className != null) {
@@ -265,6 +269,25 @@
executor = Executors.newCachedThreadPool(appCtx.getThreadFactory());
}
+ private void connectNCs() throws Exception {
+ Ini ini = ccConfig.getIni();
+ if (ini == null) {
+ return;
+ }
+ for (String section : ini.keySet()) {
+ if (!section.startsWith("nc/")) {
+ continue;
+ }
+ String ncid = section.substring(3);
+ String address = ini.get(section, "address");
+ int port = IniUtils.getInt(ini, section, "port", 9090);
+ if (address == null) {
+ address = InetAddress.getLoopbackAddress().getHostAddress();
+ }
+ workQueue.schedule(new TriggerNCWork(this, address, port, ncid));
+ }
+ }
+
@Override
public void stop() throws Exception {
LOGGER.log(Level.INFO, "Stopping ClusterControllerService");
@@ -317,7 +340,6 @@
public Map<String, NodeControllerState> getNodeMap() {
return nodeRegistry;
}
-
public CCConfig getConfig() {
return ccConfig;
}
@@ -614,7 +636,7 @@
* Add a deployment run
*
* @param deploymentKey
- * @param nodeControllerIds
+ * @param dRun
*/
public synchronized void addDeploymentRun(DeploymentId deploymentKey, DeploymentRun dRun) {
deploymentRunMap.put(deploymentKey, dRun);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java
index 61505eb..7917e4a 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java
@@ -26,6 +26,7 @@
import java.util.Map;
import java.util.Set;
+import org.apache.hyracks.api.application.IApplicationConfig;
import org.apache.hyracks.api.application.ICCApplicationContext;
import org.apache.hyracks.api.application.IClusterLifecycleListener;
import org.apache.hyracks.api.context.ICCContext;
@@ -52,9 +53,9 @@
private List<IClusterLifecycleListener> clusterLifecycleListeners;
private final ClusterControllerService ccs;
- public CCApplicationContext(ClusterControllerService ccs, ServerContext serverCtx, ICCContext ccContext)
- throws IOException {
- super(serverCtx);
+ public CCApplicationContext(ClusterControllerService ccs, ServerContext serverCtx, ICCContext ccContext,
+ IApplicationConfig appConfig) throws IOException {
+ super(serverCtx, appConfig);
this.ccContext = ccContext;
this.ccs = ccs;
initPendingNodeIds = new HashSet<String>();
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/web/WebServer.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/web/WebServer.java
index 71ae37d..ac98b646 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/web/WebServer.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/web/WebServer.java
@@ -18,11 +18,10 @@
*/
package org.apache.hyracks.control.cc.web;
-import java.util.EnumSet;
-import java.util.logging.Logger;
-
-import javax.servlet.DispatcherType;
-
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.adminconsole.HyracksAdminConsoleApplication;
+import org.apache.hyracks.control.cc.web.util.JSONOutputRequestHandler;
+import org.apache.hyracks.control.cc.web.util.RoutingHandler;
import org.apache.wicket.Application;
import org.apache.wicket.RuntimeConfigurationType;
import org.apache.wicket.protocol.http.ContextParamWebApplicationFactory;
@@ -38,10 +37,9 @@
import org.eclipse.jetty.servlet.FilterHolder;
import org.eclipse.jetty.servlet.ServletContextHandler;
-import org.apache.hyracks.control.cc.ClusterControllerService;
-import org.apache.hyracks.control.cc.adminconsole.HyracksAdminConsoleApplication;
-import org.apache.hyracks.control.cc.web.util.JSONOutputRequestHandler;
-import org.apache.hyracks.control.cc.web.util.RoutingHandler;
+import javax.servlet.DispatcherType;
+import java.util.EnumSet;
+import java.util.logging.Logger;
public class WebServer {
private final static Logger LOGGER = Logger.getLogger(WebServer.class.getName());
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java
index dca6bc2..b3a3065 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java
@@ -45,7 +45,7 @@
Map<String, NodeControllerState> nodeMap = ccs.getNodeMap();
for (Map.Entry<String, NodeControllerState> e : nodeMap.entrySet()) {
NodeControllerState state = e.getValue();
- if (state.incrementLastHeartbeatDuration() >= ccs.getConfig().maxHeartbeatLapsePeriods) {
+ if (state.incrementLastHeartbeatDuration() >= ccs.getCCConfig().maxHeartbeatLapsePeriods) {
deadNodes.add(e.getKey());
LOGGER.info(e.getKey() + " considered dead");
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TriggerNCWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TriggerNCWork.java
new file mode 100644
index 0000000..75d4dd2
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TriggerNCWork.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.cc.work;
+
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.common.work.AbstractWork;
+import org.ini4j.Ini;
+
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.io.StringWriter;
+import java.net.Socket;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A work which is run at CC startup for each NC specified in the configuration file.
+ * It contacts the NC service on each node and passes in the NC-specific configuration.
+ */
+public class TriggerNCWork extends AbstractWork {
+
+ // This constant must match the corresponding constant in
+ // hyracks-control/hyracks-nc-service/src/main/java/org/apache/hyracks/control/nc/service/NCService.java
+ // I didn't want to introduce a Maven-level dependency on the
+ // hyracks-nc-service package (or vice-versa).
+ public static final String NC_MAGIC_COOKIE = "hyncmagic";
+ private static final Logger LOGGER = Logger.getLogger(TriggerNCWork.class.getName());
+
+ private final ClusterControllerService ccs;
+ private final String ncHost;
+ private final int ncPort;
+ private final String ncId;
+
+ public TriggerNCWork(ClusterControllerService ccs, String ncHost, int ncPort, String ncId) {
+ this.ccs = ccs;
+ this.ncHost = ncHost;
+ this.ncPort = ncPort;
+ this.ncId = ncId;
+ }
+ @Override
+ public final void run() {
+ ccs.getExecutor().execute(new Runnable() {
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Connecting NC service '" + ncId + "' at " + ncHost + ":" + ncPort);
+ }
+ Socket s = new Socket(ncHost, ncPort);
+ ObjectOutputStream oos = new ObjectOutputStream(s.getOutputStream());
+ oos.writeUTF(NC_MAGIC_COOKIE);
+ oos.writeUTF(serializeIni(ccs.getCCConfig().getIni()));
+ oos.close();
+ break;
+ // QQQ Should probably have an ACK here
+ } catch (IOException e) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.log(Level.WARNING, "Failed to contact NC service at " + ncHost +
+ ":" + ncPort + "; will retry", e);
+ }
+ }
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+ }
+ });
+ }
+
+ /**
+ * Utility routine to copy all keys from a named section in Ini a
+ * to a named section in Ini b. We need to do this the hard way
+ * because Ini4j reacts inscrutably when attempting to copy
+ * Ini.Sections directly from one Ini to another.
+ */
+ private void copyIniSection(Ini a, String asect, Ini b, String bsect) {
+ Ini.Section source = a.get(asect);
+ for (String key : source.keySet()) {
+ b.put(bsect, key, source.get(key));
+ }
+ }
+ /**
+ * Given an Ini object, serialize it to String with some enhancements.
+ * @param ccini
+ */
+ String serializeIni(Ini ccini) throws IOException {
+ Ini ini = new Ini();
+ String ncsection = "nc/" + ncId;
+ for (String section : ccini.keySet()) {
+ if (section.equals(ncsection)) {
+ copyIniSection(ccini, ncsection, ini, "localnc");
+ ini.put("localnc", "id", ncId);
+ }
+ copyIniSection(ccini, section, ini, section);
+ }
+ StringWriter iniString = new StringWriter();
+ ini.store(iniString);
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine("Returning Ini file:\n" + iniString.toString());
+ }
+ return iniString.toString();
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/pom.xml b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/pom.xml
index 912e447..d885b38 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/pom.xml
@@ -44,5 +44,10 @@
<type>jar</type>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>org.ini4j</groupId>
+ <artifactId>ini4j</artifactId>
+ <version>0.5.4</version>
+ </dependency>
</dependencies>
</project>
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ApplicationContext.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ApplicationContext.java
index c4e9ea2..23d287c 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ApplicationContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ApplicationContext.java
@@ -22,6 +22,7 @@
import java.io.Serializable;
import java.util.concurrent.ThreadFactory;
+import org.apache.hyracks.api.application.IApplicationConfig;
import org.apache.hyracks.api.application.IApplicationContext;
import org.apache.hyracks.api.job.IJobSerializerDeserializerContainer;
import org.apache.hyracks.api.job.JobSerializerDeserializerContainer;
@@ -32,6 +33,7 @@
protected ServerContext serverCtx;
protected Serializable distributedState;
protected IMessageBroker messageBroker;
+ protected final IApplicationConfig appConfig;
protected IJobSerializerDeserializerContainer jobSerDeContainer = new JobSerializerDeserializerContainer();
protected ThreadFactory threadFactory = new ThreadFactory() {
public Thread newThread(Runnable r) {
@@ -41,8 +43,9 @@
}
};
- public ApplicationContext(ServerContext serverCtx) throws IOException {
+ public ApplicationContext(ServerContext serverCtx, IApplicationConfig appConfig) {
this.serverCtx = serverCtx;
+ this.appConfig = appConfig;
}
@Override
@@ -74,4 +77,9 @@
public void setThreadFactory(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
}
-}
+
+ @Override
+ public IApplicationConfig getAppConfig() {
+ return appConfig;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/IniApplicationConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/IniApplicationConfig.java
new file mode 100644
index 0000000..3a8a2de
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/IniApplicationConfig.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.common.application;
+
+import org.apache.hyracks.api.application.IApplicationConfig;
+import org.ini4j.Ini;
+
+import java.util.Set;
+
+/**
+ * An implementation of IApplicationConfig which is backed by Ini4j.
+ */
+public class IniApplicationConfig implements IApplicationConfig {
+ private final Ini ini;
+
+ public IniApplicationConfig(Ini ini) {
+ if (ini != null) {
+ this.ini = ini;
+ } else {
+ this.ini = new Ini();
+ }
+ }
+
+ private <T> T getIniValue(String section, String key, T default_value, Class<T> clazz) {
+ T value = ini.get(section, key, clazz);
+ return (value != null) ? value : default_value;
+ }
+
+ @Override
+ public String getString(String section, String key) {
+ return getIniValue(section, key, null, String.class);
+ }
+
+ @Override
+ public String getString(String section, String key, String defaultValue) {
+ return getIniValue(section, key, defaultValue, String.class);
+ }
+
+ @Override
+ public int getInt(String section, String key) {
+ return getIniValue(section, key, 0, Integer.class);
+ }
+
+ @Override
+ public int getInt(String section, String key, int defaultValue) {
+ return getIniValue(section, key, defaultValue, Integer.class);
+ }
+
+ @Override
+ public long getLong(String section, String key) {
+ return getIniValue(section, key, (long) 0, Long.class);
+ }
+
+ @Override
+ public long getLong(String section, String key, long defaultValue) {
+ return getIniValue(section, key, defaultValue, Long.class);
+ }
+
+ @Override
+ public Set<String> getSections() {
+ return ini.keySet();
+ }
+
+ @Override
+ public Set<String> getKeys(String section) {
+ return ini.get(section).keySet();
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
index 4e7c394..a04d750 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/CCConfig.java
@@ -19,20 +19,31 @@
package org.apache.hyracks.control.common.controllers;
import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
import java.util.List;
+import org.apache.hyracks.api.application.IApplicationConfig;
+import org.apache.hyracks.control.common.application.IniApplicationConfig;
+import org.ini4j.Ini;
import org.kohsuke.args4j.Argument;
import org.kohsuke.args4j.Option;
import org.kohsuke.args4j.spi.StopOptionHandler;
public class CCConfig {
- @Option(name = "-client-net-ip-address", usage = "Sets the IP Address to listen for connections from clients", required = true)
+ @Option(name = "-address", usage = "IP Address for CC (default: localhost)", required = false)
+ public String ipAddress = InetAddress.getLoopbackAddress().getHostAddress();
+
+ @Option(name = "-client-net-ip-address", usage = "Sets the IP Address to listen for connections from clients (default: same as -address)", required = false)
public String clientNetIpAddress;
@Option(name = "-client-net-port", usage = "Sets the port to listen for connections from clients (default 1098)")
public int clientNetPort = 1098;
- @Option(name = "-cluster-net-ip-address", usage = "Sets the IP Address to listen for connections from", required = true)
+ // QQQ Note that clusterNetIpAddress is *not directly used* yet. Both
+ // the cluster listener and the web server listen on "all interfaces".
+ // This IP address is only used to instruct the NC on which IP to call in.
+ @Option(name = "-cluster-net-ip-address", usage = "Sets the IP Address to listen for connections from NCs (default: same as -address)", required = false)
public String clusterNetIpAddress;
@Option(name = "-cluster-net-port", usage = "Sets the port to listen for connections from node controllers (default 1099)")
@@ -71,10 +82,77 @@
@Option(name = "-app-cc-main-class", required = false, usage = "Application CC Main Class")
public String appCCMainClass = null;
+ @Option(name = "-config-file", usage = "Specify path to master configuration file (default: none)", required = false)
+ public String configFile = null;
+
@Argument
@Option(name = "--", handler = StopOptionHandler.class)
public List<String> appArgs;
+ private Ini ini = null;
+
+ private void loadINIFile() throws IOException {
+ // This method simply maps from the ini parameters to the CCConfig's fields.
+ // It does not apply defaults or any logic.
+ ini = IniUtils.loadINIFile(configFile);
+
+ ipAddress = IniUtils.getString(ini, "cc", "address", ipAddress);
+ clientNetIpAddress = IniUtils.getString(ini, "cc", "client.address", clientNetIpAddress);
+ clientNetPort = IniUtils.getInt(ini, "cc", "client.port", clientNetPort);
+ clusterNetIpAddress = IniUtils.getString(ini, "cc", "cluster.address", clusterNetIpAddress);
+ clusterNetPort = IniUtils.getInt(ini, "cc", "cluster.port", clusterNetPort);
+ httpPort = IniUtils.getInt(ini, "cc", "http.port", httpPort);
+ heartbeatPeriod = IniUtils.getInt(ini, "cc", "heartbeat.period", heartbeatPeriod);
+ maxHeartbeatLapsePeriods = IniUtils.getInt(ini, "cc", "heartbeat.maxlapse", maxHeartbeatLapsePeriods);
+ profileDumpPeriod = IniUtils.getInt(ini, "cc", "profiledump.period", profileDumpPeriod);
+ defaultMaxJobAttempts = IniUtils.getInt(ini, "cc", "job.defaultattempts", defaultMaxJobAttempts);
+ jobHistorySize = IniUtils.getInt(ini, "cc", "job.historysize", jobHistorySize);
+ resultTTL = IniUtils.getLong(ini, "cc", "results.ttl", resultTTL);
+ resultSweepThreshold = IniUtils.getLong(ini, "cc", "results.sweepthreshold", resultSweepThreshold);
+ ccRoot = IniUtils.getString(ini, "cc", "rootfolder", ccRoot);
+ // QQQ clusterTopologyDefinition is a "File"; should support verifying that the file
+ // exists, as @Option likely does
+ appCCMainClass = IniUtils.getString(ini, "cc", "app.class", appCCMainClass);
+ }
+
+ /**
+ * Once all @Option fields have been loaded from command-line or otherwise
+ * specified programmatically, call this method to:
+ * 1. Load options from a config file (as specified by -config-file)
+ * 2. Set default values for certain derived values, such as setting
+ * clusterNetIpAddress to ipAddress
+ */
+ public void loadConfigAndApplyDefaults() throws IOException {
+ if (configFile != null) {
+ loadINIFile();
+ // QQQ This way of passing overridden/defaulted values back into
+ // the ini feels clunky, and it's clearly incomplete
+ ini.add("cc", "cluster.address", clusterNetIpAddress);
+ ini.add("cc", "client.address", clientNetIpAddress);
+ }
+
+ // "address" is the default for all IP addresses
+ if (clusterNetIpAddress == null) clusterNetIpAddress = ipAddress;
+ if (clientNetIpAddress == null) clientNetIpAddress = ipAddress;
+ }
+
+ /**
+ * Returns the global config Ini file. Note: this will be null
+ * if -config-file wasn't specified.
+ */
+ public Ini getIni() {
+ return ini;
+ }
+
+ /**
+ * @return An IApplicationConfig representing this NCConfig.
+ * Note: Currently this only includes the values from the configuration
+ * file, not anything specified on the command-line. QQQ
+ */
+ public IApplicationConfig getAppConfig() {
+ return new IniApplicationConfig(ini);
+ }
+
public void toCommandLine(List<String> cList) {
cList.add("-client-net-ip-address");
cList.add(clientNetIpAddress);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/IniUtils.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/IniUtils.java
new file mode 100644
index 0000000..9a5c9a0
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/IniUtils.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.common.controllers;
+
+import org.ini4j.Ini;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+/**
+ * Some utility functions for reading Ini4j objects with default values.
+ */
+public class IniUtils {
+ public static String getString(Ini ini, String section, String key, String defaultValue) {
+ String value = ini.get(section, key, String.class);
+ return (value != null) ? value : defaultValue;
+ }
+
+ public static int getInt(Ini ini, String section, String key, int defaultValue) {
+ Integer value = ini.get(section, key, Integer.class);
+ return (value != null) ? value : defaultValue;
+ }
+
+ public static long getLong(Ini ini, String section, String key, long defaultValue) {
+ Long value = ini.get(section, key, Long.class);
+ return (value != null) ? value : defaultValue;
+ }
+
+ public static Ini loadINIFile(String configFile) throws IOException {
+ Ini ini = new Ini();
+ File conffile = new File(configFile);
+ if (!conffile.exists()) {
+ throw new FileNotFoundException(configFile);
+ }
+ ini.load(conffile);
+ return ini;
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
index 547ffe7..4240e3a 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
@@ -18,24 +18,32 @@
*/
package org.apache.hyracks.control.common.controllers;
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-
+import org.apache.hyracks.api.application.IApplicationConfig;
+import org.apache.hyracks.control.common.application.IniApplicationConfig;
+import org.ini4j.Ini;
import org.kohsuke.args4j.Argument;
import org.kohsuke.args4j.Option;
import org.kohsuke.args4j.spi.StopOptionHandler;
-public class NCConfig implements Serializable {
- private static final long serialVersionUID = 1L;
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.InetAddress;
+import java.util.List;
+import java.util.Map;
- @Option(name = "-cc-host", usage = "Cluster Controller host name", required = true)
- public String ccHost;
+public class NCConfig implements Serializable {
+ private static final long serialVersionUID = 2L;
+
+ @Option(name = "-cc-host", usage = "Cluster Controller host name (required unless specified in config file)", required = false)
+ public String ccHost = null;
@Option(name = "-cc-port", usage = "Cluster Controller port (default: 1099)", required = false)
public int ccPort = 1099;
- @Option(name = "-cluster-net-ip-address", usage = "IP Address to bind cluster listener", required = true)
+ @Option(name = "-address", usage = "IP Address for NC (default: localhost)", required = false)
+ public String ipAddress = InetAddress.getLoopbackAddress().getHostAddress();
+
+ @Option(name = "-cluster-net-ip-address", usage = "IP Address to bind cluster listener (default: same as -address)", required = false)
public String clusterNetIPAddress;
@Option(name = "-cluster-net-port", usage = "IP port to bind cluster listener (default: random port)", required = false)
@@ -47,10 +55,10 @@
@Option(name = "-cluster-net-public-port", usage = "Public IP port to announce cluster listener (default: same as -cluster-net-port; must set -cluser-net-public-ip-address also)", required = false)
public int clusterNetPublicPort = 0;
- @Option(name = "-node-id", usage = "Logical name of node controller unique within the cluster", required = true)
- public String nodeId;
+ @Option(name = "-node-id", usage = "Logical name of node controller unique within the cluster (required unless specified in config file)", required = false)
+ public String nodeId = null;
- @Option(name = "-data-ip-address", usage = "IP Address to bind data listener", required = true)
+ @Option(name = "-data-ip-address", usage = "IP Address to bind data listener (default: same as -address)", required = false)
public String dataIPAddress;
@Option(name = "-data-port", usage = "IP port to bind data listener (default: random port)", required = false)
@@ -62,7 +70,7 @@
@Option(name = "-data-public-port", usage = "Public IP port to announce data listener (default: same as -data-port; must set -data-public-ip-address also)", required = false)
public int dataPublicPort = 0;
- @Option(name = "-result-ip-address", usage = "IP Address to bind dataset result distribution listener", required = true)
+ @Option(name = "-result-ip-address", usage = "IP Address to bind dataset result distribution listener (default: same as -address)", required = false)
public String resultIPAddress;
@Option(name = "-result-port", usage = "IP port to bind dataset result distribution listener (default: random port)", required = false)
@@ -74,6 +82,9 @@
@Option(name = "-result-public-port", usage = "Public IP port to announce dataset result distribution listener (default: same as -result-port; must set -result-public-ip-address also)", required = false)
public int resultPublicPort = 0;
+ @Option(name = "-retries", usage ="Number of attempts to contact CC before giving up (default = 5)")
+ public int retries = 5;
+
@Option(name = "-iodevices", usage = "Comma separated list of IO Device mount points (default: One device in default temp folder)", required = false)
public String ioDevices = System.getProperty("java.io.tmpdir");
@@ -98,10 +109,85 @@
@Option(name = "-app-nc-main-class", usage = "Application NC Main Class")
public String appNCMainClass;
+ @Option(name = "-config-file", usage = "Specify path to local configuration file (default: no local config)", required = false)
+ public String configFile = null;
+
@Argument
@Option(name = "--", handler = StopOptionHandler.class)
public List<String> appArgs;
+ private transient Ini ini = null;
+
+ private void loadINIFile() throws IOException {
+ ini = IniUtils.loadINIFile(configFile);
+ // QQQ This should default to cc/address if cluster.address not set, but
+ // that logic really should be handled by the ini file sent from the CC
+ ccHost = IniUtils.getString(ini, "cc", "cluster.address", ccHost);
+ ccPort = IniUtils.getInt(ini, "cc", "cluster.port", ccPort);
+ nodeId = IniUtils.getString(ini, "localnc", "id", nodeId);
+
+ // Network ports
+
+ ipAddress = IniUtils.getString(ini, "localnc", "address", ipAddress);
+
+ clusterNetIPAddress = IniUtils.getString(ini, "localnc", "cluster.address", clusterNetIPAddress);
+ clusterNetPort = IniUtils.getInt(ini, "localnc", "cluster.port", clusterNetPort);
+ dataIPAddress = IniUtils.getString(ini, "localnc", "data.address", dataIPAddress);
+ dataPort = IniUtils.getInt(ini, "localnc", "data.port", dataPort);
+ resultIPAddress = IniUtils.getString(ini, "localnc", "result.address", resultIPAddress);
+ resultPort = IniUtils.getInt(ini, "localnc", "result.port", resultPort);
+
+ clusterNetPublicIPAddress = IniUtils.getString(ini, "localnc", "public.cluster.address", clusterNetPublicIPAddress);
+ clusterNetPublicPort = IniUtils.getInt(ini, "localnc", "public.cluster.port", clusterNetPublicPort);
+ dataPublicIPAddress = IniUtils.getString(ini, "localnc", "public.data.address", dataPublicIPAddress);
+ dataPublicPort = IniUtils.getInt(ini, "localnc", "public.data.port", dataPublicPort);
+ resultPublicIPAddress = IniUtils.getString(ini, "localnc", "public.result.address", resultPublicIPAddress);
+ resultPublicPort = IniUtils.getInt(ini, "localnc", "public.result.port", resultPublicPort);
+
+ retries = IniUtils.getInt(ini, "localnc", "retries", retries);
+
+ // Directories
+ ioDevices = IniUtils.getString(ini, "localnc", "iodevices", ioDevices);
+
+ // Hyracks client entrypoint
+ appNCMainClass = IniUtils.getString(ini, "localnc", "app.class", appNCMainClass);
+ }
+
+ /*
+ * Once all @Option fields have been loaded from command-line or otherwise
+ * specified programmatically, call this method to:
+ * 1. Load options from a config file (as specified by -config-file)
+ * 2. Set default values for certain derived values, such as setting
+ * clusterNetIpAddress to ipAddress
+ */
+ public void loadConfigAndApplyDefaults() throws IOException {
+ if (configFile != null) {
+ loadINIFile();
+ }
+
+ // "address" is the default for all IP addresses
+ if (clusterNetIPAddress == null) clusterNetIPAddress = ipAddress;
+ if (dataIPAddress == null) dataIPAddress = ipAddress;
+ if (resultIPAddress == null) resultIPAddress = ipAddress;
+
+ // All "public" options default to their "non-public" versions
+ if (clusterNetPublicIPAddress == null) clusterNetPublicIPAddress = clusterNetIPAddress;
+ if (clusterNetPublicPort == 0) clusterNetPublicPort = clusterNetPort;
+ if (dataPublicIPAddress == null) dataPublicIPAddress = dataIPAddress;
+ if (dataPublicPort == 0) dataPublicPort = dataPort;
+ if (resultPublicIPAddress == null) resultPublicIPAddress = resultIPAddress;
+ if (resultPublicPort == 0) resultPublicPort = resultPort;
+ }
+
+ /**
+ * @return An IApplicationConfig representing this NCConfig.
+ * Note: Currently this only includes the values from the configuration
+ * file, not anything specified on the command-line. QQQ
+ */
+ public IApplicationConfig getAppConfig() {
+ return new IniApplicationConfig(ini);
+ }
+
public void toCommandLine(List<String> cList) {
cList.add("-cc-host");
cList.add(ccHost);
@@ -133,6 +219,8 @@
cList.add(resultPublicIPAddress);
cList.add("-result-public-port");
cList.add(String.valueOf(resultPublicPort));
+ cList.add("-retries");
+ cList.add(String.valueOf(retries));
cList.add("-iodevices");
cList.add(ioDevices);
cList.add("-net-thread-count");
@@ -176,6 +264,7 @@
configuration.put("result-port", String.valueOf(resultPort));
configuration.put("result-public-ip-address", resultPublicIPAddress);
configuration.put("result-public-port", String.valueOf(resultPublicPort));
+ configuration.put("retries", String.valueOf(retries));
configuration.put("iodevices", ioDevices);
configuration.put("net-thread-count", String.valueOf(nNetThreads));
configuration.put("net-buffer-count", String.valueOf(nNetBuffers));
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCDriver.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCDriver.java
index db759cb..b015e3d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCDriver.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCDriver.java
@@ -18,12 +18,11 @@
*/
package org.apache.hyracks.control.nc;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
+import org.apache.hyracks.control.common.controllers.NCConfig;
import org.kohsuke.args4j.CmdLineParser;
-import org.apache.hyracks.control.common.controllers.NCConfig;
+import java.util.logging.Level;
+import java.util.logging.Logger;
public class NCDriver {
private static final Logger LOGGER = Logger.getLogger(NCDriver.class.getName());
@@ -35,13 +34,14 @@
try {
cp.parseArgument(args);
} catch (Exception e) {
- System.err.println(e.getMessage());
+ e.printStackTrace();
cp.printUsage(System.err);
- return;
+ System.exit(1);
}
+ ncConfig.loadConfigAndApplyDefaults();
final NodeControllerService nService = new NodeControllerService(ncConfig);
- if (LOGGER.isLoggable(Level.INFO)) {
+ if (LOGGER.isLoggable(Level.SEVERE)) {
LOGGER.severe("Setting uncaught exception handler " + nService.getLifeCycleComponentManager());
}
Thread.currentThread().setUncaughtExceptionHandler(nService.getLifeCycleComponentManager());
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 598d6db..2f8def1 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -260,7 +260,7 @@
init();
datasetNetworkManager.start();
- IIPCHandle ccIPCHandle = ipc.getHandle(new InetSocketAddress(ncConfig.ccHost, ncConfig.ccPort), -1);
+ IIPCHandle ccIPCHandle = ipc.getHandle(new InetSocketAddress(ncConfig.ccHost, ncConfig.ccPort), ncConfig.retries);
this.ccs = new ClusterControllerRemoteProxy(ccIPCHandle);
HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()];
for (int i = 0; i < gcInfos.length; ++i) {
@@ -270,14 +270,12 @@
// Use "public" versions of network addresses and ports
NetworkAddress datasetAddress = datasetNetworkManager.getPublicNetworkAddress();
NetworkAddress netAddress = netManager.getPublicNetworkAddress();
- if (ncConfig.dataPublicIPAddress != null) {
- netAddress = new NetworkAddress(ncConfig.dataPublicIPAddress, ncConfig.dataPublicPort);
- }
- ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, ncConfig, netAddress, datasetAddress,
- osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), osMXBean.getAvailableProcessors(),
- runtimeMXBean.getVmName(), runtimeMXBean.getVmVersion(), runtimeMXBean.getVmVendor(),
- runtimeMXBean.getClassPath(), runtimeMXBean.getLibraryPath(), runtimeMXBean.getBootClassPath(),
- runtimeMXBean.getInputArguments(), runtimeMXBean.getSystemProperties(), hbSchema));
+ ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, ncConfig, netAddress,
+ datasetAddress, osMXBean.getName(), osMXBean.getArch(), osMXBean
+ .getVersion(), osMXBean.getAvailableProcessors(), runtimeMXBean.getVmName(), runtimeMXBean
+ .getVmVersion(), runtimeMXBean.getVmVendor(), runtimeMXBean.getClassPath(), runtimeMXBean
+ .getLibraryPath(), runtimeMXBean.getBootClassPath(), runtimeMXBean.getInputArguments(),
+ runtimeMXBean.getSystemProperties(), hbSchema));
synchronized (this) {
while (registrationPending) {
@@ -316,7 +314,7 @@
}
private void startApplication() throws Exception {
- appCtx = new NCApplicationContext(this, serverCtx, ctx, id, memoryManager, lccm);
+ appCtx = new NCApplicationContext(this, serverCtx, ctx, id, memoryManager, lccm, ncConfig.getAppConfig());
String className = ncConfig.appNCMainClass;
if (className != null) {
Class<?> c = Class.forName(className);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCApplicationContext.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCApplicationContext.java
index d0fd524..d23c701 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCApplicationContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCApplicationContext.java
@@ -22,6 +22,7 @@
import java.io.OutputStream;
import java.io.Serializable;
+import org.apache.hyracks.api.application.IApplicationConfig;
import org.apache.hyracks.api.application.INCApplicationContext;
import org.apache.hyracks.api.application.IStateDumpHandler;
import org.apache.hyracks.api.context.IHyracksRootContext;
@@ -42,10 +43,10 @@
private IStateDumpHandler sdh;
private final NodeControllerService ncs;
- public NCApplicationContext(NodeControllerService ncs, ServerContext serverCtx, IHyracksRootContext rootCtx,
- String nodeId, MemoryManager memoryManager, ILifeCycleComponentManager lifeCyclecomponentManager)
- throws IOException {
- super(serverCtx);
+ public NCApplicationContext(NodeControllerService ncs, ServerContext serverCtx, IHyracksRootContext rootCtx, String nodeId,
+ MemoryManager memoryManager, ILifeCycleComponentManager lifeCyclecomponentManager,
+ IApplicationConfig appConfig) throws IOException {
+ super(serverCtx, appConfig);
this.lccm = lifeCyclecomponentManager;
this.nodeId = nodeId;
this.rootCtx = rootCtx;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/pom.xml b/hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/pom.xml
new file mode 100644
index 0000000..66d4d24
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/pom.xml
@@ -0,0 +1,69 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements. See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership. The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License. You may obtain a copy of the License at
+ !
+ ! http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied. See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>hyracks-control</artifactId>
+ <groupId>org.apache.hyracks</groupId>
+ <version>0.2.18-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>hyracks-nc-service</artifactId>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.0.2</version>
+ <configuration>
+ <source>1.7</source>
+ <target>1.7</target>
+ <fork>true</fork>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>args4j</groupId>
+ <artifactId>args4j</artifactId>
+ <version>2.0.12</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.ini4j</groupId>
+ <artifactId>ini4j</artifactId>
+ <version>0.5.4</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-control-nc</artifactId>
+ <version>0.2.18-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/src/main/java/org/apache/hyracks/control/nc/service/NCService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/src/main/java/org/apache/hyracks/control/nc/service/NCService.java
new file mode 100644
index 0000000..df92d1a
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/src/main/java/org/apache/hyracks/control/nc/service/NCService.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.service;
+
+import org.apache.commons.lang3.SystemUtils;
+import org.ini4j.Ini;
+import org.kohsuke.args4j.CmdLineParser;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.StringReader;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Stand-alone process which listens for configuration information from the
+ * CC and starts an NC. Intended to be a constantly-running service.
+ */
+public class NCService {
+
+ private static final Logger LOGGER = Logger.getLogger(NCService.class.getName());
+
+ /**
+ * The .ini read from the CC (*not* the ncservice.ini file)
+ */
+ private static Ini ini = new Ini();
+
+ /**
+ * The NCServiceConfig
+ */
+ private static NCServiceConfig config;
+
+ /**
+ * The child Process, if one is active
+ */
+ private static Process proc = null;
+
+ private static final String MAGIC_COOKIE = "hyncmagic";
+
+ private static String getStringINIOpt(Ini ini, String section, String key, String default_value) {
+ String value = ini.get(section, key, String.class);
+ return (value != null) ? value : default_value;
+ }
+
+ private static int getIntINIOpt(Ini ini, String section, String key, int default_value) {
+ Integer value = ini.get(section, key, Integer.class);
+ return (value != null) ? value : default_value;
+ }
+
+ private static List<String> buildCommand() throws IOException {
+ List<String> cList = new ArrayList<String>();
+ if (SystemUtils.IS_OS_WINDOWS) {
+ cList.add(config.command + ".bat");
+ }
+ else {
+ cList.add(config.command);
+ }
+ cList.add("-config-file");
+ // Store the Ini file from the CC locally so NCConfig can read it.
+ // QQQ should arrange to delete this when done
+ File tempIni = File.createTempFile("ncconf", ".conf");
+ ini.store(tempIni);
+ cList.add(tempIni.getCanonicalPath());
+ return cList;
+ }
+
+ private static void configEnvironment(Map<String,String> env) {
+ if (env.containsKey("JAVA_OPTS")) {
+ return;
+ }
+ String jvmargs = getStringINIOpt(ini, "localnc", "jvm.args", "-Xmx1536m");
+ env.put("JAVA_OPTS", jvmargs);
+ }
+
+ /**
+ * Attempts to launch the "real" NCDriver, based on the configuration
+ * information gathered so far.
+ * @return true if the process was successfully launched and has now
+ * exited with a 0 (normal) exit code. false if some configuration error
+ * prevented the process from being launched or the process returned
+ * a non-0 (abnormal) exit code.
+ */
+ private static boolean launchNCProcess() {
+ try {
+ ProcessBuilder pb = new ProcessBuilder(buildCommand());
+ configEnvironment(pb.environment());
+ // QQQ inheriting probably isn't right
+ pb.inheritIO();
+
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Launching NCDriver process");
+ }
+ proc = pb.start();
+
+ boolean waiting = true;
+ int retval = 0;
+ while (waiting) {
+ try {
+ retval = proc.waitFor();
+ waiting = false;
+ } catch (InterruptedException ignored) {
+ }
+ }
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("NCDriver exited with return value " + retval);
+ }
+ return (retval == 0);
+ } catch (Exception e) {
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.log(Level.SEVERE, "Configuration from CC broken", e);
+ }
+ return false;
+ }
+ }
+
+ private static boolean acceptConnection(InputStream is) {
+ // Simple on-wire protocol: magic cookie (string), CC address (string),
+ // port (string), as encoded on CC by ObjectOutputStream. If we see
+ // anything else or have any error, crap out and await a different
+ // connection.
+ // QQQ This should probably be changed to directly accept the full
+ // config file from the CC, rather than calling back to the CC's
+ // "config" webservice to retrieve it. Revisit when the CC is fully
+ // parsing and validating the master config file.
+ try {
+ ObjectInputStream ois = new ObjectInputStream(is);
+ String magic = ois.readUTF();
+ if (! MAGIC_COOKIE.equals(magic)) {
+ LOGGER.severe("Connection used incorrect magic cookie");
+ return false;
+ }
+ String iniString = ois.readUTF();
+ ini = new Ini(new StringReader(iniString));
+ return launchNCProcess();
+ } catch (Exception e) {
+ LOGGER.log(Level.SEVERE, "Error decoding connection from server", e);
+ }
+ return false;
+ }
+
+ public static void main(String[] args) throws Exception {
+ // Register a shutdown hook which will kill the NC if the NC Service is killed.
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ if (proc != null) {
+ proc.destroy();
+ }
+ }
+ });
+ config = new NCServiceConfig();
+ CmdLineParser cp = new CmdLineParser(config);
+ try {
+ cp.parseArgument(args);
+ } catch (Exception e) {
+ e.printStackTrace();
+ cp.printUsage(System.err);
+ System.exit(1);
+ }
+ config.loadConfigAndApplyDefaults();
+
+ // For now we implement a trivial listener which just
+ // accepts an IP/port combination from the CC. This could
+ // be made more advanced in several ways depending on whether
+ // we want to expand the functionality of this service.
+ // For now this gets the job done, without radically changing
+ // the NC itself so that Managix can continue to function.
+ InetAddress addr = config.address == null ? null : InetAddress.getByName(config.address);
+ int port = config.port;
+
+ // Loop forever - the NCService will always return to "waiting for CC" state
+ // when the child NC terminates for any reason.
+ while (true) {
+ ServerSocket listener = new ServerSocket(port, 5, addr);
+ try {
+ boolean launched = false;
+ while (!launched) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Waiting for connection from CC on " + addr + ":" + port);
+ }
+ Socket socket = listener.accept();
+ try {
+ // QQQ Because acceptConnection() doesn't return if the
+ // service is started appropriately, the socket remains
+ // open but non-responsive.
+ launched = acceptConnection(socket.getInputStream());
+ } finally {
+ socket.close();
+ }
+ }
+ } finally {
+ listener.close();
+ }
+ }
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/src/main/java/org/apache/hyracks/control/nc/service/NCServiceConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/src/main/java/org/apache/hyracks/control/nc/service/NCServiceConfig.java
new file mode 100644
index 0000000..af80b33
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/src/main/java/org/apache/hyracks/control/nc/service/NCServiceConfig.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.service;
+
+import org.apache.hyracks.control.common.controllers.IniUtils;
+import org.ini4j.Ini;
+import org.kohsuke.args4j.Option;
+
+import java.io.IOException;
+import java.net.InetAddress;
+
+/**
+ * Command-line arguments for NC Service.
+ */
+public class NCServiceConfig {
+
+ /**
+ * Normally one should only pass this argument. Other arguments are for debugging and test purposes.
+ * If an option is specified both in the config file and on the command line, the config file
+ * version will take precedence.
+ */
+ @Option(name = "-config-file", usage = "Local NC configuration file (default: none)", required = false)
+ public String configFile = null;
+
+ @Option(name = "-address", usage = "Address to listen on for connections from CC (default: localhost)", required = false)
+ public String address = InetAddress.getLoopbackAddress().getHostAddress();
+
+ @Option(name = "-port", usage = "Port to listen on for connections from CC (default: 9090)", required = false)
+ public int port = 9090;
+
+ @Option(name = "-command", usage = "NC command to run (default: 'hyracksnc' on PATH)", required = false)
+ public String command = "hyracksnc";
+
+ private Ini ini = null;
+
+ /**
+ * This method simply maps from the ini parameters to the NCServiceConfig's fields.
+ * It does not apply defaults or any logic.
+ */
+ private void loadINIFile() throws IOException {
+ ini = IniUtils.loadINIFile(configFile);
+ address = IniUtils.getString(ini, "ncservice", "address", address);
+ port = IniUtils.getInt(ini, "ncservice", "port", port);
+ }
+
+ /**
+ * Once all @Option fields have been loaded from command-line or otherwise
+ * specified programmatically, call this method to:
+ * 1. Load options from a config file (as specified by -config-file)
+ * 2. Set default values for certain derived values
+ */
+ public void loadConfigAndApplyDefaults() throws IOException {
+ if (configFile != null) {
+ loadINIFile();
+ }
+ // No defaults necessary beyond the static ones for this config
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/pom.xml b/hyracks-fullstack/hyracks/hyracks-control/pom.xml
index ddb4e89..bd8d739 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-control/pom.xml
@@ -46,5 +46,6 @@
<module>hyracks-control-common</module>
<module>hyracks-control-cc</module>
<module>hyracks-control-nc</module>
+ <module>hyracks-nc-service</module>
</modules>
</project>
diff --git a/hyracks-fullstack/hyracks/hyracks-dist/pom.xml b/hyracks-fullstack/hyracks/hyracks-dist/pom.xml
index 99f8eca..a7c7a15 100644
--- a/hyracks-fullstack/hyracks/hyracks-dist/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-dist/pom.xml
@@ -76,7 +76,7 @@
<phase>package</phase>
<configuration>
<target>
- <chmod file="target/appassembler/bin/*)" perm="755"/>
+ <chmod file="target/appassembler/bin/*" perm="755"/>
</target>
</configuration>
<goals>
diff --git a/hyracks-fullstack/hyracks/hyracks-server/pom.xml b/hyracks-fullstack/hyracks/hyracks-server/pom.xml
index facaae8..3bda1d3 100644
--- a/hyracks-fullstack/hyracks/hyracks-server/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-server/pom.xml
@@ -43,6 +43,30 @@
<build>
<plugins>
<plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <version>2.6</version>
+ <configuration>
+ <runOrder>alphabetical</runOrder>
+ <forkMode>pertest</forkMode>
+ <systemProperties>
+ <property>
+ <name>java.util.logging.config.file</name>
+ <value>src/test/resources/logging.properties</value>
+ </property>
+ </systemProperties>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>integration-test</goal>
+ <goal>verify</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>appassembler-maven-plugin</artifactId>
<version>1.3</version>
@@ -59,6 +83,10 @@
<name>hyracksnc</name>
</program>
<program>
+ <mainClass>org.apache.hyracks.control.nc.service.NCService</mainClass>
+ <name>hyracksncservice</name>
+ </program>
+ <program>
<mainClass>org.apache.hyracks.server.drivers.VirtualClusterDriver</mainClass>
<name>hyracks-virtual-cluster</name>
</program>
@@ -80,6 +108,24 @@
<skip>true</skip>
</configuration>
</plugin>
+ <plugin>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <version>1.6</version>
+ <executions>
+ <execution>
+ <id>process-test-classes</id>
+ <phase>package</phase>
+ <configuration>
+ <target>
+ <chmod file="target/appassembler/bin/*" perm="755" />
+ </target>
+ </configuration>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
<dependencies>
@@ -97,5 +143,18 @@
<type>jar</type>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-nc-service</artifactId>
+ <version>0.2.18-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>commons-httpclient</groupId>
+ <artifactId>commons-httpclient</artifactId>
+ <version>3.0.1</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/hyracks-fullstack/hyracks/hyracks-server/src/test/java/org/apache/hyracks/server/test/NCServiceIT.java b/hyracks-fullstack/hyracks/hyracks-server/src/test/java/org/apache/hyracks/server/test/NCServiceIT.java
new file mode 100644
index 0000000..7f431f6
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-server/src/test/java/org/apache/hyracks/server/test/NCServiceIT.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.server.test;
+
+import junit.framework.Assert;
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.HttpStatus;
+import org.apache.commons.httpclient.methods.GetMethod;
+import org.apache.commons.lang3.StringUtils;
+import org.json.JSONArray;
+import org.json.JSONObject;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Logger;
+
+public class NCServiceIT {
+
+ private static final String RESOURCE_DIR = StringUtils
+ .join(new String[]{System.getProperty("user.dir"), "src", "test", "resources", "NCServiceIT"},
+ File.separator);
+ private static final String APP_DIR = StringUtils
+ .join(new String[]{System.getProperty("user.dir"), "target", "appassembler", "bin"},
+ File.separator);
+ private static final Logger LOGGER = Logger.getLogger(NCServiceIT.class.getName());
+ private static List<Process> procs = new ArrayList<>();
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ // Start two NC Services - don't read their output as they don't terminate
+ procs.add(invoke(APP_DIR + File.separator + "hyracksncservice",
+ "-config-file", RESOURCE_DIR + File.separator + "nc-red.conf",
+ "-command", APP_DIR + File.separator + "hyracksnc"));
+ procs.add(invoke(APP_DIR + File.separator + "hyracksncservice",
+ "-config-file", RESOURCE_DIR + File.separator + "nc-blue.conf",
+ "-command", APP_DIR + File.separator + "hyracksnc"));
+ try {
+ Thread.sleep(2000);
+ }
+ catch (InterruptedException ignored) {
+ }
+
+ // Start CC
+ procs.add(invoke(APP_DIR + File.separator + "hyrackscc",
+ "-config-file", RESOURCE_DIR + File.separator + "cc.conf"));
+ try {
+ Thread.sleep(10000);
+ }
+ catch (InterruptedException ignored) {
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ for (Process p : procs) {
+ p.destroy();
+ p.waitFor();
+ }
+ }
+
+ private static String getHttp(String url) throws Exception {
+ HttpClient client = new HttpClient();
+ GetMethod get = new GetMethod(url);
+ int statusCode;
+ try {
+ statusCode = client.executeMethod(get);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ }
+ String response = get.getResponseBodyAsString();
+ if (statusCode == HttpStatus.SC_OK) {
+ return response;
+ } else {
+ throw new Exception("HTTP error " + statusCode + ":\n" + response);
+ }
+ }
+
+ private static Process invoke(String... args) throws Exception {
+ ProcessBuilder pb = new ProcessBuilder(args);
+ pb.redirectErrorStream(true);
+ Process p = pb.start();
+ return p;
+ }
+
+ @Test
+ public void IsNodelistCorrect() throws Exception {
+ // Ping the nodelist HTTP API
+ String localhost = InetAddress.getLoopbackAddress().getHostAddress();
+ String response = getHttp("http://" + localhost + ":12345/rest/nodes");
+ JSONObject result = new JSONObject(response);
+ JSONArray nodes = result.getJSONArray("result");
+ int numNodes = nodes.length();
+ Assert.assertEquals("Wrong number of nodes!", numNodes, 2);
+ for (int i = 0; i < nodes.length(); i++) {
+ JSONObject node = nodes.getJSONObject(i);
+ String id = node.getString("node-id");
+ if (id.equals("red") || id.equals("blue")) {
+ continue;
+ }
+ Assert.fail("Unexpected node ID '" + id + "'!");
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ try {
+ setUp();
+ } catch (Exception e) {
+ e.printStackTrace();
+ LOGGER.severe("TEST CASE(S) FAILED");
+ } finally {
+ tearDown();
+ }
+ }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-server/src/test/resources/NCServiceIT/cc.conf b/hyracks-fullstack/hyracks/hyracks-server/src/test/resources/NCServiceIT/cc.conf
new file mode 100644
index 0000000..25ac530
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-server/src/test/resources/NCServiceIT/cc.conf
@@ -0,0 +1,11 @@
+[nc/red]
+address=127.0.0.1
+
+[nc/blue]
+address=127.0.0.1
+port=9091
+
+[cc]
+cluster.address = 127.0.0.1
+http.port = 12345
+
diff --git a/hyracks-fullstack/hyracks/hyracks-server/src/test/resources/NCServiceIT/nc-blue.conf b/hyracks-fullstack/hyracks/hyracks-server/src/test/resources/NCServiceIT/nc-blue.conf
new file mode 100644
index 0000000..d070b59
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-server/src/test/resources/NCServiceIT/nc-blue.conf
@@ -0,0 +1,3 @@
+[ncservice]
+address=127.0.0.1
+port=9091
diff --git a/hyracks-fullstack/hyracks/hyracks-server/src/test/resources/NCServiceIT/nc-red.conf b/hyracks-fullstack/hyracks/hyracks-server/src/test/resources/NCServiceIT/nc-red.conf
new file mode 100644
index 0000000..58a8f1d
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-server/src/test/resources/NCServiceIT/nc-red.conf
@@ -0,0 +1,4 @@
+[ncservice]
+address=127.0.0.1
+port=9090
+
diff --git a/hyracks-fullstack/hyracks/hyracks-server/src/test/resources/logging.properties b/hyracks-fullstack/hyracks/hyracks-server/src/test/resources/logging.properties
new file mode 100644
index 0000000..c888bb1
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-server/src/test/resources/logging.properties
@@ -0,0 +1,76 @@
+#/*
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+############################################################
+# Default Logging Configuration File
+#
+# You can use a different file by specifying a filename
+# with the java.util.logging.config.file system property.
+# For example java -Djava.util.logging.config.file=myfile
+############################################################
+
+############################################################
+# Global properties
+############################################################
+
+# "handlers" specifies a comma separated list of log Handler
+# classes. These handlers will be installed during VM startup.
+# Note that these classes must be on the system classpath.
+# By default we only configure a ConsoleHandler, which will only
+# show messages at the INFO and above levels.
+
+handlers= java.util.logging.ConsoleHandler
+
+# To also add the FileHandler, use the following line instead.
+
+# handlers= java.util.logging.FileHandler, java.util.logging.ConsoleHandler
+
+# Default global logging level.
+# This specifies which kinds of events are logged across
+# all loggers. For any given facility this global level
+# can be overriden by a facility specific level
+# Note that the ConsoleHandler also has a separate level
+# setting to limit messages printed to the console.
+
+.level= WARNING
+# .level= INFO
+# .level= FINE
+# .level = FINEST
+
+############################################################
+# Handler specific properties.
+# Describes specific configuration info for Handlers.
+############################################################
+
+# default file output is in user's home directory.
+
+# java.util.logging.FileHandler.pattern = %h/java%u.log
+# java.util.logging.FileHandler.limit = 50000
+# java.util.logging.FileHandler.count = 1
+# java.util.logging.FileHandler.formatter = java.util.logging.XMLFormatter
+
+# Limit the message that are printed on the console to FINE and above.
+
+java.util.logging.ConsoleHandler.level = FINE
+java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter
+
+
+############################################################
+# Facility specific properties.
+# Provides extra control for each logger.
+############################################################
+
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestNCApplicationContext.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestNCApplicationContext.java
index f619be0..c6a4430 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestNCApplicationContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestNCApplicationContext.java
@@ -21,6 +21,7 @@
import java.io.Serializable;
import java.util.concurrent.ThreadFactory;
+import org.apache.hyracks.api.application.IApplicationConfig;
import org.apache.hyracks.api.application.INCApplicationContext;
import org.apache.hyracks.api.application.IStateDumpHandler;
import org.apache.hyracks.api.context.IHyracksRootContext;
@@ -122,6 +123,11 @@
}
@Override
+ public IApplicationConfig getAppConfig() {
+ return null;
+ }
+
+ @Override
public ILifeCycleComponentManager getLifeCycleComponentManager() {
return lccm;
}