ASTERIXDB-1087: Make CC configurable from Asterix
Change-Id: I32b5e4b4242bdab16a664c9085bd8cd96fcc1142
Reviewed-on: https://asterix-gerrit.ics.uci.edu/648
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
Reviewed-by: Ian Maxon <imaxon@apache.org>
diff --git a/asterix-common/src/main/resources/schema/cluster.xsd b/asterix-common/src/main/resources/schema/cluster.xsd
index e0605f0..935d33f 100644
--- a/asterix-common/src/main/resources/schema/cluster.xsd
+++ b/asterix-common/src/main/resources/schema/cluster.xsd
@@ -49,6 +49,14 @@
<xs:element name="replication_factor" type="xs:integer" />
<xs:element name="auto_failover" type="xs:boolean" />
<xs:element name="replication_time_out" type="xs:integer" />
+ <xs:element name="heartbeat_period" type="xs:integer" />
+ <xs:element name="max_heartbeat_lapse_periods" type="xs:integer" />
+ <xs:element name="profile_dump_period" type="xs:integer" />
+ <xs:element name="default_max_job_attempts" type="xs:integer" />
+ <xs:element name="job_history_size" type="xs:integer" />
+ <xs:element name="result_time_to_live" type="xs:long" />
+ <xs:element name="result_sweep_threshold" type="xs:long" />
+ <xs:element name="cc_root" type="xs:string" />
<!-- definition of complex elements -->
<xs:element name="working_dir">
@@ -145,6 +153,14 @@
<xs:element ref="cl:master_node" />
<xs:element ref="cl:node" maxOccurs="unbounded" />
<xs:element ref="cl:substitute_nodes" />
+ <xs:element ref="cl:heartbeat_period" minOccurs="0" />
+ <xs:element ref="cl:max_heartbeat_lapse_periods" minOccurs="0" />
+ <xs:element ref="cl:profile_dump_period" minOccurs="0" />
+ <xs:element ref="cl:default_max_job_attempts" minOccurs="0" />
+ <xs:element ref="cl:job_history_size" minOccurs="0" />
+ <xs:element ref="cl:result_time_to_live" minOccurs="0" />
+ <xs:element ref="cl:result_sweep_threshold" minOccurs="0" />
+ <xs:element ref="cl:cc_root" minOccurs="0" />
</xs:sequence>
</xs:complexType>
</xs:element>
diff --git a/asterix-common/src/main/resources/schema/yarn_cluster.xsd b/asterix-common/src/main/resources/schema/yarn_cluster.xsd
index 8827985..463997c 100644
--- a/asterix-common/src/main/resources/schema/yarn_cluster.xsd
+++ b/asterix-common/src/main/resources/schema/yarn_cluster.xsd
@@ -83,7 +83,14 @@
<xs:element
name="nc_container_mem"
type="xs:string" />
-
+ <xs:element name="heartbeat_period" type="xs:integer" />
+ <xs:element name="max_heartbeat_lapse_periods" type="xs:integer" />
+ <xs:element name="profile_dump_period" type="xs:integer" />
+ <xs:element name="default_max_job_attempts" type="xs:integer" />
+ <xs:element name="job_history_size" type="xs:integer" />
+ <xs:element name="result_time_to_live" type="xs:long" />
+ <xs:element name="result_sweep_threshold" type="xs:long" />
+ <xs:element name="cc_root" type="xs:string" />
<!-- definition of complex elements -->
@@ -189,6 +196,14 @@
ref="cl:node"
maxOccurs="unbounded" />
<xs:element ref="cl:substitute_nodes" />
+ <xs:element ref="cl:heartbeat_period" minOccurs="0" />
+ <xs:element ref="cl:max_heartbeat_lapse_periods" minOccurs="0" />
+ <xs:element ref="cl:profile_dump_period" minOccurs="0" />
+ <xs:element ref="cl:default_max_job_attempts" minOccurs="0" />
+ <xs:element ref="cl:job_history_size" minOccurs="0" />
+ <xs:element ref="cl:result_time_to_live" minOccurs="0" />
+ <xs:element ref="cl:result_sweep_threshold" minOccurs="0" />
+ <xs:element ref="cl:cc_root" minOccurs="0" />
</xs:sequence>
</xs:complexType>
</xs:element>
diff --git a/asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java b/asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java
index 4bd5098..ecbafa7 100644
--- a/asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java
+++ b/asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java
@@ -147,6 +147,38 @@
clusterProperties.add(new Property("CLUSTER_NET_PORT", "" + clusterNetPort));
clusterProperties.add(new Property("HTTP_PORT", "" + httpPort));
+ //pass Cluster optional parameters
+ if (cluster.getHeartbeatPeriod() != null) {
+ clusterProperties
+ .add(new Property("HEARTBEAT_PERIOD", String.valueOf(cluster.getHeartbeatPeriod().intValue())));
+ }
+ if (cluster.getMaxHeartbeatLapsePeriods() != null) {
+ clusterProperties.add(new Property("MAX_HEARTBEAT_LAPSE_PERIODS",
+ String.valueOf(cluster.getMaxHeartbeatLapsePeriods().intValue())));
+ }
+ if (cluster.getProfileDumpPeriod() != null) {
+ clusterProperties.add(
+ new Property("PROFILE_DUMP_PERIOD", String.valueOf(cluster.getProfileDumpPeriod().intValue())));
+ }
+ if (cluster.getDefaultMaxJobAttempts() != null) {
+ clusterProperties.add(new Property("DEFAULT_MAX_JOB_ATTEMPTS",
+ String.valueOf(cluster.getDefaultMaxJobAttempts().intValue())));
+ }
+ if (cluster.getJobHistorySize() != null) {
+ clusterProperties
+ .add(new Property("JOB_HISTORY_SIZE", String.valueOf(cluster.getJobHistorySize().intValue())));
+ }
+ if (cluster.getResultTimeToLive() != null) {
+ clusterProperties.add(
+ new Property("RESULT_TIME_TO_LIVE", String.valueOf(cluster.getResultTimeToLive().longValue())));
+ }
+ if (cluster.getResultSweepThreshold() != null) {
+ clusterProperties.add(new Property("RESULT_SWEEP_THRESHOLD",
+ String.valueOf(cluster.getResultSweepThreshold().longValue())));
+ }
+ if (cluster.getCcRoot() != null) {
+ clusterProperties.add(new Property("CC_ROOT", cluster.getCcRoot()));
+ }
cluster.setEnv(new Env(clusterProperties));
}
diff --git a/asterix-events/src/main/resources/events/cc_start/cc_start.sh b/asterix-events/src/main/resources/events/cc_start/cc_start.sh
index 24d3432..b7481e9 100644
--- a/asterix-events/src/main/resources/events/cc_start/cc_start.sh
+++ b/asterix-events/src/main/resources/events/cc_start/cc_start.sh
@@ -21,5 +21,50 @@
then
mkdir -p $LOG_DIR
fi
+ccArgs='-client-net-ip-address '$CLIENT_NET_IP
+ccArgs=$ccArgs' -client-net-port '$CLIENT_NET_PORT
+ccArgs=$ccArgs' -cluster-net-ip-address '$CLUSTER_NET_IP
+ccArgs=$ccArgs' -cluster-net-port '$CLUSTER_NET_PORT
+ccArgs=$ccArgs' -http-port '$HTTP_PORT
+
+if [ ! -z $HEARTBEAT_PERIOD ]
+then
+ccArgs=$ccArgs' -heartbeat-period '$HEARTBEAT_PERIOD
+fi
+
+if [ ! -z $MAX_HEARTBEAT_LAPSE_PERIODS ]
+then
+ccArgs=$ccArgs' -max-heartbeat-lapse-periods '$MAX_HEARTBEAT_LAPSE_PERIODS
+fi
+
+if [ ! -z $PROFILE_DUMP_PERIOD ]
+then
+ccArgs=$ccArgs' -profile-dump-period '$PROFILE_DUMP_PERIOD
+fi
+
+if [ ! -z $DEFAULT_MAX_JOB_ATTEMPTS ]
+then
+ccArgs=$ccArgs' -default-max-job-attempts '$DEFAULT_MAX_JOB_ATTEMPTS
+fi
+
+if [ ! -z $JOB_HISTORY_SIZE ]
+then
+ccArgs=$ccArgs' -job-history-size '$JOB_HISTORY_SIZE
+fi
+
+if [ ! -z $RESULT_TIME_TO_LIVE ]
+then
+ccArgs=$ccArgs' "-result-time-to-live '$RESULT_TIME_TO_LIVE
+fi
+
+if [ ! -z $RESULT_SWEEP_THRESHOLD ]
+then
+ccArgs=$ccArgs' -result-sweep-threshold '$RESULT_SWEEP_THRESHOLD
+fi
+
+if [ ! -z $CC_ROOT ]
+then
+ccArgs=$ccArgs' -cc-root '$CC_ROOT
+fi
cd $WORKING_DIR
-$ASTERIX_HOME/bin/asterixcc -client-net-ip-address $CLIENT_NET_IP -client-net-port $CLIENT_NET_PORT -cluster-net-ip-address $CLUSTER_NET_IP -cluster-net-port $CLUSTER_NET_PORT -http-port $HTTP_PORT &> $LOG_DIR/cc.log
+$ASTERIX_HOME/bin/asterixcc echo $ccArgs &> $LOG_DIR/cc.log
\ No newline at end of file
diff --git a/asterix-installer/src/main/resources/clusters/local/local.xml b/asterix-installer/src/main/resources/clusters/local/local.xml
index 57bf114..20f697f 100644
--- a/asterix-installer/src/main/resources/clusters/local/local.xml
+++ b/asterix-installer/src/main/resources/clusters/local/local.xml
@@ -26,6 +26,20 @@
<dir>/tmp/asterix-installer</dir>
<NFS>true</NFS>
</working_dir>
+ <!-- Sets the time duration between two heartbeats from each node controller in milliseconds (default: 10000) -->
+ <heartbeat_period>10000</heartbeat_period>
+ <!-- Sets the maximum number of missed heartbeats before a node is marked as dead (default: 5) -->
+ <max_heartbeat_lapse_periods>5</max_heartbeat_lapse_periods>
+ <!-- Sets the time duration between two profile dumps from each node controller in milliseconds. 0 to disable. (default: 0) -->
+ <profile_dump_period>0</profile_dump_period>
+ <!-- Sets the default number of job attempts allowed if not specified in the job specification. (default: 5) -->
+ <default_max_job_attempts>5</default_max_job_attempts>
+ <!-- Limits the number of historical jobs remembered by the system to the specified value. (default: 10) -->
+ <job_history_size>10</job_history_size>
+ <!-- Limits the amount of time results for asynchronous jobs should be retained by the system in milliseconds. (default: 24 hours) -->
+ <result_time_to_live>86400000</result_time_to_live>
+ <!-- The duration within which an instance of the result cleanup should be invoked in milliseconds. (default: 1 minute) -->
+ <result_sweep_threshold>60000</result_sweep_threshold>
<master_node>
<id>master</id>
<client_ip>127.0.0.1</client_ip>
diff --git a/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixApplicationMaster.java b/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixApplicationMaster.java
index 22534e0..00d9018 100644
--- a/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixApplicationMaster.java
+++ b/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixApplicationMaster.java
@@ -47,6 +47,7 @@
import org.apache.asterix.common.config.GlobalConfig;
import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.event.schema.cluster.Property;
import org.apache.asterix.event.schema.yarnCluster.Cluster;
import org.apache.asterix.event.schema.yarnCluster.MasterNode;
import org.apache.asterix.event.schema.yarnCluster.Node;
@@ -93,13 +94,12 @@
public class AsterixApplicationMaster {
- static
- {
+ static {
Logger rootLogger = Logger.getRootLogger();
rootLogger.setLevel(Level.INFO);
- rootLogger.addAppender(new ConsoleAppender(
- new PatternLayout("%-6r [%p] %c - %m%n")));
+ rootLogger.addAppender(new ConsoleAppender(new PatternLayout("%-6r [%p] %c - %m%n")));
}
+
private static final Log LOG = LogFactory.getLog(AsterixApplicationMaster.class);
private static final String CLUSTER_DESC_PATH = "cluster-config.xml";
private static final String ASTERIX_CONF_NAME = "asterix-configuration.xml";
@@ -134,7 +134,7 @@
// Hostname of the container
private String appMasterHostname = "";
// Port on which the app master listens for status updates from clients
- private int appMasterRpcPort = new Random().nextInt(65535-49152);
+ private int appMasterRpcPort = new Random().nextInt(65535 - 49152);
// Tracking url to which app master publishes info for clients to monitor
private String appMasterTrackingUrl = "";
@@ -284,7 +284,7 @@
if (cliParser.hasOption("obliterate")) {
obliterate = true;
}
- if(cliParser.hasOption("initial")){
+ if (cliParser.hasOption("initial")) {
initial = true;
}
@@ -327,14 +327,14 @@
appAttemptID = containerId.getApplicationAttemptId();
}
- if (!envs.containsKey(ApplicationConstants.APP_SUBMIT_TIME_ENV)
- || !envs.containsKey(Environment.NM_HOST.name()) || !envs.containsKey(Environment.NM_HTTP_PORT.name())
+ if (!envs.containsKey(ApplicationConstants.APP_SUBMIT_TIME_ENV) || !envs.containsKey(Environment.NM_HOST.name())
+ || !envs.containsKey(Environment.NM_HTTP_PORT.name())
|| !envs.containsKey(Environment.NM_PORT.name())) {
throw new IllegalArgumentException(
"Environment is not set correctly- please check client submission settings");
}
- System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, envs.get("PWD") + File.separator + "bin" + File.separator
- + ASTERIX_CONF_NAME);
+ System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY,
+ envs.get("PWD") + File.separator + "bin" + File.separator + ASTERIX_CONF_NAME);
LOG.info("Application master for app" + ", appId=" + appAttemptID.getApplicationId().getId()
+ ", clustertimestamp=" + appAttemptID.getApplicationId().getClusterTimestamp() + ", attemptId="
@@ -350,11 +350,10 @@
instanceConfPath = envs.get(AConstants.INSTANCESTORE);
//the only time this is null is during testing, when asterix-yarn isn't packaged in a JAR yet.
- if(envs.get(AConstants.APPLICATIONMASTERJARLOCATION) != null
- && !envs.get(AConstants.APPLICATIONMASTERJARLOCATION).endsWith(File.separator)){
+ if (envs.get(AConstants.APPLICATIONMASTERJARLOCATION) != null
+ && !envs.get(AConstants.APPLICATIONMASTERJARLOCATION).endsWith(File.separator)) {
appMasterJar = new Path(envs.get(AConstants.APPLICATIONMASTERJARLOCATION));
- }
- else{
+ } else {
appMasterJar = null;
}
@@ -473,7 +472,7 @@
resourceManager.addContainerRequest(hostToRequest(n.getClusterIp(), false));
LOG.info("Asked for NC: " + n.getClusterIp());
numNodes++;
- synchronized(pendingNCs){
+ synchronized (pendingNCs) {
pendingNCs.add(n);
}
}
@@ -776,7 +775,7 @@
+ ", diagnostics=" + containerStatus.getDiagnostics());
// non complete containers should not be here
- if(containerStatus.getState() != ContainerState.COMPLETE){
+ if (containerStatus.getState() != ContainerState.COMPLETE) {
throw new IllegalStateException("Non-completed container given as completed by RM.");
}
@@ -803,13 +802,14 @@
LOG.info("Got response from RM for container ask, allocatedCnt=" + allocatedContainers.size());
numAllocatedContainers.addAndGet(allocatedContainers.size());
for (Container allocatedContainer : allocatedContainers) {
- synchronized(pendingNCs){
+ synchronized (pendingNCs) {
try {
if (!pendingNCs.contains(containerToNode(allocatedContainer, clusterDesc)) && ccUp.get()) {
- nmClientAsync.stopContainerAsync(allocatedContainer.getId(), allocatedContainer.getNodeId());
+ nmClientAsync.stopContainerAsync(allocatedContainer.getId(),
+ allocatedContainer.getNodeId());
continue;
}
- } catch(UnknownHostException ex){
+ } catch (UnknownHostException ex) {
LOG.error("Unknown host allocated for us by RM- this shouldn't happen.", ex);
}
}
@@ -826,12 +826,12 @@
// I want to know if this node is the CC, because it must start before the NCs.
LOG.info("Allocated: " + allocatedContainer.getNodeId().getHost());
LOG.info("CC : " + cC.getId());
- synchronized(pendingNCs){
+ synchronized (pendingNCs) {
try {
if (ccUp.get()) {
pendingNCs.remove(containerToNode(allocatedContainer, clusterDesc));
}
- } catch(UnknownHostException ex){
+ } catch (UnknownHostException ex) {
LOG.error("Unknown host allocated for us by RM- this shouldn't happen.", ex);
}
}
@@ -1035,6 +1035,34 @@
vargs.add("-app-cc-main-class org.apache.asterix.hyracks.bootstrap.CCApplicationEntryPoint");
vargs.add("-cluster-net-ip-address " + cC.getClusterIp());
vargs.add("-client-net-ip-address " + cC.getClientIp());
+ //pass CC optional parameters
+ if (clusterDesc.getHeartbeatPeriod() != null) {
+ vargs.add("-heartbeat-period " + String.valueOf(clusterDesc.getHeartbeatPeriod().intValue()));
+ }
+ if (clusterDesc.getMaxHeartbeatLapsePeriods() != null) {
+ vargs.add("-max-heartbeat-lapse-periods "
+ + String.valueOf(clusterDesc.getMaxHeartbeatLapsePeriods().intValue()));
+ }
+ if (clusterDesc.getProfileDumpPeriod() != null) {
+ vargs.add("-profile-dump-period " + String.valueOf(clusterDesc.getProfileDumpPeriod().intValue()));
+ }
+ if (clusterDesc.getDefaultMaxJobAttempts() != null) {
+ vargs.add("-default-max-job-attempts "
+ + String.valueOf(clusterDesc.getDefaultMaxJobAttempts().intValue()));
+ }
+ if (clusterDesc.getJobHistorySize() != null) {
+ vargs.add("-job-history-size " + String.valueOf(clusterDesc.getJobHistorySize().intValue()));
+ }
+ if (clusterDesc.getResultTimeToLive() != null) {
+ vargs.add("-result-time-to-live " + String.valueOf(clusterDesc.getResultTimeToLive().intValue()));
+ }
+ if (clusterDesc.getResultSweepThreshold() != null) {
+ vargs.add("-result-sweep-threshold "
+ + String.valueOf(clusterDesc.getResultSweepThreshold().intValue()));
+ }
+ if (clusterDesc.getCcRoot() != null) {
+ vargs.add("-cc-root " + clusterDesc.getCcRoot());
+ }
ccStarted.set(true);
} else {
@@ -1058,7 +1086,7 @@
vargs.add("-data-ip-address " + local.getClusterIp());
vargs.add("-result-ip-address " + local.getClusterIp());
vargs.add("--");
- if(initial){
+ if (initial) {
vargs.add("-initial-run ");
}
} catch (UnknownHostException e) {
@@ -1096,8 +1124,7 @@
if (!containerIsCC(container)) {
LOG.error("Unable to find NC configured for host: " + container.getId() + e);
return null;
- }
- else {
+ } else {
return Arrays.asList("");
}
}
@@ -1143,7 +1170,7 @@
if (!containerIsCC(container)) {
LOG.error("Unable to find NC configured for host: " + container.getId() + e);
return null;
- }else {
+ } else {
return Arrays.asList("");
}
}