ASTERIXDB-1087: Make CC configurable from Asterix

Change-Id: I32b5e4b4242bdab16a664c9085bd8cd96fcc1142
Reviewed-on: https://asterix-gerrit.ics.uci.edu/648
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
Reviewed-by: Ian Maxon <imaxon@apache.org>
diff --git a/asterix-common/src/main/resources/schema/cluster.xsd b/asterix-common/src/main/resources/schema/cluster.xsd
index e0605f0..935d33f 100644
--- a/asterix-common/src/main/resources/schema/cluster.xsd
+++ b/asterix-common/src/main/resources/schema/cluster.xsd
@@ -49,6 +49,14 @@
 	<xs:element name="replication_factor" type="xs:integer" />
 	<xs:element name="auto_failover" type="xs:boolean" />
 	<xs:element name="replication_time_out" type="xs:integer" />
+    <xs:element name="heartbeat_period" type="xs:integer" />
+    <xs:element name="max_heartbeat_lapse_periods" type="xs:integer" />
+    <xs:element name="profile_dump_period" type="xs:integer" />
+    <xs:element name="default_max_job_attempts" type="xs:integer" />
+    <xs:element name="job_history_size" type="xs:integer" />
+    <xs:element name="result_time_to_live" type="xs:long" />
+    <xs:element name="result_sweep_threshold" type="xs:long" />
+    <xs:element name="cc_root" type="xs:string" />
 
 	<!-- definition of complex elements -->
 	<xs:element name="working_dir">
@@ -145,6 +153,14 @@
 				<xs:element ref="cl:master_node" />
 				<xs:element ref="cl:node" maxOccurs="unbounded" />
 				<xs:element ref="cl:substitute_nodes" />
+                <xs:element ref="cl:heartbeat_period" minOccurs="0" />
+                <xs:element ref="cl:max_heartbeat_lapse_periods" minOccurs="0" />
+                <xs:element ref="cl:profile_dump_period" minOccurs="0" />
+                <xs:element ref="cl:default_max_job_attempts" minOccurs="0" />
+                <xs:element ref="cl:job_history_size" minOccurs="0" />
+                <xs:element ref="cl:result_time_to_live" minOccurs="0" />
+                <xs:element ref="cl:result_sweep_threshold" minOccurs="0" />
+                <xs:element ref="cl:cc_root" minOccurs="0" />
 			</xs:sequence>
 		</xs:complexType>
 	</xs:element>
diff --git a/asterix-common/src/main/resources/schema/yarn_cluster.xsd b/asterix-common/src/main/resources/schema/yarn_cluster.xsd
index 8827985..463997c 100644
--- a/asterix-common/src/main/resources/schema/yarn_cluster.xsd
+++ b/asterix-common/src/main/resources/schema/yarn_cluster.xsd
@@ -83,7 +83,14 @@
     <xs:element
         name="nc_container_mem"
         type="xs:string" />
-	
+    <xs:element name="heartbeat_period" type="xs:integer" />
+    <xs:element name="max_heartbeat_lapse_periods" type="xs:integer" />
+    <xs:element name="profile_dump_period" type="xs:integer" />
+    <xs:element name="default_max_job_attempts" type="xs:integer" />
+    <xs:element name="job_history_size" type="xs:integer" />
+    <xs:element name="result_time_to_live" type="xs:long" />
+    <xs:element name="result_sweep_threshold" type="xs:long" />
+    <xs:element name="cc_root" type="xs:string" />
 
 	<!-- definition of complex elements -->
 
@@ -189,6 +196,14 @@
                     ref="cl:node"
                     maxOccurs="unbounded" />
                 <xs:element ref="cl:substitute_nodes" />
+                <xs:element ref="cl:heartbeat_period" minOccurs="0" />
+                <xs:element ref="cl:max_heartbeat_lapse_periods" minOccurs="0" />
+                <xs:element ref="cl:profile_dump_period" minOccurs="0" />
+                <xs:element ref="cl:default_max_job_attempts" minOccurs="0" />
+                <xs:element ref="cl:job_history_size" minOccurs="0" />
+                <xs:element ref="cl:result_time_to_live" minOccurs="0" />
+                <xs:element ref="cl:result_sweep_threshold" minOccurs="0" />
+                <xs:element ref="cl:cc_root" minOccurs="0" />
             </xs:sequence>
         </xs:complexType>
     </xs:element>
