YARN integration for AsterixDB
This is an initial version of YARN integration for AsterixDB.
- Uses static assignment of CC and NC nodes to NM locations
- Stores state locally on each NM, outside of HDFS
- "All or nothing" container allocation. We don't attempt to
move or rellocate containers the RM may kill (yet).
- Retains feature parity with managix.
Change-Id: I49c849179d17fc7faa446b9be57a0695df6836ab
Reviewed-on: https://asterix-gerrit.ics.uci.edu/161
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <hubailmor@gmail.com>
diff --git a/asterix-yarn/src/main/assembly/binary-assembly.xml b/asterix-yarn/src/main/assembly/binary-assembly.xml
new file mode 100644
index 0000000..8457c66
--- /dev/null
+++ b/asterix-yarn/src/main/assembly/binary-assembly.xml
@@ -0,0 +1,101 @@
+<!--
+ ! Copyright 2009-2013 by The Regents of the University of California
+ ! Licensed under the Apache License, Version 2.0 (the "License");
+ ! you may not use this file except in compliance with the License.
+ ! you may obtain a copy of the License from
+ !
+ ! http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing, software
+ ! distributed under the License is distributed on an "AS IS" BASIS,
+ ! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ! See the License for the specific language governing permissions and
+ ! limitations under the License.
+ !-->
+<assembly>
+ <id>binary-assembly</id>
+ <formats>
+ <format>dir</format>
+ <format>zip</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <fileSets>
+ <fileSet>
+ <directory>src/main/resources/scripts</directory>
+ <fileMode>0755</fileMode>
+ <includes>
+ <include>asterix</include>
+ </includes>
+ <outputDirectory>bin</outputDirectory>
+ </fileSet>
+ <fileSet>
+ <directory>src/main/resources/configs</directory>
+ <fileMode>0755</fileMode>
+ <includes>
+ <include>local.xml</include>
+ </includes>
+ <outputDirectory>configs</outputDirectory>
+ </fileSet>
+ <fileSet>
+ <directory>src/main/resources/</directory>
+ <fileMode>0755</fileMode>
+ <includes>
+ <include>base-asterix-configuration.xml</include>
+ </includes>
+ <outputDirectory>conf</outputDirectory>
+ </fileSet>
+ <fileSet>
+ <directory>src/main/resources/configs</directory>
+ <fileMode>0755</fileMode>
+ <includes>
+ <include>my_awesome_cluster_desc.xml</include>
+ </includes>
+ <outputDirectory>configs</outputDirectory>
+ </fileSet>
+ <fileSet>
+ <directory>src/main/resources/configs</directory>
+ <fileMode>0755</fileMode>
+ <includes>
+ <include>asterix-client-log4j.properties</include>
+ </includes>
+ <outputDirectory>conf</outputDirectory>
+ </fileSet>
+ </fileSets>
+ <dependencySets>
+ <dependencySet>
+ <includes>
+ <include>edu.uci.ics.asterix:asterix-yarn</include>
+ <include>edu.uci.ics.asterix:asterix-common</include>
+ <include>log4j:log4j</include>
+ <include>org.slf4j:slf4j-api</include>
+ <include>org.slf4j:slf4j-simple</include>
+ <include>commons-io:commons-io</include>
+ <include>commons-cli:commons-cli</include>
+ <include>commons-configuration:commons-configuration</include>
+ <include>commons-logging:commons-logging</include>
+ <include>commons-codec:commons-codec</include>
+ <include>commons-lang:commons-lang</include>
+ <include>org.apache.hadoop:hadoop-common</include>
+ <include>org.apache.hadoop:hadoop-hdfs</include>
+ <include>org.apache.hadoop:hadoop-auth</include>
+ <include>org.apache.hadoop:hadoop-yarn-client</include>
+ <include>org.apache.hadoop:hadoop-yarn-common</include>
+ <include>org.apache.hadoop:hadoop-yarn-api</include>
+ <include>org.apache.httpcomponents:httpcore</include>
+ <include>org.apache.httpcomponents:httpclient</include>
+ <include>commons-httpclient:commons-httpclient</include>
+ <include>com.google.guava:guava</include>
+ <include>com.google.protobuf:protobuf-java</include>
+ </includes>
+ <outputDirectory>lib</outputDirectory>
+ </dependencySet>
+ <dependencySet>
+ <outputDirectory>asterix</outputDirectory>
+ <includes>
+ <include>asterix-server*</include>
+ </includes>
+ <unpack>false</unpack>
+ <useTransitiveDependencies>false</useTransitiveDependencies>
+ </dependencySet>
+ </dependencySets>
+</assembly>
diff --git a/asterix-yarn/src/main/assembly/hyracks-assembly.xml b/asterix-yarn/src/main/assembly/hyracks-assembly.xml
new file mode 100644
index 0000000..ae362ca
--- /dev/null
+++ b/asterix-yarn/src/main/assembly/hyracks-assembly.xml
@@ -0,0 +1,37 @@
+<!--
+ ! Copyright 2009-2013 by The Regents of the University of California
+ ! Licensed under the Apache License, Version 2.0 (the "License");
+ ! you may not use this file except in compliance with the License.
+ ! you may obtain a copy of the License from
+ !
+ ! http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing, software
+ ! distributed under the License is distributed on an "AS IS" BASIS,
+ ! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ! See the License for the specific language governing permissions and
+ ! limitations under the License.
+ !-->
+<assembly>
+ <id>binary-assembly</id>
+ <formats>
+ <format>zip</format>
+ <format>dir</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <fileSets>
+ <fileSet>
+ <directory>target/appassembler/bin</directory>
+ <outputDirectory>bin</outputDirectory>
+ <fileMode>0755</fileMode>
+ </fileSet>
+ <fileSet>
+ <directory>target/appassembler/lib</directory>
+ <outputDirectory>lib</outputDirectory>
+ </fileSet>
+ <fileSet>
+ <directory>docs</directory>
+ <outputDirectory>docs</outputDirectory>
+ </fileSet>
+ </fileSets>
+</assembly>
diff --git a/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/AConstants.java b/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/AConstants.java
new file mode 100644
index 0000000..f8548db
--- /dev/null
+++ b/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/AConstants.java
@@ -0,0 +1,102 @@
+package edu.uci.ics.asterix.aoya;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Constants used in both Client and Application Master
+ */
+@InterfaceAudience.LimitedPrivate(value = { "For simplicity between Client and AM." })
+@InterfaceStability.Unstable
+public class AConstants {
+ /**
+ * Environment key name pointing to the the app master jar location
+ */
+ public static final String APPLICATIONMASTERJARLOCATION = "APPLICATIONMASTERJARLOCATION";
+
+ /**
+ * Environment key name denoting the file timestamp for the the app master jar.
+ * Used to validate the local resource.
+ */
+ public static final String APPLICATIONMASTERJARTIMESTAMP = "APPLICATIONMASTERJARTIMESTAMP";
+
+ /**
+ * Environment key name denoting the file content length for the app master jar.
+ * Used to validate the local resource.
+ */
+ public static final String APPLICATIONMASTERJARLEN = "APPLICATIONMASTERJARLEN";
+ /**
+ * Environment key name pointing to the Asterix distributable tar
+ */
+ public static final String TARLOCATION = "TARLOCATION";
+
+ /**
+ * Environment key name denoting the file timestamp for the Asterix tar.
+ * Used to validate the local resource.
+ */
+ public static final String TARTIMESTAMP = "TARTIMESTAMP";
+
+ /**
+ * Environment key name denoting the file content length for the Asterix tar.
+ * Used to validate the local resource.
+ */
+ public static final String TARLEN = "TARLEN";
+
+ /**
+ * Environment key name pointing to the Asterix cluster configuration file
+ */
+ public static final String CONFLOCATION = "CONFLOCATION";
+
+ /**
+ * Environment key name denoting the file timestamp for the Asterix config.
+ * Used to validate the local resource.
+ */
+
+ public static final String CONFTIMESTAMP = "CONFTIMESTAMP";
+
+ /**
+ * Environment key name denoting the file content length for the Asterix config.
+ * Used to validate the local resource.
+ */
+
+ public static final String CONFLEN = "CONFLEN";
+
+ /**
+ * Environment key name pointing to the Asterix parameters file
+ */
+
+ public static final String PARAMLOCATION = "PARAMLOCATION";
+
+ /**
+ * Environment key name denoting the file timestamp for the Asterix parameters.
+ * Used to validate the local resource.
+ */
+
+ public static final String PARAMTIMESTAMP = "PARAMTIMESTAMP";
+
+ /**
+ * Environment key name denoting the file content length for the Asterix parameters.
+ * Used to validate the local resource.
+ */
+
+ public static final String PARAMLEN = "PARAMLEN";
+
+ public static final String PATHSUFFIX = "PATHSUFFIX";
+
+ public static final String INSTANCESTORE = "INSTANCESTORE";
+
+ public static final String RMADDRESS = "RMADDRESS";
+
+ public static final String RMSCHEDULERADDRESS = "RMSCHEDULERADDRESS";
+
+ public static final String DFS_BASE = "DFSBASE";
+
+ public static final String NC_JAVA_OPTS = "NCJAVAOPTS";
+
+ public static final String CC_JAVA_OPTS = "CCJAVAOPTS";
+
+ public static final String NC_CONTAINER_MEM = "NC_CONTAINER_MEM";
+
+ public static final String CC_CONTAINER_MEM = "CC_CONTAINER_MEM";
+
+}
diff --git a/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/AsterixApplicationMaster.java b/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/AsterixApplicationMaster.java
new file mode 100644
index 0000000..1539a34
--- /dev/null
+++ b/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/AsterixApplicationMaster.java
@@ -0,0 +1,1288 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.aoya;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.net.InetAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+
+import edu.uci.ics.asterix.common.config.GlobalConfig;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.event.schema.yarnCluster.Cluster;
+import edu.uci.ics.asterix.event.schema.yarnCluster.MasterNode;
+import edu.uci.ics.asterix.event.schema.yarnCluster.Node;
+import org.apache.log4j.ConsoleAppender;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PatternLayout;
+
+public class AsterixApplicationMaster {
+
+ static
+ {
+ Logger rootLogger = Logger.getRootLogger();
+ rootLogger.setLevel(Level.INFO);
+ rootLogger.addAppender(new ConsoleAppender(
+ new PatternLayout("%-6r [%p] %c - %m%n")));
+ }
+ private static final Log LOG = LogFactory.getLog(AsterixApplicationMaster.class);
+ private static final String CLUSTER_DESC_PATH = "cluster-config.xml";
+ private static final String ASTERIX_CONF_NAME = "asterix-configuration.xml";
+ private static final String ASTERIX_ZIP_NAME = "asterix-server.zip";
+
+ private static final int CC_MEMORY_MBS_DEFAULT = 1024;
+ private static final int NC_MEMORY_MBS_DEFAULT = 1536;
+ private static final String EXTERNAL_CC_JAVA_OPTS_DEFAULT = "-Xmx" + CC_MEMORY_MBS_DEFAULT + "m";
+ private static final String EXTERNAL_NC_JAVA_OPTS_DEFAULT = "-Xmx" + NC_MEMORY_MBS_DEFAULT + "m";
+ private static final String OBLITERATOR_CLASSNAME = "edu.uci.ics.asterix.aoya.Deleter";
+ private static final String HDFS_BACKUP_CLASSNAME = "edu.uci.ics.asterix.aoya.HDFSBackup";
+ private static final String NC_CLASSNAME = "edu.uci.ics.hyracks.control.nc.NCDriver";
+ private static final String CC_CLASSNAME = "edu.uci.ics.hyracks.control.cc.CCDriver";
+ private static final String JAVA_HOME = System.getProperty("java.home");
+ private boolean doneAllocating = false;
+
+ // Configuration
+ private Configuration conf;
+
+ // Handle to communicate with the Resource Manager
+ private AMRMClientAsync<ContainerRequest> resourceManager;
+
+ // Handle to communicate with the Node Manager
+ private NMClientAsync nmClientAsync;
+ // Listen to process the response from the Node Manager
+ private NMCallbackHandler containerListener;
+ // Application Attempt Id ( combination of attemptId and fail count )
+ private ApplicationAttemptId appAttemptID;
+
+ // TODO
+ // For status update for clients - yet to be implemented
+ // Hostname of the container
+ private String appMasterHostname = "";
+ // Port on which the app master listens for status updates from clients
+ private int appMasterRpcPort = new Random().nextInt(65535-49152);
+ // Tracking url to which app master publishes info for clients to monitor
+ private String appMasterTrackingUrl = "";
+
+ // Counter for completed containers ( complete denotes successful or failed )
+ private AtomicInteger numCompletedContainers = new AtomicInteger();
+ // Allocated container count so that we know how many containers has the RM
+ // allocated to us
+ private AtomicInteger numAllocatedContainers = new AtomicInteger();
+ // Count of failed containers
+ private AtomicInteger numFailedContainers = new AtomicInteger();
+ // Count of containers already requested from the RM
+ // Needed as once requested, we should not request for containers again.
+ // Only request for more if the original requirement changes.
+ private AtomicInteger numRequestedContainers = new AtomicInteger();
+ //Tells us whether the Cluster Controller is up so we can safely start some Node Controllers
+ private AtomicBoolean ccUp = new AtomicBoolean();
+ private AtomicBoolean ccStarted = new AtomicBoolean();
+ private Queue<Node> pendingNCs = new ArrayDeque<Node>();
+
+ //HDFS path to AsterixDB distributable zip
+ private String asterixZipPath = "";
+ // Timestamp needed for creating a local resource
+ private long asterixZipTimestamp = 0;
+ // File length needed for local resource
+ private long asterixZipLen = 0;
+
+ //HDFS path to AsterixDB cluster description
+ private String asterixConfPath = "";
+ // Timestamp needed for creating a local resource
+ private long asterixConfTimestamp = 0;
+ // File length needed for local resource
+ private long asterixConfLen = 0;
+
+ private String instanceConfPath = "";
+
+ //base dir under which all configs and binaries lie
+ private String dfsBasePath;
+
+ private int numTotalContainers = 0;
+
+ // Set the local resources
+ private Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+
+ private Cluster clusterDesc = null;
+ private MasterNode cC = null;
+ private String ccJavaOpts = null;
+ private int ccMem = 0;
+ private String ncJavaOpts = null;
+ private int ncMem = 0;
+ private volatile boolean done;
+ private volatile boolean success;
+
+ private boolean obliterate = false;
+ private Path appMasterJar = null;
+ private boolean backup = false;
+ long backupTimestamp;
+ String snapName;
+ private boolean restore = false;
+ private boolean initial = false;
+
+ // Launch threads
+ private List<Thread> launchThreads = new CopyOnWriteArrayList<Thread>();
+
+ public static void main(String[] args) {
+
+ boolean result = false;
+ try {
+
+ AsterixApplicationMaster appMaster = new AsterixApplicationMaster();
+ LOG.info("Initializing ApplicationMaster");
+ appMaster.setEnvs(appMaster.setArgs(args));
+ boolean doRun = appMaster.init();
+ if (!doRun) {
+ System.exit(0);
+ }
+ result = appMaster.run();
+ } catch (Exception e) {
+ LOG.fatal("Error running ApplicationMaster", e);
+ System.exit(1);
+ }
+ if (result) {
+ LOG.info("Application Master completed successfully. exiting");
+ System.exit(0);
+ } else {
+ LOG.info("Application Master failed. exiting");
+ System.exit(2);
+ }
+ }
+
+ private void dumpOutDebugInfo() {
+
+ LOG.info("Dump debug output");
+ Map<String, String> envs = System.getenv();
+ for (Map.Entry<String, String> env : envs.entrySet()) {
+ LOG.info("System env: key=" + env.getKey() + ", val=" + env.getValue());
+ System.out.println("System env: key=" + env.getKey() + ", val=" + env.getValue());
+ }
+
+ String cmd = "ls -alhLR";
+ Runtime run = Runtime.getRuntime();
+ Process pr = null;
+ try {
+ pr = run.exec(cmd);
+ pr.waitFor();
+
+ BufferedReader buf = new BufferedReader(new InputStreamReader(pr.getInputStream()));
+ String line = "";
+ while ((line = buf.readLine()) != null) {
+ LOG.info("System CWD content: " + line);
+ System.out.println("System CWD content: " + line);
+ }
+ buf.close();
+ } catch (IOException e) {
+ LOG.info(e);
+ } catch (InterruptedException e) {
+ LOG.info(e);
+ }
+ }
+
+ public AsterixApplicationMaster() {
+ // Set up the configuration and RPC
+ conf = new YarnConfiguration();
+
+ }
+
+ public CommandLine setArgs(String[] args) throws ParseException {
+ Options opts = new Options();
+ opts.addOption("app_attempt_id", true, "App Attempt ID. Not to be used unless for testing purposes");
+ opts.addOption("priority", true, "Application Priority. Default 0");
+ opts.addOption("debug", false, "Dump out debug information");
+ opts.addOption("help", false, "Print usage");
+ opts.addOption("initial", false, "Initialize existing Asterix instance.");
+ opts.addOption("obliterate", false, "Delete asterix instance completely.");
+ opts.addOption("backup", false, "Back up AsterixDB instance");
+ opts.addOption("restore", true, "Restore an AsterixDB instance");
+
+ CommandLine cliParser = new GnuParser().parse(opts, args);
+
+ if (cliParser.hasOption("help")) {
+ printUsage(opts);
+ }
+
+ if (cliParser.hasOption("debug")) {
+ dumpOutDebugInfo();
+ }
+
+ if (cliParser.hasOption("obliterate")) {
+ obliterate = true;
+ }
+ if(cliParser.hasOption("initial")){
+ initial = true;
+ }
+
+ if (cliParser.hasOption("backup")) {
+ backup = true;
+ backupTimestamp = System.currentTimeMillis();
+ }
+ if (cliParser.hasOption("restore")) {
+ restore = true;
+ snapName = cliParser.getOptionValue("restore");
+ LOG.info(snapName);
+ }
+ return cliParser;
+ }
+
+ public void setEnvs(CommandLine cliParser) {
+ Map<String, String> envs = System.getenv();
+ if (envs.containsKey("HADOOP_CONF_DIR")) {
+ File hadoopConfDir = new File(envs.get("HADOOP_CONF_DIR"));
+ if (hadoopConfDir.isDirectory()) {
+ for (File config : hadoopConfDir.listFiles()) {
+ if (config.getName().matches("^.*(xml)$")) {
+ conf.addResource(new Path(config.getAbsolutePath()));
+ }
+ }
+ }
+ }
+ //the containerID might be in the arguments or the environment
+ if (!envs.containsKey(Environment.CONTAINER_ID.name())) {
+ if (cliParser.hasOption("app_attempt_id")) {
+ String appIdStr = cliParser.getOptionValue("app_attempt_id", "");
+ appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr);
+ } else {
+
+ throw new IllegalArgumentException(
+ "Environment is not set correctly- please check client submission settings");
+ }
+ } else {
+ ContainerId containerId = ConverterUtils.toContainerId(envs.get(Environment.CONTAINER_ID.name()));
+ appAttemptID = containerId.getApplicationAttemptId();
+ }
+
+ if (!envs.containsKey(ApplicationConstants.APP_SUBMIT_TIME_ENV)
+ || !envs.containsKey(Environment.NM_HOST.name()) || !envs.containsKey(Environment.NM_HTTP_PORT.name())
+ || !envs.containsKey(Environment.NM_PORT.name())) {
+ throw new IllegalArgumentException(
+ "Environment is not set correctly- please check client submission settings");
+ }
+ System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, envs.get("PWD") + File.separator + "bin" + File.separator
+ + ASTERIX_CONF_NAME);
+
+ LOG.info("Application master for app" + ", appId=" + appAttemptID.getApplicationId().getId()
+ + ", clustertimestamp=" + appAttemptID.getApplicationId().getClusterTimestamp() + ", attemptId="
+ + appAttemptID.getAttemptId());
+
+ asterixZipPath = envs.get(AConstants.TARLOCATION);
+ asterixZipTimestamp = Long.parseLong(envs.get(AConstants.TARTIMESTAMP));
+ asterixZipLen = Long.parseLong(envs.get(AConstants.TARLEN));
+
+ asterixConfPath = envs.get(AConstants.CONFLOCATION);
+ asterixConfTimestamp = Long.parseLong(envs.get(AConstants.CONFTIMESTAMP));
+ asterixConfLen = Long.parseLong(envs.get(AConstants.CONFLEN));
+
+ instanceConfPath = envs.get(AConstants.INSTANCESTORE);
+ //the only time this is null is during testing, when asterix-yarn isn't packaged in a JAR yet.
+ if(envs.get(AConstants.APPLICATIONMASTERJARLOCATION) != null
+ && !envs.get(AConstants.APPLICATIONMASTERJARLOCATION).endsWith(File.separator)){
+ appMasterJar = new Path(envs.get(AConstants.APPLICATIONMASTERJARLOCATION));
+ }
+ else{
+ appMasterJar = null;
+ }
+
+ dfsBasePath = envs.get(AConstants.DFS_BASE);
+ //If the NM has an odd environment where the proper hadoop XML configs dont get imported, we can end up not being able to talk to the RM
+ // this solves that!
+ //in a testing environment these can be null however.
+ if (envs.get(AConstants.RMADDRESS) != null) {
+ conf.set("yarn.resourcemanager.address", envs.get(AConstants.RMADDRESS));
+ LOG.info("RM Address: " + envs.get(AConstants.RMADDRESS));
+ }
+ if (envs.get(AConstants.RMADDRESS) != null) {
+ conf.set("yarn.resourcemanager.scheduler.address", envs.get(AConstants.RMSCHEDULERADDRESS));
+ }
+ ccJavaOpts = envs.get(AConstants.CC_JAVA_OPTS);
+ //set defaults if no special given options
+ if (ccJavaOpts == null) {
+ ccJavaOpts = EXTERNAL_CC_JAVA_OPTS_DEFAULT;
+ }
+ ncJavaOpts = envs.get(AConstants.NC_JAVA_OPTS);
+ if (ncJavaOpts == null) {
+ ncJavaOpts = EXTERNAL_NC_JAVA_OPTS_DEFAULT;
+ }
+
+ LOG.info("Path suffix: " + instanceConfPath);
+ }
+
+ public boolean init() throws ParseException, IOException, AsterixException, YarnException {
+ try {
+ localizeDFSResources();
+ clusterDesc = Utils.parseYarnClusterConfig(CLUSTER_DESC_PATH);
+ cC = clusterDesc.getMasterNode();
+ appMasterTrackingUrl = "http://" + cC.getClientIp() + ":" + cC.getClientPort() + Path.SEPARATOR;
+ distributeAsterixConfig();
+ //now let's read what's in there so we can set the JVM opts right
+ LOG.debug("config file loc: " + System.getProperty(GlobalConfig.CONFIG_FILE_PROPERTY));
+ } catch (FileNotFoundException | IllegalStateException e) {
+ LOG.error("Could not deserialize Cluster Config from disk- aborting!");
+ LOG.error(e);
+ throw e;
+ }
+
+ return true;
+ }
+
+ /**
+ * Sets up the parameters for the Asterix config.
+ *
+ * @throws IOException
+ */
+ private void distributeAsterixConfig() throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ String pathSuffix = instanceConfPath + File.separator + ASTERIX_CONF_NAME;
+ Path dst = new Path(dfsBasePath, pathSuffix);
+ URI paramLocation = dst.toUri();
+ FileStatus paramFileStatus = fs.getFileStatus(dst);
+ Long paramLen = paramFileStatus.getLen();
+ Long paramTimestamp = paramFileStatus.getModificationTime();
+ LocalResource asterixParamLoc = Records.newRecord(LocalResource.class);
+ asterixParamLoc.setType(LocalResourceType.FILE);
+ asterixParamLoc.setVisibility(LocalResourceVisibility.PRIVATE);
+ asterixParamLoc.setResource(ConverterUtils.getYarnUrlFromURI(paramLocation));
+ asterixParamLoc.setTimestamp(paramTimestamp);
+ asterixParamLoc.setSize(paramLen);
+ localResources.put(ASTERIX_CONF_NAME, asterixParamLoc);
+
+ }
+
+ /**
+ * @param c
+ * The cluster exception to attempt to alocate with the RM
+ * @throws YarnException
+ */
+ private void requestResources(Cluster c) throws YarnException, UnknownHostException {
+ //set memory
+ if (c.getCcContainerMem() != null) {
+ ccMem = Integer.parseInt(c.getCcContainerMem());
+ } else {
+ ccMem = CC_MEMORY_MBS_DEFAULT;
+ }
+ if (c.getNcContainerMem() != null) {
+ ncMem = Integer.parseInt(c.getNcContainerMem());
+ } else {
+ ncMem = CC_MEMORY_MBS_DEFAULT;
+ }
+ //request CC
+ int numNodes = 0;
+ ContainerRequest ccAsk = hostToRequest(cC.getClusterIp(), true);
+ resourceManager.addContainerRequest(ccAsk);
+ LOG.info("Asked for CC: " + Arrays.toString(ccAsk.getNodes().toArray()));
+ numNodes++;
+ //now we wait to be given the CC before starting the NCs...
+ //we will wait a minute.
+ int deathClock = 60;
+ while (ccUp.get() == false && deathClock > 0) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ex) {
+ LOG.debug(ex);
+ }
+ --deathClock;
+ }
+ if (deathClock == 0 && ccUp.get() == false) {
+ throw new YarnException("Couldn't allocate container for CC. Abort!");
+ }
+ LOG.info("Waiting for CC process to start");
+ //TODO: inspect for actual liveness instead of waiting.
+ // is there a good way to do this? maybe try opening a socket to it...
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException ex) {
+ LOG.debug(ex);
+ }
+ //request NCs
+ for (Node n : c.getNode()) {
+ resourceManager.addContainerRequest(hostToRequest(n.getClusterIp(), false));
+ LOG.info("Asked for NC: " + n.getClusterIp());
+ numNodes++;
+ synchronized(pendingNCs){
+ pendingNCs.add(n);
+ }
+ }
+ LOG.info("Requested all NCs and CCs. Wait for things to settle!");
+ numRequestedContainers.set(numNodes);
+ numTotalContainers = numNodes;
+ doneAllocating = true;
+
+ }
+
+ /**
+ * Asks the RM for a particular host, nicely.
+ *
+ * @param host
+ * The host to request
+ * @param cc
+ * Whether or not the host is the CC
+ * @return A container request that is (hopefully) for the host we asked for.
+ */
+ private ContainerRequest hostToRequest(String host, boolean cc) throws UnknownHostException {
+ InetAddress hostIp = InetAddress.getByName(host);
+ Priority pri = Records.newRecord(Priority.class);
+ pri.setPriority(0);
+ Resource capability = Records.newRecord(Resource.class);
+ if (cc) {
+ capability.setMemory(ccMem);
+ } else {
+ capability.setMemory(ncMem);
+ }
+ //we dont set anything else because we don't care about that and yarn doesn't honor it yet
+ String[] hosts = new String[1];
+ //TODO this is silly
+ hosts[0] = hostIp.getHostName();
+ LOG.info("IP addr: " + host + " resolved to " + hostIp.getHostName());
+ ContainerRequest request = new ContainerRequest(capability, hosts, null, pri, false);
+ LOG.info("Requested host ask: " + request.getNodes());
+ return request;
+ }
+
+ /**
+ * Determines whether or not a container is the one on which the CC should reside
+ *
+ * @param c
+ * The container in question
+ * @return True if the container should have the CC process on it, false otherwise.
+ */
+ boolean containerIsCC(Container c) {
+ String containerHost = c.getNodeId().getHost();
+ try {
+ InetAddress containerIp = InetAddress.getByName(containerHost);
+ LOG.info(containerIp.getCanonicalHostName());
+ InetAddress ccIp = InetAddress.getByName(cC.getClusterIp());
+ LOG.info(ccIp.getCanonicalHostName());
+ return containerIp.getCanonicalHostName().equals(ccIp.getCanonicalHostName());
+ } catch (UnknownHostException e) {
+ return false;
+ }
+ }
+
+ /**
+ * Attempts to find the Node in the Cluster Description that matches this container
+ *
+ * @param c
+ * The container to resolve
+ * @return The node this container corresponds to
+ * @throws java.net.UnknownHostException
+ * if the container isn't present in the description
+ */
+ Node containerToNode(Container c, Cluster cl) throws UnknownHostException {
+ String containerHost = c.getNodeId().getHost();
+ InetAddress containerIp = InetAddress.getByName(containerHost);
+ LOG.info("Resolved Container IP: " + containerIp);
+ for (Node node : cl.getNode()) {
+ InetAddress nodeIp = InetAddress.getByName(node.getClusterIp());
+ LOG.info(nodeIp + "?=" + containerIp);
+ if (nodeIp.equals(containerIp))
+ return node;
+ }
+ //if we find nothing, this is bad...
+ throw new java.net.UnknownHostException("Could not resolve container" + containerHost + " to node");
+ }
+
+ /**
+ * Here I am just pointing the Containers to the exisiting HDFS resources given by the Client
+ * filesystem of the nodes.
+ *
+ * @throws IOException
+ */
+ private void localizeDFSResources() throws IOException {
+ //if performing an 'offline' task, skip a lot of resource distribution
+ if (obliterate || backup || restore) {
+ if (appMasterJar == null || ("").equals(appMasterJar)) {
+ //this can happen in a jUnit testing environment. we don't need to set it there.
+ if (!conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
+ throw new IllegalStateException("AM jar not provided in environment.");
+ } else {
+ return;
+ }
+ }
+ FileSystem fs = FileSystem.get(conf);
+ FileStatus appMasterJarStatus = fs.getFileStatus(appMasterJar);
+ LocalResource obliteratorJar = Records.newRecord(LocalResource.class);
+ obliteratorJar.setType(LocalResourceType.FILE);
+ obliteratorJar.setVisibility(LocalResourceVisibility.PRIVATE);
+ obliteratorJar.setResource(ConverterUtils.getYarnUrlFromPath(appMasterJar));
+ obliteratorJar.setTimestamp(appMasterJarStatus.getModificationTime());
+ obliteratorJar.setSize(appMasterJarStatus.getLen());
+ localResources.put("asterix-yarn.jar", obliteratorJar);
+ LOG.info(localResources.values());
+ return;
+ }
+ //otherwise, distribute evertything to start up asterix
+
+ LocalResource asterixZip = Records.newRecord(LocalResource.class);
+
+ //this un-tar's the asterix distribution
+ asterixZip.setType(LocalResourceType.ARCHIVE);
+
+ asterixZip.setVisibility(LocalResourceVisibility.PRIVATE);
+ try {
+ asterixZip.setResource(ConverterUtils.getYarnUrlFromURI(new URI(asterixZipPath)));
+
+ } catch (URISyntaxException e) {
+ LOG.error("Error locating Asterix zip" + " in env, path=" + asterixZipPath);
+ throw new IOException(e);
+ }
+
+ asterixZip.setTimestamp(asterixZipTimestamp);
+ asterixZip.setSize(asterixZipLen);
+ localResources.put(ASTERIX_ZIP_NAME, asterixZip);
+
+ //now let's do the same for the cluster description XML
+ LocalResource asterixConf = Records.newRecord(LocalResource.class);
+ asterixConf.setType(LocalResourceType.FILE);
+
+ asterixConf.setVisibility(LocalResourceVisibility.PRIVATE);
+ try {
+ asterixConf.setResource(ConverterUtils.getYarnUrlFromURI(new URI(asterixConfPath)));
+ } catch (URISyntaxException e) {
+ LOG.error("Error locating Asterix config" + " in env, path=" + asterixConfPath);
+ throw new IOException(e);
+ }
+ //TODO: I could avoid localizing this everywhere by only calling this block on the metadata node.
+ asterixConf.setTimestamp(asterixConfTimestamp);
+ asterixConf.setSize(asterixConfLen);
+ localResources.put("cluster-config.xml", asterixConf);
+ //now add the libraries if there are any
+ try {
+ FileSystem fs = FileSystem.get(conf);
+ Path p = new Path(dfsBasePath, instanceConfPath + File.separator + "library" + Path.SEPARATOR);
+ if (fs.exists(p)) {
+ FileStatus[] dataverses = fs.listStatus(p);
+ for (FileStatus d : dataverses) {
+ if (!d.isDirectory())
+ throw new IOException("Library configuration directory structure is incorrect");
+ FileStatus[] libraries = fs.listStatus(d.getPath());
+ for (FileStatus l : libraries) {
+ if (l.isDirectory())
+ throw new IOException("Library configuration directory structure is incorrect");
+ LocalResource lr = Records.newRecord(LocalResource.class);
+ lr.setResource(ConverterUtils.getYarnUrlFromURI(l.getPath().toUri()));
+ lr.setSize(l.getLen());
+ lr.setTimestamp(l.getModificationTime());
+ lr.setType(LocalResourceType.ARCHIVE);
+ lr.setVisibility(LocalResourceVisibility.PRIVATE);
+ localResources.put("library" + Path.SEPARATOR + d.getPath().getName() + Path.SEPARATOR
+ + l.getPath().getName().split("\\.")[0], lr);
+ LOG.info("Found library: " + l.getPath().toString());
+ LOG.info(l.getPath().getName());
+ }
+ }
+ }
+ } catch (FileNotFoundException e) {
+ LOG.info("No external libraries present");
+ //do nothing, it just means there aren't libraries. that is possible and ok
+ // it should be handled by the fs.exists(p) check though.
+ }
+ LOG.info(localResources.values());
+
+ }
+
+ private void printUsage(Options opts) {
+ new HelpFormatter().printHelp("ApplicationMaster", opts);
+ }
+
+ /**
+ * Start the AM and request all necessary resources.
+ *
+ * @return True if the run fully succeeded, false otherwise.
+ * @throws YarnException
+ * @throws IOException
+ */
+ public boolean run() throws YarnException, IOException {
+ LOG.info("Starting ApplicationMaster");
+
+ AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
+ resourceManager = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
+ resourceManager.init(conf);
+ resourceManager.start();
+
+ containerListener = new NMCallbackHandler();
+ nmClientAsync = new NMClientAsyncImpl(containerListener);
+ nmClientAsync.init(conf);
+ nmClientAsync.start();
+
+ // Register self with ResourceManager
+ // This will start heartbeating to the RM
+ try {
+ appMasterHostname = InetAddress.getLocalHost().toString();
+ } catch (java.net.UnknownHostException uhe) {
+ appMasterHostname = uhe.toString();
+ }
+ RegisterApplicationMasterResponse response = resourceManager.registerApplicationMaster(appMasterHostname,
+ appMasterRpcPort, appMasterTrackingUrl);
+
+ // Dump out information about cluster capability as seen by the
+ // resource manager
+ int maxMem = response.getMaximumResourceCapability().getMemory();
+ LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
+
+ try {
+ requestResources(clusterDesc);
+ } catch (YarnException e) {
+ LOG.error("Could not allocate resources properly:" + e.getMessage());
+ done = true;
+ throw e;
+ }
+ //now we just sit and listen for messages from the RM
+
+ while (!done) {
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException ex) {
+ }
+ }
+ finish();
+ return success;
+ }
+
+ /**
+ * Clean up, whether or not we were successful.
+ */
+ private void finish() {
+ // Join all launched threads
+ // needed for when we time out
+ // and we need to release containers
+ for (Thread launchThread : launchThreads) {
+ try {
+ launchThread.join(10000);
+ } catch (InterruptedException e) {
+ LOG.info("Exception thrown in thread join: " + e.getMessage());
+ //from https://stackoverflow.com/questions/4812570/how-to-store-printstacktrace-into-a-string
+ StringWriter errors = new StringWriter();
+ e.printStackTrace(new PrintWriter(errors));
+ LOG.error(errors.toString());
+ }
+ }
+
+ // When the application completes, it should stop all running containers
+ LOG.info("Application completed. Stopping running containers");
+ nmClientAsync.stop();
+
+ // When the application completes, it should send a finish application
+ // signal to the RM
+ LOG.info("Application completed. Signalling finish to RM");
+
+ FinalApplicationStatus appStatus;
+ String appMessage = null;
+ success = true;
+ if (numFailedContainers.get() == 0 && numCompletedContainers.get() == numTotalContainers) {
+ appStatus = FinalApplicationStatus.SUCCEEDED;
+ } else {
+ appStatus = FinalApplicationStatus.FAILED;
+ appMessage = "Diagnostics." + ", total=" + numTotalContainers + ", completed="
+ + numCompletedContainers.get() + ", allocated=" + numAllocatedContainers.get() + ", failed="
+ + numFailedContainers.get();
+ success = false;
+ }
+ try {
+ resourceManager.unregisterApplicationMaster(appStatus, appMessage, null);
+ } catch (YarnException ex) {
+ LOG.error("Failed to unregister application", ex);
+ } catch (IOException e) {
+ LOG.error("Failed to unregister application", e);
+ }
+ done = true;
+ resourceManager.stop();
+ }
+
+ /**
+ * This handles the information that comes in from the RM while the AM
+ * is running.
+ */
+ private class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
+ public void onContainersCompleted(List<ContainerStatus> completedContainers) {
+ LOG.info("Got response from RM for container ask, completedCnt=" + completedContainers.size());
+ for (ContainerStatus containerStatus : completedContainers) {
+ LOG.info("Got container status for containerID=" + containerStatus.getContainerId() + ", state="
+ + containerStatus.getState() + ", exitStatus=" + containerStatus.getExitStatus()
+ + ", diagnostics=" + containerStatus.getDiagnostics());
+
+ // non complete containers should not be here
+ if(containerStatus.getState() != ContainerState.COMPLETE){
+ throw new IllegalStateException("Non-completed container given as completed by RM.");
+ }
+
+ // increment counters for completed/failed containers
+ int exitStatus = containerStatus.getExitStatus();
+ if (0 != exitStatus) {
+ // container failed
+ numCompletedContainers.incrementAndGet();
+ numFailedContainers.incrementAndGet();
+ } else {
+ // nothing to do
+ // container completed successfully
+ numCompletedContainers.incrementAndGet();
+ LOG.info("Container completed successfully." + ", containerId=" + containerStatus.getContainerId());
+ }
+ }
+ //stop infinite looping of run()
+ if (numCompletedContainers.get() + numFailedContainers.get() == numAllocatedContainers.get()
+ && doneAllocating)
+ done = true;
+ }
+
+ public void onContainersAllocated(List<Container> allocatedContainers) {
+ LOG.info("Got response from RM for container ask, allocatedCnt=" + allocatedContainers.size());
+ numAllocatedContainers.addAndGet(allocatedContainers.size());
+ for (Container allocatedContainer : allocatedContainers) {
+ synchronized(pendingNCs){
+ try {
+ if (!pendingNCs.contains(containerToNode(allocatedContainer, clusterDesc)) && ccUp.get()) {
+ nmClientAsync.stopContainerAsync(allocatedContainer.getId(), allocatedContainer.getNodeId());
+ continue;
+ }
+ } catch(UnknownHostException ex){
+ LOG.error("Unknown host allocated for us by RM- this shouldn't happen.", ex);
+ }
+ }
+ LOG.info("Launching shell command on a new container." + ", containerId=" + allocatedContainer.getId()
+ + ", containerNode=" + allocatedContainer.getNodeId().getHost() + ":"
+ + allocatedContainer.getNodeId().getPort() + ", containerNodeURI="
+ + allocatedContainer.getNodeHttpAddress() + ", containerResourceMemory"
+ + allocatedContainer.getResource().getMemory());
+
+ LaunchAsterixContainer runnableLaunchContainer = new LaunchAsterixContainer(allocatedContainer,
+ containerListener);
+ Thread launchThread = new Thread(runnableLaunchContainer, "Asterix CC/NC");
+
+ // I want to know if this node is the CC, because it must start before the NCs.
+ LOG.info("Allocated: " + allocatedContainer.getNodeId().getHost());
+ LOG.info("CC : " + cC.getId());
+ synchronized(pendingNCs){
+ try {
+ if (ccUp.get()) {
+ pendingNCs.remove(containerToNode(allocatedContainer, clusterDesc));
+ }
+ } catch(UnknownHostException ex){
+ LOG.error("Unknown host allocated for us by RM- this shouldn't happen.", ex);
+ }
+ }
+
+ if (containerIsCC(allocatedContainer)) {
+ ccUp.set(true);
+ }
+ // launch and start the container on a separate thread to keep
+ // the main thread unblocked
+ // as all containers may not be allocated at one go.
+ launchThreads.add(launchThread);
+ launchThread.start();
+ }
+ }
+
+ /**
+ * Ask the processes on the container to gracefully exit.
+ */
+ public void onShutdownRequest() {
+ LOG.info("AM shutting down per request");
+ done = true;
+ }
+
+ public void onNodesUpdated(List<NodeReport> updatedNodes) {
+ //TODO: This will become important when we deal with what happens if an NC dies
+ }
+
+ public float getProgress() {
+ //return half way because progress is basically meaningless for us
+ if (!doneAllocating) {
+ return 0.0f;
+ }
+ return (float) 0.5;
+ }
+
+ public void onError(Throwable arg0) {
+ LOG.error("Fatal Error recieved by AM: " + arg0);
+ done = true;
+ }
+ }
+
+ private class NMCallbackHandler implements NMClientAsync.CallbackHandler {
+
+ private ConcurrentMap<ContainerId, Container> containers = new ConcurrentHashMap<ContainerId, Container>();
+
+ public void addContainer(ContainerId containerId, Container container) {
+ containers.putIfAbsent(containerId, container);
+ }
+
+ public void onContainerStopped(ContainerId containerId) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Succeeded to stop Container " + containerId);
+ }
+ containers.remove(containerId);
+ }
+
+ public void onContainerStatusReceived(ContainerId containerId, ContainerStatus containerStatus) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Container Status: id=" + containerId + ", status=" + containerStatus);
+ }
+ }
+
+ public void onContainerStarted(ContainerId containerId, Map<String, ByteBuffer> allServiceResponse) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Succeeded to start Container " + containerId);
+ }
+ Container container = containers.get(containerId);
+ if (container != null) {
+ nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId());
+ }
+ }
+
+ public void onStartContainerError(ContainerId containerId, Throwable t) {
+ LOG.error("Failed to start Container " + containerId);
+ containers.remove(containerId);
+ }
+
+ public void onGetContainerStatusError(ContainerId containerId, Throwable t) {
+ LOG.error("Failed to query the status of Container " + containerId);
+ }
+
+ public void onStopContainerError(ContainerId containerId, Throwable t) {
+ LOG.error("Failed to stop Container " + containerId);
+ containers.remove(containerId);
+ }
+ }
+
+ /**
+ * Thread to connect to the {@link ContainerManagementProtocol} and launch the container
+ * that will execute the shell command.
+ */
+ private class LaunchAsterixContainer implements Runnable {
+
+ // Allocated container
+ final Container container;
+
+ final NMCallbackHandler containerListener;
+
+ /**
+ * @param lcontainer
+ * Allocated container
+ * @param containerListener
+ * Callback handler of the container
+ */
+ public LaunchAsterixContainer(Container lcontainer, NMCallbackHandler containerListener) {
+ this.container = lcontainer;
+ this.containerListener = containerListener;
+ }
+
+ /**
+ * Connects to CM, sets up container launch context
+ * for shell command and eventually dispatches the container
+ * start request to the CM.
+ */
+ public void run() {
+ LOG.info("Setting up container launch container for containerid=" + container.getId());
+ ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
+ // Set the local resources
+ ctx.setLocalResources(localResources);
+
+ //Set the env variables to be setup in the env where the application master will be run
+ LOG.info("Set the environment for the node");
+ Map<String, String> env = new HashMap<String, String>();
+
+ // Add AppMaster.jar location to classpath
+ // At some point we should not be required to add
+ // the hadoop specific classpaths to the env.
+ // It should be provided out of the box.
+ // For now setting all required classpaths including
+ // the classpath to "." for the application jar
+ StringBuilder classPathEnv = new StringBuilder(Environment.CLASSPATH.$()).append(File.pathSeparatorChar)
+ .append("." + File.pathSeparatorChar + "*");
+ for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+ YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
+ classPathEnv.append(File.pathSeparatorChar);
+ classPathEnv.append(c.trim());
+ }
+ classPathEnv.append('.').append(File.pathSeparatorChar).append("log4j.properties");
+
+ // add the runtime classpath needed for tests to work
+ if (conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
+ classPathEnv.append(System.getProperty("path.separator"));
+ classPathEnv.append(System.getProperty("java.class.path"));
+ env.put("HADOOP_CONF_DIR", System.getProperty("user.dir") + File.separator + "target" + File.separator);
+ }
+
+ env.put("CLASSPATH", classPathEnv.toString());
+
+ ctx.setEnvironment(env);
+ LOG.info(ctx.getEnvironment().toString());
+ List<String> startCmd = null;
+ if (obliterate) {
+ LOG.debug("AM in obliterate mode");
+ startCmd = produceObliterateCommand(container);
+ } else if (backup) {
+ startCmd = produceBackupCommand(container);
+ LOG.debug("AM in backup mode");
+ } else if (restore) {
+ startCmd = produceRestoreCommand(container);
+ LOG.debug("AM in restore mode");
+ } else {
+ startCmd = produceStartCmd(container);
+ }
+
+ if (startCmd == null || startCmd.size() == 0) {
+ LOG.fatal("Could not map one or more NCs to NM container hosts- aborting!");
+ return;
+ }
+
+ for (String s : startCmd) {
+ LOG.info("Command to execute: " + s);
+ }
+ ctx.setCommands(startCmd);
+ containerListener.addContainer(container.getId(), container);
+ //finally start the container!?
+ nmClientAsync.startContainerAsync(container, ctx);
+ }
+
+ /**
+ * Determines for a given container what the necessary command line
+ * arguments are to start the Asterix processes on that instance
+ *
+ * @param container
+ * The container to produce the commands for
+ * @return A list of the commands that should be executed
+ */
+ private List<String> produceStartCmd(Container container) {
+ List<String> commands = new ArrayList<String>();
+ // Set the necessary command to execute on the allocated container
+ List<CharSequence> vargs = new ArrayList<CharSequence>(5);
+
+ vargs.add(JAVA_HOME + File.separator + "bin" + File.separator + "java");
+ vargs.add("-classpath " + '\'' + ASTERIX_ZIP_NAME + File.separator + "repo" + File.separator + "*\'");
+ vargs.add("-Dapp.repo=" + ASTERIX_ZIP_NAME + File.separator + "repo" + File.separator);
+ //first see if this node is the CC
+ if (containerIsCC(container) && (ccStarted.get() == false)) {
+ LOG.info("CC found on container" + container.getNodeId().getHost());
+ //get our java opts
+ vargs.add(ccJavaOpts);
+ vargs.add(CC_CLASSNAME);
+ vargs.add("-app-cc-main-class edu.uci.ics.asterix.hyracks.bootstrap.CCApplicationEntryPoint");
+ vargs.add("-cluster-net-ip-address " + cC.getClusterIp());
+ vargs.add("-client-net-ip-address " + cC.getClientIp());
+ ccStarted.set(true);
+
+ } else {
+ //now we need to know what node we are on, so we can apply the correct properties
+
+ Node local;
+ try {
+ local = containerToNode(container, clusterDesc);
+ LOG.info("Attempting to start NC on host " + local.getId());
+ String iodevice = local.getIodevices();
+ if (iodevice == null) {
+ iodevice = clusterDesc.getIodevices();
+ }
+ String storageSuffix = local.getStore() == null ? clusterDesc.getStore() : local.getStore();
+ String storagePath = iodevice + File.separator + storageSuffix;
+ vargs.add(ncJavaOpts);
+ vargs.add(NC_CLASSNAME);
+ vargs.add("-app-nc-main-class edu.uci.ics.asterix.hyracks.bootstrap.NCApplicationEntryPoint");
+ vargs.add("-node-id " + local.getId());
+ vargs.add("-cc-host " + cC.getClusterIp());
+ vargs.add("-iodevices " + storagePath);
+ vargs.add("-cluster-net-ip-address " + local.getClusterIp());
+ vargs.add("-data-ip-address " + local.getClusterIp());
+ vargs.add("-result-ip-address " + local.getClusterIp());
+ vargs.add("--");
+ if(initial){
+ vargs.add("-initial-run ");
+ }
+ } catch (UnknownHostException e) {
+ LOG.error("Unable to find NC or CC configured for host: " + container.getId() + " " + e);
+ }
+ }
+
+ // Add log redirect params
+ vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + File.separator + "stdout");
+ vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + File.separator + "stderr");
+
+ // Get final commmand
+ StringBuilder command = new StringBuilder();
+ for (CharSequence str : vargs) {
+ command.append(str).append(" ");
+ }
+ commands.add(command.toString());
+ LOG.error(Arrays.toString(commands.toArray()));
+ return commands;
+ }
+
+ private List<String> produceObliterateCommand(Container container) {
+ //if this container has no NCs on it, nothing will be there to delete.
+ Node local = null;
+ List<String> iodevices = null;
+ try {
+ local = containerToNode(container, clusterDesc);
+ if (local.getIodevices() == null) {
+ iodevices = Arrays.asList(clusterDesc.getIodevices().split(",", -1));
+ } else {
+ iodevices = Arrays.asList(local.getIodevices().split(",", -1));
+ }
+ } catch (UnknownHostException e) {
+ //we expect this may happen for the CC if it isn't colocated with an NC. otherwise it is not suppose to happen.
+ if (!containerIsCC(container)) {
+ LOG.error("Unable to find NC configured for host: " + container.getId() + e);
+ return null;
+ }
+ else {
+ return Arrays.asList("");
+ }
+ }
+ StringBuilder classPathEnv = new StringBuilder("").append("*");
+ classPathEnv.append(File.pathSeparatorChar).append("log4j.properties");
+
+ List<String> commands = new ArrayList<String>();
+ Vector<CharSequence> vargs = new Vector<CharSequence>(5);
+ vargs.add(JAVA_HOME + File.separator + "bin" + File.separator + "java");
+ vargs.add("-cp " + classPathEnv.toString());
+ vargs.add(OBLITERATOR_CLASSNAME);
+ for (String s : iodevices) {
+ vargs.add(s + File.separator + clusterDesc.getStore());
+ LOG.debug("Deleting from: " + s);
+ //logs only exist on 1st iodevice
+ if (iodevices.indexOf(s) == 0) {
+ vargs.add(clusterDesc.getTxnLogDir() + "txnLogs" + File.separator);
+ LOG.debug("Deleting logs from: " + clusterDesc.getTxnLogDir());
+ }
+ }
+ vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + File.separator + "stdout");
+ vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + File.separator + "stderr");
+ StringBuilder command = new StringBuilder();
+ for (CharSequence str : vargs) {
+ command.append(str).append(" ");
+ }
+ commands.add(command.toString());
+ return commands;
+ }
+
+ private List<String> produceBackupCommand(Container container) {
+ Node local = null;
+ List<String> iodevices = null;
+ try {
+ local = containerToNode(container, clusterDesc);
+ if (local.getIodevices() == null) {
+ iodevices = Arrays.asList(clusterDesc.getIodevices().split(",", -1));
+ } else {
+ iodevices = Arrays.asList(local.getIodevices().split(",", -1));
+ }
+ } catch (UnknownHostException e) {
+ //we expect this may happen for the CC if it isn't colocated with an NC. otherwise it is not suppose to happen.
+ if (!containerIsCC(container)) {
+ LOG.error("Unable to find NC configured for host: " + container.getId() + e);
+ return null;
+ }else {
+ return Arrays.asList("");
+ }
+ }
+ StringBuilder classPathEnv = new StringBuilder("").append("." + File.separator + "*");
+ for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+ YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
+ classPathEnv.append(File.pathSeparatorChar);
+ classPathEnv.append(c.trim());
+ }
+ classPathEnv.append(File.pathSeparatorChar).append("." + File.separator + "log4j.properties");
+
+ List<String> commands = new ArrayList<String>();
+ Vector<CharSequence> vargs = new Vector<CharSequence>(5);
+ vargs.add(JAVA_HOME + File.separator + "bin" + File.separator + "java");
+ vargs.add("-cp " + classPathEnv.toString());
+ vargs.add(HDFS_BACKUP_CLASSNAME);
+ vargs.add("-backup");
+
+ String dstBase = instanceConfPath + "backups" + Path.SEPARATOR + backupTimestamp + Path.SEPARATOR
+ + local.getId();
+ try {
+ createBackupFolder(dstBase);
+ } catch (IOException e) {
+ //something very bad happened- return null to cause attempt to abort
+ return null;
+ }
+ for (String s : iodevices) {
+ List<String> ioComponents = Arrays.asList(s.split("\\/"));
+ StringBuilder dst = new StringBuilder().append(dstBase);
+ for (String io : ioComponents) {
+ dst.append(io);
+ if (ioComponents.indexOf(io) != ioComponents.size() - 1) {
+ dst.append("_");
+ }
+ }
+ dst.append(Path.SEPARATOR);
+ vargs.add(s + File.separator + clusterDesc.getStore() + "," + dst);
+ LOG.debug("Backing up from: " + s);
+ //logs only exist on 1st iodevice
+ if (iodevices.indexOf(s) == 0) {
+ LOG.debug("Backing up logs from: " + clusterDesc.getTxnLogDir());
+ vargs.add(clusterDesc.getTxnLogDir() + "txnLogs" + File.separator + "," + dst);
+ }
+ }
+ LOG.debug("Backing up to: " + instanceConfPath + "backups" + Path.SEPARATOR + local.getId());
+
+ vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + File.separator + "stdout");
+ vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + File.separator + "stderr");
+ StringBuilder command = new StringBuilder();
+ for (CharSequence str : vargs) {
+ command.append(str).append(" ");
+ }
+ commands.add(command.toString());
+ return commands;
+ }
+
+ private void createBackupFolder(String path) throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ Path backupFolder = new Path(path);
+ fs.mkdirs(backupFolder);
+ }
+
+ private List<String> produceRestoreCommand(Container container) {
+ if (containerIsCC(container)) {
+ List<String> blank = new ArrayList<String>();
+ blank.add("");
+ return blank;
+ }
+ Node local = null;
+ List<String> iodevices = null;
+ try {
+ local = containerToNode(container, clusterDesc);
+ if (local.getIodevices() == null) {
+ iodevices = Arrays.asList(clusterDesc.getIodevices().split(",", -1));
+ } else {
+ iodevices = Arrays.asList(local.getIodevices().split(",", -1));
+ }
+ } catch (UnknownHostException e) {
+ //we expect this may happen for the CC if it isn't colocated with an NC. otherwise it is not suppose to happen.
+ if (!containerIsCC(container)) {
+ LOG.error("Unable to find NC configured for host: " + container.getId() + e);
+ return null;
+ } else {
+ return Arrays.asList("");
+ }
+ }
+ StringBuilder classPathEnv = new StringBuilder("").append("." + File.separator + "*");
+ for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+ YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
+ classPathEnv.append(File.pathSeparatorChar);
+ classPathEnv.append(c.trim());
+ }
+ classPathEnv.append(File.pathSeparatorChar).append("." + File.separator + "log4j.properties");
+
+ List<String> commands = new ArrayList<String>();
+ Vector<CharSequence> vargs = new Vector<CharSequence>(5);
+ vargs.add(JAVA_HOME + File.separator + "bin" + File.separator + "java");
+ vargs.add("-cp " + classPathEnv.toString());
+ vargs.add(HDFS_BACKUP_CLASSNAME);
+ vargs.add("-restore");
+ String srcBase = instanceConfPath + "backups" + Path.SEPARATOR + Long.parseLong(snapName) + Path.SEPARATOR
+ + local.getId();
+ for (String s : iodevices) {
+ List<String> ioComponents = Arrays.asList(s.split("\\/"));
+ StringBuilder src = new StringBuilder().append(srcBase);
+ for (String io : ioComponents) {
+ src.append(io);
+ if (ioComponents.indexOf(io) != ioComponents.size() - 1) {
+ src.append("_");
+ }
+ }
+ src.append(Path.SEPARATOR);
+ try {
+ FileSystem fs = FileSystem.get(conf);
+ FileStatus[] backups = fs.listStatus(new Path(src.toString()));
+ for (FileStatus b : backups) {
+ if (!b.getPath().toString().contains("txnLogs")
+ && !b.getPath().toString().contains(File.separator + "asterix_root_metadata")) {
+ vargs.add(b.getPath() + "," + s + File.separator + clusterDesc.getStore());
+ }
+ }
+ } catch (IOException e) {
+ LOG.error("Could not stat backup directory in DFS");
+ }
+ vargs.add(src + "," + s + clusterDesc.getStore());
+ LOG.debug("Restoring from: " + s);
+ //logs only exist on 1st iodevice
+ if (iodevices.indexOf(s) == 0) {
+ vargs.add(src + "txnLogs" + File.separator + "," + clusterDesc.getTxnLogDir() + File.separator);
+
+ LOG.debug("Restoring logs from: " + clusterDesc.getTxnLogDir());
+ }
+ }
+ LOG.debug("Restoring to: " + instanceConfPath + "backups" + Path.SEPARATOR + local.getId());
+
+ vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + File.separator + "stdout");
+ vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + File.separator + "stderr");
+ StringBuilder command = new StringBuilder();
+ for (CharSequence str : vargs) {
+ command.append(str).append(" ");
+ }
+ commands.add(command.toString());
+ return commands;
+ }
+
+ }
+}
diff --git a/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/AsterixYARNClient.java b/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/AsterixYARNClient.java
new file mode 100644
index 0000000..63b85e6
--- /dev/null
+++ b/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/AsterixYARNClient.java
@@ -0,0 +1,1387 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.aoya;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
+import java.util.regex.Pattern;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Marshaller;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.service.Service.STATE;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+
+import com.google.common.collect.ImmutableMap;
+
+import edu.uci.ics.asterix.common.configuration.AsterixConfiguration;
+import edu.uci.ics.asterix.common.configuration.Coredump;
+import edu.uci.ics.asterix.common.configuration.Store;
+import edu.uci.ics.asterix.common.configuration.TransactionLogDir;
+import edu.uci.ics.asterix.event.schema.yarnCluster.Cluster;
+import edu.uci.ics.asterix.event.schema.yarnCluster.Node;
+
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class AsterixYARNClient {
+
+ public static enum Mode {
+ INSTALL("install"),
+ START("start"),
+ STOP("stop"),
+ KILL("kill"),
+ DESTROY("destroy"),
+ ALTER("alter"),
+ LIBINSTALL("libinstall"),
+ DESCRIBE("describe"),
+ BACKUP("backup"),
+ LSBACKUP("lsbackups"),
+ RMBACKUP("rmbackup"),
+ RESTORE("restore"),
+ NOOP("");
+
+ public final String alias;
+
+ Mode(String alias) {
+ this.alias = alias;
+ }
+
+ public static Mode fromAlias(String a) {
+ return STRING_TO_MODE.get(a.toLowerCase());
+ }
+ }
+
+ public static final Map<String, AsterixYARNClient.Mode> STRING_TO_MODE = ImmutableMap
+ .<String, AsterixYARNClient.Mode> builder().put(Mode.INSTALL.alias, Mode.INSTALL)
+ .put(Mode.START.alias, Mode.START).put(Mode.STOP.alias, Mode.STOP).put(Mode.KILL.alias, Mode.KILL)
+ .put(Mode.DESTROY.alias, Mode.DESTROY).put(Mode.ALTER.alias, Mode.ALTER)
+ .put(Mode.LIBINSTALL.alias, Mode.LIBINSTALL).put(Mode.DESCRIBE.alias, Mode.DESCRIBE)
+ .put(Mode.BACKUP.alias, Mode.BACKUP).put(Mode.LSBACKUP.alias, Mode.LSBACKUP)
+ .put(Mode.RMBACKUP.alias, Mode.RMBACKUP).put(Mode.RESTORE.alias, Mode.RESTORE).build();
+ private static final Log LOG = LogFactory.getLog(AsterixYARNClient.class);
+ public static final String CONF_DIR_REL = ".asterix" + File.separator;
+ private static final String instanceLock = "instance";
+ public static final String CONFIG_DEFAULT_NAME = "cluster-config.xml";
+ public static final String PARAMS_DEFAULT_NAME = "asterix-configuration.xml";
+ private static String DEFAULT_PARAMETERS_PATH = "conf" + File.separator + "base-asterix-configuration.xml";
+ private static String MERGED_PARAMETERS_PATH = "conf" + File.separator + PARAMS_DEFAULT_NAME;
+ private static final String JAVA_HOME = System.getProperty("java.home");
+ public static final String NC_JAVA_OPTS_KEY = "nc.java.opts";
+ public static final String CC_JAVA_OPTS_KEY = "cc.java.opts";
+ public static final String CC_REST_PORT_KEY = "api.port";
+ private Mode mode = Mode.NOOP;
+
+ // Hadoop Configuration
+ private Configuration conf;
+ private YarnClient yarnClient;
+ // Application master specific info to register a new Application with
+ // RM/ASM
+ private String appName = "";
+ // App master priority
+ private int amPriority = 0;
+ // Queue for App master
+ private String amQueue = "";
+ // Amt. of memory resource to request for to run the App Master
+ private int amMemory = 1000;
+
+ // Main class to invoke application master
+ private final String appMasterMainClass = "edu.uci.ics.asterix.aoya.AsterixApplicationMaster";
+
+ //instance name
+ private String instanceName = "";
+ //location of distributable AsterixDB zip
+ private String asterixZip = "";
+ // Location of cluster configuration
+ private String asterixConf = "";
+ // Location of optional external libraries
+ private String extLibs = "";
+
+ private String instanceFolder = "";
+
+ // log4j.properties file
+ // if available, add to local resources and set into classpath
+ private String log4jPropFile = "";
+
+ // Debug flag
+ boolean debugFlag = false;
+ private boolean refresh = false;
+ private boolean force = false;
+
+ // Command line options
+ private Options opts;
+ private String libDataverse;
+ private String snapName = "";
+ private String baseConfig = ".";
+ private String ccJavaOpts = "";
+ private String ncJavaOpts = "";
+
+ //Ports
+ private int ccRestPort = 19002;
+
+ /**
+ * @param args
+ * Command line arguments
+ */
+ public static void main(String[] args) {
+
+ try {
+ AsterixYARNClient client = new AsterixYARNClient();
+ try {
+ client.init(args);
+ AsterixYARNClient.execute(client);
+ } catch (ParseException | ApplicationNotFoundException e) {
+ LOG.fatal(e);
+ client.printUsage();
+ System.exit(-1);
+ }
+ } catch (Exception e) {
+ LOG.fatal("Error running client", e);
+ System.exit(1);
+ }
+ LOG.info("Command executed successfully.");
+ System.exit(0);
+ }
+
+ public static void execute(AsterixYARNClient client) throws IOException, YarnException {
+ YarnClientApplication app;
+ List<DFSResourceCoordinate> res;
+
+ System.out.println("JAVA HOME: " + JAVA_HOME);
+ switch (client.mode) {
+ case START:
+ startAction(client);
+ break;
+ case STOP:
+ try {
+ client.stopInstance();
+ } catch (ApplicationNotFoundException e) {
+ LOG.info(e);
+ System.out.println("Asterix instance by that name already exited or was never started");
+ client.deleteLockFile();
+ }
+ break;
+ case KILL:
+ if (client.isRunning() &&
+ Utils.confirmAction("Are you sure you want to kill this instance? In-progress tasks will be aborted")) {
+ try {
+ AsterixYARNClient.killApplication(client.getLockFile(), client.yarnClient);
+ } catch (ApplicationNotFoundException e) {
+ LOG.info(e);
+ System.out.println("Asterix instance by that name already exited or was never started");
+ client.deleteLockFile();
+ }
+ }
+ else if(!client.isRunning()){
+ System.out.println("Asterix instance by that name already exited or was never started");
+ client.deleteLockFile();
+ }
+ break;
+ case DESCRIBE:
+ Utils.listInstances(client.conf, CONF_DIR_REL);
+ break;
+ case INSTALL:
+ installAction(client);
+ break;
+ case LIBINSTALL:
+ client.installExtLibs();
+ break;
+ case ALTER:
+ client.writeAsterixConfig(Utils.parseYarnClusterConfig(client.asterixConf));
+ client.installAsterixConfig(true);
+ System.out.println("Configuration successfully modified");
+ break;
+ case DESTROY:
+ try {
+ if (client.force
+ || Utils.confirmAction("Are you really sure you want to obliterate this instance? This action cannot be undone!")) {
+ app = client.makeApplicationContext();
+ res = client.deployConfig();
+ res.addAll(client.distributeBinaries());
+ client.removeInstance(app, res);
+ }
+ } catch (YarnException | IOException e) {
+ LOG.error("Asterix failed to deploy on to cluster");
+ throw e;
+ }
+ break;
+ case BACKUP:
+ if (client.force || Utils.confirmAction("Performing a backup will stop a running instance.")) {
+ app = client.makeApplicationContext();
+ res = client.deployConfig();
+ res.addAll(client.distributeBinaries());
+ client.backupInstance(app, res);
+ }
+ break;
+ case LSBACKUP:
+ Utils.listBackups(client.conf, CONF_DIR_REL, client.instanceName);
+ break;
+ case RMBACKUP:
+ Utils.rmBackup(client.conf, CONF_DIR_REL, client.instanceName, Long.parseLong(client.snapName));
+ break;
+ case RESTORE:
+ if (client.force || Utils.confirmAction("Performing a restore will stop a running instance.")) {
+ app = client.makeApplicationContext();
+ res = client.deployConfig();
+ res.addAll(client.distributeBinaries());
+ client.restoreInstance(app, res);
+ }
+ break;
+ default:
+ LOG.fatal("Unknown mode. Known client modes are: start, stop, install, describe, kill, destroy, describe, backup, restore, lsbackup, rmbackup");
+ client.printUsage();
+ System.exit(-1);
+ }
+ }
+
+ private static void startAction(AsterixYARNClient client) throws YarnException {
+ YarnClientApplication app;
+ List<DFSResourceCoordinate> res;
+ ApplicationId appId;
+ try {
+ app = client.makeApplicationContext();
+ res = client.deployConfig();
+ res.addAll(client.distributeBinaries());
+ appId = client.deployAM(app, res, client.mode);
+ LOG.info("Asterix started up with Application ID: " + appId.toString());
+ if (Utils.waitForLiveness(appId, "Waiting for AsterixDB instance to resume ", client.yarnClient,
+ client.instanceName, client.conf, client.ccRestPort)) {
+ System.out.println("Asterix successfully deployed and is now running.");
+ } else {
+ LOG.fatal("AsterixDB appears to have failed to install and start");
+ throw new YarnException("AsterixDB appears to have failed to install and start");
+ }
+ } catch (IOException e) {
+ throw new YarnException(e);
+ }
+ }
+
+ private static void installAction(AsterixYARNClient client) throws YarnException {
+ YarnClientApplication app;
+ List<DFSResourceCoordinate> res;
+ ApplicationId appId;
+ try {
+ app = client.makeApplicationContext();
+ client.installConfig();
+ client.writeAsterixConfig(Utils.parseYarnClusterConfig(client.asterixConf));
+ client.installAsterixConfig(false);
+ res = client.deployConfig();
+ res.addAll(client.distributeBinaries());
+
+ appId = client.deployAM(app, res, client.mode);
+ LOG.info("Asterix started up with Application ID: " + appId.toString());
+ if (Utils.waitForLiveness(appId, "Waiting for new AsterixDB Instance to start ", client.yarnClient,
+ client.instanceName, client.conf, client.ccRestPort)) {
+ System.out.println("Asterix successfully deployed and is now running.");
+ } else {
+ LOG.fatal("AsterixDB appears to have failed to install and start");
+ throw new YarnException("AsterixDB appears to have failed to install and start");
+ }
+ } catch (IOException e) {
+ LOG.fatal("Asterix failed to deploy on to cluster");
+ throw new YarnException(e);
+ }
+ }
+
+ public AsterixYARNClient(Configuration conf) throws Exception {
+
+ this.conf = conf;
+ yarnClient = YarnClient.createYarnClient();
+ //If the HDFS jars aren't on the classpath this won't be set
+ if (conf.get("fs.hdfs.impl", null) == conf.get("fs.file.impl", null)) { //only would happen if both are null
+ conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
+ conf.set("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem");
+ }
+ yarnClient.init(conf);
+ opts = parseConf(conf);
+ }
+
+ private static Options parseConf(Configuration conf) {
+ Options opts = new Options();
+ opts.addOption(new Option("appname", true, "Application Name. Default value - Asterix"));
+ opts.addOption(new Option("priority", true, "Application Priority. Default 0"));
+ opts.addOption(new Option("queue", true, "RM Queue in which this application is to be submitted"));
+ opts.addOption(new Option("master_memory", true,
+ "Amount of memory in MB to be requested to run the application master"));
+ opts.addOption(new Option("log_properties", true, "log4j.properties file"));
+ opts.addOption(new Option("n", "name", true, "Asterix instance name (required)"));
+ opts.addOption(new Option("zip", "asterixZip", true,
+ "zip file with AsterixDB inside- if in non-default location"));
+ opts.addOption(new Option("bc", "baseConfig", true,
+ "base Asterix parameters configuration file if not in default position"));
+ opts.addOption(new Option("c", "asterixConf", true, "Asterix cluster config (required on install)"));
+ opts.addOption(new Option("l", "externalLibs", true, "Libraries to deploy along with Asterix instance"));
+ opts.addOption(new Option("ld", "libDataverse", true, "Dataverse to deploy external libraries to"));
+ opts.addOption(new Option("r", "refresh", false,
+ "If starting an existing instance, this will replace them with the local copy on startup"));
+ opts.addOption(new Option("appId", true, "ApplicationID to monitor if running client in status monitor mode"));
+ opts.addOption(new Option("masterLibsDir", true, "Directory that contains the JARs needed to run the AM"));
+ opts.addOption(new Option("s", "snapshot", true,
+ "Backup timestamp for arguments requiring a specific backup (rm, restore)"));
+ opts.addOption(new Option("v", "debug", false, "Dump out debug information"));
+ opts.addOption(new Option("help", false, "Print usage"));
+ opts.addOption(new Option("f", "force", false,
+ "Execute this command as fully as possible, disregarding any caution"));
+ return opts;
+ }
+
+ /**
+ */
+ public AsterixYARNClient() throws Exception {
+ this(new YarnConfiguration());
+ }
+
+ /**
+ * Helper function to print out usage
+ */
+ private void printUsage() {
+ new HelpFormatter().printHelp("Asterix YARN client. Usage: asterix [options] [mode]", opts);
+ }
+
+ /**
+ * Initialize the client's arguments and parameters before execution.
+ *
+ * @param args
+ * - Standard command-line arguments.
+ * @throws ParseException
+ */
+ public void init(String[] args) throws ParseException {
+
+ CommandLine cliParser = new GnuParser().parse(opts, args);
+ if (cliParser.hasOption("help")) {
+ printUsage();
+ return;
+ }
+ //initialize most things
+ debugFlag = cliParser.hasOption("debug");
+ force = cliParser.hasOption("force");
+ baseConfig = cliParser.getOptionValue("baseConfig");
+ extLibs = cliParser.getOptionValue("externalLibs");
+ libDataverse = cliParser.getOptionValue("libDataverse");
+
+ appName = cliParser.getOptionValue("appname", "AsterixDB");
+ amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0"));
+ amQueue = cliParser.getOptionValue("queue", "default");
+ amMemory = Integer.parseInt(cliParser.getOptionValue("master_memory", "10"));
+
+ instanceName = cliParser.getOptionValue("name");
+ instanceFolder = instanceName + '/';
+ appName = appName + ": " + instanceName;
+
+ asterixConf = cliParser.getOptionValue("asterixConf");
+
+ log4jPropFile = cliParser.getOptionValue("log_properties", "");
+
+ //see if the given argument values are sane in general
+ checkConfSanity(args, cliParser);
+
+ //intialize the mode, see if it is a valid one.
+ initMode(args, cliParser);
+
+ //now check the validity of the arguments given the mode
+ checkModeSanity(args, cliParser);
+
+ //if we are going to refresh the binaries, find that out
+ refresh = cliParser.hasOption("refresh");
+ //same goes for snapshot restoration/removal
+ snapName = cliParser.getOptionValue("snapshot");
+
+ if (!cliParser.hasOption("asterixZip")
+ && (mode == Mode.INSTALL || mode == Mode.ALTER || mode == Mode.DESTROY || mode == Mode.BACKUP)) {
+
+ asterixZip = cliParser.getOptionValue("asterixZip", getAsterixDistributableLocation().getAbsolutePath());
+ } else {
+ asterixZip = cliParser.getOptionValue("asterixZip");
+ }
+
+ }
+
+ /**
+ * Cursory sanity checks for argument sanity, without considering the mode of the client
+ *
+ * @param args
+ * @param cliParser
+ * The parsed arguments.
+ * @throws ParseException
+ */
+ private void checkConfSanity(String[] args, CommandLine cliParser) throws ParseException {
+ String message = null;
+
+ //Sanity check for no args
+ if (args.length == 0) {
+ message = "No args specified for client to initialize";
+ }
+ //AM memory should be a sane value
+ else if (amMemory < 0) {
+ message = "Invalid memory specified for application master, exiting." + " Specified memory=" + amMemory;
+ }
+ //we're good!
+ else {
+ return;
+ }
+ //default:
+ throw new ParseException(message);
+
+ }
+
+ /**
+ * Initialize the mode of the client from the arguments.
+ *
+ * @param args
+ * @param cliParser
+ * @throws ParseException
+ */
+ private void initMode(String[] args, CommandLine cliParser) throws ParseException {
+ @SuppressWarnings("unchecked")
+ List<String> clientVerb = cliParser.getArgList();
+ String message = null;
+ //Now check if there is a mode
+ if (clientVerb == null || clientVerb.size() < 1) {
+ message = "You must specify an action.";
+ }
+ //But there can only be one mode...
+ else if (clientVerb.size() > 1) {
+ message = "Trailing arguments, or too many arguments. Only one action may be performed at a time.";
+ }
+ if (message != null) {
+ throw new ParseException(message);
+ }
+ //Now we can initialize the mode and check it against parameters
+ mode = Mode.fromAlias(clientVerb.get(0));
+ if (mode == null) {
+ mode = Mode.NOOP;
+ }
+ }
+
+ /**
+ * Determine if the command line arguments are sufficient for the requested client mode.
+ *
+ * @param args
+ * The command line arguments.
+ * @param cliParser
+ * Parsed command line arguments.
+ * @throws ParseException
+ */
+
+ private void checkModeSanity(String[] args, CommandLine cliParser) throws ParseException {
+
+ String message = null;
+ //The only time you can use the client without specifiying an instance, is to list all of the instances it sees.
+ if (!cliParser.hasOption("name") && mode != Mode.DESCRIBE) {
+ message = "You must give a name for the instance to be acted upon";
+ } else if (mode == Mode.INSTALL && !cliParser.hasOption("asterixConf")) {
+ message = "No Configuration XML given. Please specify a config for cluster installation";
+ } else if (mode != Mode.START && cliParser.hasOption("refresh")) {
+ message = "Cannot specify refresh in any mode besides start, mode is: " + mode;
+ } else if (cliParser.hasOption("snapshot") && !(mode == Mode.RESTORE || mode == Mode.RMBACKUP)) {
+ message = "Cannot specify a snapshot to restore in any mode besides restore or rmbackup, mode is: " + mode;
+ } else if ((mode == Mode.ALTER || mode == Mode.INSTALL) && baseConfig == null
+ && !(new File(DEFAULT_PARAMETERS_PATH).exists())) {
+ message = "Default asterix parameters file is not in the default location, and no custom location is specified";
+ }
+ //nothing is wrong, so exit
+ else {
+ return;
+ }
+ //otherwise, something is bad.
+ throw new ParseException(message);
+
+ }
+
+ /**
+ * Find the distributable asterix bundle, be it in the default location or in a user-specified location.
+ *
+ * @return
+ */
+ private File getAsterixDistributableLocation() {
+ //Look in the PWD for the "asterix" folder
+ File tarDir = new File("asterix");
+ if (!tarDir.exists()) {
+ throw new IllegalArgumentException(
+ "Default directory structure not in use- please specify an asterix zip and base config file to distribute");
+ }
+ FileFilter tarFilter = new WildcardFileFilter("asterix-server*.zip");
+ File[] tarFiles = tarDir.listFiles(tarFilter);
+ if (tarFiles.length != 1) {
+ throw new IllegalArgumentException(
+ "There is more than one canonically named asterix distributable in the default directory. Please leave only one there.");
+ }
+ return tarFiles[0];
+ }
+
+ /**
+ * Initialize and register the application attempt with the YARN ResourceManager.
+ *
+ * @return
+ * @throws IOException
+ * @throws YarnException
+ */
+ public YarnClientApplication makeApplicationContext() throws IOException, YarnException {
+
+ //first check to see if an instance already exists.
+ FileSystem fs = FileSystem.get(conf);
+ Path lock = new Path(fs.getHomeDirectory(), CONF_DIR_REL + instanceFolder + instanceLock);
+ LOG.info("Running Deployment");
+ yarnClient.start();
+ if (fs.exists(lock)) {
+ ApplicationId lockAppId = getLockFile();
+ try {
+ ApplicationReport previousAppReport = yarnClient.getApplicationReport(lockAppId);
+ YarnApplicationState prevStatus = previousAppReport.getYarnApplicationState();
+ if (!(prevStatus == YarnApplicationState.FAILED || prevStatus == YarnApplicationState.KILLED || prevStatus == YarnApplicationState.FINISHED)
+ && mode != Mode.DESTROY && mode != Mode.BACKUP && mode != Mode.RESTORE) {
+ throw new IllegalStateException("Instance is already running in: " + lockAppId);
+ } else if (mode != Mode.DESTROY && mode != Mode.BACKUP && mode != Mode.RESTORE) {
+ //stale lock file
+ LOG.warn("Stale lockfile detected. Instance attempt " + lockAppId + " may have exited abnormally");
+ deleteLockFile();
+ }
+ } catch (YarnException e) {
+ LOG.warn("Stale lockfile detected, but the RM has no record of this application's last run. This is normal if the cluster was restarted.");
+ deleteLockFile();
+ }
+ }
+
+ // Get a new application id
+ YarnClientApplication app = yarnClient.createApplication();
+ GetNewApplicationResponse appResponse = app.getNewApplicationResponse();
+ int maxMem = appResponse.getMaximumResourceCapability().getMemory();
+ LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
+
+ // A resource ask cannot exceed the max.
+ if (amMemory > maxMem) {
+ LOG.info("AM memory specified above max threshold of cluster. Using max value." + ", specified=" + amMemory
+ + ", max=" + maxMem);
+ amMemory = maxMem;
+ }
+
+ // set the application name
+ ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
+ appContext.setApplicationName(appName);
+
+ return app;
+ }
+
+ /**
+ * Upload the Asterix cluster description on to the DFS. This will persist the state of the instance.
+ *
+ * @return
+ * @throws YarnException
+ * @throws IOException
+ */
+ private List<DFSResourceCoordinate> deployConfig() throws YarnException, IOException {
+
+ FileSystem fs = FileSystem.get(conf);
+ List<DFSResourceCoordinate> resources = new ArrayList<DFSResourceCoordinate>(2);
+
+ String pathSuffix = CONF_DIR_REL + instanceFolder + CONFIG_DEFAULT_NAME;
+ Path dstConf = new Path(fs.getHomeDirectory(), pathSuffix);
+ FileStatus destStatus;
+ try {
+ destStatus = fs.getFileStatus(dstConf);
+ } catch (IOException e) {
+ throw new YarnException("Asterix instance by that name does not appear to exist in DFS");
+ }
+ LocalResource asterixConfLoc = Records.newRecord(LocalResource.class);
+ asterixConfLoc.setType(LocalResourceType.FILE);
+ asterixConfLoc.setVisibility(LocalResourceVisibility.PRIVATE);
+ asterixConfLoc.setResource(ConverterUtils.getYarnUrlFromPath(dstConf));
+ asterixConfLoc.setTimestamp(destStatus.getModificationTime());
+
+ DFSResourceCoordinate conf = new DFSResourceCoordinate();
+ conf.envs.put(dstConf.toUri().toString(), AConstants.CONFLOCATION);
+ conf.envs.put(Long.toString(asterixConfLoc.getSize()), AConstants.CONFLEN);
+ conf.envs.put(Long.toString(asterixConfLoc.getTimestamp()), AConstants.CONFTIMESTAMP);
+ conf.name = CONFIG_DEFAULT_NAME;
+ conf.res = asterixConfLoc;
+ resources.add(conf);
+
+ return resources;
+
+ }
+
+ /**
+ * Install the current Asterix parameters to the DFS. This can be modified via alter.
+ *
+ * @throws YarnException
+ * @throws IOException
+ */
+ private void installConfig() throws YarnException, IOException {
+ FileSystem fs = FileSystem.get(conf);
+ String pathSuffix = CONF_DIR_REL + instanceFolder + CONFIG_DEFAULT_NAME;
+ Path dstConf = new Path(fs.getHomeDirectory(), pathSuffix);
+ try {
+ fs.getFileStatus(dstConf);
+ if (mode == Mode.INSTALL) {
+ throw new IllegalStateException("Instance with this name already exists.");
+ }
+ } catch (FileNotFoundException e) {
+ if (mode == Mode.START) {
+ throw new IllegalStateException("Instance does not exist for this user", e);
+ }
+ }
+ if (mode == Mode.INSTALL) {
+ Path src = new Path(asterixConf);
+ fs.copyFromLocalFile(false, true, src, dstConf);
+ }
+
+ }
+
+ /**
+ * Upload External libraries and functions to HDFS for an instance to use when started
+ * @throws IllegalStateException
+ * @throws IOException
+ */
+
+ private void installExtLibs() throws IllegalStateException, IOException {
+ FileSystem fs = FileSystem.get(conf);
+ if (!instanceExists()) {
+ throw new IllegalStateException("No instance by name " + instanceName + " found.");
+ }
+ if (isRunning()) {
+ throw new IllegalStateException("Instance " + instanceName
+ + " is running. Please stop it before installing any libraries.");
+ }
+ String libPathSuffix = CONF_DIR_REL + instanceFolder + "library" + Path.SEPARATOR + libDataverse
+ + Path.SEPARATOR;
+ Path src = new Path(extLibs);
+ String fullLibPath = libPathSuffix + src.getName();
+ Path libFilePath = new Path(fs.getHomeDirectory(), fullLibPath);
+ LOG.info("Copying Asterix external library to DFS");
+ fs.copyFromLocalFile(false, true, src, libFilePath);
+ }
+
+ /**
+ * Finds the minimal classes and JARs needed to start the AM only.
+ * @return Resources the AM needs to start on the initial container.
+ * @throws IllegalStateException
+ * @throws IOException
+ */
+ private List<DFSResourceCoordinate> installAmLibs() throws IllegalStateException, IOException {
+ List<DFSResourceCoordinate> resources = new ArrayList<DFSResourceCoordinate>(2);
+ FileSystem fs = FileSystem.get(conf);
+ String fullLibPath = CONF_DIR_REL + instanceFolder + "am_jars" + Path.SEPARATOR;
+ String[] cp = System.getProperty("java.class.path").split(System.getProperty("path.separator"));
+ String asterixJarPattern = "^(asterix).*(jar)$"; //starts with asterix,ends with jar
+ String commonsJarPattern = "^(commons).*(jar)$";
+ String surefireJarPattern = "^(surefire).*(jar)$"; //for maven tests
+ String jUnitTestPattern = "^(asterix-yarn" + File.separator + "target)$";
+
+ LOG.info(File.separator);
+ for (String j : cp) {
+ String[] pathComponents = j.split(Pattern.quote(File.separator));
+ LOG.info(j);
+ LOG.info(pathComponents[pathComponents.length - 1]);
+ if (pathComponents[pathComponents.length - 1].matches(asterixJarPattern)
+ || pathComponents[pathComponents.length - 1].matches(commonsJarPattern)
+ || pathComponents[pathComponents.length - 1].matches(surefireJarPattern)
+ || pathComponents[pathComponents.length - 1].matches(jUnitTestPattern)) {
+ LOG.info("Loading JAR/classpath: " + j);
+ File f = new File(j);
+ Path dst = new Path(fs.getHomeDirectory(), fullLibPath + f.getName());
+ if (!fs.exists(dst) || refresh) {
+ fs.copyFromLocalFile(false, true, new Path(f.getAbsolutePath()), dst);
+ }
+ FileStatus dstSt = fs.getFileStatus(dst);
+ LocalResource amLib = Records.newRecord(LocalResource.class);
+ amLib.setType(LocalResourceType.FILE);
+ amLib.setVisibility(LocalResourceVisibility.PRIVATE);
+ amLib.setResource(ConverterUtils.getYarnUrlFromPath(dst));
+ amLib.setTimestamp(dstSt.getModificationTime());
+ amLib.setSize(dstSt.getLen());
+ DFSResourceCoordinate amLibCoord = new DFSResourceCoordinate();
+ amLibCoord.res = amLib;
+ amLibCoord.name = f.getName();
+ if (f.getName().contains("asterix-yarn") || f.getName().contains("surefire")) {
+ amLibCoord.envs.put(dst.toUri().toString(), AConstants.APPLICATIONMASTERJARLOCATION);
+ amLibCoord.envs.put(Long.toString(dstSt.getLen()), AConstants.APPLICATIONMASTERJARLEN);
+ amLibCoord.envs.put(Long.toString(dstSt.getModificationTime()),
+ AConstants.APPLICATIONMASTERJARTIMESTAMP);
+ }
+ resources.add(amLibCoord);
+ }
+
+ }
+ if (resources.size() == 0) {
+ throw new IOException("Required JARs are missing. Please check your directory structure");
+ }
+ return resources;
+ }
+
+ /**
+ * Uploads a AsterixDB cluster configuration to HDFS for the AM to use.
+ * @param overwrite Overwrite existing configurations by the same name.
+ * @throws IllegalStateException
+ * @throws IOException
+ */
+ private void installAsterixConfig(boolean overwrite) throws IllegalStateException, IOException {
+ FileSystem fs = FileSystem.get(conf);
+ File srcfile = new File(MERGED_PARAMETERS_PATH);
+ Path src = new Path(srcfile.getCanonicalPath());
+ String pathSuffix = CONF_DIR_REL + instanceFolder + File.separator + PARAMS_DEFAULT_NAME;
+ Path dst = new Path(fs.getHomeDirectory(), pathSuffix);
+ if (fs.exists(dst) && !overwrite) {
+
+ throw new IllegalStateException(
+ "Instance exists. Please delete an existing instance before trying to overwrite");
+ }
+ fs.copyFromLocalFile(false, true, src, dst);
+ }
+
+ /**
+ * Uploads binary resources to HDFS for use by the AM
+ * @return
+ * @throws IOException
+ * @throws YarnException
+ */
+ public List<DFSResourceCoordinate> distributeBinaries() throws IOException, YarnException {
+
+ List<DFSResourceCoordinate> resources = new ArrayList<DFSResourceCoordinate>(2);
+ // Copy the application master jar to the filesystem
+ // Create a local resource to point to the destination jar path
+ FileSystem fs = FileSystem.get(conf);
+ Path src, dst;
+ FileStatus destStatus;
+ String pathSuffix;
+
+ // adding info so we can add the jar to the App master container path
+
+ // Add the asterix tarfile to HDFS for easy distribution
+ // Keep it all archived for now so add it as a file...
+
+ pathSuffix = CONF_DIR_REL + instanceFolder + "asterix-server.zip";
+ dst = new Path(fs.getHomeDirectory(), pathSuffix);
+ if (refresh) {
+ if (fs.exists(dst)) {
+ fs.delete(dst, false);
+ }
+ }
+ if (!fs.exists(dst)) {
+ src = new Path(asterixZip);
+ LOG.info("Copying Asterix distributable to DFS");
+ fs.copyFromLocalFile(false, true, src, dst);
+ }
+ destStatus = fs.getFileStatus(dst);
+ LocalResource asterixTarLoc = Records.newRecord(LocalResource.class);
+ asterixTarLoc.setType(LocalResourceType.ARCHIVE);
+ asterixTarLoc.setVisibility(LocalResourceVisibility.PRIVATE);
+ asterixTarLoc.setResource(ConverterUtils.getYarnUrlFromPath(dst));
+ asterixTarLoc.setTimestamp(destStatus.getModificationTime());
+
+ // adding info so we can add the tarball to the App master container path
+ DFSResourceCoordinate tar = new DFSResourceCoordinate();
+ tar.envs.put(dst.toUri().toString(), AConstants.TARLOCATION);
+ tar.envs.put(Long.toString(asterixTarLoc.getSize()), AConstants.TARLEN);
+ tar.envs.put(Long.toString(asterixTarLoc.getTimestamp()), AConstants.TARTIMESTAMP);
+ tar.res = asterixTarLoc;
+ tar.name = "asterix-server.zip";
+ resources.add(tar);
+
+ // Set the log4j properties if needed
+ if (!log4jPropFile.isEmpty()) {
+ Path log4jSrc = new Path(log4jPropFile);
+ Path log4jDst = new Path(fs.getHomeDirectory(), "log4j.props");
+ fs.copyFromLocalFile(false, true, log4jSrc, log4jDst);
+ FileStatus log4jFileStatus = fs.getFileStatus(log4jDst);
+ LocalResource log4jRsrc = Records.newRecord(LocalResource.class);
+ log4jRsrc.setType(LocalResourceType.FILE);
+ log4jRsrc.setVisibility(LocalResourceVisibility.PRIVATE);
+ log4jRsrc.setResource(ConverterUtils.getYarnUrlFromURI(log4jDst.toUri()));
+ log4jRsrc.setTimestamp(log4jFileStatus.getModificationTime());
+ log4jRsrc.setSize(log4jFileStatus.getLen());
+ DFSResourceCoordinate l4j = new DFSResourceCoordinate();
+ tar.res = log4jRsrc;
+ tar.name = "log4j.properties";
+ resources.add(l4j);
+ }
+
+ resources.addAll(installAmLibs());
+ return resources;
+ }
+
+ /**
+ * Submits the request to start the AsterixApplicationMaster to the YARN ResourceManager.
+ *
+ * @param app
+ * The application attempt handle.
+ * @param resources
+ * Resources to be distributed as part of the container launch
+ * @param mode
+ * The mode of the ApplicationMaster
+ * @return The application ID of the new Asterix instance.
+ * @throws IOException
+ * @throws YarnException
+ */
+
+ public ApplicationId deployAM(YarnClientApplication app, List<DFSResourceCoordinate> resources, Mode mode)
+ throws IOException, YarnException {
+
+ // Set up the container launch context for the application master
+ ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
+
+ ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
+
+ // Set local resource info into app master container launch context
+ Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+ for (DFSResourceCoordinate res : resources) {
+ localResources.put(res.name, res.res);
+ }
+ amContainer.setLocalResources(localResources);
+ // Set the env variables to be setup in the env where the application
+ // master will be run
+ LOG.info("Set the environment for the application master");
+ Map<String, String> env = new HashMap<String, String>();
+
+ // using the env info, the application master will create the correct
+ // local resource for the
+ // eventual containers that will be launched to execute the shell
+ // scripts
+ for (DFSResourceCoordinate res : resources) {
+ if (res.envs == null) { //some entries may not have environment variables.
+ continue;
+ }
+ for (Map.Entry<String, String> e : res.envs.entrySet()) {
+ env.put(e.getValue(), e.getKey());
+ }
+ }
+ //this is needed for when the RM address isn't known from the environment of the AM
+ env.put(AConstants.RMADDRESS, conf.get("yarn.resourcemanager.address"));
+ env.put(AConstants.RMSCHEDULERADDRESS, conf.get("yarn.resourcemanager.scheduler.address"));
+ ///add miscellaneous environment variables.
+ env.put(AConstants.INSTANCESTORE, CONF_DIR_REL + instanceFolder);
+ env.put(AConstants.DFS_BASE, FileSystem.get(conf).getHomeDirectory().toUri().toString());
+ env.put(AConstants.CC_JAVA_OPTS, ccJavaOpts);
+ env.put(AConstants.NC_JAVA_OPTS, ncJavaOpts);
+
+ // Add AppMaster.jar location to classpath
+ // At some point we should not be required to add
+ // the hadoop specific classpaths to the env.
+ // It should be provided out of the box.
+ // For now setting all required classpaths including
+ // the classpath to "." for the application jar
+ StringBuilder classPathEnv = new StringBuilder("").append("./*");
+ for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+ YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
+ classPathEnv.append(File.pathSeparatorChar);
+ classPathEnv.append(c.trim());
+ }
+ classPathEnv.append(File.pathSeparatorChar).append("." + File.separator + "log4j.properties");
+
+ // add the runtime classpath needed for tests to work
+ if (conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
+ LOG.info("In YARN MiniCluster");
+ classPathEnv.append(System.getProperty("path.separator"));
+ classPathEnv.append(System.getProperty("java.class.path"));
+ env.put("HADOOP_CONF_DIR", System.getProperty("user.dir") + File.separator + "target" + File.separator);
+ }
+ LOG.info("AM Classpath:" + classPathEnv.toString());
+ env.put("CLASSPATH", classPathEnv.toString());
+
+ amContainer.setEnvironment(env);
+
+ // Set the necessary command to execute the application master
+ Vector<CharSequence> vargs = new Vector<CharSequence>(30);
+
+ // Set java executable command
+ LOG.info("Setting up app master command");
+ vargs.add(JAVA_HOME + File.separator + "bin" + File.separator + "java");
+ // Set class name
+ vargs.add(appMasterMainClass);
+ //Set params for Application Master
+ if (debugFlag) {
+ vargs.add("-debug");
+ }
+ if (mode == Mode.DESTROY) {
+ vargs.add("-obliterate");
+ }
+ else if (mode == Mode.BACKUP) {
+ vargs.add("-backup");
+ }
+ else if (mode == Mode.RESTORE) {
+ vargs.add("-restore " + snapName);
+ }
+ else if( mode == Mode.INSTALL){
+ vargs.add("-initial ");
+ }
+ if (refresh) {
+ vargs.add("-refresh");
+ }
+ //vargs.add("/bin/ls -alh asterix-server.zip/repo");
+ vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + File.separator + "AppMaster.stdout");
+ vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + File.separator + "AppMaster.stderr");
+ // Get final commmand
+ StringBuilder command = new StringBuilder();
+ for (CharSequence str : vargs) {
+ command.append(str).append(" ");
+ }
+
+ LOG.info("Completed setting up app master command " + command.toString());
+ List<String> commands = new ArrayList<String>();
+ commands.add(command.toString());
+ amContainer.setCommands(commands);
+
+ // Set up resource type requirements
+ // For now, only memory is supported so we set memory requirements
+ Resource capability = Records.newRecord(Resource.class);
+ capability.setMemory(amMemory);
+ appContext.setResource(capability);
+
+ // Service data is a binary blob that can be passed to the application
+ // Not needed in this scenario
+ // amContainer.setServiceData(serviceData);
+
+ // The following are not required for launching an application master
+ // amContainer.setContainerId(containerId);
+
+ appContext.setAMContainerSpec(amContainer);
+
+ // Set the priority for the application master
+ Priority pri = Records.newRecord(Priority.class);
+ // TODO - what is the range for priority? how to decide?
+ pri.setPriority(amPriority);
+ appContext.setPriority(pri);
+
+ // Set the queue to which this application is to be submitted in the RM
+ appContext.setQueue(amQueue);
+
+ // Submit the application to the applications manager
+ // SubmitApplicationResponse submitResp =
+ // applicationsManager.submitApplication(appRequest);
+ // Ignore the response as either a valid response object is returned on
+ // success
+ // or an exception thrown to denote some form of a failure
+ LOG.info("Submitting application to ASM");
+
+ yarnClient.submitApplication(appContext);
+
+ //now write the instance lock
+ if (mode == Mode.INSTALL || mode == Mode.START) {
+ FileSystem fs = FileSystem.get(conf);
+ Path lock = new Path(fs.getHomeDirectory(), CONF_DIR_REL + instanceFolder + instanceLock);
+ if (fs.exists(lock)) {
+ throw new IllegalStateException("Somehow, this instance has been launched twice. ");
+ }
+ BufferedWriter out = new BufferedWriter(new OutputStreamWriter(fs.create(lock, true)));
+ try {
+ out.write(app.getApplicationSubmissionContext().getApplicationId().toString());
+ out.close();
+ } finally {
+ out.close();
+ }
+ }
+ return app.getApplicationSubmissionContext().getApplicationId();
+
+ }
+
+ /**
+ * Asks YARN to kill a given application by appId
+ * @param appId The application to kill.
+ * @param yarnClient The YARN client object that is connected to the RM.
+ * @throws YarnException
+ * @throws IOException
+ */
+
+ public static void killApplication(ApplicationId appId, YarnClient yarnClient) throws YarnException, IOException {
+ if (appId == null) {
+ throw new YarnException("No Application given to kill");
+ }
+ if (yarnClient.isInState(STATE.INITED)) {
+ yarnClient.start();
+ }
+ YarnApplicationState st;
+ ApplicationReport rep = yarnClient.getApplicationReport(appId);
+ st = rep.getYarnApplicationState();
+ if (st == YarnApplicationState.FINISHED || st == YarnApplicationState.KILLED
+ || st == YarnApplicationState.FAILED) {
+ LOG.info("Application " + appId + " already exited.");
+ return;
+ }
+ LOG.info("Killing applicaiton with ID: " + appId);
+ yarnClient.killApplication(appId);
+
+ }
+
+ /**
+ * Tries to stop a running AsterixDB instance gracefully.
+ * @throws IOException
+ * @throws YarnException
+ */
+ private void stopInstanceIfRunning()
+ throws IOException, YarnException {
+ FileSystem fs = FileSystem.get(conf);
+ String pathSuffix = CONF_DIR_REL + instanceFolder + CONFIG_DEFAULT_NAME;
+ Path dstConf = new Path(fs.getHomeDirectory(), pathSuffix);
+ //if the instance is up, fix that
+ if (isRunning()) {
+ try {
+ this.stopInstance();
+ } catch (IOException e) {
+ throw new YarnException(e);
+ }
+ } else if (!fs.exists(dstConf)) {
+ throw new YarnException("No instance configured with that name exists");
+ }
+ }
+
+ /**
+ * Start a YARN job to delete local AsterixDB resources of an extant instance
+ * @param app The Client connection
+ * @param resources AM resources
+ * @throws IOException
+ * @throws YarnException
+ */
+
+ private void removeInstance(YarnClientApplication app, List<DFSResourceCoordinate> resources) throws IOException,
+ YarnException {
+ FileSystem fs = FileSystem.get(conf);
+ //if the instance is up, fix that
+ stopInstanceIfRunning();
+ //now try deleting all of the on-disk artifacts on the cluster
+ ApplicationId deleter = deployAM(app, resources, Mode.DESTROY);
+ boolean delete_start = Utils.waitForApplication(deleter, yarnClient, "Waiting for deletion to start", ccRestPort);
+ if (!delete_start) {
+ if (force) {
+ fs.delete(new Path(CONF_DIR_REL + instanceFolder), true);
+ LOG.error("Forcing deletion of HDFS resources");
+ }
+ LOG.fatal(" of on-disk persistient resources on individual nodes failed.");
+ throw new YarnException();
+ }
+ boolean deleted = waitForCompletion(deleter, "Deletion in progress");
+ if (!(deleted || force)) {
+ LOG.fatal("Cleanup of on-disk persistent resources failed.");
+ return;
+ } else {
+ fs.delete(new Path(CONF_DIR_REL + instanceFolder), true);
+ }
+ System.out.println("Deletion of instance succeeded.");
+
+ }
+
+ /**
+ * Start a YARN job to copy all data-containing resources of an AsterixDB instance to HDFS
+ * @param app
+ * @param resources
+ * @throws IOException
+ * @throws YarnException
+ */
+
+ private void backupInstance(YarnClientApplication app, List<DFSResourceCoordinate> resources) throws IOException,
+ YarnException {
+ stopInstanceIfRunning();
+ ApplicationId backerUpper = deployAM(app, resources, Mode.BACKUP);
+ boolean backupStart;
+ backupStart = Utils.waitForApplication(backerUpper, yarnClient, "Waiting for backup " + backerUpper.toString()
+ + "to start", ccRestPort);
+ if (!backupStart) {
+ LOG.fatal("Backup failed to start");
+ throw new YarnException();
+ }
+ boolean complete;
+ complete = waitForCompletion(backerUpper, "Backup in progress");
+ if (!complete) {
+ LOG.fatal("Backup failed- timeout waiting for completion");
+ return;
+ }
+ System.out.println("Backup of instance succeeded.");
+ }
+
+ /**
+ * Start a YARN job to copy a set of resources from backupInstance to restore the state of an extant AsterixDB instance
+ * @param app
+ * @param resources
+ * @throws IOException
+ * @throws YarnException
+ */
+
+ private void restoreInstance(YarnClientApplication app, List<DFSResourceCoordinate> resources) throws IOException,
+ YarnException {
+ stopInstanceIfRunning();
+ ApplicationId restorer = deployAM(app, resources, Mode.RESTORE);
+ boolean restoreStart = Utils.waitForApplication(restorer, yarnClient, "Waiting for restore to start", ccRestPort);
+ if (!restoreStart) {
+ LOG.fatal("Restore failed to start");
+ throw new YarnException();
+ }
+ boolean complete = waitForCompletion(restorer, "Restore in progress");
+ if (!complete) {
+ LOG.fatal("Restore failed- timeout waiting for completion");
+ return;
+ }
+ System.out.println("Restoration of instance succeeded.");
+ }
+
+ /**
+ * Stops the instance and remove the lockfile to allow a restart.
+ *
+ * @throws IOException
+ * @throws JAXBException
+ * @throws YarnException
+ */
+
+ private void stopInstance() throws IOException, YarnException {
+ ApplicationId appId = getLockFile();
+ //get CC rest API port if it is nonstandard
+ readConfigParams(locateConfig());
+ if (yarnClient.isInState(STATE.INITED)) {
+ yarnClient.start();
+ }
+ System.out.println("Stopping instance " + instanceName);
+ if (!isRunning()) {
+ LOG.fatal("AsterixDB instance by that name is stopped already");
+ return;
+ }
+ try {
+ String ccIp = Utils.getCCHostname(instanceName, conf);
+ Utils.sendShutdownCall(ccIp,ccRestPort);
+ } catch (IOException e) {
+ LOG.error("Error while trying to issue safe shutdown:", e);
+ }
+ //now make sure it is actually gone and not "stuck"
+ String message = "Waiting for AsterixDB to shut down";
+ boolean completed = waitForCompletion(appId, message);
+ if (!completed && force) {
+ LOG.warn("Instance failed to stop gracefully, now killing it");
+ try {
+ AsterixYARNClient.killApplication(appId, yarnClient);
+ completed = true;
+ } catch (YarnException e1) {
+ LOG.fatal("Could not stop nor kill instance gracefully.",e1);
+ return;
+ }
+ }
+ if (completed) {
+ deleteLockFile();
+ }
+ }
+
+ private void deleteLockFile() throws IOException {
+ if (instanceName == null || instanceName == "") {
+ return;
+ }
+ FileSystem fs = FileSystem.get(conf);
+ Path lockPath = new Path(fs.getHomeDirectory(), CONF_DIR_REL + instanceName + '/' + instanceLock);
+ if (fs.exists(lockPath)) {
+ fs.delete(lockPath, false);
+ }
+ }
+
+ private boolean instanceExists() throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ String pathSuffix = CONF_DIR_REL + instanceFolder + CONFIG_DEFAULT_NAME;
+ Path dstConf = new Path(fs.getHomeDirectory(), pathSuffix);
+ return fs.exists(dstConf);
+ }
+
+ private boolean isRunning() throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ String pathSuffix = CONF_DIR_REL + instanceFolder + CONFIG_DEFAULT_NAME;
+ Path dstConf = new Path(fs.getHomeDirectory(), pathSuffix);
+ if (fs.exists(dstConf)) {
+ Path lock = new Path(fs.getHomeDirectory(), CONF_DIR_REL + instanceFolder + instanceLock);
+ return fs.exists(lock);
+ } else {
+ return false;
+ }
+ }
+
+ private ApplicationId getLockFile() throws IOException, YarnException {
+ if (instanceFolder == "") {
+ throw new IllegalStateException("Instance name not given.");
+ }
+ FileSystem fs = FileSystem.get(conf);
+ Path lockPath = new Path(fs.getHomeDirectory(), CONF_DIR_REL + instanceFolder + instanceLock);
+ if (!fs.exists(lockPath)) {
+ throw new YarnException("Instance appears to not be running. If you know it is, try using kill");
+ }
+ BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(lockPath)));
+ String lockAppId = br.readLine();
+ br.close();
+ return ConverterUtils.toApplicationId(lockAppId);
+ }
+
+ public static ApplicationId getLockFile(String instanceName, Configuration conf) throws IOException {
+ if (instanceName == "") {
+ throw new IllegalStateException("Instance name not given.");
+ }
+ FileSystem fs = FileSystem.get(conf);
+ Path lockPath = new Path(fs.getHomeDirectory(), CONF_DIR_REL + instanceName + '/' + instanceLock);
+ if (!fs.exists(lockPath)) {
+ return null;
+ }
+ BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(lockPath)));
+ String lockAppId = br.readLine();
+ br.close();
+ return ConverterUtils.toApplicationId(lockAppId);
+ }
+
+ /**
+ * Locate the Asterix parameters file.
+ * @return
+ * @throws FileNotFoundException
+ * @throws IOException
+ */
+ private AsterixConfiguration locateConfig() throws FileNotFoundException, IOException{
+ AsterixConfiguration configuration;
+ String configPathBase = MERGED_PARAMETERS_PATH;
+ if (baseConfig != null) {
+ configuration = Utils.loadAsterixConfig(baseConfig);
+ configPathBase = new File(baseConfig).getParentFile().getAbsolutePath() + File.separator
+ + PARAMS_DEFAULT_NAME;
+ MERGED_PARAMETERS_PATH = configPathBase;
+ } else {
+ configuration = Utils.loadAsterixConfig(DEFAULT_PARAMETERS_PATH);
+ }
+ return configuration;
+ }
+
+ /**
+ *
+ */
+ private void readConfigParams(AsterixConfiguration configuration){
+ //this is the "base" config that is inside the zip, we start here
+ for (edu.uci.ics.asterix.common.configuration.Property property : configuration.getProperty()) {
+ if (property.getName().equalsIgnoreCase(CC_JAVA_OPTS_KEY)) {
+ ccJavaOpts = property.getValue();
+ } else if (property.getName().equalsIgnoreCase(NC_JAVA_OPTS_KEY)) {
+ ncJavaOpts = property.getValue();
+ } else if(property.getName().equalsIgnoreCase(CC_REST_PORT_KEY)){
+ ccRestPort = Integer.parseInt(property.getValue());
+ }
+
+ }
+ }
+
+ /**
+ * Retrieves necessary information from the cluster configuration and splices it into the Asterix configuration parameters
+ * @param cluster
+ * @throws FileNotFoundException
+ * @throws IOException
+ */
+
+ private void writeAsterixConfig(Cluster cluster) throws FileNotFoundException, IOException {
+ String metadataNodeId = Utils.getMetadataNode(cluster).getId();
+ String asterixInstanceName = instanceName;
+
+ AsterixConfiguration configuration = locateConfig();
+
+ readConfigParams(configuration);
+
+ String version = Utils.getAsterixVersionFromClasspath();
+ configuration.setVersion(version);
+
+ configuration.setInstanceName(asterixInstanceName);
+ String storeDir = null;
+ List<Store> stores = new ArrayList<Store>();
+ for (Node node : cluster.getNode()) {
+ storeDir = node.getStore() == null ? cluster.getStore() : node.getStore();
+ stores.add(new Store(node.getId(), storeDir));
+ }
+ configuration.setStore(stores);
+
+ List<Coredump> coredump = new ArrayList<Coredump>();
+ String coredumpDir = null;
+ List<TransactionLogDir> txnLogDirs = new ArrayList<TransactionLogDir>();
+ String txnLogDir = null;
+ for (Node node : cluster.getNode()) {
+ coredumpDir = node.getLogDir() == null ? cluster.getLogDir() : node.getLogDir();
+ coredump.add(new Coredump(node.getId(), coredumpDir + "coredump" + File.separator));
+ txnLogDir = node.getTxnLogDir() == null ? cluster.getTxnLogDir() : node.getTxnLogDir(); //node or cluster-wide
+ txnLogDirs.add(new TransactionLogDir(node.getId(), txnLogDir
+ + (txnLogDir.charAt(txnLogDir.length() - 1) == File.separatorChar ? File.separator : "")
+ + "txnLogs" //if the string doesn't have a trailing / add one
+ + File.separator));
+ }
+ configuration.setMetadataNode(metadataNodeId);
+
+ configuration.setCoredump(coredump);
+ configuration.setTransactionLogDir(txnLogDirs);
+ FileOutputStream os = new FileOutputStream(MERGED_PARAMETERS_PATH);
+ try {
+ JAXBContext ctx = JAXBContext.newInstance(AsterixConfiguration.class);
+ Marshaller marshaller = ctx.createMarshaller();
+ marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true);
+
+ marshaller.marshal(configuration, os);
+ } catch (JAXBException e) {
+ throw new IOException(e);
+ } finally {
+ os.close();
+ }
+ }
+
+ private boolean waitForCompletion(ApplicationId appId, String message) throws YarnException, IOException {
+ return Utils.waitForApplication(appId, yarnClient, message, ccRestPort);
+ }
+
+ private class DFSResourceCoordinate {
+ String name;
+ LocalResource res;
+ Map<String, String> envs;
+
+ public DFSResourceCoordinate() {
+ envs = new HashMap<String, String>(3);
+ }
+ }
+}
diff --git a/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/Deleter.java b/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/Deleter.java
new file mode 100644
index 0000000..55a9d0b
--- /dev/null
+++ b/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/Deleter.java
@@ -0,0 +1,31 @@
+package edu.uci.ics.asterix.aoya;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+
+public class Deleter {
+ private static final Log LOG = LogFactory.getLog(Deleter.class);
+
+ public static void main(String[] args) throws IOException {
+
+ LogManager.getRootLogger().setLevel(Level.DEBUG);
+
+ LOG.info("Obliterator args: " + Arrays.toString(args));
+ for (int i = 0; i < args.length; i++) {
+ File f = new File(args[i]);
+ if (f.exists()) {
+ LOG.info("Deleting: " + f.getPath());
+ FileUtils.deleteDirectory(f);
+ } else {
+ LOG.error("Could not find file to delete: " + f.getPath());
+ }
+ }
+ }
+}
diff --git a/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/HDFSBackup.java b/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/HDFSBackup.java
new file mode 100644
index 0000000..13c9a6e
--- /dev/null
+++ b/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/HDFSBackup.java
@@ -0,0 +1,94 @@
+package edu.uci.ics.asterix.aoya;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+public class HDFSBackup {
+ Configuration conf = new YarnConfiguration();
+ private static final Log LOG = LogFactory.getLog(AsterixApplicationMaster.class);
+ boolean restore = false;
+ boolean backup = false;
+
+ public static void main(String[] args) throws ParseException, IllegalArgumentException, IOException {
+
+ HDFSBackup back = new HDFSBackup();
+ Map<String, String> envs = System.getenv();
+ if(envs.containsKey("HADOOP_CONF_DIR")){
+ File hadoopConfDir = new File(envs.get("HADOOP_CONF_DIR"));
+ if(hadoopConfDir.isDirectory()){
+ for(File config: hadoopConfDir.listFiles()){
+ if(config.getName().matches("^.*(xml)$")){
+ back.conf.addResource(new Path(config.getAbsolutePath()));
+ }
+ }
+ }
+ }
+ Options opts = new Options();
+ opts.addOption("restore", false, "");
+ opts.addOption("backup", false, "");
+ CommandLine cliParser = new GnuParser().parse(opts, args);
+ if (cliParser.hasOption("restore")) {
+ back.restore = true;
+ }
+ if (cliParser.hasOption("backup")) {
+ back.backup = true;
+ }
+ @SuppressWarnings("unchecked")
+ List<String> pairs = (List<String>) cliParser.getArgList();
+
+ List<Path[]> sources = new ArrayList<Path[]>(10);
+ for (String p : pairs) {
+ String[] s = p.split(",");
+ sources.add(new Path[] { new Path(s[0]), new Path(s[1]) });
+ }
+
+ try {
+ if (back.backup) {
+ back.performBackup(sources);
+ }
+ if (back.restore) {
+ back.performRestore(sources);
+ }
+ } catch (IOException e) {
+ back.LOG.fatal("Backup/restoration unsuccessful: " + e.getMessage());
+ throw e;
+ }
+ }
+
+ private void performBackup(List<Path[]> paths) throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ for (Path[] p : paths) {
+ LOG.info("Backing up " + p[0] + " to " + p[1] + ".");
+ fs.copyFromLocalFile(p[0], p[1]);
+ }
+ }
+
+ private void performRestore(List<Path[]> paths) throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ for (Path[] p : paths) {
+ LOG.info("Restoring " + p[0] + " to " + p[1] + ".");
+ File f = new File(p[1].toString() + File.separator + p[0].getName());
+ LOG.info(f.getAbsolutePath());
+ if (f.exists()) {
+ FileUtils.deleteDirectory(f);
+ }
+ LOG.info(f.exists());
+ fs.copyToLocalFile(false, p[0], p[1], true);
+ }
+ }
+}
diff --git a/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/Utils.java b/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/Utils.java
new file mode 100644
index 0000000..4eb8bc3
--- /dev/null
+++ b/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/Utils.java
@@ -0,0 +1,462 @@
+package edu.uci.ics.asterix.aoya;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.ConnectException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Scanner;
+import java.util.regex.Pattern;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Marshaller;
+import javax.xml.bind.Unmarshaller;
+
+import org.apache.commons.httpclient.*;
+import org.apache.commons.httpclient.methods.GetMethod;
+import org.apache.commons.httpclient.methods.PostMethod;
+import org.apache.commons.httpclient.params.HttpMethodParams;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+import edu.uci.ics.asterix.common.configuration.AsterixConfiguration;
+import edu.uci.ics.asterix.event.schema.yarnCluster.Cluster;
+import edu.uci.ics.asterix.event.schema.yarnCluster.Node;
+
+public class Utils {
+
+ private Utils() {
+
+ }
+
+ private static final String CONF_DIR_REL = AsterixYARNClient.CONF_DIR_REL;
+
+ public static String hostFromContainerID(String containerID) {
+ return containerID.split("_")[4];
+ }
+
+ /**
+ * Gets the metadata node from an AsterixDB cluster description file
+ *
+ * @param cluster
+ * The cluster description in question.
+ * @return
+ */
+ public static Node getMetadataNode(Cluster cluster) {
+ Node metadataNode = null;
+ if (cluster.getMetadataNode() != null) {
+ for (Node node : cluster.getNode()) {
+ if (node.getId().equals(cluster.getMetadataNode())) {
+ metadataNode = node;
+ break;
+ }
+ }
+ } else {
+ //I will pick one for you.
+ metadataNode = cluster.getNode().get(1);
+ }
+ return metadataNode;
+ }
+
+ /**
+ * Sends a "poison pill" message to an AsterixDB instance for it to shut down safely.
+ *
+ * @param host
+ * The host to shut down.
+ * @throws IOException
+ */
+
+ public static void sendShutdownCall(String host, int port) throws IOException {
+ final String url = "http://" + host + ":" + port + "/admin/shutdown";
+ PostMethod method = new PostMethod(url);
+ try {
+ executeHTTPCall(method);
+ } catch (NoHttpResponseException e) {
+ //do nothing... this is expected
+ }
+ //now let's test that the instance is really down, or throw an exception
+ try {
+ executeHTTPCall(method);
+ } catch (ConnectException e) {
+ return;
+ }
+ throw new IOException("Instance did not shut down cleanly.");
+ }
+
+ /**
+ * Simple test via the AsterixDB Javascript API to determine if an instance is truly live or not.
+ * Queries the Metadata dataset and returns true if the query completes successfully, false otherwise.
+ *
+ * @param host
+ * The host to run the query against
+ * @return
+ * True if the instance is OK, false otherwise.
+ * @throws IOException
+ */
+ public static boolean probeLiveness(String host, int port) throws IOException {
+ final String url = "http://" + host + ":" + port + "/query";
+ final String test = "for $x in dataset Metadata.Dataset return $x;";
+ GetMethod method = new GetMethod(url);
+ method.setQueryString(new NameValuePair[] { new NameValuePair("query", test) });
+ InputStream response;
+ try {
+ response = executeHTTPCall(method);
+ } catch (ConnectException e) {
+ return false;
+ }
+ if (response == null) {
+ return false;
+ }
+ BufferedReader br = new BufferedReader(new InputStreamReader(response));
+ String result = br.readLine();
+ if (result == null) {
+ return false;
+ }
+ if(method.getStatusCode() != HttpStatus.SC_OK){
+ return false;
+ }
+ return true;
+ }
+
+ private static InputStream executeHTTPCall(HttpMethod method) throws HttpException, IOException {
+ HttpClient client = new HttpClient();
+ HttpMethodRetryHandler noop = new HttpMethodRetryHandler() {
+ @Override
+ public boolean retryMethod(final HttpMethod method, final IOException exception, int executionCount) {
+ return false;
+ }
+ };
+ client.getParams().setParameter(HttpMethodParams.RETRY_HANDLER, noop);
+ client.executeMethod(method);
+ return method.getResponseBodyAsStream();
+ }
+
+ //**
+
+ public static String makeDots(int iter) {
+ int pos = iter % 3;
+ char[] dots = { ' ', ' ', ' ' };
+ dots[pos] = '.';
+ return new String(dots);
+ }
+
+ public static boolean confirmAction(String warning) {
+ System.out.println(warning);
+ System.out.print("Are you sure you want to do this? (yes/no): ");
+ Scanner in = new Scanner(System.in);
+ while (true) {
+ try {
+ String input = in.nextLine();
+ if ("yes".equals(input)) {
+ return true;
+ } else if ("no".equals(input)) {
+ return false;
+ } else {
+ System.out.println("Please type yes or no");
+ }
+ } finally {
+ in.close();
+ }
+ }
+ }
+
+ /**
+ * Lists the deployed instances of AsterixDB on a YARN cluster
+ *
+ * @param conf
+ * Hadoop configuration object
+ * @param confDirRel
+ * Relative AsterixDB configuration path for DFS
+ * @throws IOException
+ */
+
+ public static void listInstances(Configuration conf, String confDirRel) throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ Path instanceFolder = new Path(fs.getHomeDirectory(), confDirRel);
+ if (!fs.exists(instanceFolder)) {
+ System.out.println("No running or stopped AsterixDB instances exist in this cluster.");
+ return;
+ }
+ FileStatus[] instances = fs.listStatus(instanceFolder);
+ if (instances.length != 0) {
+ System.out.println("Existing AsterixDB instances: ");
+ for (int i = 0; i < instances.length; i++) {
+ FileStatus st = instances[i];
+ String name = st.getPath().getName();
+ ApplicationId lockFile = AsterixYARNClient.getLockFile(name, conf);
+ if (lockFile != null) {
+ System.out.println("Instance " + name + " is running with Application ID: " + lockFile.toString());
+ } else {
+ System.out.println("Instance " + name + " is stopped");
+ }
+ }
+ } else {
+ System.out.println("No running or stopped AsterixDB instances exist in this cluster");
+ }
+ }
+
+ /**
+ * Lists the backups in the DFS.
+ *
+ * @param conf
+ * YARN configuration
+ * @param confDirRel
+ * Relative config path
+ * @param instance
+ * Instance name
+ * @throws IOException
+ */
+ public static void listBackups(Configuration conf, String confDirRel, String instance) throws IOException {
+ List<String> backups = getBackups(conf,confDirRel,instance);
+ if (backups.size() != 0) {
+ System.out.println("Backups for instance " + instance + ": ");
+ for (String name : backups) {
+ System.out.println("Backup: " + name);
+ }
+ } else {
+ System.out.println("No backups found for instance " + instance + ".");
+ }
+ }
+ /**
+ * Return the available snapshot names
+ * @param conf
+ * @param confDirRel
+ * @param instance
+ * @return
+ * @throws IOException
+ */
+ public static List<String> getBackups(Configuration conf, String confDirRel, String instance) throws IOException{
+ FileSystem fs = FileSystem.get(conf);
+ Path backupFolder = new Path(fs.getHomeDirectory(), confDirRel + "/" + instance + "/" + "backups");
+ FileStatus[] backups = fs.listStatus(backupFolder);
+ List<String> backupNames = new ArrayList<String>();
+ for(FileStatus f: backups){
+ backupNames.add(f.getPath().getName());
+ }
+ return backupNames;
+ }
+
+ /**
+ * Removes backup snapshots from the DFS
+ *
+ * @param conf
+ * DFS Configuration
+ * @param confDirRel
+ * Configuration relative directory
+ * @param instance
+ * The asterix instance name
+ * @param timestamp
+ * The snapshot timestap (ID)
+ * @throws IOException
+ */
+ public static void rmBackup(Configuration conf, String confDirRel, String instance, long timestamp)
+ throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ Path backupFolder = new Path(fs.getHomeDirectory(), confDirRel + "/" + instance + "/" + "backups");
+ FileStatus[] backups = fs.listStatus(backupFolder);
+ if (backups.length != 0) {
+ System.out.println("Backups for instance " + instance + ": ");
+ } else {
+ System.out.println("No backups found for instance " + instance + ".");
+ }
+ for (FileStatus f : backups) {
+ String name = f.getPath().getName();
+ long file_ts = Long.parseLong(name);
+ if (file_ts == timestamp) {
+ System.out.println("Deleting backup " + timestamp);
+ if (!fs.delete(f.getPath(), true)) {
+ System.out.println("Backup could not be deleted");
+ return;
+ } else {
+ return;
+ }
+ }
+ }
+ System.out.println("No backup found with specified timestamp");
+
+ }
+
+ /**
+ * Simply parses out the YARN cluster config and instantiates it into a nice object.
+ *
+ * @return The object representing the configuration
+ * @throws FileNotFoundException
+ * @throws JAXBException
+ */
+ public static Cluster parseYarnClusterConfig(String path) throws YarnException {
+ try {
+ File f = new File(path);
+ JAXBContext configCtx = JAXBContext.newInstance(Cluster.class);
+ Unmarshaller unmarshaller = configCtx.createUnmarshaller();
+ Cluster cl = (Cluster) unmarshaller.unmarshal(f);
+ return cl;
+ } catch (JAXBException e) {
+ throw new YarnException(e);
+ }
+ }
+
+ public static void writeYarnClusterConfig(String path, Cluster cl) throws YarnException {
+ try {
+ File f = new File(path);
+ JAXBContext configCtx = JAXBContext.newInstance(Cluster.class);
+ Marshaller marhsaller = configCtx.createMarshaller();
+ marhsaller.marshal(cl, f);
+ } catch (JAXBException e) {
+ throw new YarnException(e);
+ }
+ }
+
+ /**
+ * Looks in the current class path for AsterixDB libraries and gets the version number from the name of the first match.
+ *
+ * @return The version found, as a string.
+ */
+
+ public static String getAsterixVersionFromClasspath() {
+ String[] cp = System.getProperty("java.class.path").split(System.getProperty("path.separator"));
+ String asterixJarPattern = "^(asterix).*(jar)$"; //starts with asterix,ends with jar
+
+ for (String j : cp) {
+ //escape backslashes for windows
+ String[] pathComponents = j.split(Pattern.quote(File.separator));
+ if (pathComponents[pathComponents.length - 1].matches(asterixJarPattern)) {
+ //get components of maven version
+ String[] byDash = pathComponents[pathComponents.length - 1].split("-");
+ //get the version number but remove the possible '.jar' tailing it
+ String version = (byDash[2].split("\\."))[0];
+ //SNAPSHOT suffix
+ if (byDash.length == 4) {
+ //do the same if it's a snapshot suffix
+ return version + '-' + (byDash[3].split("\\."))[0];
+ }
+ //stable version
+ return version;
+ }
+ }
+ return null;
+ }
+
+ public static boolean waitForLiveness(ApplicationId appId, boolean probe, boolean print, String message,
+ YarnClient yarnClient, String instanceName, Configuration conf, int port) throws YarnException {
+ ApplicationReport report;
+ try {
+ report = yarnClient.getApplicationReport(appId);
+ } catch (IOException e) {
+ throw new YarnException(e);
+ }
+ YarnApplicationState st = report.getYarnApplicationState();
+ for (int i = 0; i < 120; i++) {
+ if (st != YarnApplicationState.RUNNING) {
+ try {
+ report = yarnClient.getApplicationReport(appId);
+ st = report.getYarnApplicationState();
+ if (print) {
+ System.out.print(message + Utils.makeDots(i) + "\r");
+ }
+ Thread.sleep(1000);
+ } catch (InterruptedException e1) {
+ Thread.currentThread().interrupt();
+ } catch (IOException e1) {
+ throw new YarnException(e1);
+ }
+ if (st == YarnApplicationState.FAILED || st == YarnApplicationState.FINISHED
+ || st == YarnApplicationState.KILLED) {
+ return false;
+ }
+ }
+ if (probe) {
+ String host;
+ host = getCCHostname(instanceName, conf);
+ try {
+ for (int j = 0; j < 60; j++) {
+ if (!Utils.probeLiveness(host, port)) {
+ try {
+ if (print) {
+ System.out.print(message + Utils.makeDots(i) + "\r");
+ }
+ Thread.sleep(1000);
+ } catch (InterruptedException e2) {
+ Thread.currentThread().interrupt();
+ }
+ } else {
+ if (print) {
+ System.out.println("");
+ }
+ return true;
+ }
+ }
+ } catch (IOException e1) {
+ throw new YarnException(e1);
+ }
+ } else {
+ if (print) {
+ System.out.println("");
+ }
+ return true;
+ }
+ }
+ if (print) {
+ System.out.println("");
+ }
+ return false;
+ }
+
+ public static boolean waitForLiveness(ApplicationId appId, String message, YarnClient yarnClient,
+ String instanceName, Configuration conf, int port) throws YarnException, IOException {
+ return waitForLiveness(appId, true, true, message, yarnClient, instanceName, conf, port);
+ }
+
+ public static boolean waitForApplication(ApplicationId appId, YarnClient yarnClient, String message, int port)
+ throws YarnException, IOException {
+ return waitForLiveness(appId, false, true, message, yarnClient, "", null, port);
+ }
+
+ public static boolean waitForApplication(ApplicationId appId, YarnClient yarnClient, int port) throws YarnException,
+ IOException, JAXBException {
+ return waitForLiveness(appId, false, false, "", yarnClient, "", null, port);
+ }
+
+ public static String getCCHostname(String instanceName, Configuration conf) throws YarnException {
+ try {
+ FileSystem fs = FileSystem.get(conf);
+ String instanceFolder = instanceName + "/";
+ String pathSuffix = CONF_DIR_REL + instanceFolder + "cluster-config.xml";
+ Path dstConf = new Path(fs.getHomeDirectory(), pathSuffix);
+ File tmp = File.createTempFile("cluster-config", "xml");
+ tmp.deleteOnExit();
+ fs.copyToLocalFile(dstConf, new Path(tmp.getPath()));
+ JAXBContext clusterConf = JAXBContext.newInstance(Cluster.class);
+ Unmarshaller unm = clusterConf.createUnmarshaller();
+ Cluster cl = (Cluster) unm.unmarshal(tmp);
+ String ccIp = cl.getMasterNode().getClientIp();
+ return ccIp;
+ } catch (IOException | JAXBException e) {
+ throw new YarnException(e);
+ }
+ }
+
+ public static AsterixConfiguration loadAsterixConfig(String path) throws IOException {
+ File f = new File(path);
+ try {
+ JAXBContext configCtx = JAXBContext.newInstance(AsterixConfiguration.class);
+ Unmarshaller unmarshaller = configCtx.createUnmarshaller();
+ AsterixConfiguration conf = (AsterixConfiguration) unmarshaller.unmarshal(f);
+ return conf;
+ } catch (JAXBException e) {
+ throw new IOException(e);
+ }
+ }
+
+}
diff --git a/asterix-yarn/src/main/resources/base-asterix-configuration.xml b/asterix-yarn/src/main/resources/base-asterix-configuration.xml
new file mode 100644
index 0000000..76c00db
--- /dev/null
+++ b/asterix-yarn/src/main/resources/base-asterix-configuration.xml
@@ -0,0 +1,246 @@
+<!--
+ ! Copyright 2009-2013 by The Regents of the University of California
+ ! Licensed under the Apache License, Version 2.0 (the "License");
+ ! you may not use this file except in compliance with the License.
+ ! you may obtain a copy of the License from
+ !
+ ! http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing, software
+ ! distributed under the License is distributed on an "AS IS" BASIS,
+ ! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ! See the License for the specific language governing permissions and
+ ! limitations under the License.
+ !-->
+<asterixConfiguration xmlns="asterixconf">
+
+ <property>
+ <name>nc.java.opts</name>
+ <value>-Xmx1536m</value>
+ <description>JVM parameters for each Node Contoller (NC)</description>
+ </property>
+
+ <property>
+ <name>cc.java.opts</name>
+ <value>-Xmx1024m</value>
+ <description>JVM parameters for each Cluster Contoller (CC)
+ </description>
+ </property>
+
+ <property>
+ <name>max.wait.active.cluster</name>
+ <value>60</value>
+ <description>Maximum wait (in seconds) for a cluster to be ACTIVE (all nodes are available)
+ before a submitted query/statement can be executed. (Default = 60 seconds)
+ </description>
+ </property>
+
+ <property>
+ <name>storage.buffercache.pagesize</name>
+ <value>131072</value>
+ <description>The page size in bytes for pages in the buffer cache.
+ (Default = "131072" // 128KB)
+ </description>
+ </property>
+
+ <property>
+ <name>storage.buffercache.size</name>
+ <value>536870912</value>
+ <description>The size of memory allocated to the disk buffer cache.
+ The value should be a multiple of the buffer cache page size(Default
+ = "536870912" // 512MB)
+ </description>
+ </property>
+
+ <property>
+ <name>storage.buffercache.maxopenfiles</name>
+ <value>214748364</value>
+ <description>The maximum number of open files in the buffer cache.
+ (Default = "214748364")
+ </description>
+ </property>
+
+ <property>
+ <name>storage.memorycomponent.pagesize</name>
+ <value>131072</value>
+ <description>The page size in bytes for pages allocated to memory
+ components. (Default = "131072" // 128KB)
+ </description>
+ </property>
+
+ <property>
+ <name>storage.memorycomponent.numpages</name>
+ <value>256</value>
+ <description>The number of pages to allocate for a memory component.
+ (Default = 256)
+ </description>
+ </property>
+
+ <property>
+ <name>storage.metadata.memorycomponent.numpages</name>
+ <value>64</value>
+ <description>The number of pages to allocate for a memory component.
+ (Default = 64)
+ </description>
+ </property>
+
+ <property>
+ <name>storage.memorycomponent.numcomponents</name>
+ <value>2</value>
+ <description>The number of memory components to be used per lsm index.
+ (Default = 2)
+ </description>
+ </property>
+
+ <property>
+ <name>storage.memorycomponent.globalbudget</name>
+ <value>536870912</value>
+ <description>The total size of memory in bytes that the sum of all
+ open memory
+ components cannot exceed. (Default = "536870192" // 512MB)
+ </description>
+ </property>
+
+ <property>
+ <name>storage.lsm.bloomfilter.falsepositiverate</name>
+ <value>0.01</value>
+ <description>The maximum acceptable false positive rate for bloom
+ filters associated with LSM indexes. (Default = "0.01" // 1%)
+ </description>
+ </property>
+
+ <property>
+ <name>txn.log.buffer.numpages</name>
+ <value>8</value>
+ <description>The number of in-memory log buffer pages. (Default = "8")
+ </description>
+ </property>
+
+ <property>
+ <name>txn.log.buffer.pagesize</name>
+ <value>524288</value>
+ <description>The size of pages in the in-memory log buffer. (Default =
+ "524288" // 512KB)
+ </description>
+ </property>
+
+ <property>
+ <name>txn.log.partitionsize</name>
+ <value>2147483648</value>
+ <description>The maximum size of a log file partition allowed before
+ rotating the log to the next partition. (Default = "2147483648" //
+ 2GB)
+ </description>
+ </property>
+
+ <property>
+ <name>txn.log.checkpoint.lsnthreshold</name>
+ <value>67108864</value>
+ <description>The size of the window that the maximum LSN is allowed to
+ be ahead of the checkpoint LSN by. (Default = ""67108864" // 64M)
+ </description>
+ </property>
+
+ <property>
+ <name>txn.log.checkpoint.pollfrequency</name>
+ <value>120</value>
+ <description>The time in seconds between that the checkpoint thread
+ waits between polls. (Default = "120" // 120s)
+ </description>
+ </property>
+
+ <property>
+ <name>txn.log.checkpoint.history</name>
+ <value>0</value>
+ <description>The number of old log partition files to keep before
+ discarding. (Default = "0")
+ </description>
+ </property>
+
+ <property>
+ <name>txn.lock.escalationthreshold</name>
+ <value>1000</value>
+ <description>The number of entity level locks that need to be acquired
+ before the locks are coalesced and escalated into a dataset level
+ lock. (Default = "1000")
+ </description>
+ </property>
+
+ <property>
+ <name>txn.lock.shrinktimer</name>
+ <value>5000</value>
+ <description>The time in milliseconds to wait before deallocating
+ unused lock manager memory. (Default = "5000" // 5s)
+ </description>
+ </property>
+
+ <property>
+ <name>txn.lock.timeout.waitthreshold</name>
+ <value>60000</value>
+ <description>The time in milliseconds to wait before labeling a
+ transaction which has been waiting for a lock timed-out. (Default =
+ "60000" // 60s)
+ </description>
+ </property>
+
+ <property>
+ <name>txn.lock.timeout.sweepthreshold</name>
+ <value>10000</value>
+ <description>The time in milliseconds the timeout thread waits between
+ sweeps to detect timed-out transactions. (Default = "10000" // 10s)
+ </description>
+ </property>
+
+ <property>
+ <name>compiler.sortmemory</name>
+ <value>33554432</value>
+ <description>The amount of memory in bytes given to sort operations.
+ (Default = "33554432" // 32mb)
+ </description>
+ </property>
+
+ <property>
+ <name>compiler.joinmemory</name>
+ <value>33554432</value>
+ <description>The amount of memory in bytes given to join operations.
+ (Default = "33554432" // 32mb)
+ </description>
+ </property>
+
+ <property>
+ <name>compiler.framesize</name>
+ <value>131072</value>
+ <description>The Hyracks frame size that the compiler configures per
+ job. (Default = "131072" // 128KB)
+ </description>
+ </property>
+
+ <property>
+ <name>web.port</name>
+ <value>19001</value>
+ <description>The port for the ASTERIX web interface. (Default = 19001)
+ </description>
+ </property>
+
+ <property>
+ <name>api.port</name>
+ <value>19002</value>
+ <description>The port for the ASTERIX API server. (Default = 19002)
+ </description>
+ </property>
+
+ <property>
+ <name>log.level</name>
+ <value>INFO</value>
+ <description>The minimum log level to be displayed. (Default = INFO)
+ </description>
+ </property>
+
+ <property>
+ <name>plot.activate</name>
+ <value>false</value>
+ <description>Enabling plot of Algebricks plan to tmp folder. (Default = false)
+ </description>
+ </property>
+
+</asterixConfiguration>
diff --git a/asterix-yarn/src/main/resources/configs/asterix-client-log4j.properties b/asterix-yarn/src/main/resources/configs/asterix-client-log4j.properties
new file mode 100644
index 0000000..4b936b5
--- /dev/null
+++ b/asterix-yarn/src/main/resources/configs/asterix-client-log4j.properties
@@ -0,0 +1,23 @@
+#/*
+# Copyright 2009-2013 by The Regents of the University of California
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# you may obtain a copy of the License from
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#*/
+log4j.rootLogger=info, A1
+
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+# Print the date in ISO 8601 format
+log4j.appender.A1.layout.ConversionPattern=%-p: %m%n
+
+log4j.logger.edu.uci.ics.asterix.event.management=info
+log4j.logger.org.apache.zookeeper=info
diff --git a/asterix-yarn/src/main/resources/configs/base-asterix-configuration.xml b/asterix-yarn/src/main/resources/configs/base-asterix-configuration.xml
new file mode 100644
index 0000000..76c00db
--- /dev/null
+++ b/asterix-yarn/src/main/resources/configs/base-asterix-configuration.xml
@@ -0,0 +1,246 @@
+<!--
+ ! Copyright 2009-2013 by The Regents of the University of California
+ ! Licensed under the Apache License, Version 2.0 (the "License");
+ ! you may not use this file except in compliance with the License.
+ ! you may obtain a copy of the License from
+ !
+ ! http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing, software
+ ! distributed under the License is distributed on an "AS IS" BASIS,
+ ! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ! See the License for the specific language governing permissions and
+ ! limitations under the License.
+ !-->
+<asterixConfiguration xmlns="asterixconf">
+
+ <property>
+ <name>nc.java.opts</name>
+ <value>-Xmx1536m</value>
+ <description>JVM parameters for each Node Contoller (NC)</description>
+ </property>
+
+ <property>
+ <name>cc.java.opts</name>
+ <value>-Xmx1024m</value>
+ <description>JVM parameters for each Cluster Contoller (CC)
+ </description>
+ </property>
+
+ <property>
+ <name>max.wait.active.cluster</name>
+ <value>60</value>
+ <description>Maximum wait (in seconds) for a cluster to be ACTIVE (all nodes are available)
+ before a submitted query/statement can be executed. (Default = 60 seconds)
+ </description>
+ </property>
+
+ <property>
+ <name>storage.buffercache.pagesize</name>
+ <value>131072</value>
+ <description>The page size in bytes for pages in the buffer cache.
+ (Default = "131072" // 128KB)
+ </description>
+ </property>
+
+ <property>
+ <name>storage.buffercache.size</name>
+ <value>536870912</value>
+ <description>The size of memory allocated to the disk buffer cache.
+ The value should be a multiple of the buffer cache page size(Default
+ = "536870912" // 512MB)
+ </description>
+ </property>
+
+ <property>
+ <name>storage.buffercache.maxopenfiles</name>
+ <value>214748364</value>
+ <description>The maximum number of open files in the buffer cache.
+ (Default = "214748364")
+ </description>
+ </property>
+
+ <property>
+ <name>storage.memorycomponent.pagesize</name>
+ <value>131072</value>
+ <description>The page size in bytes for pages allocated to memory
+ components. (Default = "131072" // 128KB)
+ </description>
+ </property>
+
+ <property>
+ <name>storage.memorycomponent.numpages</name>
+ <value>256</value>
+ <description>The number of pages to allocate for a memory component.
+ (Default = 256)
+ </description>
+ </property>
+
+ <property>
+ <name>storage.metadata.memorycomponent.numpages</name>
+ <value>64</value>
+ <description>The number of pages to allocate for a memory component.
+ (Default = 64)
+ </description>
+ </property>
+
+ <property>
+ <name>storage.memorycomponent.numcomponents</name>
+ <value>2</value>
+ <description>The number of memory components to be used per lsm index.
+ (Default = 2)
+ </description>
+ </property>
+
+ <property>
+ <name>storage.memorycomponent.globalbudget</name>
+ <value>536870912</value>
+ <description>The total size of memory in bytes that the sum of all
+ open memory
+ components cannot exceed. (Default = "536870192" // 512MB)
+ </description>
+ </property>
+
+ <property>
+ <name>storage.lsm.bloomfilter.falsepositiverate</name>
+ <value>0.01</value>
+ <description>The maximum acceptable false positive rate for bloom
+ filters associated with LSM indexes. (Default = "0.01" // 1%)
+ </description>
+ </property>
+
+ <property>
+ <name>txn.log.buffer.numpages</name>
+ <value>8</value>
+ <description>The number of in-memory log buffer pages. (Default = "8")
+ </description>
+ </property>
+
+ <property>
+ <name>txn.log.buffer.pagesize</name>
+ <value>524288</value>
+ <description>The size of pages in the in-memory log buffer. (Default =
+ "524288" // 512KB)
+ </description>
+ </property>
+
+ <property>
+ <name>txn.log.partitionsize</name>
+ <value>2147483648</value>
+ <description>The maximum size of a log file partition allowed before
+ rotating the log to the next partition. (Default = "2147483648" //
+ 2GB)
+ </description>
+ </property>
+
+ <property>
+ <name>txn.log.checkpoint.lsnthreshold</name>
+ <value>67108864</value>
+ <description>The size of the window that the maximum LSN is allowed to
+ be ahead of the checkpoint LSN by. (Default = ""67108864" // 64M)
+ </description>
+ </property>
+
+ <property>
+ <name>txn.log.checkpoint.pollfrequency</name>
+ <value>120</value>
+ <description>The time in seconds between that the checkpoint thread
+ waits between polls. (Default = "120" // 120s)
+ </description>
+ </property>
+
+ <property>
+ <name>txn.log.checkpoint.history</name>
+ <value>0</value>
+ <description>The number of old log partition files to keep before
+ discarding. (Default = "0")
+ </description>
+ </property>
+
+ <property>
+ <name>txn.lock.escalationthreshold</name>
+ <value>1000</value>
+ <description>The number of entity level locks that need to be acquired
+ before the locks are coalesced and escalated into a dataset level
+ lock. (Default = "1000")
+ </description>
+ </property>
+
+ <property>
+ <name>txn.lock.shrinktimer</name>
+ <value>5000</value>
+ <description>The time in milliseconds to wait before deallocating
+ unused lock manager memory. (Default = "5000" // 5s)
+ </description>
+ </property>
+
+ <property>
+ <name>txn.lock.timeout.waitthreshold</name>
+ <value>60000</value>
+ <description>The time in milliseconds to wait before labeling a
+ transaction which has been waiting for a lock timed-out. (Default =
+ "60000" // 60s)
+ </description>
+ </property>
+
+ <property>
+ <name>txn.lock.timeout.sweepthreshold</name>
+ <value>10000</value>
+ <description>The time in milliseconds the timeout thread waits between
+ sweeps to detect timed-out transactions. (Default = "10000" // 10s)
+ </description>
+ </property>
+
+ <property>
+ <name>compiler.sortmemory</name>
+ <value>33554432</value>
+ <description>The amount of memory in bytes given to sort operations.
+ (Default = "33554432" // 32mb)
+ </description>
+ </property>
+
+ <property>
+ <name>compiler.joinmemory</name>
+ <value>33554432</value>
+ <description>The amount of memory in bytes given to join operations.
+ (Default = "33554432" // 32mb)
+ </description>
+ </property>
+
+ <property>
+ <name>compiler.framesize</name>
+ <value>131072</value>
+ <description>The Hyracks frame size that the compiler configures per
+ job. (Default = "131072" // 128KB)
+ </description>
+ </property>
+
+ <property>
+ <name>web.port</name>
+ <value>19001</value>
+ <description>The port for the ASTERIX web interface. (Default = 19001)
+ </description>
+ </property>
+
+ <property>
+ <name>api.port</name>
+ <value>19002</value>
+ <description>The port for the ASTERIX API server. (Default = 19002)
+ </description>
+ </property>
+
+ <property>
+ <name>log.level</name>
+ <value>INFO</value>
+ <description>The minimum log level to be displayed. (Default = INFO)
+ </description>
+ </property>
+
+ <property>
+ <name>plot.activate</name>
+ <value>false</value>
+ <description>Enabling plot of Algebricks plan to tmp folder. (Default = false)
+ </description>
+ </property>
+
+</asterixConfiguration>
diff --git a/asterix-yarn/src/main/resources/configs/local.xml b/asterix-yarn/src/main/resources/configs/local.xml
new file mode 100644
index 0000000..582254c
--- /dev/null
+++ b/asterix-yarn/src/main/resources/configs/local.xml
@@ -0,0 +1,31 @@
+ <cluster xmlns="yarn_cluster">
+
+ <!-- Name of the cluster -->
+ <name>local</name>
+
+ <log_dir>/tmp/</log_dir>
+ <txn_log_dir>/tmp/</txn_log_dir>
+
+ <!-- Mount point of an iodevice. Use a comma separated list for a machine that
+ has multiple iodevices (disks).
+ This property can be overriden for a node by redefining at the node level. -->
+ <iodevices>/tmp</iodevices>
+
+ <!-- Path on each iodevice where Asterix will store its data -->
+ <store>storage</store>
+
+ <!-- IP addresses of the master machine A -->
+ <master_node>
+ <id>cc</id>
+ <client_ip>localhost</client_ip>
+ <cluster_ip>localhost</cluster_ip>
+ <client_port>1098</client_port>
+ <cluster_port>1099</cluster_port>
+ <http_port>8888</http_port>
+ </master_node>
+ <node>
+ <id>nc1</id>
+ <cluster_ip>127.0.0.1</cluster_ip>
+ </node>
+ <metadata_node>nc1</metadata_node>
+</cluster>
diff --git a/asterix-yarn/src/main/resources/configs/my_awesome_cluster_desc.xml b/asterix-yarn/src/main/resources/configs/my_awesome_cluster_desc.xml
new file mode 100644
index 0000000..749bc6f
--- /dev/null
+++ b/asterix-yarn/src/main/resources/configs/my_awesome_cluster_desc.xml
@@ -0,0 +1,23 @@
+ <cluster xmlns="yarn_cluster">
+ <name>my_awesome_instance</name>
+ <txn_log_dir>/home/yarn/</txn_log_dir>
+ <iodevices>/home/yarn/</iodevices>
+ <store>asterix-data</store>
+ <master_node>
+ <id>cc</id>
+ <client_ip>10.10.0.2</client_ip>
+ <cluster_ip>10.10.0.2</cluster_ip>
+ <client_port>1098</client_port>
+ <cluster_port>1099</cluster_port>
+ <http_port>8888</http_port>
+ </master_node>
+ <node>
+ <id>nc1</id>
+ <cluster_ip>10.10.0.3</cluster_ip>
+ </node>
+ <node>
+ <id>nc2</id>
+ <cluster_ip>10.10.0.4</cluster_ip>
+ </node>
+ <metadata_node>nc1</metadata_node>
+ </cluster>
diff --git a/asterix-yarn/src/main/resources/scripts/asterix b/asterix-yarn/src/main/resources/scripts/asterix
new file mode 100644
index 0000000..c2fc20f
--- /dev/null
+++ b/asterix-yarn/src/main/resources/scripts/asterix
@@ -0,0 +1,25 @@
+#!/usr/bin/env bash
+if [ -z $ASTERIX_HOME ]
+ then
+ pushd $(dirname $0) >/dev/null
+ cd ..
+ export ASTERIX_HOME=$(pwd)
+ popd >/dev/null
+fi
+
+for jar in `ls $ASTERIX_HOME/lib/*.jar`
+ do
+ if [ -z $ASTERIX_CLASSPATH ]
+ then
+ ASTERIX_CLASSPATH=$jar
+ else
+ ASTERIX_CLASSPATH=$ASTERIX_CLASSPATH:$jar
+ fi
+done
+
+ASTERIX_CLASSPATH=$ASTERIX_CLASSPATH:
+ASTERIX_CLASSPATH=$ASTERIX_CLASSPATH:$YARN_CONF_DIR:$HADOOP_CONF_DIR:$HADOOP_CONF_PATH
+pushd $(dirname $0) > /dev/null
+cd $ASTERIX_HOME
+java $JAVA_OPTS -cp $ASTERIX_CLASSPATH -Dlog4j.configuration=file://$ASTERIX_HOME/conf/asterix-client-log4j.properties edu.uci.ics.asterix.aoya.AsterixYARNClient $@
+popd > /dev/null
diff --git a/asterix-yarn/src/main/resources/scripts/asterix.cmd b/asterix-yarn/src/main/resources/scripts/asterix.cmd
new file mode 100644
index 0000000..a85e411
--- /dev/null
+++ b/asterix-yarn/src/main/resources/scripts/asterix.cmd
@@ -0,0 +1,103 @@
+@echo off
+@rem Licensed to the Apache Software Foundation (ASF) under one or more
+@rem contributor license agreements. See the NOTICE file distributed with
+@rem this work for additional information regarding copyright ownership.
+@rem The ASF licenses this file to You under the Apache License, Version 2.0
+@rem (the "License"); you may not use this file except in compliance with
+@rem the License. You may obtain a copy of the License at
+@rem
+@rem http://www.apache.org/licenses/LICENSE-2.0
+@rem
+@rem Unless required by applicable law or agreed to in writing, software
+@rem distributed under the License is distributed on an "AS IS" BASIS,
+@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+@rem See the License for the specific language governing permissions and
+@rem limitations under the License.
+
+setlocal enabledelayedexpansion
+
+if not defined HADOOP_BIN_PATH (
+ set HADOOP_BIN_PATH=%~dp0
+)
+
+if "%HADOOP_BIN_PATH:~-1%" == "\" (
+ set HADOOP_BIN_PATH=%HADOOP_BIN_PATH:~0,-1%
+)
+
+set DEFAULT_LIBEXEC_DIR=%HADOOP_BIN_PATH%\..\libexec
+if not defined HADOOP_LIBEXEC_DIR (
+ set HADOOP_LIBEXEC_DIR=%DEFAULT_LIBEXEC_DIR%
+)
+
+:main
+
+ set CLASS=edu.uci.ics.asterix.aoya.AsterixYARNClient
+
+ @rem JAVA and JAVA_HEAP_MAX and set in hadoop-config.cmd
+
+ if defined YARN_HEAPSIZE (
+ @rem echo run with Java heapsize %YARN_HEAPSIZE%
+ set JAVA_HEAP_MAX=-Xmx%YARN_HEAPSIZE%m
+ )
+
+ @rem CLASSPATH initially contains HADOOP_CONF_DIR & YARN_CONF_DIR
+ if not defined HADOOP_CONF_DIR (
+ echo No HADOOP_CONF_DIR set.
+ echo Please specify it either in yarn-env.cmd or in the environment.
+ goto :eof
+ )
+
+ set CLASSPATH=%HADOOP_CONF_DIR%;%YARN_CONF_DIR%;%CLASSPATH%;Z:\lib\*
+
+ @rem for developers, add Hadoop classes to CLASSPATH
+ if exist %HADOOP_YARN_HOME%\yarn-api\target\classes (
+ set CLASSPATH=%CLASSPATH%;%HADOOP_YARN_HOME%\yarn-api\target\classes
+ )
+
+ if exist %HADOOP_YARN_HOME%\yarn-common\target\classes (
+ set CLASSPATH=%CLASSPATH%;%HADOOP_YARN_HOME%\yarn-common\target\classes
+ )
+
+ if exist %HADOOP_YARN_HOME%\yarn-mapreduce\target\classes (
+ set CLASSPATH=%CLASSPATH%;%HADOOP_YARN_HOME%\yarn-mapreduce\target\classes
+ )
+
+ if exist %HADOOP_YARN_HOME%\yarn-master-worker\target\classes (
+ set CLASSPATH=%CLASSPATH%;%HADOOP_YARN_HOME%\yarn-master-worker\target\classes
+ )
+
+ if exist %HADOOP_YARN_HOME%\yarn-server\yarn-server-nodemanager\target\classes (
+ set CLASSPATH=%CLASSPATH%;%HADOOP_YARN_HOME%\yarn-server\yarn-server-nodemanager\target\classes
+ )
+
+ if exist %HADOOP_YARN_HOME%\yarn-server\yarn-server-common\target\classes (
+ set CLASSPATH=%CLASSPATH%;%HADOOP_YARN_HOME%\yarn-server\yarn-server-common\target\classes
+ )
+
+ if exist %HADOOP_YARN_HOME%\yarn-server\yarn-server-resourcemanager\target\classes (
+ set CLASSPATH=%CLASSPATH%;%HADOOP_YARN_HOME%\yarn-server\yarn-server-resourcemanager\target\classes
+ )
+
+ if exist %HADOOP_YARN_HOME%\build\test\classes (
+ set CLASSPATH=%CLASSPATH%;%HADOOP_YARN_HOME%\build\test\classes
+ )
+
+ if exist %HADOOP_YARN_HOME%\build\tools (
+ set CLASSPATH=%CLASSPATH%;%HADOOP_YARN_HOME%\build\tools
+ )
+
+ set CLASSPATH=%CLASSPATH%;%HADOOP_YARN_HOME%\%YARN_DIR%\*
+ set CLASSPATH=%CLASSPATH%;%HADOOP_YARN_HOME%\%YARN_LIB_JARS_DIR%\*
+
+
+
+ if defined JAVA_LIBRARY_PATH (
+ set YARN_OPTS=%YARN_OPTS% -Djava.library.path=%JAVA_LIBRARY_PATH%
+ )
+
+ set java_arguments=%JAVA_HEAP_MAX% %YARN_OPTS% -classpath %CLASSPATH% %CLASS%
+ call java %java_arguments% %*
+
+goto :eof
+
+endlocal