diff --git a/asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java b/asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java
index 4bd5098..ecbafa7 100644
--- a/asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java
+++ b/asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java
@@ -147,6 +147,38 @@
         clusterProperties.add(new Property("CLUSTER_NET_PORT", "" + clusterNetPort));
         clusterProperties.add(new Property("HTTP_PORT", "" + httpPort));
 
+        //pass Cluster optional parameters
+        if (cluster.getHeartbeatPeriod() != null) {
+            clusterProperties
+                    .add(new Property("HEARTBEAT_PERIOD", String.valueOf(cluster.getHeartbeatPeriod().intValue())));
+        }
+        if (cluster.getMaxHeartbeatLapsePeriods() != null) {
+            clusterProperties.add(new Property("MAX_HEARTBEAT_LAPSE_PERIODS",
+                    String.valueOf(cluster.getMaxHeartbeatLapsePeriods().intValue())));
+        }
+        if (cluster.getProfileDumpPeriod() != null) {
+            clusterProperties.add(
+                    new Property("PROFILE_DUMP_PERIOD", String.valueOf(cluster.getProfileDumpPeriod().intValue())));
+        }
+        if (cluster.getDefaultMaxJobAttempts() != null) {
+            clusterProperties.add(new Property("DEFAULT_MAX_JOB_ATTEMPTS",
+                    String.valueOf(cluster.getDefaultMaxJobAttempts().intValue())));
+        }
+        if (cluster.getJobHistorySize() != null) {
+            clusterProperties
+                    .add(new Property("JOB_HISTORY_SIZE", String.valueOf(cluster.getJobHistorySize().intValue())));
+        }
+        if (cluster.getResultTimeToLive() != null) {
+            clusterProperties.add(
+                    new Property("RESULT_TIME_TO_LIVE", String.valueOf(cluster.getResultTimeToLive().longValue())));
+        }
+        if (cluster.getResultSweepThreshold() != null) {
+            clusterProperties.add(new Property("RESULT_SWEEP_THRESHOLD",
+                    String.valueOf(cluster.getResultSweepThreshold().longValue())));
+        }
+        if (cluster.getCcRoot() != null) {
+            clusterProperties.add(new Property("CC_ROOT", cluster.getCcRoot()));
+        }
         cluster.setEnv(new Env(clusterProperties));
     }
 
diff --git a/asterix-events/src/main/resources/events/cc_start/cc_start.sh b/asterix-events/src/main/resources/events/cc_start/cc_start.sh
index 24d3432..b7481e9 100644
--- a/asterix-events/src/main/resources/events/cc_start/cc_start.sh
+++ b/asterix-events/src/main/resources/events/cc_start/cc_start.sh
@@ -21,5 +21,50 @@
 then 
   mkdir -p $LOG_DIR
 fi
+ccArgs='-client-net-ip-address '$CLIENT_NET_IP
+ccArgs=$ccArgs' -client-net-port '$CLIENT_NET_PORT
+ccArgs=$ccArgs' -cluster-net-ip-address '$CLUSTER_NET_IP
+ccArgs=$ccArgs' -cluster-net-port '$CLUSTER_NET_PORT
+ccArgs=$ccArgs' -http-port '$HTTP_PORT
+
+if [ ! -z $HEARTBEAT_PERIOD ]
+then
+ccArgs=$ccArgs' -heartbeat-period '$HEARTBEAT_PERIOD
+fi
+
+if [ ! -z $MAX_HEARTBEAT_LAPSE_PERIODS ]
+then
+ccArgs=$ccArgs' -max-heartbeat-lapse-periods '$MAX_HEARTBEAT_LAPSE_PERIODS
+fi
+
+if [ ! -z $PROFILE_DUMP_PERIOD ]
+then
+ccArgs=$ccArgs' -profile-dump-period '$PROFILE_DUMP_PERIOD
+fi
+
+if [ ! -z $DEFAULT_MAX_JOB_ATTEMPTS ]
+then
+ccArgs=$ccArgs' -default-max-job-attempts '$DEFAULT_MAX_JOB_ATTEMPTS
+fi
+
+if [ ! -z $JOB_HISTORY_SIZE ]
+then
+ccArgs=$ccArgs' -job-history-size '$JOB_HISTORY_SIZE
+fi
+
+if [ ! -z $RESULT_TIME_TO_LIVE ]
+then
+ccArgs=$ccArgs' "-result-time-to-live '$RESULT_TIME_TO_LIVE
+fi
+
+if [ ! -z $RESULT_SWEEP_THRESHOLD ]
+then
+ccArgs=$ccArgs' -result-sweep-threshold '$RESULT_SWEEP_THRESHOLD
+fi
+
+if [ ! -z $CC_ROOT ]
+then
+ccArgs=$ccArgs' -cc-root '$CC_ROOT
+fi
 cd $WORKING_DIR
-$ASTERIX_HOME/bin/asterixcc -client-net-ip-address $CLIENT_NET_IP -client-net-port $CLIENT_NET_PORT -cluster-net-ip-address $CLUSTER_NET_IP -cluster-net-port $CLUSTER_NET_PORT -http-port $HTTP_PORT &> $LOG_DIR/cc.log
+$ASTERIX_HOME/bin/asterixcc echo $ccArgs &> $LOG_DIR/cc.log
\ No newline at end of file
diff --git a/asterix-installer/src/main/resources/clusters/local/local.xml b/asterix-installer/src/main/resources/clusters/local/local.xml
index 57bf114..20f697f 100644
--- a/asterix-installer/src/main/resources/clusters/local/local.xml
+++ b/asterix-installer/src/main/resources/clusters/local/local.xml
@@ -26,6 +26,20 @@
         <dir>/tmp/asterix-installer</dir>
         <NFS>true</NFS>
     </working_dir>
+    <!-- Sets the time duration between two heartbeats from each node controller in milliseconds (default: 10000) -->
+    <heartbeat_period>10000</heartbeat_period>
+    <!-- Sets the maximum number of missed heartbeats before a node is marked as dead (default: 5) -->
+    <max_heartbeat_lapse_periods>5</max_heartbeat_lapse_periods>
+    <!-- Sets the time duration between two profile dumps from each node controller in milliseconds. 0 to disable. (default: 0) -->
+    <profile_dump_period>0</profile_dump_period>
+    <!-- Sets the default number of job attempts allowed if not specified in the job specification. (default: 5) -->
+    <default_max_job_attempts>5</default_max_job_attempts>
+    <!-- Limits the number of historical jobs remembered by the system to the specified value. (default: 10) -->
+    <job_history_size>10</job_history_size>
+    <!-- Limits the amount of time results for asynchronous jobs should be retained by the system in milliseconds. (default: 24 hours) -->
+    <result_time_to_live>86400000</result_time_to_live>
+    <!-- The duration within which an instance of the result cleanup should be invoked in milliseconds. (default: 1 minute) -->
+    <result_sweep_threshold>60000</result_sweep_threshold>
     <master_node>
         <id>master</id>
         <client_ip>127.0.0.1</client_ip>
diff --git a/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixApplicationMaster.java b/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixApplicationMaster.java
index 22534e0..00d9018 100644
--- a/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixApplicationMaster.java
+++ b/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixApplicationMaster.java
@@ -47,6 +47,7 @@
 
 import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.event.schema.cluster.Property;
 import org.apache.asterix.event.schema.yarnCluster.Cluster;
 import org.apache.asterix.event.schema.yarnCluster.MasterNode;
 import org.apache.asterix.event.schema.yarnCluster.Node;
@@ -93,13 +94,12 @@
 
 public class AsterixApplicationMaster {
 
-    static
-    {
+    static {
         Logger rootLogger = Logger.getRootLogger();
         rootLogger.setLevel(Level.INFO);
-        rootLogger.addAppender(new ConsoleAppender(
-                new PatternLayout("%-6r [%p] %c - %m%n")));
+        rootLogger.addAppender(new ConsoleAppender(new PatternLayout("%-6r [%p] %c - %m%n")));
     }
+
     private static final Log LOG = LogFactory.getLog(AsterixApplicationMaster.class);
     private static final String CLUSTER_DESC_PATH = "cluster-config.xml";
     private static final String ASTERIX_CONF_NAME = "asterix-configuration.xml";
@@ -134,7 +134,7 @@
     // Hostname of the container
     private String appMasterHostname = "";
     // Port on which the app master listens for status updates from clients
-    private int appMasterRpcPort = new Random().nextInt(65535-49152);
+    private int appMasterRpcPort = new Random().nextInt(65535 - 49152);
     // Tracking url to which app master publishes info for clients to monitor
     private String appMasterTrackingUrl = "";
 
@@ -284,7 +284,7 @@
         if (cliParser.hasOption("obliterate")) {
             obliterate = true;
         }
-        if(cliParser.hasOption("initial")){
+        if (cliParser.hasOption("initial")) {
             initial = true;
         }
 
@@ -327,14 +327,14 @@
             appAttemptID = containerId.getApplicationAttemptId();
         }
 
-        if (!envs.containsKey(ApplicationConstants.APP_SUBMIT_TIME_ENV)
-                || !envs.containsKey(Environment.NM_HOST.name()) || !envs.containsKey(Environment.NM_HTTP_PORT.name())
+        if (!envs.containsKey(ApplicationConstants.APP_SUBMIT_TIME_ENV) || !envs.containsKey(Environment.NM_HOST.name())
+                || !envs.containsKey(Environment.NM_HTTP_PORT.name())
                 || !envs.containsKey(Environment.NM_PORT.name())) {
             throw new IllegalArgumentException(
                     "Environment is not set correctly- please check client submission settings");
         }
-        System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, envs.get("PWD") + File.separator + "bin" + File.separator
-                + ASTERIX_CONF_NAME);
+        System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY,
+                envs.get("PWD") + File.separator + "bin" + File.separator + ASTERIX_CONF_NAME);
 
         LOG.info("Application master for app" + ", appId=" + appAttemptID.getApplicationId().getId()
                 + ", clustertimestamp=" + appAttemptID.getApplicationId().getClusterTimestamp() + ", attemptId="
@@ -350,11 +350,10 @@
 
         instanceConfPath = envs.get(AConstants.INSTANCESTORE);
         //the only time this is null is during testing, when asterix-yarn isn't packaged in a JAR yet.
-        if(envs.get(AConstants.APPLICATIONMASTERJARLOCATION) != null
-                && !envs.get(AConstants.APPLICATIONMASTERJARLOCATION).endsWith(File.separator)){
+        if (envs.get(AConstants.APPLICATIONMASTERJARLOCATION) != null
+                && !envs.get(AConstants.APPLICATIONMASTERJARLOCATION).endsWith(File.separator)) {
             appMasterJar = new Path(envs.get(AConstants.APPLICATIONMASTERJARLOCATION));
-        }
-        else{
+        } else {
             appMasterJar = null;
         }
 
@@ -473,7 +472,7 @@
             resourceManager.addContainerRequest(hostToRequest(n.getClusterIp(), false));
             LOG.info("Asked for NC: " + n.getClusterIp());
             numNodes++;
-            synchronized(pendingNCs){
+            synchronized (pendingNCs) {
                 pendingNCs.add(n);
             }
         }
@@ -776,7 +775,7 @@
                         + ", diagnostics=" + containerStatus.getDiagnostics());
 
                 // non complete containers should not be here
-                if(containerStatus.getState() != ContainerState.COMPLETE){
+                if (containerStatus.getState() != ContainerState.COMPLETE) {
                     throw new IllegalStateException("Non-completed container given as completed by RM.");
                 }
 
@@ -803,13 +802,14 @@
             LOG.info("Got response from RM for container ask, allocatedCnt=" + allocatedContainers.size());
             numAllocatedContainers.addAndGet(allocatedContainers.size());
             for (Container allocatedContainer : allocatedContainers) {
-                synchronized(pendingNCs){
+                synchronized (pendingNCs) {
                     try {
                         if (!pendingNCs.contains(containerToNode(allocatedContainer, clusterDesc)) && ccUp.get()) {
-                            nmClientAsync.stopContainerAsync(allocatedContainer.getId(), allocatedContainer.getNodeId());
+                            nmClientAsync.stopContainerAsync(allocatedContainer.getId(),
+                                    allocatedContainer.getNodeId());
                             continue;
                         }
-                    } catch(UnknownHostException ex){
+                    } catch (UnknownHostException ex) {
                         LOG.error("Unknown host allocated for us by RM- this shouldn't happen.", ex);
                     }
                 }
@@ -826,12 +826,12 @@
                 // I want to know if this node is the CC, because it must start before the NCs.
                 LOG.info("Allocated: " + allocatedContainer.getNodeId().getHost());
                 LOG.info("CC : " + cC.getId());
-                synchronized(pendingNCs){
+                synchronized (pendingNCs) {
                     try {
                         if (ccUp.get()) {
                             pendingNCs.remove(containerToNode(allocatedContainer, clusterDesc));
                         }
-                    } catch(UnknownHostException ex){
+                    } catch (UnknownHostException ex) {
                         LOG.error("Unknown host allocated for us by RM- this shouldn't happen.", ex);
                     }
                 }
@@ -1035,6 +1035,34 @@
                 vargs.add("-app-cc-main-class org.apache.asterix.hyracks.bootstrap.CCApplicationEntryPoint");
                 vargs.add("-cluster-net-ip-address " + cC.getClusterIp());
                 vargs.add("-client-net-ip-address " + cC.getClientIp());
+                //pass CC optional parameters
+                if (clusterDesc.getHeartbeatPeriod() != null) {
+                    vargs.add("-heartbeat-period " + String.valueOf(clusterDesc.getHeartbeatPeriod().intValue()));
+                }
+                if (clusterDesc.getMaxHeartbeatLapsePeriods() != null) {
+                    vargs.add("-max-heartbeat-lapse-periods "
+                            + String.valueOf(clusterDesc.getMaxHeartbeatLapsePeriods().intValue()));
+                }
+                if (clusterDesc.getProfileDumpPeriod() != null) {
+                    vargs.add("-profile-dump-period " + String.valueOf(clusterDesc.getProfileDumpPeriod().intValue()));
+                }
+                if (clusterDesc.getDefaultMaxJobAttempts() != null) {
+                    vargs.add("-default-max-job-attempts "
+                            + String.valueOf(clusterDesc.getDefaultMaxJobAttempts().intValue()));
+                }
+                if (clusterDesc.getJobHistorySize() != null) {
+                    vargs.add("-job-history-size " + String.valueOf(clusterDesc.getJobHistorySize().intValue()));
+                }
+                if (clusterDesc.getResultTimeToLive() != null) {
+                    vargs.add("-result-time-to-live " + String.valueOf(clusterDesc.getResultTimeToLive().intValue()));
+                }
+                if (clusterDesc.getResultSweepThreshold() != null) {
+                    vargs.add("-result-sweep-threshold "
+                            + String.valueOf(clusterDesc.getResultSweepThreshold().intValue()));
+                }
+                if (clusterDesc.getCcRoot() != null) {
+                    vargs.add("-cc-root " + clusterDesc.getCcRoot());
+                }
                 ccStarted.set(true);
 
             } else {
@@ -1058,7 +1086,7 @@
                     vargs.add("-data-ip-address " + local.getClusterIp());
                     vargs.add("-result-ip-address " + local.getClusterIp());
                     vargs.add("--");
-                    if(initial){
+                    if (initial) {
                         vargs.add("-initial-run ");
                     }
                 } catch (UnknownHostException e) {
@@ -1096,8 +1124,7 @@
                 if (!containerIsCC(container)) {
                     LOG.error("Unable to find NC configured for host: " + container.getId() + e);
                     return null;
-                }
-                else {
+                } else {
                     return Arrays.asList("");
                 }
             }
@@ -1143,7 +1170,7 @@
                 if (!containerIsCC(container)) {
                     LOG.error("Unable to find NC configured for host: " + container.getId() + e);
                     return null;
-                }else {
+                } else {
                     return Arrays.asList("");
                 }
             }