YARN integration for AsterixDB

This is an initial version of YARN integration for AsterixDB.
- Uses static assignment of CC and NC nodes to NM locations
- Stores state locally on each NM, outside of HDFS
- "All or nothing" container allocation. We don't attempt to
  move or rellocate containers the RM may kill (yet).
- Retains feature parity with managix.

Change-Id: I49c849179d17fc7faa446b9be57a0695df6836ab
Reviewed-on: https://asterix-gerrit.ics.uci.edu/161
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <hubailmor@gmail.com>
diff --git a/asterix-yarn/src/main/assembly/binary-assembly.xml b/asterix-yarn/src/main/assembly/binary-assembly.xml
new file mode 100644
index 0000000..8457c66
--- /dev/null
+++ b/asterix-yarn/src/main/assembly/binary-assembly.xml
@@ -0,0 +1,101 @@
+<!--
+ ! Copyright 2009-2013 by The Regents of the University of California
+ ! Licensed under the Apache License, Version 2.0 (the "License");
+ ! you may not use this file except in compliance with the License.
+ ! you may obtain a copy of the License from
+ ! 
+ !     http://www.apache.org/licenses/LICENSE-2.0
+ ! 
+ ! Unless required by applicable law or agreed to in writing, software
+ ! distributed under the License is distributed on an "AS IS" BASIS,
+ ! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ! See the License for the specific language governing permissions and
+ ! limitations under the License.
+ !-->
+<assembly>
+	<id>binary-assembly</id>
+	<formats>
+        <format>dir</format>
+        <format>zip</format>
+	</formats>
+	<includeBaseDirectory>false</includeBaseDirectory>
+	<fileSets>
+        <fileSet>
+            <directory>src/main/resources/scripts</directory>
+            <fileMode>0755</fileMode>
+            <includes>
+                <include>asterix</include>
+            </includes>
+            <outputDirectory>bin</outputDirectory>
+        </fileSet>
+        <fileSet>
+            <directory>src/main/resources/configs</directory>
+            <fileMode>0755</fileMode>
+            <includes>
+                <include>local.xml</include>
+            </includes>
+            <outputDirectory>configs</outputDirectory>
+        </fileSet>
+        <fileSet>
+            <directory>src/main/resources/</directory>
+            <fileMode>0755</fileMode>
+            <includes>
+                <include>base-asterix-configuration.xml</include>
+            </includes>
+            <outputDirectory>conf</outputDirectory>
+        </fileSet>
+        <fileSet>
+            <directory>src/main/resources/configs</directory>
+            <fileMode>0755</fileMode>
+            <includes>
+                <include>my_awesome_cluster_desc.xml</include>
+            </includes>
+            <outputDirectory>configs</outputDirectory>
+        </fileSet>
+        <fileSet>
+            <directory>src/main/resources/configs</directory>
+            <fileMode>0755</fileMode>
+            <includes>
+                <include>asterix-client-log4j.properties</include>
+            </includes>
+            <outputDirectory>conf</outputDirectory>
+        </fileSet>
+	</fileSets>
+    <dependencySets>
+        <dependencySet>
+            <includes>
+                <include>edu.uci.ics.asterix:asterix-yarn</include>
+                <include>edu.uci.ics.asterix:asterix-common</include>
+                <include>log4j:log4j</include>
+                <include>org.slf4j:slf4j-api</include>
+                <include>org.slf4j:slf4j-simple</include>
+                <include>commons-io:commons-io</include>
+                <include>commons-cli:commons-cli</include>
+                <include>commons-configuration:commons-configuration</include>
+                <include>commons-logging:commons-logging</include>
+                <include>commons-codec:commons-codec</include>
+                <include>commons-lang:commons-lang</include>
+                <include>org.apache.hadoop:hadoop-common</include>
+                <include>org.apache.hadoop:hadoop-hdfs</include>
+                <include>org.apache.hadoop:hadoop-auth</include>
+                <include>org.apache.hadoop:hadoop-yarn-client</include>
+                <include>org.apache.hadoop:hadoop-yarn-common</include>
+                <include>org.apache.hadoop:hadoop-yarn-api</include>
+                <include>org.apache.httpcomponents:httpcore</include>
+                <include>org.apache.httpcomponents:httpclient</include>
+                <include>commons-httpclient:commons-httpclient</include>
+                <include>com.google.guava:guava</include>
+                <include>com.google.protobuf:protobuf-java</include>
+            </includes>
+            <outputDirectory>lib</outputDirectory>
+        </dependencySet>
+        <dependencySet>
+            <outputDirectory>asterix</outputDirectory>
+            <includes>
+                    <include>asterix-server*</include>
+            </includes>
+            <unpack>false</unpack>
+            <useTransitiveDependencies>false</useTransitiveDependencies>
+        </dependencySet>
+    </dependencySets>
+</assembly>
diff --git a/asterix-yarn/src/main/assembly/hyracks-assembly.xml b/asterix-yarn/src/main/assembly/hyracks-assembly.xml
new file mode 100644
index 0000000..ae362ca
--- /dev/null
+++ b/asterix-yarn/src/main/assembly/hyracks-assembly.xml
@@ -0,0 +1,37 @@
+<!--
+ ! Copyright 2009-2013 by The Regents of the University of California
+ ! Licensed under the Apache License, Version 2.0 (the "License");
+ ! you may not use this file except in compliance with the License.
+ ! you may obtain a copy of the License from
+ ! 
+ !     http://www.apache.org/licenses/LICENSE-2.0
+ ! 
+ ! Unless required by applicable law or agreed to in writing, software
+ ! distributed under the License is distributed on an "AS IS" BASIS,
+ ! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ! See the License for the specific language governing permissions and
+ ! limitations under the License.
+ !-->
+<assembly>
+  <id>binary-assembly</id>
+  <formats>
+    <format>zip</format>
+    <format>dir</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <fileSets>
+    <fileSet>
+      <directory>target/appassembler/bin</directory>
+      <outputDirectory>bin</outputDirectory>
+      <fileMode>0755</fileMode>
+    </fileSet>
+    <fileSet>
+      <directory>target/appassembler/lib</directory>
+      <outputDirectory>lib</outputDirectory>
+    </fileSet>
+    <fileSet>
+      <directory>docs</directory>
+      <outputDirectory>docs</outputDirectory>
+    </fileSet>
+  </fileSets>
+</assembly>
diff --git a/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/AConstants.java b/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/AConstants.java
new file mode 100644
index 0000000..f8548db
--- /dev/null
+++ b/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/AConstants.java
@@ -0,0 +1,102 @@
+package edu.uci.ics.asterix.aoya;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Constants used in both Client and Application Master
+ */
+@InterfaceAudience.LimitedPrivate(value = { "For simplicity between Client and AM." })
+@InterfaceStability.Unstable
+public class AConstants {
+    /**
+     * Environment key name pointing to the the app master jar location
+     */
+    public static final String APPLICATIONMASTERJARLOCATION = "APPLICATIONMASTERJARLOCATION";
+
+    /**
+     * Environment key name denoting the file timestamp for the the app master jar.
+     * Used to validate the local resource.
+     */
+    public static final String APPLICATIONMASTERJARTIMESTAMP = "APPLICATIONMASTERJARTIMESTAMP";
+
+    /**
+     * Environment key name denoting the file content length for the app master jar.
+     * Used to validate the local resource.
+     */
+    public static final String APPLICATIONMASTERJARLEN = "APPLICATIONMASTERJARLEN";
+    /**
+     * Environment key name pointing to the Asterix distributable tar
+     */
+    public static final String TARLOCATION = "TARLOCATION";
+
+    /**
+     * Environment key name denoting the file timestamp for the Asterix tar.
+     * Used to validate the local resource.
+     */
+    public static final String TARTIMESTAMP = "TARTIMESTAMP";
+
+    /**
+     * Environment key name denoting the file content length for the Asterix tar.
+     * Used to validate the local resource.
+     */
+    public static final String TARLEN = "TARLEN";
+
+    /**
+     * Environment key name pointing to the Asterix cluster configuration file
+     */
+    public static final String CONFLOCATION = "CONFLOCATION";
+
+    /**
+     * Environment key name denoting the file timestamp for the Asterix config.
+     * Used to validate the local resource.
+     */
+
+    public static final String CONFTIMESTAMP = "CONFTIMESTAMP";
+
+    /**
+     * Environment key name denoting the file content length for the Asterix config.
+     * Used to validate the local resource.
+     */
+
+    public static final String CONFLEN = "CONFLEN";
+
+    /**
+     * Environment key name pointing to the Asterix parameters file
+     */
+
+    public static final String PARAMLOCATION = "PARAMLOCATION";
+
+    /**
+     * Environment key name denoting the file timestamp for the Asterix parameters.
+     * Used to validate the local resource.
+     */
+
+    public static final String PARAMTIMESTAMP = "PARAMTIMESTAMP";
+
+    /**
+     * Environment key name denoting the file content length for the Asterix parameters.
+     * Used to validate the local resource.
+     */
+
+    public static final String PARAMLEN = "PARAMLEN";
+
+    public static final String PATHSUFFIX = "PATHSUFFIX";
+
+    public static final String INSTANCESTORE = "INSTANCESTORE";
+
+    public static final String RMADDRESS = "RMADDRESS";
+
+    public static final String RMSCHEDULERADDRESS = "RMSCHEDULERADDRESS";
+
+    public static final String DFS_BASE = "DFSBASE";
+
+    public static final String NC_JAVA_OPTS = "NCJAVAOPTS";
+
+    public static final String CC_JAVA_OPTS = "CCJAVAOPTS";
+
+    public static final String NC_CONTAINER_MEM = "NC_CONTAINER_MEM";
+
+    public static final String CC_CONTAINER_MEM = "CC_CONTAINER_MEM";
+
+}
diff --git a/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/AsterixApplicationMaster.java b/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/AsterixApplicationMaster.java
new file mode 100644
index 0000000..1539a34
--- /dev/null
+++ b/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/AsterixApplicationMaster.java
@@ -0,0 +1,1288 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.aoya;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.net.InetAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+
+import edu.uci.ics.asterix.common.config.GlobalConfig;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.event.schema.yarnCluster.Cluster;
+import edu.uci.ics.asterix.event.schema.yarnCluster.MasterNode;
+import edu.uci.ics.asterix.event.schema.yarnCluster.Node;
+import org.apache.log4j.ConsoleAppender;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PatternLayout;
+
+public class AsterixApplicationMaster {
+
+    static
+    {
+        Logger rootLogger = Logger.getRootLogger();
+        rootLogger.setLevel(Level.INFO);
+        rootLogger.addAppender(new ConsoleAppender(
+                new PatternLayout("%-6r [%p] %c - %m%n")));
+    }
+    private static final Log LOG = LogFactory.getLog(AsterixApplicationMaster.class);
+    private static final String CLUSTER_DESC_PATH = "cluster-config.xml";
+    private static final String ASTERIX_CONF_NAME = "asterix-configuration.xml";
+    private static final String ASTERIX_ZIP_NAME = "asterix-server.zip";
+
+    private static final int CC_MEMORY_MBS_DEFAULT = 1024;
+    private static final int NC_MEMORY_MBS_DEFAULT = 1536;
+    private static final String EXTERNAL_CC_JAVA_OPTS_DEFAULT = "-Xmx" + CC_MEMORY_MBS_DEFAULT + "m";
+    private static final String EXTERNAL_NC_JAVA_OPTS_DEFAULT = "-Xmx" + NC_MEMORY_MBS_DEFAULT + "m";
+    private static final String OBLITERATOR_CLASSNAME = "edu.uci.ics.asterix.aoya.Deleter";
+    private static final String HDFS_BACKUP_CLASSNAME = "edu.uci.ics.asterix.aoya.HDFSBackup";
+    private static final String NC_CLASSNAME = "edu.uci.ics.hyracks.control.nc.NCDriver";
+    private static final String CC_CLASSNAME = "edu.uci.ics.hyracks.control.cc.CCDriver";
+    private static final String JAVA_HOME = System.getProperty("java.home");
+    private boolean doneAllocating = false;
+
+    // Configuration
+    private Configuration conf;
+
+    // Handle to communicate with the Resource Manager
+    private AMRMClientAsync<ContainerRequest> resourceManager;
+
+    // Handle to communicate with the Node Manager
+    private NMClientAsync nmClientAsync;
+    // Listen to process the response from the Node Manager
+    private NMCallbackHandler containerListener;
+    // Application Attempt Id ( combination of attemptId and fail count )
+    private ApplicationAttemptId appAttemptID;
+
+    // TODO
+    // For status update for clients - yet to be implemented
+    // Hostname of the container
+    private String appMasterHostname = "";
+    // Port on which the app master listens for status updates from clients
+    private int appMasterRpcPort = new Random().nextInt(65535-49152);
+    // Tracking url to which app master publishes info for clients to monitor
+    private String appMasterTrackingUrl = "";
+
+    // Counter for completed containers ( complete denotes successful or failed )
+    private AtomicInteger numCompletedContainers = new AtomicInteger();
+    // Allocated container count so that we know how many containers has the RM
+    // allocated to us
+    private AtomicInteger numAllocatedContainers = new AtomicInteger();
+    // Count of failed containers
+    private AtomicInteger numFailedContainers = new AtomicInteger();
+    // Count of containers already requested from the RM
+    // Needed as once requested, we should not request for containers again.
+    // Only request for more if the original requirement changes.
+    private AtomicInteger numRequestedContainers = new AtomicInteger();
+    //Tells us whether the Cluster Controller is up so we can safely start some Node Controllers
+    private AtomicBoolean ccUp = new AtomicBoolean();
+    private AtomicBoolean ccStarted = new AtomicBoolean();
+    private Queue<Node> pendingNCs = new ArrayDeque<Node>();
+
+    //HDFS path to AsterixDB distributable zip
+    private String asterixZipPath = "";
+    // Timestamp needed for creating a local resource
+    private long asterixZipTimestamp = 0;
+    // File length needed for local resource
+    private long asterixZipLen = 0;
+
+    //HDFS path to AsterixDB cluster description
+    private String asterixConfPath = "";
+    // Timestamp needed for creating a local resource
+    private long asterixConfTimestamp = 0;
+    // File length needed for local resource
+    private long asterixConfLen = 0;
+
+    private String instanceConfPath = "";
+
+    //base dir under which all configs and binaries lie
+    private String dfsBasePath;
+
+    private int numTotalContainers = 0;
+
+    // Set the local resources
+    private Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+
+    private Cluster clusterDesc = null;
+    private MasterNode cC = null;
+    private String ccJavaOpts = null;
+    private int ccMem = 0;
+    private String ncJavaOpts = null;
+    private int ncMem = 0;
+    private volatile boolean done;
+    private volatile boolean success;
+
+    private boolean obliterate = false;
+    private Path appMasterJar = null;
+    private boolean backup = false;
+    long backupTimestamp;
+    String snapName;
+    private boolean restore = false;
+    private boolean initial = false;
+
+    // Launch threads
+    private List<Thread> launchThreads = new CopyOnWriteArrayList<Thread>();
+
+    public static void main(String[] args) {
+
+        boolean result = false;
+        try {
+
+            AsterixApplicationMaster appMaster = new AsterixApplicationMaster();
+            LOG.info("Initializing ApplicationMaster");
+            appMaster.setEnvs(appMaster.setArgs(args));
+            boolean doRun = appMaster.init();
+            if (!doRun) {
+                System.exit(0);
+            }
+            result = appMaster.run();
+        } catch (Exception e) {
+            LOG.fatal("Error running ApplicationMaster", e);
+            System.exit(1);
+        }
+        if (result) {
+            LOG.info("Application Master completed successfully. exiting");
+            System.exit(0);
+        } else {
+            LOG.info("Application Master failed. exiting");
+            System.exit(2);
+        }
+    }
+
+    private void dumpOutDebugInfo() {
+
+        LOG.info("Dump debug output");
+        Map<String, String> envs = System.getenv();
+        for (Map.Entry<String, String> env : envs.entrySet()) {
+            LOG.info("System env: key=" + env.getKey() + ", val=" + env.getValue());
+            System.out.println("System env: key=" + env.getKey() + ", val=" + env.getValue());
+        }
+
+        String cmd = "ls -alhLR";
+        Runtime run = Runtime.getRuntime();
+        Process pr = null;
+        try {
+            pr = run.exec(cmd);
+            pr.waitFor();
+
+            BufferedReader buf = new BufferedReader(new InputStreamReader(pr.getInputStream()));
+            String line = "";
+            while ((line = buf.readLine()) != null) {
+                LOG.info("System CWD content: " + line);
+                System.out.println("System CWD content: " + line);
+            }
+            buf.close();
+        } catch (IOException e) {
+            LOG.info(e);
+        } catch (InterruptedException e) {
+            LOG.info(e);
+        }
+    }
+
+    public AsterixApplicationMaster() {
+        // Set up the configuration and RPC
+        conf = new YarnConfiguration();
+
+    }
+
+    public CommandLine setArgs(String[] args) throws ParseException {
+        Options opts = new Options();
+        opts.addOption("app_attempt_id", true, "App Attempt ID. Not to be used unless for testing purposes");
+        opts.addOption("priority", true, "Application Priority. Default 0");
+        opts.addOption("debug", false, "Dump out debug information");
+        opts.addOption("help", false, "Print usage");
+        opts.addOption("initial", false, "Initialize existing Asterix instance.");
+        opts.addOption("obliterate", false, "Delete asterix instance completely.");
+        opts.addOption("backup", false, "Back up AsterixDB instance");
+        opts.addOption("restore", true, "Restore an AsterixDB instance");
+
+        CommandLine cliParser = new GnuParser().parse(opts, args);
+
+        if (cliParser.hasOption("help")) {
+            printUsage(opts);
+        }
+
+        if (cliParser.hasOption("debug")) {
+            dumpOutDebugInfo();
+        }
+
+        if (cliParser.hasOption("obliterate")) {
+            obliterate = true;
+        }
+        if(cliParser.hasOption("initial")){
+            initial = true;
+        }
+
+        if (cliParser.hasOption("backup")) {
+            backup = true;
+            backupTimestamp = System.currentTimeMillis();
+        }
+        if (cliParser.hasOption("restore")) {
+            restore = true;
+            snapName = cliParser.getOptionValue("restore");
+            LOG.info(snapName);
+        }
+        return cliParser;
+    }
+
+    public void setEnvs(CommandLine cliParser) {
+        Map<String, String> envs = System.getenv();
+        if (envs.containsKey("HADOOP_CONF_DIR")) {
+            File hadoopConfDir = new File(envs.get("HADOOP_CONF_DIR"));
+            if (hadoopConfDir.isDirectory()) {
+                for (File config : hadoopConfDir.listFiles()) {
+                    if (config.getName().matches("^.*(xml)$")) {
+                        conf.addResource(new Path(config.getAbsolutePath()));
+                    }
+                }
+            }
+        }
+        //the containerID might be in the arguments or the environment
+        if (!envs.containsKey(Environment.CONTAINER_ID.name())) {
+            if (cliParser.hasOption("app_attempt_id")) {
+                String appIdStr = cliParser.getOptionValue("app_attempt_id", "");
+                appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr);
+            } else {
+
+                throw new IllegalArgumentException(
+                        "Environment is not set correctly- please check client submission settings");
+            }
+        } else {
+            ContainerId containerId = ConverterUtils.toContainerId(envs.get(Environment.CONTAINER_ID.name()));
+            appAttemptID = containerId.getApplicationAttemptId();
+        }
+
+        if (!envs.containsKey(ApplicationConstants.APP_SUBMIT_TIME_ENV)
+                || !envs.containsKey(Environment.NM_HOST.name()) || !envs.containsKey(Environment.NM_HTTP_PORT.name())
+                || !envs.containsKey(Environment.NM_PORT.name())) {
+            throw new IllegalArgumentException(
+                    "Environment is not set correctly- please check client submission settings");
+        }
+        System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, envs.get("PWD") + File.separator + "bin" + File.separator
+                + ASTERIX_CONF_NAME);
+
+        LOG.info("Application master for app" + ", appId=" + appAttemptID.getApplicationId().getId()
+                + ", clustertimestamp=" + appAttemptID.getApplicationId().getClusterTimestamp() + ", attemptId="
+                + appAttemptID.getAttemptId());
+
+        asterixZipPath = envs.get(AConstants.TARLOCATION);
+        asterixZipTimestamp = Long.parseLong(envs.get(AConstants.TARTIMESTAMP));
+        asterixZipLen = Long.parseLong(envs.get(AConstants.TARLEN));
+
+        asterixConfPath = envs.get(AConstants.CONFLOCATION);
+        asterixConfTimestamp = Long.parseLong(envs.get(AConstants.CONFTIMESTAMP));
+        asterixConfLen = Long.parseLong(envs.get(AConstants.CONFLEN));
+
+        instanceConfPath = envs.get(AConstants.INSTANCESTORE);
+        //the only time this is null is during testing, when asterix-yarn isn't packaged in a JAR yet.
+        if(envs.get(AConstants.APPLICATIONMASTERJARLOCATION) != null
+                && !envs.get(AConstants.APPLICATIONMASTERJARLOCATION).endsWith(File.separator)){
+            appMasterJar = new Path(envs.get(AConstants.APPLICATIONMASTERJARLOCATION));
+        }
+        else{
+            appMasterJar = null;
+        }
+
+        dfsBasePath = envs.get(AConstants.DFS_BASE);
+        //If the NM has an odd environment where the proper hadoop XML configs dont get imported, we can end up not being able to talk to the RM
+        // this solves that!
+        //in a testing environment these can be null however. 
+        if (envs.get(AConstants.RMADDRESS) != null) {
+            conf.set("yarn.resourcemanager.address", envs.get(AConstants.RMADDRESS));
+            LOG.info("RM Address: " + envs.get(AConstants.RMADDRESS));
+        }
+        if (envs.get(AConstants.RMADDRESS) != null) {
+            conf.set("yarn.resourcemanager.scheduler.address", envs.get(AConstants.RMSCHEDULERADDRESS));
+        }
+        ccJavaOpts = envs.get(AConstants.CC_JAVA_OPTS);
+        //set defaults if no special given options
+        if (ccJavaOpts == null) {
+            ccJavaOpts = EXTERNAL_CC_JAVA_OPTS_DEFAULT;
+        }
+        ncJavaOpts = envs.get(AConstants.NC_JAVA_OPTS);
+        if (ncJavaOpts == null) {
+            ncJavaOpts = EXTERNAL_NC_JAVA_OPTS_DEFAULT;
+        }
+
+        LOG.info("Path suffix: " + instanceConfPath);
+    }
+
+    public boolean init() throws ParseException, IOException, AsterixException, YarnException {
+        try {
+            localizeDFSResources();
+            clusterDesc = Utils.parseYarnClusterConfig(CLUSTER_DESC_PATH);
+            cC = clusterDesc.getMasterNode();
+            appMasterTrackingUrl = "http://" + cC.getClientIp() + ":" + cC.getClientPort() + Path.SEPARATOR;
+            distributeAsterixConfig();
+            //now let's read what's in there so we can set the JVM opts right
+            LOG.debug("config file loc: " + System.getProperty(GlobalConfig.CONFIG_FILE_PROPERTY));
+        } catch (FileNotFoundException | IllegalStateException e) {
+            LOG.error("Could not deserialize Cluster Config from disk- aborting!");
+            LOG.error(e);
+            throw e;
+        }
+
+        return true;
+    }
+
+    /**
+     * Sets up the parameters for the Asterix config.
+     * 
+     * @throws IOException
+     */
+    private void distributeAsterixConfig() throws IOException {
+        FileSystem fs = FileSystem.get(conf);
+        String pathSuffix = instanceConfPath + File.separator + ASTERIX_CONF_NAME;
+        Path dst = new Path(dfsBasePath, pathSuffix);
+        URI paramLocation = dst.toUri();
+        FileStatus paramFileStatus = fs.getFileStatus(dst);
+        Long paramLen = paramFileStatus.getLen();
+        Long paramTimestamp = paramFileStatus.getModificationTime();
+        LocalResource asterixParamLoc = Records.newRecord(LocalResource.class);
+        asterixParamLoc.setType(LocalResourceType.FILE);
+        asterixParamLoc.setVisibility(LocalResourceVisibility.PRIVATE);
+        asterixParamLoc.setResource(ConverterUtils.getYarnUrlFromURI(paramLocation));
+        asterixParamLoc.setTimestamp(paramTimestamp);
+        asterixParamLoc.setSize(paramLen);
+        localResources.put(ASTERIX_CONF_NAME, asterixParamLoc);
+
+    }
+
+    /**
+     * @param c
+     *            The cluster exception to attempt to alocate with the RM
+     * @throws YarnException
+     */
+    private void requestResources(Cluster c) throws YarnException, UnknownHostException {
+        //set memory
+        if (c.getCcContainerMem() != null) {
+            ccMem = Integer.parseInt(c.getCcContainerMem());
+        } else {
+            ccMem = CC_MEMORY_MBS_DEFAULT;
+        }
+        if (c.getNcContainerMem() != null) {
+            ncMem = Integer.parseInt(c.getNcContainerMem());
+        } else {
+            ncMem = CC_MEMORY_MBS_DEFAULT;
+        }
+        //request CC
+        int numNodes = 0;
+        ContainerRequest ccAsk = hostToRequest(cC.getClusterIp(), true);
+        resourceManager.addContainerRequest(ccAsk);
+        LOG.info("Asked for CC: " + Arrays.toString(ccAsk.getNodes().toArray()));
+        numNodes++;
+        //now we wait to be given the CC before starting the NCs...
+        //we will wait a minute. 
+        int deathClock = 60;
+        while (ccUp.get() == false && deathClock > 0) {
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException ex) {
+                LOG.debug(ex);
+            }
+            --deathClock;
+        }
+        if (deathClock == 0 && ccUp.get() == false) {
+            throw new YarnException("Couldn't allocate container for CC. Abort!");
+        }
+        LOG.info("Waiting for CC process to start");
+        //TODO: inspect for actual liveness instead of waiting.
+        // is there a good way to do this? maybe try opening a socket to it...
+        try {
+            Thread.sleep(10000);
+        } catch (InterruptedException ex) {
+            LOG.debug(ex);
+        }
+        //request NCs
+        for (Node n : c.getNode()) {
+            resourceManager.addContainerRequest(hostToRequest(n.getClusterIp(), false));
+            LOG.info("Asked for NC: " + n.getClusterIp());
+            numNodes++;
+            synchronized(pendingNCs){
+                pendingNCs.add(n);
+            }
+        }
+        LOG.info("Requested all NCs and CCs. Wait for things to settle!");
+        numRequestedContainers.set(numNodes);
+        numTotalContainers = numNodes;
+        doneAllocating = true;
+
+    }
+
+    /**
+     * Asks the RM for a particular host, nicely.
+     * 
+     * @param host
+     *            The host to request
+     * @param cc
+     *            Whether or not the host is the CC
+     * @return A container request that is (hopefully) for the host we asked for.
+     */
+    private ContainerRequest hostToRequest(String host, boolean cc) throws UnknownHostException {
+        InetAddress hostIp = InetAddress.getByName(host);
+        Priority pri = Records.newRecord(Priority.class);
+        pri.setPriority(0);
+        Resource capability = Records.newRecord(Resource.class);
+        if (cc) {
+            capability.setMemory(ccMem);
+        } else {
+            capability.setMemory(ncMem);
+        }
+        //we dont set anything else because we don't care about that and yarn doesn't honor it yet
+        String[] hosts = new String[1];
+        //TODO this is silly
+        hosts[0] = hostIp.getHostName();
+        LOG.info("IP addr: " + host + " resolved to " + hostIp.getHostName());
+        ContainerRequest request = new ContainerRequest(capability, hosts, null, pri, false);
+        LOG.info("Requested host ask: " + request.getNodes());
+        return request;
+    }
+
+    /**
+     * Determines whether or not a container is the one on which the CC should reside
+     * 
+     * @param c
+     *            The container in question
+     * @return True if the container should have the CC process on it, false otherwise.
+     */
+    boolean containerIsCC(Container c) {
+        String containerHost = c.getNodeId().getHost();
+        try {
+            InetAddress containerIp = InetAddress.getByName(containerHost);
+            LOG.info(containerIp.getCanonicalHostName());
+            InetAddress ccIp = InetAddress.getByName(cC.getClusterIp());
+            LOG.info(ccIp.getCanonicalHostName());
+            return containerIp.getCanonicalHostName().equals(ccIp.getCanonicalHostName());
+        } catch (UnknownHostException e) {
+            return false;
+        }
+    }
+
+    /**
+     * Attempts to find the Node in the Cluster Description that matches this container
+     * 
+     * @param c
+     *            The container to resolve
+     * @return The node this container corresponds to
+     * @throws java.net.UnknownHostException
+     *             if the container isn't present in the description
+     */
+    Node containerToNode(Container c, Cluster cl) throws UnknownHostException {
+        String containerHost = c.getNodeId().getHost();
+        InetAddress containerIp = InetAddress.getByName(containerHost);
+        LOG.info("Resolved Container IP: " + containerIp);
+        for (Node node : cl.getNode()) {
+            InetAddress nodeIp = InetAddress.getByName(node.getClusterIp());
+            LOG.info(nodeIp + "?=" + containerIp);
+            if (nodeIp.equals(containerIp))
+                return node;
+        }
+        //if we find nothing, this is bad...
+        throw new java.net.UnknownHostException("Could not resolve container" + containerHost + " to node");
+    }
+
+    /**
+     * Here I am just pointing the Containers to the exisiting HDFS resources given by the Client
+     * filesystem of the nodes.
+     * 
+     * @throws IOException
+     */
+    private void localizeDFSResources() throws IOException {
+        //if performing an 'offline' task, skip a lot of resource distribution
+        if (obliterate || backup || restore) {
+            if (appMasterJar == null || ("").equals(appMasterJar)) {
+                //this can happen in a jUnit testing environment. we don't need to set it there. 
+                if (!conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
+                    throw new IllegalStateException("AM jar not provided in environment.");
+                } else {
+                    return;
+                }
+            }
+            FileSystem fs = FileSystem.get(conf);
+            FileStatus appMasterJarStatus = fs.getFileStatus(appMasterJar);
+            LocalResource obliteratorJar = Records.newRecord(LocalResource.class);
+            obliteratorJar.setType(LocalResourceType.FILE);
+            obliteratorJar.setVisibility(LocalResourceVisibility.PRIVATE);
+            obliteratorJar.setResource(ConverterUtils.getYarnUrlFromPath(appMasterJar));
+            obliteratorJar.setTimestamp(appMasterJarStatus.getModificationTime());
+            obliteratorJar.setSize(appMasterJarStatus.getLen());
+            localResources.put("asterix-yarn.jar", obliteratorJar);
+            LOG.info(localResources.values());
+            return;
+        }
+        //otherwise, distribute evertything to start up asterix
+
+        LocalResource asterixZip = Records.newRecord(LocalResource.class);
+
+        //this un-tar's the asterix distribution
+        asterixZip.setType(LocalResourceType.ARCHIVE);
+
+        asterixZip.setVisibility(LocalResourceVisibility.PRIVATE);
+        try {
+            asterixZip.setResource(ConverterUtils.getYarnUrlFromURI(new URI(asterixZipPath)));
+
+        } catch (URISyntaxException e) {
+            LOG.error("Error locating Asterix zip" + " in env, path=" + asterixZipPath);
+            throw new IOException(e);
+        }
+
+        asterixZip.setTimestamp(asterixZipTimestamp);
+        asterixZip.setSize(asterixZipLen);
+        localResources.put(ASTERIX_ZIP_NAME, asterixZip);
+
+        //now let's do the same for the cluster description XML
+        LocalResource asterixConf = Records.newRecord(LocalResource.class);
+        asterixConf.setType(LocalResourceType.FILE);
+
+        asterixConf.setVisibility(LocalResourceVisibility.PRIVATE);
+        try {
+            asterixConf.setResource(ConverterUtils.getYarnUrlFromURI(new URI(asterixConfPath)));
+        } catch (URISyntaxException e) {
+            LOG.error("Error locating Asterix config" + " in env, path=" + asterixConfPath);
+            throw new IOException(e);
+        }
+        //TODO: I could avoid localizing this everywhere by only calling this block on the metadata node. 
+        asterixConf.setTimestamp(asterixConfTimestamp);
+        asterixConf.setSize(asterixConfLen);
+        localResources.put("cluster-config.xml", asterixConf);
+        //now add the libraries if there are any
+        try {
+            FileSystem fs = FileSystem.get(conf);
+            Path p = new Path(dfsBasePath, instanceConfPath + File.separator + "library" + Path.SEPARATOR);
+            if (fs.exists(p)) {
+                FileStatus[] dataverses = fs.listStatus(p);
+                for (FileStatus d : dataverses) {
+                    if (!d.isDirectory())
+                        throw new IOException("Library configuration directory structure is incorrect");
+                    FileStatus[] libraries = fs.listStatus(d.getPath());
+                    for (FileStatus l : libraries) {
+                        if (l.isDirectory())
+                            throw new IOException("Library configuration directory structure is incorrect");
+                        LocalResource lr = Records.newRecord(LocalResource.class);
+                        lr.setResource(ConverterUtils.getYarnUrlFromURI(l.getPath().toUri()));
+                        lr.setSize(l.getLen());
+                        lr.setTimestamp(l.getModificationTime());
+                        lr.setType(LocalResourceType.ARCHIVE);
+                        lr.setVisibility(LocalResourceVisibility.PRIVATE);
+                        localResources.put("library" + Path.SEPARATOR + d.getPath().getName() + Path.SEPARATOR
+                                + l.getPath().getName().split("\\.")[0], lr);
+                        LOG.info("Found library: " + l.getPath().toString());
+                        LOG.info(l.getPath().getName());
+                    }
+                }
+            }
+        } catch (FileNotFoundException e) {
+            LOG.info("No external libraries present");
+            //do nothing, it just means there aren't libraries. that is possible and ok
+            // it should be handled by the fs.exists(p) check though.
+        }
+        LOG.info(localResources.values());
+
+    }
+
+    private void printUsage(Options opts) {
+        new HelpFormatter().printHelp("ApplicationMaster", opts);
+    }
+
+    /**
+     * Start the AM and request all necessary resources.
+     * 
+     * @return True if the run fully succeeded, false otherwise.
+     * @throws YarnException
+     * @throws IOException
+     */
+    public boolean run() throws YarnException, IOException {
+        LOG.info("Starting ApplicationMaster");
+
+        AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
+        resourceManager = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
+        resourceManager.init(conf);
+        resourceManager.start();
+
+        containerListener = new NMCallbackHandler();
+        nmClientAsync = new NMClientAsyncImpl(containerListener);
+        nmClientAsync.init(conf);
+        nmClientAsync.start();
+
+        // Register self with ResourceManager
+        // This will start heartbeating to the RM
+        try {
+            appMasterHostname = InetAddress.getLocalHost().toString();
+        } catch (java.net.UnknownHostException uhe) {
+            appMasterHostname = uhe.toString();
+        }
+        RegisterApplicationMasterResponse response = resourceManager.registerApplicationMaster(appMasterHostname,
+                appMasterRpcPort, appMasterTrackingUrl);
+
+        // Dump out information about cluster capability as seen by the
+        // resource manager
+        int maxMem = response.getMaximumResourceCapability().getMemory();
+        LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
+
+        try {
+            requestResources(clusterDesc);
+        } catch (YarnException e) {
+            LOG.error("Could not allocate resources properly:" + e.getMessage());
+            done = true;
+            throw e;
+        }
+        //now we just sit and listen for messages from the RM
+
+        while (!done) {
+            try {
+                Thread.sleep(200);
+            } catch (InterruptedException ex) {
+            }
+        }
+        finish();
+        return success;
+    }
+
+    /**
+     * Clean up, whether or not we were successful.
+     */
+    private void finish() {
+        // Join all launched threads
+        // needed for when we time out
+        // and we need to release containers
+        for (Thread launchThread : launchThreads) {
+            try {
+                launchThread.join(10000);
+            } catch (InterruptedException e) {
+                LOG.info("Exception thrown in thread join: " + e.getMessage());
+                //from https://stackoverflow.com/questions/4812570/how-to-store-printstacktrace-into-a-string
+                StringWriter errors = new StringWriter();
+                e.printStackTrace(new PrintWriter(errors));
+                LOG.error(errors.toString());
+            }
+        }
+
+        // When the application completes, it should stop all running containers
+        LOG.info("Application completed. Stopping running containers");
+        nmClientAsync.stop();
+
+        // When the application completes, it should send a finish application
+        // signal to the RM
+        LOG.info("Application completed. Signalling finish to RM");
+
+        FinalApplicationStatus appStatus;
+        String appMessage = null;
+        success = true;
+        if (numFailedContainers.get() == 0 && numCompletedContainers.get() == numTotalContainers) {
+            appStatus = FinalApplicationStatus.SUCCEEDED;
+        } else {
+            appStatus = FinalApplicationStatus.FAILED;
+            appMessage = "Diagnostics." + ", total=" + numTotalContainers + ", completed="
+                    + numCompletedContainers.get() + ", allocated=" + numAllocatedContainers.get() + ", failed="
+                    + numFailedContainers.get();
+            success = false;
+        }
+        try {
+            resourceManager.unregisterApplicationMaster(appStatus, appMessage, null);
+        } catch (YarnException ex) {
+            LOG.error("Failed to unregister application", ex);
+        } catch (IOException e) {
+            LOG.error("Failed to unregister application", e);
+        }
+        done = true;
+        resourceManager.stop();
+    }
+
+    /**
+     * This handles the information that comes in from the RM while the AM
+     * is running.
+     */
+    private class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
+        public void onContainersCompleted(List<ContainerStatus> completedContainers) {
+            LOG.info("Got response from RM for container ask, completedCnt=" + completedContainers.size());
+            for (ContainerStatus containerStatus : completedContainers) {
+                LOG.info("Got container status for containerID=" + containerStatus.getContainerId() + ", state="
+                        + containerStatus.getState() + ", exitStatus=" + containerStatus.getExitStatus()
+                        + ", diagnostics=" + containerStatus.getDiagnostics());
+
+                // non complete containers should not be here
+                if(containerStatus.getState() != ContainerState.COMPLETE){
+                    throw new IllegalStateException("Non-completed container given as completed by RM.");
+                }
+
+                // increment counters for completed/failed containers
+                int exitStatus = containerStatus.getExitStatus();
+                if (0 != exitStatus) {
+                    // container failed
+                    numCompletedContainers.incrementAndGet();
+                    numFailedContainers.incrementAndGet();
+                } else {
+                    // nothing to do
+                    // container completed successfully
+                    numCompletedContainers.incrementAndGet();
+                    LOG.info("Container completed successfully." + ", containerId=" + containerStatus.getContainerId());
+                }
+            }
+            //stop infinite looping of run()
+            if (numCompletedContainers.get() + numFailedContainers.get() == numAllocatedContainers.get()
+                    && doneAllocating)
+                done = true;
+        }
+
+        public void onContainersAllocated(List<Container> allocatedContainers) {
+            LOG.info("Got response from RM for container ask, allocatedCnt=" + allocatedContainers.size());
+            numAllocatedContainers.addAndGet(allocatedContainers.size());
+            for (Container allocatedContainer : allocatedContainers) {
+                synchronized(pendingNCs){
+                    try {
+                        if (!pendingNCs.contains(containerToNode(allocatedContainer, clusterDesc)) && ccUp.get()) {
+                            nmClientAsync.stopContainerAsync(allocatedContainer.getId(), allocatedContainer.getNodeId());
+                            continue;
+                        }
+                    } catch(UnknownHostException ex){
+                        LOG.error("Unknown host allocated for us by RM- this shouldn't happen.", ex);
+                    }
+                }
+                LOG.info("Launching shell command on a new container." + ", containerId=" + allocatedContainer.getId()
+                        + ", containerNode=" + allocatedContainer.getNodeId().getHost() + ":"
+                        + allocatedContainer.getNodeId().getPort() + ", containerNodeURI="
+                        + allocatedContainer.getNodeHttpAddress() + ", containerResourceMemory"
+                        + allocatedContainer.getResource().getMemory());
+
+                LaunchAsterixContainer runnableLaunchContainer = new LaunchAsterixContainer(allocatedContainer,
+                        containerListener);
+                Thread launchThread = new Thread(runnableLaunchContainer, "Asterix CC/NC");
+
+                // I want to know if this node is the CC, because it must start before the NCs. 
+                LOG.info("Allocated: " + allocatedContainer.getNodeId().getHost());
+                LOG.info("CC : " + cC.getId());
+                synchronized(pendingNCs){
+                    try {
+                        if (ccUp.get()) {
+                            pendingNCs.remove(containerToNode(allocatedContainer, clusterDesc));
+                        }
+                    } catch(UnknownHostException ex){
+                        LOG.error("Unknown host allocated for us by RM- this shouldn't happen.", ex);
+                    }
+                }
+
+                if (containerIsCC(allocatedContainer)) {
+                    ccUp.set(true);
+                }
+                // launch and start the container on a separate thread to keep
+                // the main thread unblocked
+                // as all containers may not be allocated at one go.
+                launchThreads.add(launchThread);
+                launchThread.start();
+            }
+        }
+
+        /**
+         * Ask the processes on the container to gracefully exit.
+         */
+        public void onShutdownRequest() {
+            LOG.info("AM shutting down per request");
+            done = true;
+        }
+
+        public void onNodesUpdated(List<NodeReport> updatedNodes) {
+            //TODO: This will become important when we deal with what happens if an NC dies
+        }
+
+        public float getProgress() {
+            //return half way because progress is basically meaningless for us
+            if (!doneAllocating) {
+                return 0.0f;
+            }
+            return (float) 0.5;
+        }
+
+        public void onError(Throwable arg0) {
+            LOG.error("Fatal Error recieved by AM: " + arg0);
+            done = true;
+        }
+    }
+
+    private class NMCallbackHandler implements NMClientAsync.CallbackHandler {
+
+        private ConcurrentMap<ContainerId, Container> containers = new ConcurrentHashMap<ContainerId, Container>();
+
+        public void addContainer(ContainerId containerId, Container container) {
+            containers.putIfAbsent(containerId, container);
+        }
+
+        public void onContainerStopped(ContainerId containerId) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Succeeded to stop Container " + containerId);
+            }
+            containers.remove(containerId);
+        }
+
+        public void onContainerStatusReceived(ContainerId containerId, ContainerStatus containerStatus) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Container Status: id=" + containerId + ", status=" + containerStatus);
+            }
+        }
+
+        public void onContainerStarted(ContainerId containerId, Map<String, ByteBuffer> allServiceResponse) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Succeeded to start Container " + containerId);
+            }
+            Container container = containers.get(containerId);
+            if (container != null) {
+                nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId());
+            }
+        }
+
+        public void onStartContainerError(ContainerId containerId, Throwable t) {
+            LOG.error("Failed to start Container " + containerId);
+            containers.remove(containerId);
+        }
+
+        public void onGetContainerStatusError(ContainerId containerId, Throwable t) {
+            LOG.error("Failed to query the status of Container " + containerId);
+        }
+
+        public void onStopContainerError(ContainerId containerId, Throwable t) {
+            LOG.error("Failed to stop Container " + containerId);
+            containers.remove(containerId);
+        }
+    }
+
+    /**
+     * Thread to connect to the {@link ContainerManagementProtocol} and launch the container
+     * that will execute the shell command.
+     */
+    private class LaunchAsterixContainer implements Runnable {
+
+        // Allocated container
+        final Container container;
+
+        final NMCallbackHandler containerListener;
+
+        /**
+         * @param lcontainer
+         *            Allocated container
+         * @param containerListener
+         *            Callback handler of the container
+         */
+        public LaunchAsterixContainer(Container lcontainer, NMCallbackHandler containerListener) {
+            this.container = lcontainer;
+            this.containerListener = containerListener;
+        }
+
+        /**
+         * Connects to CM, sets up container launch context
+         * for shell command and eventually dispatches the container
+         * start request to the CM.
+         */
+        public void run() {
+            LOG.info("Setting up container launch container for containerid=" + container.getId());
+            ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
+            // Set the local resources
+            ctx.setLocalResources(localResources);
+
+            //Set the env variables to be setup in the env where the application master will be run
+            LOG.info("Set the environment for the node");
+            Map<String, String> env = new HashMap<String, String>();
+
+            // Add AppMaster.jar location to classpath
+            // At some point we should not be required to add
+            // the hadoop specific classpaths to the env.
+            // It should be provided out of the box.
+            // For now setting all required classpaths including
+            // the classpath to "." for the application jar
+            StringBuilder classPathEnv = new StringBuilder(Environment.CLASSPATH.$()).append(File.pathSeparatorChar)
+                    .append("." + File.pathSeparatorChar + "*");
+            for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+                    YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
+                classPathEnv.append(File.pathSeparatorChar);
+                classPathEnv.append(c.trim());
+            }
+            classPathEnv.append('.').append(File.pathSeparatorChar).append("log4j.properties");
+
+            // add the runtime classpath needed for tests to work
+            if (conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
+                classPathEnv.append(System.getProperty("path.separator"));
+                classPathEnv.append(System.getProperty("java.class.path"));
+                env.put("HADOOP_CONF_DIR", System.getProperty("user.dir") + File.separator + "target" + File.separator);
+            }
+
+            env.put("CLASSPATH", classPathEnv.toString());
+
+            ctx.setEnvironment(env);
+            LOG.info(ctx.getEnvironment().toString());
+            List<String> startCmd = null;
+            if (obliterate) {
+                LOG.debug("AM in obliterate mode");
+                startCmd = produceObliterateCommand(container);
+            } else if (backup) {
+                startCmd = produceBackupCommand(container);
+                LOG.debug("AM in backup mode");
+            } else if (restore) {
+                startCmd = produceRestoreCommand(container);
+                LOG.debug("AM in restore mode");
+            } else {
+                startCmd = produceStartCmd(container);
+            }
+
+            if (startCmd == null || startCmd.size() == 0) {
+                LOG.fatal("Could not map one or more NCs to NM container hosts- aborting!");
+                return;
+            }
+
+            for (String s : startCmd) {
+                LOG.info("Command to execute: " + s);
+            }
+            ctx.setCommands(startCmd);
+            containerListener.addContainer(container.getId(), container);
+            //finally start the container!?
+            nmClientAsync.startContainerAsync(container, ctx);
+        }
+
+        /**
+         * Determines for a given container what the necessary command line
+         * arguments are to start the Asterix processes on that instance
+         * 
+         * @param container
+         *            The container to produce the commands for
+         * @return A list of the commands that should be executed
+         */
+        private List<String> produceStartCmd(Container container) {
+            List<String> commands = new ArrayList<String>();
+            // Set the necessary command to execute on the allocated container
+            List<CharSequence> vargs = new ArrayList<CharSequence>(5);
+
+            vargs.add(JAVA_HOME + File.separator + "bin" + File.separator + "java");
+            vargs.add("-classpath " + '\'' + ASTERIX_ZIP_NAME + File.separator + "repo" + File.separator + "*\'");
+            vargs.add("-Dapp.repo=" + ASTERIX_ZIP_NAME + File.separator + "repo" + File.separator);
+            //first see if this node is the CC
+            if (containerIsCC(container) && (ccStarted.get() == false)) {
+                LOG.info("CC found on container" + container.getNodeId().getHost());
+                //get our java opts
+                vargs.add(ccJavaOpts);
+                vargs.add(CC_CLASSNAME);
+                vargs.add("-app-cc-main-class edu.uci.ics.asterix.hyracks.bootstrap.CCApplicationEntryPoint");
+                vargs.add("-cluster-net-ip-address " + cC.getClusterIp());
+                vargs.add("-client-net-ip-address " + cC.getClientIp());
+                ccStarted.set(true);
+
+            } else {
+                //now we need to know what node we are on, so we can apply the correct properties
+
+                Node local;
+                try {
+                    local = containerToNode(container, clusterDesc);
+                    LOG.info("Attempting to start NC on host " + local.getId());
+                    String iodevice = local.getIodevices();
+                    if (iodevice == null) {
+                        iodevice = clusterDesc.getIodevices();
+                    }
+                    String storageSuffix = local.getStore() == null ? clusterDesc.getStore() : local.getStore();
+                    String storagePath = iodevice + File.separator + storageSuffix;
+                    vargs.add(ncJavaOpts);
+                    vargs.add(NC_CLASSNAME);
+                    vargs.add("-app-nc-main-class edu.uci.ics.asterix.hyracks.bootstrap.NCApplicationEntryPoint");
+                    vargs.add("-node-id " + local.getId());
+                    vargs.add("-cc-host " + cC.getClusterIp());
+                    vargs.add("-iodevices " + storagePath);
+                    vargs.add("-cluster-net-ip-address " + local.getClusterIp());
+                    vargs.add("-data-ip-address " + local.getClusterIp());
+                    vargs.add("-result-ip-address " + local.getClusterIp());
+                    vargs.add("--");
+                    if(initial){
+                        vargs.add("-initial-run ");
+                    }
+                } catch (UnknownHostException e) {
+                    LOG.error("Unable to find NC or CC configured for host: " + container.getId() + " " + e);
+                }
+            }
+
+            // Add log redirect params
+            vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + File.separator + "stdout");
+            vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + File.separator + "stderr");
+
+            // Get final commmand
+            StringBuilder command = new StringBuilder();
+            for (CharSequence str : vargs) {
+                command.append(str).append(" ");
+            }
+            commands.add(command.toString());
+            LOG.error(Arrays.toString(commands.toArray()));
+            return commands;
+        }
+
+        private List<String> produceObliterateCommand(Container container) {
+            //if this container has no NCs on it, nothing will be there to delete. 
+            Node local = null;
+            List<String> iodevices = null;
+            try {
+                local = containerToNode(container, clusterDesc);
+                if (local.getIodevices() == null) {
+                    iodevices = Arrays.asList(clusterDesc.getIodevices().split(",", -1));
+                } else {
+                    iodevices = Arrays.asList(local.getIodevices().split(",", -1));
+                }
+            } catch (UnknownHostException e) {
+                //we expect this may happen for the CC if it isn't colocated with an NC. otherwise it is not suppose to happen.
+                if (!containerIsCC(container)) {
+                    LOG.error("Unable to find NC configured for host: " + container.getId() + e);
+                    return null;
+                }
+                else {
+                    return Arrays.asList("");
+                }
+            }
+            StringBuilder classPathEnv = new StringBuilder("").append("*");
+            classPathEnv.append(File.pathSeparatorChar).append("log4j.properties");
+
+            List<String> commands = new ArrayList<String>();
+            Vector<CharSequence> vargs = new Vector<CharSequence>(5);
+            vargs.add(JAVA_HOME + File.separator + "bin" + File.separator + "java");
+            vargs.add("-cp " + classPathEnv.toString());
+            vargs.add(OBLITERATOR_CLASSNAME);
+            for (String s : iodevices) {
+                vargs.add(s + File.separator + clusterDesc.getStore());
+                LOG.debug("Deleting from: " + s);
+                //logs only exist on 1st iodevice
+                if (iodevices.indexOf(s) == 0) {
+                    vargs.add(clusterDesc.getTxnLogDir() + "txnLogs" + File.separator);
+                    LOG.debug("Deleting logs from: " + clusterDesc.getTxnLogDir());
+                }
+            }
+            vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + File.separator + "stdout");
+            vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + File.separator + "stderr");
+            StringBuilder command = new StringBuilder();
+            for (CharSequence str : vargs) {
+                command.append(str).append(" ");
+            }
+            commands.add(command.toString());
+            return commands;
+        }
+
+        private List<String> produceBackupCommand(Container container) {
+            Node local = null;
+            List<String> iodevices = null;
+            try {
+                local = containerToNode(container, clusterDesc);
+                if (local.getIodevices() == null) {
+                    iodevices = Arrays.asList(clusterDesc.getIodevices().split(",", -1));
+                } else {
+                    iodevices = Arrays.asList(local.getIodevices().split(",", -1));
+                }
+            } catch (UnknownHostException e) {
+                //we expect this may happen for the CC if it isn't colocated with an NC. otherwise it is not suppose to happen.
+                if (!containerIsCC(container)) {
+                    LOG.error("Unable to find NC configured for host: " + container.getId() + e);
+                    return null;
+                }else {
+                    return Arrays.asList("");
+                }
+            }
+            StringBuilder classPathEnv = new StringBuilder("").append("." + File.separator + "*");
+            for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+                    YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
+                classPathEnv.append(File.pathSeparatorChar);
+                classPathEnv.append(c.trim());
+            }
+            classPathEnv.append(File.pathSeparatorChar).append("." + File.separator + "log4j.properties");
+
+            List<String> commands = new ArrayList<String>();
+            Vector<CharSequence> vargs = new Vector<CharSequence>(5);
+            vargs.add(JAVA_HOME + File.separator + "bin" + File.separator + "java");
+            vargs.add("-cp " + classPathEnv.toString());
+            vargs.add(HDFS_BACKUP_CLASSNAME);
+            vargs.add("-backup");
+
+            String dstBase = instanceConfPath + "backups" + Path.SEPARATOR + backupTimestamp + Path.SEPARATOR
+                    + local.getId();
+            try {
+                createBackupFolder(dstBase);
+            } catch (IOException e) {
+                //something very bad happened- return null to cause attempt to abort
+                return null;
+            }
+            for (String s : iodevices) {
+                List<String> ioComponents = Arrays.asList(s.split("\\/"));
+                StringBuilder dst = new StringBuilder().append(dstBase);
+                for (String io : ioComponents) {
+                    dst.append(io);
+                    if (ioComponents.indexOf(io) != ioComponents.size() - 1) {
+                        dst.append("_");
+                    }
+                }
+                dst.append(Path.SEPARATOR);
+                vargs.add(s + File.separator + clusterDesc.getStore() + "," + dst);
+                LOG.debug("Backing up from: " + s);
+                //logs only exist on 1st iodevice
+                if (iodevices.indexOf(s) == 0) {
+                    LOG.debug("Backing up logs from: " + clusterDesc.getTxnLogDir());
+                    vargs.add(clusterDesc.getTxnLogDir() + "txnLogs" + File.separator + "," + dst);
+                }
+            }
+            LOG.debug("Backing up to: " + instanceConfPath + "backups" + Path.SEPARATOR + local.getId());
+
+            vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + File.separator + "stdout");
+            vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + File.separator + "stderr");
+            StringBuilder command = new StringBuilder();
+            for (CharSequence str : vargs) {
+                command.append(str).append(" ");
+            }
+            commands.add(command.toString());
+            return commands;
+        }
+
+        private void createBackupFolder(String path) throws IOException {
+            FileSystem fs = FileSystem.get(conf);
+            Path backupFolder = new Path(path);
+            fs.mkdirs(backupFolder);
+        }
+
+        private List<String> produceRestoreCommand(Container container) {
+            if (containerIsCC(container)) {
+                List<String> blank = new ArrayList<String>();
+                blank.add("");
+                return blank;
+            }
+            Node local = null;
+            List<String> iodevices = null;
+            try {
+                local = containerToNode(container, clusterDesc);
+                if (local.getIodevices() == null) {
+                    iodevices = Arrays.asList(clusterDesc.getIodevices().split(",", -1));
+                } else {
+                    iodevices = Arrays.asList(local.getIodevices().split(",", -1));
+                }
+            } catch (UnknownHostException e) {
+                //we expect this may happen for the CC if it isn't colocated with an NC. otherwise it is not suppose to happen.
+                if (!containerIsCC(container)) {
+                    LOG.error("Unable to find NC configured for host: " + container.getId() + e);
+                    return null;
+                } else {
+                    return Arrays.asList("");
+                }
+            }
+            StringBuilder classPathEnv = new StringBuilder("").append("." + File.separator + "*");
+            for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+                    YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
+                classPathEnv.append(File.pathSeparatorChar);
+                classPathEnv.append(c.trim());
+            }
+            classPathEnv.append(File.pathSeparatorChar).append("." + File.separator + "log4j.properties");
+
+            List<String> commands = new ArrayList<String>();
+            Vector<CharSequence> vargs = new Vector<CharSequence>(5);
+            vargs.add(JAVA_HOME + File.separator + "bin" + File.separator + "java");
+            vargs.add("-cp " + classPathEnv.toString());
+            vargs.add(HDFS_BACKUP_CLASSNAME);
+            vargs.add("-restore");
+            String srcBase = instanceConfPath + "backups" + Path.SEPARATOR + Long.parseLong(snapName) + Path.SEPARATOR
+                    + local.getId();
+            for (String s : iodevices) {
+                List<String> ioComponents = Arrays.asList(s.split("\\/"));
+                StringBuilder src = new StringBuilder().append(srcBase);
+                for (String io : ioComponents) {
+                    src.append(io);
+                    if (ioComponents.indexOf(io) != ioComponents.size() - 1) {
+                        src.append("_");
+                    }
+                }
+                src.append(Path.SEPARATOR);
+                try {
+                    FileSystem fs = FileSystem.get(conf);
+                    FileStatus[] backups = fs.listStatus(new Path(src.toString()));
+                    for (FileStatus b : backups) {
+                        if (!b.getPath().toString().contains("txnLogs")
+                                && !b.getPath().toString().contains(File.separator + "asterix_root_metadata")) {
+                            vargs.add(b.getPath() + "," + s + File.separator + clusterDesc.getStore());
+                        }
+                    }
+                } catch (IOException e) {
+                    LOG.error("Could not stat backup directory in DFS");
+                }
+                vargs.add(src + "," + s + clusterDesc.getStore());
+                LOG.debug("Restoring from: " + s);
+                //logs only exist on 1st iodevice
+                if (iodevices.indexOf(s) == 0) {
+                    vargs.add(src + "txnLogs" + File.separator + "," + clusterDesc.getTxnLogDir() + File.separator);
+
+                    LOG.debug("Restoring logs from: " + clusterDesc.getTxnLogDir());
+                }
+            }
+            LOG.debug("Restoring to: " + instanceConfPath + "backups" + Path.SEPARATOR + local.getId());
+
+            vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + File.separator + "stdout");
+            vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + File.separator + "stderr");
+            StringBuilder command = new StringBuilder();
+            for (CharSequence str : vargs) {
+                command.append(str).append(" ");
+            }
+            commands.add(command.toString());
+            return commands;
+        }
+
+    }
+}
diff --git a/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/AsterixYARNClient.java b/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/AsterixYARNClient.java
new file mode 100644
index 0000000..63b85e6
--- /dev/null
+++ b/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/AsterixYARNClient.java
@@ -0,0 +1,1387 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.aoya;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
+import java.util.regex.Pattern;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Marshaller;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.service.Service.STATE;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+
+import com.google.common.collect.ImmutableMap;
+
+import edu.uci.ics.asterix.common.configuration.AsterixConfiguration;
+import edu.uci.ics.asterix.common.configuration.Coredump;
+import edu.uci.ics.asterix.common.configuration.Store;
+import edu.uci.ics.asterix.common.configuration.TransactionLogDir;
+import edu.uci.ics.asterix.event.schema.yarnCluster.Cluster;
+import edu.uci.ics.asterix.event.schema.yarnCluster.Node;
+
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class AsterixYARNClient {
+
+    public static enum Mode {
+        INSTALL("install"),
+        START("start"),
+        STOP("stop"),
+        KILL("kill"),
+        DESTROY("destroy"),
+        ALTER("alter"),
+        LIBINSTALL("libinstall"),
+        DESCRIBE("describe"),
+        BACKUP("backup"),
+        LSBACKUP("lsbackups"),
+        RMBACKUP("rmbackup"),
+        RESTORE("restore"),
+        NOOP("");
+
+        public final String alias;
+
+        Mode(String alias) {
+            this.alias = alias;
+        }
+
+        public static Mode fromAlias(String a) {
+            return STRING_TO_MODE.get(a.toLowerCase());
+        }
+    }
+
+    public static final Map<String, AsterixYARNClient.Mode> STRING_TO_MODE = ImmutableMap
+            .<String, AsterixYARNClient.Mode> builder().put(Mode.INSTALL.alias, Mode.INSTALL)
+            .put(Mode.START.alias, Mode.START).put(Mode.STOP.alias, Mode.STOP).put(Mode.KILL.alias, Mode.KILL)
+            .put(Mode.DESTROY.alias, Mode.DESTROY).put(Mode.ALTER.alias, Mode.ALTER)
+            .put(Mode.LIBINSTALL.alias, Mode.LIBINSTALL).put(Mode.DESCRIBE.alias, Mode.DESCRIBE)
+            .put(Mode.BACKUP.alias, Mode.BACKUP).put(Mode.LSBACKUP.alias, Mode.LSBACKUP)
+            .put(Mode.RMBACKUP.alias, Mode.RMBACKUP).put(Mode.RESTORE.alias, Mode.RESTORE).build();
+    private static final Log LOG = LogFactory.getLog(AsterixYARNClient.class);
+    public static final String CONF_DIR_REL = ".asterix" + File.separator;
+    private static final String instanceLock = "instance";
+    public static final String CONFIG_DEFAULT_NAME = "cluster-config.xml";
+    public static final String PARAMS_DEFAULT_NAME = "asterix-configuration.xml";
+    private static String DEFAULT_PARAMETERS_PATH = "conf" + File.separator + "base-asterix-configuration.xml";
+    private static String MERGED_PARAMETERS_PATH = "conf" + File.separator + PARAMS_DEFAULT_NAME;
+    private static final String JAVA_HOME = System.getProperty("java.home");
+    public static final String NC_JAVA_OPTS_KEY = "nc.java.opts";
+    public static final String CC_JAVA_OPTS_KEY = "cc.java.opts";
+    public static final String CC_REST_PORT_KEY = "api.port";
+    private Mode mode = Mode.NOOP;
+
+    // Hadoop Configuration
+    private Configuration conf;
+    private YarnClient yarnClient;
+    // Application master specific info to register a new Application with
+    // RM/ASM
+    private String appName = "";
+    // App master priority
+    private int amPriority = 0;
+    // Queue for App master
+    private String amQueue = "";
+    // Amt. of memory resource to request for to run the App Master
+    private int amMemory = 1000;
+
+    // Main class to invoke application master
+    private final String appMasterMainClass = "edu.uci.ics.asterix.aoya.AsterixApplicationMaster";
+
+    //instance name
+    private String instanceName = "";
+    //location of distributable AsterixDB zip
+    private String asterixZip = "";
+    // Location of cluster configuration
+    private String asterixConf = "";
+    // Location of optional external libraries
+    private String extLibs = "";
+
+    private String instanceFolder = "";
+
+    // log4j.properties file
+    // if available, add to local resources and set into classpath
+    private String log4jPropFile = "";
+
+    // Debug flag
+    boolean debugFlag = false;
+    private boolean refresh = false;
+    private boolean force = false;
+
+    // Command line options
+    private Options opts;
+    private String libDataverse;
+    private String snapName = "";
+    private String baseConfig = ".";
+    private String ccJavaOpts = "";
+    private String ncJavaOpts = "";
+
+    //Ports
+    private int ccRestPort = 19002;
+
+    /**
+     * @param args
+     *            Command line arguments
+     */
+    public static void main(String[] args) {
+
+        try {
+            AsterixYARNClient client = new AsterixYARNClient();
+            try {
+                client.init(args);
+                AsterixYARNClient.execute(client);
+            } catch (ParseException | ApplicationNotFoundException e) {
+                LOG.fatal(e);
+                client.printUsage();
+                System.exit(-1);
+            }
+        } catch (Exception e) {
+            LOG.fatal("Error running client", e);
+            System.exit(1);
+        }
+        LOG.info("Command executed successfully.");
+        System.exit(0);
+    }
+
+    public static void execute(AsterixYARNClient client) throws IOException, YarnException {
+        YarnClientApplication app;
+        List<DFSResourceCoordinate> res;
+
+        System.out.println("JAVA HOME: " + JAVA_HOME);
+        switch (client.mode) {
+            case START:
+                startAction(client);
+                break;
+            case STOP:
+                try {
+                    client.stopInstance();
+                } catch (ApplicationNotFoundException e) {
+                    LOG.info(e);
+                    System.out.println("Asterix instance by that name already exited or was never started");
+                    client.deleteLockFile();
+                }
+                break;
+            case KILL:
+                if (client.isRunning() &&
+                    Utils.confirmAction("Are you sure you want to kill this instance? In-progress tasks will be aborted")) {
+                    try {
+                        AsterixYARNClient.killApplication(client.getLockFile(), client.yarnClient);
+                    } catch (ApplicationNotFoundException e) {
+                        LOG.info(e);
+                        System.out.println("Asterix instance by that name already exited or was never started");
+                        client.deleteLockFile();
+                    }
+                }
+                else if(!client.isRunning()){
+                    System.out.println("Asterix instance by that name already exited or was never started");
+                    client.deleteLockFile();
+                }
+                break;
+            case DESCRIBE:
+                Utils.listInstances(client.conf, CONF_DIR_REL);
+                break;
+            case INSTALL:
+                installAction(client);
+                break;
+            case LIBINSTALL:
+                client.installExtLibs();
+                break;
+            case ALTER:
+                client.writeAsterixConfig(Utils.parseYarnClusterConfig(client.asterixConf));
+                client.installAsterixConfig(true);
+                System.out.println("Configuration successfully modified");
+                break;
+            case DESTROY:
+                try {
+                    if (client.force
+                            || Utils.confirmAction("Are you really sure you want to obliterate this instance? This action cannot be undone!")) {
+                        app = client.makeApplicationContext();
+                        res = client.deployConfig();
+                        res.addAll(client.distributeBinaries());
+                        client.removeInstance(app, res);
+                    }
+                } catch (YarnException | IOException e) {
+                    LOG.error("Asterix failed to deploy on to cluster");
+                    throw e;
+                }
+                break;
+            case BACKUP:
+                if (client.force || Utils.confirmAction("Performing a backup will stop a running instance.")) {
+                    app = client.makeApplicationContext();
+                    res = client.deployConfig();
+                    res.addAll(client.distributeBinaries());
+                    client.backupInstance(app, res);
+                }
+                break;
+            case LSBACKUP:
+                Utils.listBackups(client.conf, CONF_DIR_REL, client.instanceName);
+                break;
+            case RMBACKUP:
+                Utils.rmBackup(client.conf, CONF_DIR_REL, client.instanceName, Long.parseLong(client.snapName));
+                break;
+            case RESTORE:
+                if (client.force || Utils.confirmAction("Performing a restore will stop a running instance.")) {
+                    app = client.makeApplicationContext();
+                    res = client.deployConfig();
+                    res.addAll(client.distributeBinaries());
+                    client.restoreInstance(app, res);
+                }
+                break;
+            default:
+                LOG.fatal("Unknown mode. Known client modes are: start, stop, install, describe, kill, destroy, describe, backup, restore, lsbackup, rmbackup");
+                client.printUsage();
+                System.exit(-1);
+        }
+    }
+
+    private static void startAction(AsterixYARNClient client) throws YarnException {
+        YarnClientApplication app;
+        List<DFSResourceCoordinate> res;
+        ApplicationId appId;
+        try {
+            app = client.makeApplicationContext();
+            res = client.deployConfig();
+            res.addAll(client.distributeBinaries());
+            appId = client.deployAM(app, res, client.mode);
+            LOG.info("Asterix started up with Application ID: " + appId.toString());
+            if (Utils.waitForLiveness(appId, "Waiting for AsterixDB instance to resume ", client.yarnClient,
+                    client.instanceName, client.conf, client.ccRestPort)) {
+                System.out.println("Asterix successfully deployed and is now running.");
+            } else {
+                LOG.fatal("AsterixDB appears to have failed to install and start");
+                throw new YarnException("AsterixDB appears to have failed to install and start");
+            }
+        } catch (IOException e) {
+            throw new YarnException(e);
+        }
+    }
+
+    private static void installAction(AsterixYARNClient client) throws YarnException {
+        YarnClientApplication app;
+        List<DFSResourceCoordinate> res;
+        ApplicationId appId;
+        try {
+            app = client.makeApplicationContext();
+            client.installConfig();
+            client.writeAsterixConfig(Utils.parseYarnClusterConfig(client.asterixConf));
+            client.installAsterixConfig(false);
+            res = client.deployConfig();
+            res.addAll(client.distributeBinaries());
+
+            appId = client.deployAM(app, res, client.mode);
+            LOG.info("Asterix started up with Application ID: " + appId.toString());
+            if (Utils.waitForLiveness(appId, "Waiting for new AsterixDB Instance to start ", client.yarnClient,
+                    client.instanceName, client.conf, client.ccRestPort)) {
+                System.out.println("Asterix successfully deployed and is now running.");
+            } else {
+                LOG.fatal("AsterixDB appears to have failed to install and start");
+                throw new YarnException("AsterixDB appears to have failed to install and start");
+            }
+        } catch (IOException e) {
+            LOG.fatal("Asterix failed to deploy on to cluster");
+            throw new YarnException(e);
+        }
+    }
+
+    public AsterixYARNClient(Configuration conf) throws Exception {
+
+        this.conf = conf;
+        yarnClient = YarnClient.createYarnClient();
+        //If the HDFS jars aren't on the classpath this won't be set 
+        if (conf.get("fs.hdfs.impl", null) == conf.get("fs.file.impl", null)) { //only would happen if both are null
+            conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
+            conf.set("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem");
+        }
+        yarnClient.init(conf);
+        opts = parseConf(conf);
+    }
+
+    private static Options parseConf(Configuration conf) {
+        Options opts = new Options();
+        opts.addOption(new Option("appname", true, "Application Name. Default value - Asterix"));
+        opts.addOption(new Option("priority", true, "Application Priority. Default 0"));
+        opts.addOption(new Option("queue", true, "RM Queue in which this application is to be submitted"));
+        opts.addOption(new Option("master_memory", true,
+                "Amount of memory in MB to be requested to run the application master"));
+        opts.addOption(new Option("log_properties", true, "log4j.properties file"));
+        opts.addOption(new Option("n", "name", true, "Asterix instance name (required)"));
+        opts.addOption(new Option("zip", "asterixZip", true,
+                "zip file with AsterixDB inside- if in non-default location"));
+        opts.addOption(new Option("bc", "baseConfig", true,
+                "base Asterix parameters configuration file if not in default position"));
+        opts.addOption(new Option("c", "asterixConf", true, "Asterix cluster config (required on install)"));
+        opts.addOption(new Option("l", "externalLibs", true, "Libraries to deploy along with Asterix instance"));
+        opts.addOption(new Option("ld", "libDataverse", true, "Dataverse to deploy external libraries to"));
+        opts.addOption(new Option("r", "refresh", false,
+                "If starting an existing instance, this will replace them with the local copy on startup"));
+        opts.addOption(new Option("appId", true, "ApplicationID to monitor if running client in status monitor mode"));
+        opts.addOption(new Option("masterLibsDir", true, "Directory that contains the JARs needed to run the AM"));
+        opts.addOption(new Option("s", "snapshot", true,
+                "Backup timestamp for arguments requiring a specific backup (rm, restore)"));
+        opts.addOption(new Option("v", "debug", false, "Dump out debug information"));
+        opts.addOption(new Option("help", false, "Print usage"));
+        opts.addOption(new Option("f", "force", false,
+                "Execute this command as fully as possible, disregarding any caution"));
+        return opts;
+    }
+
+    /**
+   */
+    public AsterixYARNClient() throws Exception {
+        this(new YarnConfiguration());
+    }
+
+    /**
+     * Helper function to print out usage
+     */
+    private void printUsage() {
+        new HelpFormatter().printHelp("Asterix YARN client. Usage: asterix [options] [mode]", opts);
+    }
+
+    /**
+     * Initialize the client's arguments and parameters before execution.
+     * 
+     * @param args
+     *            - Standard command-line arguments.
+     * @throws ParseException
+     */
+    public void init(String[] args) throws ParseException {
+
+        CommandLine cliParser = new GnuParser().parse(opts, args);
+        if (cliParser.hasOption("help")) {
+            printUsage();
+            return;
+        }
+        //initialize most things
+        debugFlag = cliParser.hasOption("debug");
+        force = cliParser.hasOption("force");
+        baseConfig = cliParser.getOptionValue("baseConfig");
+        extLibs = cliParser.getOptionValue("externalLibs");
+        libDataverse = cliParser.getOptionValue("libDataverse");
+
+        appName = cliParser.getOptionValue("appname", "AsterixDB");
+        amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0"));
+        amQueue = cliParser.getOptionValue("queue", "default");
+        amMemory = Integer.parseInt(cliParser.getOptionValue("master_memory", "10"));
+
+        instanceName = cliParser.getOptionValue("name");
+        instanceFolder = instanceName + '/';
+        appName = appName + ": " + instanceName;
+
+        asterixConf = cliParser.getOptionValue("asterixConf");
+
+        log4jPropFile = cliParser.getOptionValue("log_properties", "");
+
+        //see if the given argument values are sane in general
+        checkConfSanity(args, cliParser);
+
+        //intialize the mode, see if it is a valid one.
+        initMode(args, cliParser);
+
+        //now check the validity of the arguments given the mode
+        checkModeSanity(args, cliParser);
+
+        //if we are going to refresh the binaries, find that out
+        refresh = cliParser.hasOption("refresh");
+        //same goes for snapshot restoration/removal
+        snapName = cliParser.getOptionValue("snapshot");
+
+        if (!cliParser.hasOption("asterixZip")
+                && (mode == Mode.INSTALL || mode == Mode.ALTER || mode == Mode.DESTROY || mode == Mode.BACKUP)) {
+
+            asterixZip = cliParser.getOptionValue("asterixZip", getAsterixDistributableLocation().getAbsolutePath());
+        } else {
+            asterixZip = cliParser.getOptionValue("asterixZip");
+        }
+
+    }
+
+    /**
+     * Cursory sanity checks for argument sanity, without considering the mode of the client
+     * 
+     * @param args
+     * @param cliParser
+     *            The parsed arguments.
+     * @throws ParseException
+     */
+    private void checkConfSanity(String[] args, CommandLine cliParser) throws ParseException {
+        String message = null;
+
+        //Sanity check for no args 
+        if (args.length == 0) {
+            message = "No args specified for client to initialize";
+        }
+        //AM memory should be a sane value
+        else if (amMemory < 0) {
+            message = "Invalid memory specified for application master, exiting." + " Specified memory=" + amMemory;
+        }
+        //we're good!
+        else {
+            return;
+        }
+        //default:
+        throw new ParseException(message);
+
+    }
+
+    /**
+     * Initialize the mode of the client from the arguments.
+     * 
+     * @param args
+     * @param cliParser
+     * @throws ParseException
+     */
+    private void initMode(String[] args, CommandLine cliParser) throws ParseException {
+        @SuppressWarnings("unchecked")
+        List<String> clientVerb = cliParser.getArgList();
+        String message = null;
+        //Now check if there is a mode
+        if (clientVerb == null || clientVerb.size() < 1) {
+            message = "You must specify an action.";
+        }
+        //But there can only be one mode...
+        else if (clientVerb.size() > 1) {
+            message = "Trailing arguments, or too many arguments. Only one action may be performed at a time.";
+        }
+        if (message != null) {
+            throw new ParseException(message);
+        }
+        //Now we can initialize the mode and check it against parameters
+        mode = Mode.fromAlias(clientVerb.get(0));
+        if (mode == null) {
+            mode = Mode.NOOP;
+        }
+    }
+
+    /**
+     * Determine if the command line arguments are sufficient for the requested client mode.
+     * 
+     * @param args
+     *            The command line arguments.
+     * @param cliParser
+     *            Parsed command line arguments.
+     * @throws ParseException
+     */
+
+    private void checkModeSanity(String[] args, CommandLine cliParser) throws ParseException {
+
+        String message = null;
+        //The only time you can use the client without specifiying an instance, is to list all of the instances it sees.
+        if (!cliParser.hasOption("name") && mode != Mode.DESCRIBE) {
+            message = "You must give a name for the instance to be acted upon";
+        } else if (mode == Mode.INSTALL && !cliParser.hasOption("asterixConf")) {
+            message = "No Configuration XML given. Please specify a config for cluster installation";
+        } else if (mode != Mode.START && cliParser.hasOption("refresh")) {
+            message = "Cannot specify refresh in any mode besides start, mode is: " + mode;
+        } else if (cliParser.hasOption("snapshot") && !(mode == Mode.RESTORE || mode == Mode.RMBACKUP)) {
+            message = "Cannot specify a snapshot to restore in any mode besides restore or rmbackup, mode is: " + mode;
+        } else if ((mode == Mode.ALTER || mode == Mode.INSTALL) && baseConfig == null
+                && !(new File(DEFAULT_PARAMETERS_PATH).exists())) {
+            message = "Default asterix parameters file is not in the default location, and no custom location is specified";
+        }
+        //nothing is wrong, so exit
+        else {
+            return;
+        }
+        //otherwise, something is bad.
+        throw new ParseException(message);
+
+    }
+
+    /**
+     * Find the distributable asterix bundle, be it in the default location or in a user-specified location.
+     * 
+     * @return
+     */
+    private File getAsterixDistributableLocation() {
+        //Look in the PWD for the "asterix" folder
+        File tarDir = new File("asterix");
+        if (!tarDir.exists()) {
+            throw new IllegalArgumentException(
+                    "Default directory structure not in use- please specify an asterix zip and base config file to distribute");
+        }
+        FileFilter tarFilter = new WildcardFileFilter("asterix-server*.zip");
+        File[] tarFiles = tarDir.listFiles(tarFilter);
+        if (tarFiles.length != 1) {
+            throw new IllegalArgumentException(
+                    "There is more than one canonically named asterix distributable in the default directory. Please leave only one there.");
+        }
+        return tarFiles[0];
+    }
+
+    /**
+     * Initialize and register the application attempt with the YARN ResourceManager.
+     * 
+     * @return
+     * @throws IOException
+     * @throws YarnException
+     */
+    public YarnClientApplication makeApplicationContext() throws IOException, YarnException {
+
+        //first check to see if an instance already exists.
+        FileSystem fs = FileSystem.get(conf);
+        Path lock = new Path(fs.getHomeDirectory(), CONF_DIR_REL + instanceFolder + instanceLock);
+        LOG.info("Running Deployment");
+        yarnClient.start();
+        if (fs.exists(lock)) {
+            ApplicationId lockAppId = getLockFile();
+            try {
+                ApplicationReport previousAppReport = yarnClient.getApplicationReport(lockAppId);
+                YarnApplicationState prevStatus = previousAppReport.getYarnApplicationState();
+                if (!(prevStatus == YarnApplicationState.FAILED || prevStatus == YarnApplicationState.KILLED || prevStatus == YarnApplicationState.FINISHED)
+                        && mode != Mode.DESTROY && mode != Mode.BACKUP && mode != Mode.RESTORE) {
+                    throw new IllegalStateException("Instance is already running in: " + lockAppId);
+                } else if (mode != Mode.DESTROY && mode != Mode.BACKUP && mode != Mode.RESTORE) {
+                    //stale lock file
+                    LOG.warn("Stale lockfile detected. Instance attempt " + lockAppId + " may have exited abnormally");
+                    deleteLockFile();
+                }
+            } catch (YarnException e) {
+                LOG.warn("Stale lockfile detected, but the RM has no record of this application's last run. This is normal if the cluster was restarted.");
+                deleteLockFile();
+            }
+        }
+
+        // Get a new application id
+        YarnClientApplication app = yarnClient.createApplication();
+        GetNewApplicationResponse appResponse = app.getNewApplicationResponse();
+        int maxMem = appResponse.getMaximumResourceCapability().getMemory();
+        LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
+
+        // A resource ask cannot exceed the max.
+        if (amMemory > maxMem) {
+            LOG.info("AM memory specified above max threshold of cluster. Using max value." + ", specified=" + amMemory
+                    + ", max=" + maxMem);
+            amMemory = maxMem;
+        }
+
+        // set the application name
+        ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
+        appContext.setApplicationName(appName);
+
+        return app;
+    }
+
+    /**
+     * Upload the Asterix cluster description on to the DFS. This will persist the state of the instance.
+     * 
+     * @return
+     * @throws YarnException
+     * @throws IOException
+     */
+    private List<DFSResourceCoordinate> deployConfig() throws YarnException, IOException {
+
+        FileSystem fs = FileSystem.get(conf);
+        List<DFSResourceCoordinate> resources = new ArrayList<DFSResourceCoordinate>(2);
+
+        String pathSuffix = CONF_DIR_REL + instanceFolder + CONFIG_DEFAULT_NAME;
+        Path dstConf = new Path(fs.getHomeDirectory(), pathSuffix);
+        FileStatus destStatus;
+        try {
+            destStatus = fs.getFileStatus(dstConf);
+        } catch (IOException e) {
+            throw new YarnException("Asterix instance by that name does not appear to exist in DFS");
+        }
+        LocalResource asterixConfLoc = Records.newRecord(LocalResource.class);
+        asterixConfLoc.setType(LocalResourceType.FILE);
+        asterixConfLoc.setVisibility(LocalResourceVisibility.PRIVATE);
+        asterixConfLoc.setResource(ConverterUtils.getYarnUrlFromPath(dstConf));
+        asterixConfLoc.setTimestamp(destStatus.getModificationTime());
+
+        DFSResourceCoordinate conf = new DFSResourceCoordinate();
+        conf.envs.put(dstConf.toUri().toString(), AConstants.CONFLOCATION);
+        conf.envs.put(Long.toString(asterixConfLoc.getSize()), AConstants.CONFLEN);
+        conf.envs.put(Long.toString(asterixConfLoc.getTimestamp()), AConstants.CONFTIMESTAMP);
+        conf.name = CONFIG_DEFAULT_NAME;
+        conf.res = asterixConfLoc;
+        resources.add(conf);
+
+        return resources;
+
+    }
+
+    /**
+     * Install the current Asterix parameters to the DFS. This can be modified via alter.
+     * 
+     * @throws YarnException
+     * @throws IOException
+     */
+    private void installConfig() throws YarnException, IOException {
+        FileSystem fs = FileSystem.get(conf);
+        String pathSuffix = CONF_DIR_REL + instanceFolder + CONFIG_DEFAULT_NAME;
+        Path dstConf = new Path(fs.getHomeDirectory(), pathSuffix);
+        try {
+            fs.getFileStatus(dstConf);
+            if (mode == Mode.INSTALL) {
+                throw new IllegalStateException("Instance with this name already exists.");
+            }
+        } catch (FileNotFoundException e) {
+            if (mode == Mode.START) {
+                throw new IllegalStateException("Instance does not exist for this user", e);
+            }
+        }
+        if (mode == Mode.INSTALL) {
+            Path src = new Path(asterixConf);
+            fs.copyFromLocalFile(false, true, src, dstConf);
+        }
+
+    }
+
+    /**
+     * Upload External libraries and functions to HDFS for an instance to use when started
+     * @throws IllegalStateException
+     * @throws IOException
+     */
+
+    private void installExtLibs() throws IllegalStateException, IOException {
+        FileSystem fs = FileSystem.get(conf);
+        if (!instanceExists()) {
+            throw new IllegalStateException("No instance by name " + instanceName + " found.");
+        }
+        if (isRunning()) {
+            throw new IllegalStateException("Instance " + instanceName
+                    + " is running. Please stop it before installing any libraries.");
+        }
+        String libPathSuffix = CONF_DIR_REL + instanceFolder + "library" + Path.SEPARATOR + libDataverse
+                + Path.SEPARATOR;
+        Path src = new Path(extLibs);
+        String fullLibPath = libPathSuffix + src.getName();
+        Path libFilePath = new Path(fs.getHomeDirectory(), fullLibPath);
+        LOG.info("Copying Asterix external library to DFS");
+        fs.copyFromLocalFile(false, true, src, libFilePath);
+    }
+
+    /**
+     * Finds the minimal classes and JARs needed to start the AM only.
+     * @return Resources the AM needs to start on the initial container.
+     * @throws IllegalStateException
+     * @throws IOException
+     */
+    private List<DFSResourceCoordinate> installAmLibs() throws IllegalStateException, IOException {
+        List<DFSResourceCoordinate> resources = new ArrayList<DFSResourceCoordinate>(2);
+        FileSystem fs = FileSystem.get(conf);
+        String fullLibPath = CONF_DIR_REL + instanceFolder + "am_jars" + Path.SEPARATOR;
+        String[] cp = System.getProperty("java.class.path").split(System.getProperty("path.separator"));
+        String asterixJarPattern = "^(asterix).*(jar)$"; //starts with asterix,ends with jar
+        String commonsJarPattern = "^(commons).*(jar)$";
+        String surefireJarPattern = "^(surefire).*(jar)$"; //for maven tests
+        String jUnitTestPattern = "^(asterix-yarn" + File.separator + "target)$";
+
+        LOG.info(File.separator);
+        for (String j : cp) {
+            String[] pathComponents = j.split(Pattern.quote(File.separator));
+            LOG.info(j);
+            LOG.info(pathComponents[pathComponents.length - 1]);
+            if (pathComponents[pathComponents.length - 1].matches(asterixJarPattern)
+                    || pathComponents[pathComponents.length - 1].matches(commonsJarPattern)
+                    || pathComponents[pathComponents.length - 1].matches(surefireJarPattern)
+                    || pathComponents[pathComponents.length - 1].matches(jUnitTestPattern)) {
+                LOG.info("Loading JAR/classpath: " + j);
+                File f = new File(j);
+                Path dst = new Path(fs.getHomeDirectory(), fullLibPath + f.getName());
+                if (!fs.exists(dst) || refresh) {
+                    fs.copyFromLocalFile(false, true, new Path(f.getAbsolutePath()), dst);
+                }
+                FileStatus dstSt = fs.getFileStatus(dst);
+                LocalResource amLib = Records.newRecord(LocalResource.class);
+                amLib.setType(LocalResourceType.FILE);
+                amLib.setVisibility(LocalResourceVisibility.PRIVATE);
+                amLib.setResource(ConverterUtils.getYarnUrlFromPath(dst));
+                amLib.setTimestamp(dstSt.getModificationTime());
+                amLib.setSize(dstSt.getLen());
+                DFSResourceCoordinate amLibCoord = new DFSResourceCoordinate();
+                amLibCoord.res = amLib;
+                amLibCoord.name = f.getName();
+                if (f.getName().contains("asterix-yarn") || f.getName().contains("surefire")) {
+                    amLibCoord.envs.put(dst.toUri().toString(), AConstants.APPLICATIONMASTERJARLOCATION);
+                    amLibCoord.envs.put(Long.toString(dstSt.getLen()), AConstants.APPLICATIONMASTERJARLEN);
+                    amLibCoord.envs.put(Long.toString(dstSt.getModificationTime()),
+                            AConstants.APPLICATIONMASTERJARTIMESTAMP);
+                }
+                resources.add(amLibCoord);
+            }
+
+        }
+        if (resources.size() == 0) {
+            throw new IOException("Required JARs are missing. Please check your directory structure");
+        }
+        return resources;
+    }
+
+    /**
+     * Uploads a AsterixDB cluster configuration to HDFS for the AM to use.
+     * @param overwrite Overwrite existing configurations by the same name.
+     * @throws IllegalStateException
+     * @throws IOException
+     */
+    private void installAsterixConfig(boolean overwrite) throws IllegalStateException, IOException {
+        FileSystem fs = FileSystem.get(conf);
+        File srcfile = new File(MERGED_PARAMETERS_PATH);
+        Path src = new Path(srcfile.getCanonicalPath());
+        String pathSuffix = CONF_DIR_REL + instanceFolder + File.separator + PARAMS_DEFAULT_NAME;
+        Path dst = new Path(fs.getHomeDirectory(), pathSuffix);
+        if (fs.exists(dst) && !overwrite) {
+
+            throw new IllegalStateException(
+                    "Instance exists. Please delete an existing instance before trying to overwrite");
+        }
+        fs.copyFromLocalFile(false, true, src, dst);
+    }
+
+    /**
+     * Uploads binary resources to HDFS for use by the AM
+     * @return
+     * @throws IOException
+     * @throws YarnException
+     */
+    public List<DFSResourceCoordinate> distributeBinaries() throws IOException, YarnException {
+
+        List<DFSResourceCoordinate> resources = new ArrayList<DFSResourceCoordinate>(2);
+        // Copy the application master jar to the filesystem
+        // Create a local resource to point to the destination jar path
+        FileSystem fs = FileSystem.get(conf);
+        Path src, dst;
+        FileStatus destStatus;
+        String pathSuffix;
+
+        // adding info so we can add the jar to the App master container path
+
+        // Add the asterix tarfile to HDFS for easy distribution
+        // Keep it all archived for now so add it as a file...
+
+        pathSuffix = CONF_DIR_REL + instanceFolder + "asterix-server.zip";
+        dst = new Path(fs.getHomeDirectory(), pathSuffix);
+        if (refresh) {
+            if (fs.exists(dst)) {
+                fs.delete(dst, false);
+            }
+        }
+        if (!fs.exists(dst)) {
+            src = new Path(asterixZip);
+            LOG.info("Copying Asterix distributable to DFS");
+            fs.copyFromLocalFile(false, true, src, dst);
+        }
+        destStatus = fs.getFileStatus(dst);
+        LocalResource asterixTarLoc = Records.newRecord(LocalResource.class);
+        asterixTarLoc.setType(LocalResourceType.ARCHIVE);
+        asterixTarLoc.setVisibility(LocalResourceVisibility.PRIVATE);
+        asterixTarLoc.setResource(ConverterUtils.getYarnUrlFromPath(dst));
+        asterixTarLoc.setTimestamp(destStatus.getModificationTime());
+
+        // adding info so we can add the tarball to the App master container path
+        DFSResourceCoordinate tar = new DFSResourceCoordinate();
+        tar.envs.put(dst.toUri().toString(), AConstants.TARLOCATION);
+        tar.envs.put(Long.toString(asterixTarLoc.getSize()), AConstants.TARLEN);
+        tar.envs.put(Long.toString(asterixTarLoc.getTimestamp()), AConstants.TARTIMESTAMP);
+        tar.res = asterixTarLoc;
+        tar.name = "asterix-server.zip";
+        resources.add(tar);
+
+        // Set the log4j properties if needed
+        if (!log4jPropFile.isEmpty()) {
+            Path log4jSrc = new Path(log4jPropFile);
+            Path log4jDst = new Path(fs.getHomeDirectory(), "log4j.props");
+            fs.copyFromLocalFile(false, true, log4jSrc, log4jDst);
+            FileStatus log4jFileStatus = fs.getFileStatus(log4jDst);
+            LocalResource log4jRsrc = Records.newRecord(LocalResource.class);
+            log4jRsrc.setType(LocalResourceType.FILE);
+            log4jRsrc.setVisibility(LocalResourceVisibility.PRIVATE);
+            log4jRsrc.setResource(ConverterUtils.getYarnUrlFromURI(log4jDst.toUri()));
+            log4jRsrc.setTimestamp(log4jFileStatus.getModificationTime());
+            log4jRsrc.setSize(log4jFileStatus.getLen());
+            DFSResourceCoordinate l4j = new DFSResourceCoordinate();
+            tar.res = log4jRsrc;
+            tar.name = "log4j.properties";
+            resources.add(l4j);
+        }
+
+        resources.addAll(installAmLibs());
+        return resources;
+    }
+
+    /**
+     * Submits the request to start the AsterixApplicationMaster to the YARN ResourceManager.
+     * 
+     * @param app
+     *            The application attempt handle.
+     * @param resources
+     *            Resources to be distributed as part of the container launch
+     * @param mode
+     *            The mode of the ApplicationMaster
+     * @return The application ID of the new Asterix instance.
+     * @throws IOException
+     * @throws YarnException
+     */
+
+    public ApplicationId deployAM(YarnClientApplication app, List<DFSResourceCoordinate> resources, Mode mode)
+            throws IOException, YarnException {
+
+        // Set up the container launch context for the application master
+        ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
+
+        ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
+
+        // Set local resource info into app master container launch context
+        Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+        for (DFSResourceCoordinate res : resources) {
+            localResources.put(res.name, res.res);
+        }
+        amContainer.setLocalResources(localResources);
+        // Set the env variables to be setup in the env where the application
+        // master will be run
+        LOG.info("Set the environment for the application master");
+        Map<String, String> env = new HashMap<String, String>();
+
+        // using the env info, the application master will create the correct
+        // local resource for the
+        // eventual containers that will be launched to execute the shell
+        // scripts
+        for (DFSResourceCoordinate res : resources) {
+            if (res.envs == null) { //some entries may not have environment variables.
+                continue;
+            }
+            for (Map.Entry<String, String> e : res.envs.entrySet()) {
+                env.put(e.getValue(), e.getKey());
+            }
+        }
+        //this is needed for when the RM address isn't known from the environment of the AM
+        env.put(AConstants.RMADDRESS, conf.get("yarn.resourcemanager.address"));
+        env.put(AConstants.RMSCHEDULERADDRESS, conf.get("yarn.resourcemanager.scheduler.address"));
+        ///add miscellaneous environment variables.
+        env.put(AConstants.INSTANCESTORE, CONF_DIR_REL + instanceFolder);
+        env.put(AConstants.DFS_BASE, FileSystem.get(conf).getHomeDirectory().toUri().toString());
+        env.put(AConstants.CC_JAVA_OPTS, ccJavaOpts);
+        env.put(AConstants.NC_JAVA_OPTS, ncJavaOpts);
+
+        // Add AppMaster.jar location to classpath
+        // At some point we should not be required to add
+        // the hadoop specific classpaths to the env.
+        // It should be provided out of the box.
+        // For now setting all required classpaths including
+        // the classpath to "." for the application jar
+        StringBuilder classPathEnv = new StringBuilder("").append("./*");
+        for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+                YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
+            classPathEnv.append(File.pathSeparatorChar);
+            classPathEnv.append(c.trim());
+        }
+        classPathEnv.append(File.pathSeparatorChar).append("." + File.separator + "log4j.properties");
+
+        // add the runtime classpath needed for tests to work
+        if (conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
+            LOG.info("In YARN MiniCluster");
+            classPathEnv.append(System.getProperty("path.separator"));
+            classPathEnv.append(System.getProperty("java.class.path"));
+            env.put("HADOOP_CONF_DIR", System.getProperty("user.dir") + File.separator + "target" + File.separator);
+        }
+        LOG.info("AM Classpath:" + classPathEnv.toString());
+        env.put("CLASSPATH", classPathEnv.toString());
+
+        amContainer.setEnvironment(env);
+
+        // Set the necessary command to execute the application master
+        Vector<CharSequence> vargs = new Vector<CharSequence>(30);
+
+        // Set java executable command
+        LOG.info("Setting up app master command");
+        vargs.add(JAVA_HOME + File.separator + "bin" + File.separator + "java");
+        // Set class name
+        vargs.add(appMasterMainClass);
+        //Set params for Application Master
+        if (debugFlag) {
+            vargs.add("-debug");
+        }
+        if (mode == Mode.DESTROY) {
+            vargs.add("-obliterate");
+        }
+        else if (mode == Mode.BACKUP) {
+            vargs.add("-backup");
+        }
+        else if (mode == Mode.RESTORE) {
+            vargs.add("-restore " + snapName);
+        }
+        else if( mode == Mode.INSTALL){
+            vargs.add("-initial ");
+        }
+        if (refresh) {
+            vargs.add("-refresh");
+        }
+        //vargs.add("/bin/ls -alh asterix-server.zip/repo");
+        vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + File.separator + "AppMaster.stdout");
+        vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + File.separator + "AppMaster.stderr");
+        // Get final commmand
+        StringBuilder command = new StringBuilder();
+        for (CharSequence str : vargs) {
+            command.append(str).append(" ");
+        }
+
+        LOG.info("Completed setting up app master command " + command.toString());
+        List<String> commands = new ArrayList<String>();
+        commands.add(command.toString());
+        amContainer.setCommands(commands);
+
+        // Set up resource type requirements
+        // For now, only memory is supported so we set memory requirements
+        Resource capability = Records.newRecord(Resource.class);
+        capability.setMemory(amMemory);
+        appContext.setResource(capability);
+
+        // Service data is a binary blob that can be passed to the application
+        // Not needed in this scenario
+        // amContainer.setServiceData(serviceData);
+
+        // The following are not required for launching an application master
+        // amContainer.setContainerId(containerId);
+
+        appContext.setAMContainerSpec(amContainer);
+
+        // Set the priority for the application master
+        Priority pri = Records.newRecord(Priority.class);
+        // TODO - what is the range for priority? how to decide?
+        pri.setPriority(amPriority);
+        appContext.setPriority(pri);
+
+        // Set the queue to which this application is to be submitted in the RM
+        appContext.setQueue(amQueue);
+
+        // Submit the application to the applications manager
+        // SubmitApplicationResponse submitResp =
+        // applicationsManager.submitApplication(appRequest);
+        // Ignore the response as either a valid response object is returned on
+        // success
+        // or an exception thrown to denote some form of a failure
+        LOG.info("Submitting application to ASM");
+
+        yarnClient.submitApplication(appContext);
+
+        //now write the instance lock
+        if (mode == Mode.INSTALL || mode == Mode.START) {
+            FileSystem fs = FileSystem.get(conf);
+            Path lock = new Path(fs.getHomeDirectory(), CONF_DIR_REL + instanceFolder + instanceLock);
+            if (fs.exists(lock)) {
+                throw new IllegalStateException("Somehow, this instance has been launched twice. ");
+            }
+            BufferedWriter out = new BufferedWriter(new OutputStreamWriter(fs.create(lock, true)));
+            try {
+                out.write(app.getApplicationSubmissionContext().getApplicationId().toString());
+                out.close();
+            } finally {
+                out.close();
+            }
+        }
+        return app.getApplicationSubmissionContext().getApplicationId();
+
+    }
+
+    /**
+     * Asks YARN to kill a given application by appId
+     * @param appId The application to kill.
+     * @param yarnClient The YARN client object that is connected to the RM.
+     * @throws YarnException
+     * @throws IOException
+     */
+
+    public static void killApplication(ApplicationId appId, YarnClient yarnClient) throws YarnException, IOException {
+        if (appId == null) {
+            throw new YarnException("No Application given to kill");
+        }
+        if (yarnClient.isInState(STATE.INITED)) {
+            yarnClient.start();
+        }
+        YarnApplicationState st;
+        ApplicationReport rep = yarnClient.getApplicationReport(appId);
+        st = rep.getYarnApplicationState();
+        if (st == YarnApplicationState.FINISHED || st == YarnApplicationState.KILLED
+                || st == YarnApplicationState.FAILED) {
+            LOG.info("Application " + appId + " already exited.");
+            return;
+        }
+        LOG.info("Killing applicaiton with ID: " + appId);
+        yarnClient.killApplication(appId);
+
+    }
+
+    /**
+     * Tries to stop a running AsterixDB instance gracefully.
+     * @throws IOException
+     * @throws YarnException
+     */
+    private void stopInstanceIfRunning()
+            throws IOException, YarnException {
+        FileSystem fs = FileSystem.get(conf);
+        String pathSuffix = CONF_DIR_REL + instanceFolder + CONFIG_DEFAULT_NAME;
+        Path dstConf = new Path(fs.getHomeDirectory(), pathSuffix);
+        //if the instance is up, fix that
+        if (isRunning()) {
+            try {
+                this.stopInstance();
+            } catch (IOException e) {
+                throw new YarnException(e);
+            }
+        } else if (!fs.exists(dstConf)) {
+            throw new YarnException("No instance configured with that name exists");
+        }
+    }
+
+    /**
+     * Start a YARN job to delete local AsterixDB resources of an extant instance
+     * @param app The Client connection
+     * @param resources AM resources
+     * @throws IOException
+     * @throws YarnException
+     */
+
+    private void removeInstance(YarnClientApplication app, List<DFSResourceCoordinate> resources) throws IOException,
+            YarnException {
+        FileSystem fs = FileSystem.get(conf);
+        //if the instance is up, fix that
+        stopInstanceIfRunning();
+        //now try deleting all of the on-disk artifacts on the cluster
+        ApplicationId deleter = deployAM(app, resources, Mode.DESTROY);
+        boolean delete_start = Utils.waitForApplication(deleter, yarnClient, "Waiting for deletion to start", ccRestPort);
+        if (!delete_start) {
+            if (force) {
+                fs.delete(new Path(CONF_DIR_REL + instanceFolder), true);
+                LOG.error("Forcing deletion of HDFS resources");
+            }
+            LOG.fatal(" of on-disk persistient resources on individual nodes failed.");
+            throw new YarnException();
+        }
+        boolean deleted = waitForCompletion(deleter, "Deletion in progress");
+        if (!(deleted || force)) {
+            LOG.fatal("Cleanup of on-disk persistent resources failed.");
+            return;
+        } else {
+            fs.delete(new Path(CONF_DIR_REL + instanceFolder), true);
+        }
+        System.out.println("Deletion of instance succeeded.");
+
+    }
+
+    /**
+     * Start a YARN job to copy all data-containing resources of an AsterixDB instance to HDFS
+     * @param app
+     * @param resources
+     * @throws IOException
+     * @throws YarnException
+     */
+
+    private void backupInstance(YarnClientApplication app, List<DFSResourceCoordinate> resources) throws IOException,
+            YarnException {
+        stopInstanceIfRunning();
+        ApplicationId backerUpper = deployAM(app, resources, Mode.BACKUP);
+        boolean backupStart;
+        backupStart = Utils.waitForApplication(backerUpper, yarnClient, "Waiting for backup " + backerUpper.toString()
+                + "to start", ccRestPort);
+        if (!backupStart) {
+            LOG.fatal("Backup failed to start");
+            throw new YarnException();
+        }
+        boolean complete;
+        complete = waitForCompletion(backerUpper, "Backup in progress");
+        if (!complete) {
+            LOG.fatal("Backup failed- timeout waiting for completion");
+            return;
+        }
+        System.out.println("Backup of instance succeeded.");
+    }
+
+    /**
+     * Start a YARN job to copy a set of resources from backupInstance to restore the state of an extant AsterixDB instance
+     * @param app
+     * @param resources
+     * @throws IOException
+     * @throws YarnException
+     */
+
+    private void restoreInstance(YarnClientApplication app, List<DFSResourceCoordinate> resources) throws IOException,
+            YarnException {
+        stopInstanceIfRunning();
+        ApplicationId restorer = deployAM(app, resources, Mode.RESTORE);
+        boolean restoreStart = Utils.waitForApplication(restorer, yarnClient, "Waiting for restore to start", ccRestPort);
+        if (!restoreStart) {
+            LOG.fatal("Restore failed to start");
+            throw new YarnException();
+        }
+        boolean complete = waitForCompletion(restorer, "Restore in progress");
+        if (!complete) {
+            LOG.fatal("Restore failed- timeout waiting for completion");
+            return;
+        }
+        System.out.println("Restoration of instance succeeded.");
+    }
+
+    /**
+     * Stops the instance and remove the lockfile to allow a restart.
+     * 
+     * @throws IOException
+     * @throws JAXBException
+     * @throws YarnException
+     */
+
+    private void stopInstance() throws IOException, YarnException {
+        ApplicationId appId = getLockFile();
+        //get CC rest API port if it is nonstandard
+        readConfigParams(locateConfig());
+        if (yarnClient.isInState(STATE.INITED)) {
+            yarnClient.start();
+        }
+        System.out.println("Stopping instance " + instanceName);
+        if (!isRunning()) {
+            LOG.fatal("AsterixDB instance by that name is stopped already");
+            return;
+        }
+        try {
+            String ccIp = Utils.getCCHostname(instanceName, conf);
+            Utils.sendShutdownCall(ccIp,ccRestPort);
+        } catch (IOException e) {
+            LOG.error("Error while trying to issue safe shutdown:", e);
+        }
+        //now make sure it is actually gone and not "stuck"
+        String message = "Waiting for AsterixDB to shut down";
+        boolean completed = waitForCompletion(appId, message);
+        if (!completed && force) {
+            LOG.warn("Instance failed to stop gracefully, now killing it");
+            try {
+                AsterixYARNClient.killApplication(appId, yarnClient);
+                completed = true;
+            } catch (YarnException e1) {
+                LOG.fatal("Could not stop nor kill instance gracefully.",e1);
+                return;
+            }
+        }
+        if (completed) {
+            deleteLockFile();
+        }
+    }
+
+    private void deleteLockFile() throws IOException {
+        if (instanceName == null || instanceName == "") {
+            return;
+        }
+        FileSystem fs = FileSystem.get(conf);
+        Path lockPath = new Path(fs.getHomeDirectory(), CONF_DIR_REL + instanceName + '/' + instanceLock);
+        if (fs.exists(lockPath)) {
+            fs.delete(lockPath, false);
+        }
+    }
+
+    private boolean instanceExists() throws IOException {
+        FileSystem fs = FileSystem.get(conf);
+        String pathSuffix = CONF_DIR_REL + instanceFolder + CONFIG_DEFAULT_NAME;
+        Path dstConf = new Path(fs.getHomeDirectory(), pathSuffix);
+        return fs.exists(dstConf);
+    }
+
+    private boolean isRunning() throws IOException {
+        FileSystem fs = FileSystem.get(conf);
+        String pathSuffix = CONF_DIR_REL + instanceFolder + CONFIG_DEFAULT_NAME;
+        Path dstConf = new Path(fs.getHomeDirectory(), pathSuffix);
+        if (fs.exists(dstConf)) {
+            Path lock = new Path(fs.getHomeDirectory(), CONF_DIR_REL + instanceFolder + instanceLock);
+            return fs.exists(lock);
+        } else {
+            return false;
+        }
+    }
+
+    private ApplicationId getLockFile() throws IOException, YarnException {
+        if (instanceFolder == "") {
+            throw new IllegalStateException("Instance name not given.");
+        }
+        FileSystem fs = FileSystem.get(conf);
+        Path lockPath = new Path(fs.getHomeDirectory(), CONF_DIR_REL + instanceFolder + instanceLock);
+        if (!fs.exists(lockPath)) {
+            throw new YarnException("Instance appears to not be running. If you know it is, try using kill");
+        }
+        BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(lockPath)));
+        String lockAppId = br.readLine();
+        br.close();
+        return ConverterUtils.toApplicationId(lockAppId);
+    }
+
+    public static ApplicationId getLockFile(String instanceName, Configuration conf) throws IOException {
+        if (instanceName == "") {
+            throw new IllegalStateException("Instance name not given.");
+        }
+        FileSystem fs = FileSystem.get(conf);
+        Path lockPath = new Path(fs.getHomeDirectory(), CONF_DIR_REL + instanceName + '/' + instanceLock);
+        if (!fs.exists(lockPath)) {
+            return null;
+        }
+        BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(lockPath)));
+        String lockAppId = br.readLine();
+        br.close();
+        return ConverterUtils.toApplicationId(lockAppId);
+    }
+
+    /**
+     * Locate the Asterix parameters file.
+      * @return
+     * @throws FileNotFoundException
+     * @throws IOException
+     */
+    private AsterixConfiguration locateConfig() throws FileNotFoundException, IOException{
+        AsterixConfiguration configuration;
+        String configPathBase = MERGED_PARAMETERS_PATH;
+        if (baseConfig != null) {
+            configuration = Utils.loadAsterixConfig(baseConfig);
+            configPathBase = new File(baseConfig).getParentFile().getAbsolutePath() + File.separator
+                    + PARAMS_DEFAULT_NAME;
+            MERGED_PARAMETERS_PATH = configPathBase;
+        } else {
+            configuration = Utils.loadAsterixConfig(DEFAULT_PARAMETERS_PATH);
+        }
+        return configuration;
+    }
+
+    /**
+     *
+     */
+    private void readConfigParams(AsterixConfiguration configuration){
+        //this is the "base" config that is inside the zip, we start here
+        for (edu.uci.ics.asterix.common.configuration.Property property : configuration.getProperty()) {
+            if (property.getName().equalsIgnoreCase(CC_JAVA_OPTS_KEY)) {
+                ccJavaOpts = property.getValue();
+            } else if (property.getName().equalsIgnoreCase(NC_JAVA_OPTS_KEY)) {
+                ncJavaOpts = property.getValue();
+            } else if(property.getName().equalsIgnoreCase(CC_REST_PORT_KEY)){
+                ccRestPort = Integer.parseInt(property.getValue());
+            }
+
+        }
+    }
+
+    /**
+     * Retrieves necessary information from the cluster configuration and splices it into the Asterix configuration parameters
+     * @param cluster
+     * @throws FileNotFoundException
+     * @throws IOException
+     */
+
+    private void writeAsterixConfig(Cluster cluster) throws FileNotFoundException, IOException {
+        String metadataNodeId = Utils.getMetadataNode(cluster).getId();
+        String asterixInstanceName = instanceName;
+
+        AsterixConfiguration configuration = locateConfig();
+
+        readConfigParams(configuration);
+
+        String version = Utils.getAsterixVersionFromClasspath();
+        configuration.setVersion(version);
+
+        configuration.setInstanceName(asterixInstanceName);
+        String storeDir = null;
+        List<Store> stores = new ArrayList<Store>();
+        for (Node node : cluster.getNode()) {
+            storeDir = node.getStore() == null ? cluster.getStore() : node.getStore();
+            stores.add(new Store(node.getId(), storeDir));
+        }
+        configuration.setStore(stores);
+
+        List<Coredump> coredump = new ArrayList<Coredump>();
+        String coredumpDir = null;
+        List<TransactionLogDir> txnLogDirs = new ArrayList<TransactionLogDir>();
+        String txnLogDir = null;
+        for (Node node : cluster.getNode()) {
+            coredumpDir = node.getLogDir() == null ? cluster.getLogDir() : node.getLogDir();
+            coredump.add(new Coredump(node.getId(), coredumpDir + "coredump" + File.separator));
+            txnLogDir = node.getTxnLogDir() == null ? cluster.getTxnLogDir() : node.getTxnLogDir(); //node or cluster-wide
+            txnLogDirs.add(new TransactionLogDir(node.getId(), txnLogDir
+                    + (txnLogDir.charAt(txnLogDir.length() - 1) == File.separatorChar ? File.separator : "")
+                    + "txnLogs" //if the string doesn't have a trailing / add one
+                    + File.separator));
+        }
+        configuration.setMetadataNode(metadataNodeId);
+
+        configuration.setCoredump(coredump);
+        configuration.setTransactionLogDir(txnLogDirs);
+        FileOutputStream os = new FileOutputStream(MERGED_PARAMETERS_PATH);
+        try {
+            JAXBContext ctx = JAXBContext.newInstance(AsterixConfiguration.class);
+            Marshaller marshaller = ctx.createMarshaller();
+            marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true);
+
+            marshaller.marshal(configuration, os);
+        } catch (JAXBException e) {
+            throw new IOException(e);
+        } finally {
+            os.close();
+        }
+    }
+
+    private boolean waitForCompletion(ApplicationId appId, String message) throws YarnException, IOException {
+        return Utils.waitForApplication(appId, yarnClient, message, ccRestPort);
+    }
+
+    private class DFSResourceCoordinate {
+        String name;
+        LocalResource res;
+        Map<String, String> envs;
+
+        public DFSResourceCoordinate() {
+            envs = new HashMap<String, String>(3);
+        }
+    }
+}
diff --git a/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/Deleter.java b/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/Deleter.java
new file mode 100644
index 0000000..55a9d0b
--- /dev/null
+++ b/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/Deleter.java
@@ -0,0 +1,31 @@
+package edu.uci.ics.asterix.aoya;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+
+public class Deleter {
+    private static final Log LOG = LogFactory.getLog(Deleter.class);
+
+    public static void main(String[] args) throws IOException {
+
+	LogManager.getRootLogger().setLevel(Level.DEBUG); 
+	
+        LOG.info("Obliterator args: " + Arrays.toString(args));
+        for (int i = 0; i < args.length; i++) {
+            File f = new File(args[i]);
+            if (f.exists()) {
+                LOG.info("Deleting: " + f.getPath());
+                FileUtils.deleteDirectory(f);
+            } else {
+                LOG.error("Could not find file to delete: " + f.getPath());
+            }
+        }
+    }
+}
diff --git a/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/HDFSBackup.java b/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/HDFSBackup.java
new file mode 100644
index 0000000..13c9a6e
--- /dev/null
+++ b/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/HDFSBackup.java
@@ -0,0 +1,94 @@
+package edu.uci.ics.asterix.aoya;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+public class HDFSBackup {
+    Configuration conf = new YarnConfiguration();
+    private static final Log LOG = LogFactory.getLog(AsterixApplicationMaster.class);
+    boolean restore = false;
+    boolean backup = false;
+
+    public static void main(String[] args) throws ParseException, IllegalArgumentException, IOException {
+
+        HDFSBackup back = new HDFSBackup();
+        Map<String, String> envs = System.getenv();
+        if(envs.containsKey("HADOOP_CONF_DIR")){
+            File hadoopConfDir = new File(envs.get("HADOOP_CONF_DIR"));
+            if(hadoopConfDir.isDirectory()){
+               for(File config: hadoopConfDir.listFiles()){
+                   if(config.getName().matches("^.*(xml)$")){
+                       back.conf.addResource(new Path(config.getAbsolutePath()));
+                   }
+               }
+            }
+        }
+        Options opts = new Options();
+        opts.addOption("restore", false, "");
+        opts.addOption("backup", false, "");
+        CommandLine cliParser = new GnuParser().parse(opts, args);
+        if (cliParser.hasOption("restore")) {
+            back.restore = true;
+        }
+        if (cliParser.hasOption("backup")) {
+            back.backup = true;
+        }
+        @SuppressWarnings("unchecked")
+        List<String> pairs = (List<String>) cliParser.getArgList();
+
+        List<Path[]> sources = new ArrayList<Path[]>(10);
+        for (String p : pairs) {
+            String[] s = p.split(",");
+            sources.add(new Path[] { new Path(s[0]), new Path(s[1]) });
+        }
+
+        try {
+            if (back.backup) {
+                back.performBackup(sources);
+            }
+            if (back.restore) {
+                back.performRestore(sources);
+            }
+        } catch (IOException e) {
+            back.LOG.fatal("Backup/restoration unsuccessful: " + e.getMessage());
+            throw e;
+        }
+    }
+
+    private void performBackup(List<Path[]> paths) throws IOException {
+        FileSystem fs = FileSystem.get(conf);
+        for (Path[] p : paths) {
+            LOG.info("Backing up " + p[0] + " to " + p[1] + ".");
+            fs.copyFromLocalFile(p[0], p[1]);
+        }
+    }
+
+    private void performRestore(List<Path[]> paths) throws IOException {
+        FileSystem fs = FileSystem.get(conf);
+        for (Path[] p : paths) {
+            LOG.info("Restoring " + p[0] + " to " + p[1] + ".");
+            File f = new File(p[1].toString() + File.separator + p[0].getName());
+            LOG.info(f.getAbsolutePath());
+            if (f.exists()) {
+                FileUtils.deleteDirectory(f);
+            }
+            LOG.info(f.exists());
+            fs.copyToLocalFile(false, p[0], p[1], true);
+        }
+    }
+}
diff --git a/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/Utils.java b/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/Utils.java
new file mode 100644
index 0000000..4eb8bc3
--- /dev/null
+++ b/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/Utils.java
@@ -0,0 +1,462 @@
+package edu.uci.ics.asterix.aoya;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.ConnectException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Scanner;
+import java.util.regex.Pattern;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Marshaller;
+import javax.xml.bind.Unmarshaller;
+
+import org.apache.commons.httpclient.*;
+import org.apache.commons.httpclient.methods.GetMethod;
+import org.apache.commons.httpclient.methods.PostMethod;
+import org.apache.commons.httpclient.params.HttpMethodParams;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+import edu.uci.ics.asterix.common.configuration.AsterixConfiguration;
+import edu.uci.ics.asterix.event.schema.yarnCluster.Cluster;
+import edu.uci.ics.asterix.event.schema.yarnCluster.Node;
+
+public class Utils {
+
+    private Utils() {
+
+    }
+
+    private static final String CONF_DIR_REL = AsterixYARNClient.CONF_DIR_REL;
+
+    public static String hostFromContainerID(String containerID) {
+        return containerID.split("_")[4];
+    }
+
+    /**
+     * Gets the metadata node from an AsterixDB cluster description file
+     * 
+     * @param cluster
+     *            The cluster description in question.
+     * @return
+     */
+    public static Node getMetadataNode(Cluster cluster) {
+        Node metadataNode = null;
+        if (cluster.getMetadataNode() != null) {
+            for (Node node : cluster.getNode()) {
+                if (node.getId().equals(cluster.getMetadataNode())) {
+                    metadataNode = node;
+                    break;
+                }
+            }
+        } else {
+            //I will pick one for you.
+            metadataNode = cluster.getNode().get(1);
+        }
+        return metadataNode;
+    }
+
+    /**
+     * Sends a "poison pill" message to an AsterixDB instance for it to shut down safely.
+     * 
+     * @param host
+     *            The host to shut down.
+     * @throws IOException
+     */
+
+    public static void sendShutdownCall(String host, int port) throws IOException {
+        final String url = "http://" + host + ":" + port + "/admin/shutdown";
+        PostMethod method = new PostMethod(url);
+        try {
+            executeHTTPCall(method);
+        } catch (NoHttpResponseException e) {
+            //do nothing... this is expected
+        }
+        //now let's test that the instance is really down, or throw an exception
+        try {
+            executeHTTPCall(method);
+        } catch (ConnectException e) {
+            return;
+        }
+        throw new IOException("Instance did not shut down cleanly.");
+    }
+
+    /**
+     * Simple test via the AsterixDB Javascript API to determine if an instance is truly live or not.
+     * Queries the Metadata dataset and returns true if the query completes successfully, false otherwise.
+     * 
+     * @param host
+     *            The host to run the query against
+     * @return
+     *         True if the instance is OK, false otherwise.
+     * @throws IOException
+     */
+    public static boolean probeLiveness(String host, int port) throws IOException {
+        final String url = "http://" + host + ":" + port + "/query";
+        final String test = "for $x in dataset Metadata.Dataset return $x;";
+        GetMethod method = new GetMethod(url);
+        method.setQueryString(new NameValuePair[] { new NameValuePair("query", test) });
+        InputStream response;
+        try {
+            response = executeHTTPCall(method);
+        } catch (ConnectException e) {
+            return false;
+        }
+        if (response == null) {
+            return false;
+        }
+        BufferedReader br = new BufferedReader(new InputStreamReader(response));
+        String result = br.readLine();
+        if (result == null) {
+            return false;
+        }
+        if(method.getStatusCode() != HttpStatus.SC_OK){
+            return false;
+        }
+        return true;
+    }
+
+    private static InputStream executeHTTPCall(HttpMethod method) throws HttpException, IOException {
+        HttpClient client = new HttpClient();
+        HttpMethodRetryHandler noop = new HttpMethodRetryHandler() {
+            @Override
+            public boolean retryMethod(final HttpMethod method, final IOException exception, int executionCount) {
+                return false;
+            }
+        };
+        client.getParams().setParameter(HttpMethodParams.RETRY_HANDLER, noop);
+        client.executeMethod(method);
+        return method.getResponseBodyAsStream();
+    }
+
+    //**
+
+    public static String makeDots(int iter) {
+        int pos = iter % 3;
+        char[] dots = { ' ', ' ', ' ' };
+        dots[pos] = '.';
+        return new String(dots);
+    }
+
+    public static boolean confirmAction(String warning) {
+        System.out.println(warning);
+        System.out.print("Are you sure you want to do this? (yes/no): ");
+        Scanner in = new Scanner(System.in);
+        while (true) {
+            try {
+                String input = in.nextLine();
+                if ("yes".equals(input)) {
+                    return true;
+                } else if ("no".equals(input)) {
+                    return false;
+                } else {
+                    System.out.println("Please type yes or no");
+                }
+            } finally {
+                in.close();
+            }
+        }
+    }
+
+    /**
+     * Lists the deployed instances of AsterixDB on a YARN cluster
+     * 
+     * @param conf
+     *            Hadoop configuration object
+     * @param confDirRel
+     *            Relative AsterixDB configuration path for DFS
+     * @throws IOException
+     */
+
+    public static void listInstances(Configuration conf, String confDirRel) throws IOException {
+        FileSystem fs = FileSystem.get(conf);
+        Path instanceFolder = new Path(fs.getHomeDirectory(), confDirRel);
+        if (!fs.exists(instanceFolder)) {
+            System.out.println("No running or stopped AsterixDB instances exist in this cluster.");
+            return;
+        }
+        FileStatus[] instances = fs.listStatus(instanceFolder);
+        if (instances.length != 0) {
+            System.out.println("Existing AsterixDB instances: ");
+            for (int i = 0; i < instances.length; i++) {
+                FileStatus st = instances[i];
+                String name = st.getPath().getName();
+                ApplicationId lockFile = AsterixYARNClient.getLockFile(name, conf);
+                if (lockFile != null) {
+                    System.out.println("Instance " + name + " is running with Application ID: " + lockFile.toString());
+                } else {
+                    System.out.println("Instance " + name + " is stopped");
+                }
+            }
+        } else {
+            System.out.println("No running or stopped AsterixDB instances exist in this cluster");
+        }
+    }
+
+    /**
+     * Lists the backups in the DFS.
+     * 
+     * @param conf
+     *            YARN configuration
+     * @param confDirRel
+     *            Relative config path
+     * @param instance
+     *            Instance name
+     * @throws IOException
+     */
+    public static void listBackups(Configuration conf, String confDirRel, String instance) throws IOException {
+        List<String> backups = getBackups(conf,confDirRel,instance);
+        if (backups.size() != 0) {
+            System.out.println("Backups for instance " + instance + ": ");
+            for (String name : backups) {
+                System.out.println("Backup: " + name);
+            }
+        } else {
+            System.out.println("No backups found for instance " + instance + ".");
+        }
+    }
+   /**
+    * Return the available snapshot names 
+    * @param conf 
+    * @param confDirRel
+    * @param instance
+    * @return
+    * @throws IOException
+    */
+    public static List<String> getBackups(Configuration conf, String confDirRel, String instance) throws IOException{
+        FileSystem fs = FileSystem.get(conf);
+        Path backupFolder = new Path(fs.getHomeDirectory(), confDirRel + "/" + instance + "/" + "backups");
+        FileStatus[] backups = fs.listStatus(backupFolder);
+        List<String> backupNames = new ArrayList<String>();
+        for(FileStatus f: backups){
+            backupNames.add(f.getPath().getName());
+        }
+        return backupNames;
+    }
+
+    /**
+     * Removes backup snapshots from the DFS
+     * 
+     * @param conf
+     *            DFS Configuration
+     * @param confDirRel
+     *            Configuration relative directory
+     * @param instance
+     *            The asterix instance name
+     * @param timestamp
+     *            The snapshot timestap (ID)
+     * @throws IOException
+     */
+    public static void rmBackup(Configuration conf, String confDirRel, String instance, long timestamp)
+            throws IOException {
+        FileSystem fs = FileSystem.get(conf);
+        Path backupFolder = new Path(fs.getHomeDirectory(), confDirRel + "/" + instance + "/" + "backups");
+        FileStatus[] backups = fs.listStatus(backupFolder);
+        if (backups.length != 0) {
+            System.out.println("Backups for instance " + instance + ": ");
+        } else {
+            System.out.println("No backups found for instance " + instance + ".");
+        }
+        for (FileStatus f : backups) {
+            String name = f.getPath().getName();
+            long file_ts = Long.parseLong(name);
+            if (file_ts == timestamp) {
+                System.out.println("Deleting backup " + timestamp);
+                if (!fs.delete(f.getPath(), true)) {
+                    System.out.println("Backup could not be deleted");
+                    return;
+                } else {
+                    return;
+                }
+            }
+        }
+        System.out.println("No backup found with specified timestamp");
+
+    }
+
+    /**
+     * Simply parses out the YARN cluster config and instantiates it into a nice object.
+     * 
+     * @return The object representing the configuration
+     * @throws FileNotFoundException
+     * @throws JAXBException
+     */
+    public static Cluster parseYarnClusterConfig(String path) throws YarnException {
+        try {
+            File f = new File(path);
+            JAXBContext configCtx = JAXBContext.newInstance(Cluster.class);
+            Unmarshaller unmarshaller = configCtx.createUnmarshaller();
+            Cluster cl = (Cluster) unmarshaller.unmarshal(f);
+            return cl;
+        } catch (JAXBException e) {
+            throw new YarnException(e);
+        }
+    }
+
+    public static void writeYarnClusterConfig(String path, Cluster cl) throws YarnException {
+        try {
+            File f = new File(path);
+            JAXBContext configCtx = JAXBContext.newInstance(Cluster.class);
+            Marshaller marhsaller = configCtx.createMarshaller();
+            marhsaller.marshal(cl, f);
+        } catch (JAXBException e) {
+            throw new YarnException(e);
+        }
+    }
+
+    /**
+     * Looks in the current class path for AsterixDB libraries and gets the version number from the name of the first match.
+     * 
+     * @return The version found, as a string.
+     */
+
+    public static String getAsterixVersionFromClasspath() {
+        String[] cp = System.getProperty("java.class.path").split(System.getProperty("path.separator"));
+        String asterixJarPattern = "^(asterix).*(jar)$"; //starts with asterix,ends with jar
+
+        for (String j : cp) {
+            //escape backslashes for windows
+            String[] pathComponents = j.split(Pattern.quote(File.separator));
+            if (pathComponents[pathComponents.length - 1].matches(asterixJarPattern)) {
+                //get components of maven version
+                String[] byDash = pathComponents[pathComponents.length - 1].split("-");
+                //get the version number but remove the possible '.jar' tailing it
+                String version = (byDash[2].split("\\."))[0];
+                //SNAPSHOT suffix
+                if (byDash.length == 4) {
+                    //do the same if it's a snapshot suffix
+                    return version + '-' + (byDash[3].split("\\."))[0];
+                }
+                //stable version
+                return version;
+            }
+        }
+        return null;
+    }
+
+    public static boolean waitForLiveness(ApplicationId appId, boolean probe, boolean print, String message,
+            YarnClient yarnClient, String instanceName, Configuration conf, int port) throws YarnException {
+        ApplicationReport report;
+        try {
+            report = yarnClient.getApplicationReport(appId);
+        } catch (IOException e) {
+            throw new YarnException(e);
+        }
+        YarnApplicationState st = report.getYarnApplicationState();
+        for (int i = 0; i < 120; i++) {
+            if (st != YarnApplicationState.RUNNING) {
+                try {
+                    report = yarnClient.getApplicationReport(appId);
+                    st = report.getYarnApplicationState();
+                    if (print) {
+                        System.out.print(message + Utils.makeDots(i) + "\r");
+                    }
+                    Thread.sleep(1000);
+                } catch (InterruptedException e1) {
+                    Thread.currentThread().interrupt();
+                } catch (IOException e1) {
+                    throw new YarnException(e1);
+                }
+                if (st == YarnApplicationState.FAILED || st == YarnApplicationState.FINISHED
+                        || st == YarnApplicationState.KILLED) {
+                    return false;
+                }
+            }
+            if (probe) {
+                String host;
+                host = getCCHostname(instanceName, conf);
+                try {
+                    for (int j = 0; j < 60; j++) {
+                        if (!Utils.probeLiveness(host, port)) {
+                            try {
+                                if (print) {
+                                    System.out.print(message + Utils.makeDots(i) + "\r");
+                                }
+                                Thread.sleep(1000);
+                            } catch (InterruptedException e2) {
+                                Thread.currentThread().interrupt();
+                            }
+                        } else {
+                            if (print) {
+                                System.out.println("");
+                            }
+                            return true;
+                        }
+                    }
+                } catch (IOException e1) {
+                    throw new YarnException(e1);
+                }
+            } else {
+                if (print) {
+                    System.out.println("");
+                }
+                return true;
+            }
+        }
+        if (print) {
+            System.out.println("");
+        }
+        return false;
+    }
+
+    public static boolean waitForLiveness(ApplicationId appId, String message, YarnClient yarnClient,
+            String instanceName, Configuration conf, int port) throws YarnException, IOException {
+        return waitForLiveness(appId, true, true, message, yarnClient, instanceName, conf, port);
+    }
+
+    public static boolean waitForApplication(ApplicationId appId, YarnClient yarnClient, String message, int port)
+            throws YarnException, IOException {
+        return waitForLiveness(appId, false, true, message, yarnClient, "", null, port);
+    }
+
+    public static boolean waitForApplication(ApplicationId appId, YarnClient yarnClient, int port) throws YarnException,
+            IOException, JAXBException {
+        return waitForLiveness(appId, false, false, "", yarnClient, "", null, port);
+    }
+
+    public static String getCCHostname(String instanceName, Configuration conf) throws YarnException {
+        try {
+            FileSystem fs = FileSystem.get(conf);
+            String instanceFolder = instanceName + "/";
+            String pathSuffix = CONF_DIR_REL + instanceFolder + "cluster-config.xml";
+            Path dstConf = new Path(fs.getHomeDirectory(), pathSuffix);
+            File tmp = File.createTempFile("cluster-config", "xml");
+            tmp.deleteOnExit();
+            fs.copyToLocalFile(dstConf, new Path(tmp.getPath()));
+            JAXBContext clusterConf = JAXBContext.newInstance(Cluster.class);
+            Unmarshaller unm = clusterConf.createUnmarshaller();
+            Cluster cl = (Cluster) unm.unmarshal(tmp);
+            String ccIp = cl.getMasterNode().getClientIp();
+            return ccIp;
+        } catch (IOException | JAXBException e) {
+            throw new YarnException(e);
+        }
+    }
+
+    public static AsterixConfiguration loadAsterixConfig(String path) throws IOException {
+        File f = new File(path);
+        try {
+            JAXBContext configCtx = JAXBContext.newInstance(AsterixConfiguration.class);
+            Unmarshaller unmarshaller = configCtx.createUnmarshaller();
+            AsterixConfiguration conf = (AsterixConfiguration) unmarshaller.unmarshal(f);
+            return conf;
+        } catch (JAXBException e) {
+            throw new IOException(e);
+        }
+    }
+
+}
diff --git a/asterix-yarn/src/main/resources/base-asterix-configuration.xml b/asterix-yarn/src/main/resources/base-asterix-configuration.xml
new file mode 100644
index 0000000..76c00db
--- /dev/null
+++ b/asterix-yarn/src/main/resources/base-asterix-configuration.xml
@@ -0,0 +1,246 @@
+<!--
+ ! Copyright 2009-2013 by The Regents of the University of California
+ ! Licensed under the Apache License, Version 2.0 (the "License");
+ ! you may not use this file except in compliance with the License.
+ ! you may obtain a copy of the License from
+ ! 
+ !     http://www.apache.org/licenses/LICENSE-2.0
+ ! 
+ ! Unless required by applicable law or agreed to in writing, software
+ ! distributed under the License is distributed on an "AS IS" BASIS,
+ ! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ! See the License for the specific language governing permissions and
+ ! limitations under the License.
+ !-->
+<asterixConfiguration xmlns="asterixconf">
+
+	<property>
+		<name>nc.java.opts</name>
+		<value>-Xmx1536m</value>
+		<description>JVM parameters for each Node Contoller (NC)</description>
+	</property>
+
+	<property>
+		<name>cc.java.opts</name>
+		<value>-Xmx1024m</value>
+		<description>JVM parameters for each Cluster Contoller (CC)
+		</description>
+	</property>
+
+        <property>
+                <name>max.wait.active.cluster</name>
+                <value>60</value>
+                <description>Maximum wait (in seconds) for a cluster to be ACTIVE (all nodes are available)
+                        before a submitted query/statement can be executed. (Default = 60 seconds)
+                </description>
+        </property>
+
+	<property>
+		<name>storage.buffercache.pagesize</name>
+		<value>131072</value>
+		<description>The page size in bytes for pages in the buffer cache.
+			(Default = "131072" // 128KB)
+		</description>
+	</property>
+
+	<property>
+		<name>storage.buffercache.size</name>
+		<value>536870912</value>
+		<description>The size of memory allocated to the disk buffer cache.
+			The value should be a multiple of the buffer cache page size(Default
+			= "536870912" // 512MB)
+		</description>
+	</property>
+
+	<property>
+		<name>storage.buffercache.maxopenfiles</name>
+		<value>214748364</value>
+		<description>The maximum number of open files in the buffer cache.
+			(Default = "214748364")
+		</description>
+	</property>
+
+	<property>
+		<name>storage.memorycomponent.pagesize</name>
+		<value>131072</value>
+		<description>The page size in bytes for pages allocated to memory
+			components. (Default = "131072" // 128KB)
+		</description>
+	</property>
+
+	<property>
+		<name>storage.memorycomponent.numpages</name>
+		<value>256</value>
+		<description>The number of pages to allocate for a memory component.
+			(Default = 256)
+		</description>
+	</property>
+	
+	<property>
+		<name>storage.metadata.memorycomponent.numpages</name>
+		<value>64</value>
+		<description>The number of pages to allocate for a memory component.
+			(Default = 64)
+		</description>
+	</property>
+
+    <property>
+		<name>storage.memorycomponent.numcomponents</name>
+		<value>2</value>
+		<description>The number of memory components to be used per lsm index.
+			(Default = 2)
+		</description>
+	</property>
+	
+	<property>
+		<name>storage.memorycomponent.globalbudget</name>
+		<value>536870912</value>
+		<description>The total size of memory in bytes that the sum of all
+			open memory
+			components cannot exceed. (Default = "536870192" // 512MB)
+		</description>
+	</property>
+
+	<property>
+		<name>storage.lsm.bloomfilter.falsepositiverate</name>
+		<value>0.01</value>
+		<description>The maximum acceptable false positive rate for bloom
+			filters associated with LSM indexes. (Default = "0.01" // 1%)
+		</description>
+	</property>
+	
+	<property>
+		<name>txn.log.buffer.numpages</name>
+		<value>8</value>
+		<description>The number of in-memory log buffer pages. (Default = "8")
+		</description>
+	</property>
+
+	<property>
+		<name>txn.log.buffer.pagesize</name>
+		<value>524288</value>
+		<description>The size of pages in the in-memory log buffer. (Default =
+			"524288" // 512KB)
+		</description>
+	</property>
+
+	<property>
+		<name>txn.log.partitionsize</name>
+		<value>2147483648</value>
+		<description>The maximum size of a log file partition allowed before
+			rotating the log to the next partition. (Default = "2147483648" //
+			2GB)
+		</description>
+	</property>
+
+	<property>
+		<name>txn.log.checkpoint.lsnthreshold</name>
+		<value>67108864</value>
+		<description>The size of the window that the maximum LSN is allowed to
+			be ahead of the checkpoint LSN by. (Default = ""67108864" // 64M)
+		</description>
+	</property>
+
+	<property>
+		<name>txn.log.checkpoint.pollfrequency</name>
+		<value>120</value>
+		<description>The time in seconds between that the checkpoint thread
+			waits between polls. (Default = "120" // 120s)
+		</description>
+	</property>
+
+	<property>
+		<name>txn.log.checkpoint.history</name>
+		<value>0</value>
+		<description>The number of old log partition files to keep before
+			discarding. (Default = "0")
+		</description>
+	</property>
+
+	<property>
+		<name>txn.lock.escalationthreshold</name>
+		<value>1000</value>
+		<description>The number of entity level locks that need to be acquired
+			before the locks are coalesced and escalated into a dataset level
+			lock. (Default = "1000")
+		</description>
+	</property>
+
+	<property>
+		<name>txn.lock.shrinktimer</name>
+		<value>5000</value>
+		<description>The time in milliseconds to wait before deallocating
+			unused lock manager memory. (Default = "5000" // 5s)
+		</description>
+	</property>
+
+	<property>
+		<name>txn.lock.timeout.waitthreshold</name>
+		<value>60000</value>
+		<description>The time in milliseconds to wait before labeling a
+			transaction which has been waiting for a lock timed-out. (Default =
+			"60000" // 60s)
+		</description>
+	</property>
+
+	<property>
+		<name>txn.lock.timeout.sweepthreshold</name>
+		<value>10000</value>
+		<description>The time in milliseconds the timeout thread waits between
+			sweeps to detect timed-out transactions. (Default = "10000" // 10s)
+		</description>
+	</property>
+
+	<property>
+		<name>compiler.sortmemory</name>
+		<value>33554432</value>
+		<description>The amount of memory in bytes given to sort operations.
+			(Default = "33554432" // 32mb)
+		</description>
+	</property>
+
+	<property>
+		<name>compiler.joinmemory</name>
+		<value>33554432</value>
+		<description>The amount of memory in bytes given to join operations.
+			(Default = "33554432" // 32mb)
+		</description>
+	</property>
+
+	<property>
+		<name>compiler.framesize</name>
+		<value>131072</value>
+		<description>The Hyracks frame size that the compiler configures per
+			job. (Default = "131072" // 128KB)
+		</description>
+	</property>
+
+	<property>
+		<name>web.port</name>
+		<value>19001</value>
+		<description>The port for the ASTERIX web interface. (Default = 19001)
+		</description>
+	</property>
+
+	<property>
+		<name>api.port</name>
+		<value>19002</value>
+		<description>The port for the ASTERIX API server. (Default = 19002)
+		</description>
+	</property>
+
+	<property>
+		<name>log.level</name>
+		<value>INFO</value>
+		<description>The minimum log level to be displayed. (Default = INFO)
+		</description>
+	</property>
+
+	<property>
+		<name>plot.activate</name>
+		<value>false</value>
+		<description>Enabling plot of Algebricks plan to tmp folder. (Default = false)
+		</description>
+	</property>
+
+</asterixConfiguration>
diff --git a/asterix-yarn/src/main/resources/configs/asterix-client-log4j.properties b/asterix-yarn/src/main/resources/configs/asterix-client-log4j.properties
new file mode 100644
index 0000000..4b936b5
--- /dev/null
+++ b/asterix-yarn/src/main/resources/configs/asterix-client-log4j.properties
@@ -0,0 +1,23 @@
+#/*
+# Copyright 2009-2013 by The Regents of the University of California
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# you may obtain a copy of the License from
+# 
+#     http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#*/
+log4j.rootLogger=info, A1
+
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+# Print the date in ISO 8601 format
+log4j.appender.A1.layout.ConversionPattern=%-p: %m%n
+
+log4j.logger.edu.uci.ics.asterix.event.management=info
+log4j.logger.org.apache.zookeeper=info
diff --git a/asterix-yarn/src/main/resources/configs/base-asterix-configuration.xml b/asterix-yarn/src/main/resources/configs/base-asterix-configuration.xml
new file mode 100644
index 0000000..76c00db
--- /dev/null
+++ b/asterix-yarn/src/main/resources/configs/base-asterix-configuration.xml
@@ -0,0 +1,246 @@
+<!--
+ ! Copyright 2009-2013 by The Regents of the University of California
+ ! Licensed under the Apache License, Version 2.0 (the "License");
+ ! you may not use this file except in compliance with the License.
+ ! you may obtain a copy of the License from
+ ! 
+ !     http://www.apache.org/licenses/LICENSE-2.0
+ ! 
+ ! Unless required by applicable law or agreed to in writing, software
+ ! distributed under the License is distributed on an "AS IS" BASIS,
+ ! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ! See the License for the specific language governing permissions and
+ ! limitations under the License.
+ !-->
+<asterixConfiguration xmlns="asterixconf">
+
+	<property>
+		<name>nc.java.opts</name>
+		<value>-Xmx1536m</value>
+		<description>JVM parameters for each Node Contoller (NC)</description>
+	</property>
+
+	<property>
+		<name>cc.java.opts</name>
+		<value>-Xmx1024m</value>
+		<description>JVM parameters for each Cluster Contoller (CC)
+		</description>
+	</property>
+
+        <property>
+                <name>max.wait.active.cluster</name>
+                <value>60</value>
+                <description>Maximum wait (in seconds) for a cluster to be ACTIVE (all nodes are available)
+                        before a submitted query/statement can be executed. (Default = 60 seconds)
+                </description>
+        </property>
+
+	<property>
+		<name>storage.buffercache.pagesize</name>
+		<value>131072</value>
+		<description>The page size in bytes for pages in the buffer cache.
+			(Default = "131072" // 128KB)
+		</description>
+	</property>
+
+	<property>
+		<name>storage.buffercache.size</name>
+		<value>536870912</value>
+		<description>The size of memory allocated to the disk buffer cache.
+			The value should be a multiple of the buffer cache page size(Default
+			= "536870912" // 512MB)
+		</description>
+	</property>
+
+	<property>
+		<name>storage.buffercache.maxopenfiles</name>
+		<value>214748364</value>
+		<description>The maximum number of open files in the buffer cache.
+			(Default = "214748364")
+		</description>
+	</property>
+
+	<property>
+		<name>storage.memorycomponent.pagesize</name>
+		<value>131072</value>
+		<description>The page size in bytes for pages allocated to memory
+			components. (Default = "131072" // 128KB)
+		</description>
+	</property>
+
+	<property>
+		<name>storage.memorycomponent.numpages</name>
+		<value>256</value>
+		<description>The number of pages to allocate for a memory component.
+			(Default = 256)
+		</description>
+	</property>
+	
+	<property>
+		<name>storage.metadata.memorycomponent.numpages</name>
+		<value>64</value>
+		<description>The number of pages to allocate for a memory component.
+			(Default = 64)
+		</description>
+	</property>
+
+    <property>
+		<name>storage.memorycomponent.numcomponents</name>
+		<value>2</value>
+		<description>The number of memory components to be used per lsm index.
+			(Default = 2)
+		</description>
+	</property>
+	
+	<property>
+		<name>storage.memorycomponent.globalbudget</name>
+		<value>536870912</value>
+		<description>The total size of memory in bytes that the sum of all
+			open memory
+			components cannot exceed. (Default = "536870192" // 512MB)
+		</description>
+	</property>
+
+	<property>
+		<name>storage.lsm.bloomfilter.falsepositiverate</name>
+		<value>0.01</value>
+		<description>The maximum acceptable false positive rate for bloom
+			filters associated with LSM indexes. (Default = "0.01" // 1%)
+		</description>
+	</property>
+	
+	<property>
+		<name>txn.log.buffer.numpages</name>
+		<value>8</value>
+		<description>The number of in-memory log buffer pages. (Default = "8")
+		</description>
+	</property>
+
+	<property>
+		<name>txn.log.buffer.pagesize</name>
+		<value>524288</value>
+		<description>The size of pages in the in-memory log buffer. (Default =
+			"524288" // 512KB)
+		</description>
+	</property>
+
+	<property>
+		<name>txn.log.partitionsize</name>
+		<value>2147483648</value>
+		<description>The maximum size of a log file partition allowed before
+			rotating the log to the next partition. (Default = "2147483648" //
+			2GB)
+		</description>
+	</property>
+
+	<property>
+		<name>txn.log.checkpoint.lsnthreshold</name>
+		<value>67108864</value>
+		<description>The size of the window that the maximum LSN is allowed to
+			be ahead of the checkpoint LSN by. (Default = ""67108864" // 64M)
+		</description>
+	</property>
+
+	<property>
+		<name>txn.log.checkpoint.pollfrequency</name>
+		<value>120</value>
+		<description>The time in seconds between that the checkpoint thread
+			waits between polls. (Default = "120" // 120s)
+		</description>
+	</property>
+
+	<property>
+		<name>txn.log.checkpoint.history</name>
+		<value>0</value>
+		<description>The number of old log partition files to keep before
+			discarding. (Default = "0")
+		</description>
+	</property>
+
+	<property>
+		<name>txn.lock.escalationthreshold</name>
+		<value>1000</value>
+		<description>The number of entity level locks that need to be acquired
+			before the locks are coalesced and escalated into a dataset level
+			lock. (Default = "1000")
+		</description>
+	</property>
+
+	<property>
+		<name>txn.lock.shrinktimer</name>
+		<value>5000</value>
+		<description>The time in milliseconds to wait before deallocating
+			unused lock manager memory. (Default = "5000" // 5s)
+		</description>
+	</property>
+
+	<property>
+		<name>txn.lock.timeout.waitthreshold</name>
+		<value>60000</value>
+		<description>The time in milliseconds to wait before labeling a
+			transaction which has been waiting for a lock timed-out. (Default =
+			"60000" // 60s)
+		</description>
+	</property>
+
+	<property>
+		<name>txn.lock.timeout.sweepthreshold</name>
+		<value>10000</value>
+		<description>The time in milliseconds the timeout thread waits between
+			sweeps to detect timed-out transactions. (Default = "10000" // 10s)
+		</description>
+	</property>
+
+	<property>
+		<name>compiler.sortmemory</name>
+		<value>33554432</value>
+		<description>The amount of memory in bytes given to sort operations.
+			(Default = "33554432" // 32mb)
+		</description>
+	</property>
+
+	<property>
+		<name>compiler.joinmemory</name>
+		<value>33554432</value>
+		<description>The amount of memory in bytes given to join operations.
+			(Default = "33554432" // 32mb)
+		</description>
+	</property>
+
+	<property>
+		<name>compiler.framesize</name>
+		<value>131072</value>
+		<description>The Hyracks frame size that the compiler configures per
+			job. (Default = "131072" // 128KB)
+		</description>
+	</property>
+
+	<property>
+		<name>web.port</name>
+		<value>19001</value>
+		<description>The port for the ASTERIX web interface. (Default = 19001)
+		</description>
+	</property>
+
+	<property>
+		<name>api.port</name>
+		<value>19002</value>
+		<description>The port for the ASTERIX API server. (Default = 19002)
+		</description>
+	</property>
+
+	<property>
+		<name>log.level</name>
+		<value>INFO</value>
+		<description>The minimum log level to be displayed. (Default = INFO)
+		</description>
+	</property>
+
+	<property>
+		<name>plot.activate</name>
+		<value>false</value>
+		<description>Enabling plot of Algebricks plan to tmp folder. (Default = false)
+		</description>
+	</property>
+
+</asterixConfiguration>
diff --git a/asterix-yarn/src/main/resources/configs/local.xml b/asterix-yarn/src/main/resources/configs/local.xml
new file mode 100644
index 0000000..582254c
--- /dev/null
+++ b/asterix-yarn/src/main/resources/configs/local.xml
@@ -0,0 +1,31 @@
+    <cluster xmlns="yarn_cluster">
+
+      <!-- Name of the cluster -->
+      <name>local</name>
+
+      <log_dir>/tmp/</log_dir>
+      <txn_log_dir>/tmp/</txn_log_dir>
+
+      <!-- Mount point of an iodevice. Use a comma separated list for a machine that
+                      has multiple iodevices (disks).
+                                 This property can be overriden for a node by redefining at the node level. -->
+      <iodevices>/tmp</iodevices>
+
+      <!-- Path on each iodevice where Asterix will store its data -->
+      <store>storage</store>
+
+       <!-- IP addresses of the master machine A -->
+      <master_node>
+          <id>cc</id>
+	      <client_ip>localhost</client_ip>
+	      <cluster_ip>localhost</cluster_ip>
+	      <client_port>1098</client_port>
+	      <cluster_port>1099</cluster_port>
+	      <http_port>8888</http_port>
+      </master_node>
+      <node>
+      	      <id>nc1</id>
+      	      <cluster_ip>127.0.0.1</cluster_ip>
+      </node>
+    <metadata_node>nc1</metadata_node>
+</cluster>
diff --git a/asterix-yarn/src/main/resources/configs/my_awesome_cluster_desc.xml b/asterix-yarn/src/main/resources/configs/my_awesome_cluster_desc.xml
new file mode 100644
index 0000000..749bc6f
--- /dev/null
+++ b/asterix-yarn/src/main/resources/configs/my_awesome_cluster_desc.xml
@@ -0,0 +1,23 @@
+    <cluster xmlns="yarn_cluster">
+        <name>my_awesome_instance</name>
+        <txn_log_dir>/home/yarn/</txn_log_dir>
+        <iodevices>/home/yarn/</iodevices>
+        <store>asterix-data</store>
+        <master_node>
+            <id>cc</id>
+            <client_ip>10.10.0.2</client_ip>
+            <cluster_ip>10.10.0.2</cluster_ip>
+            <client_port>1098</client_port>
+            <cluster_port>1099</cluster_port>
+            <http_port>8888</http_port>
+        </master_node>
+        <node>
+            <id>nc1</id>
+            <cluster_ip>10.10.0.3</cluster_ip>
+        </node>
+        <node>
+            <id>nc2</id>
+            <cluster_ip>10.10.0.4</cluster_ip>
+        </node>
+        <metadata_node>nc1</metadata_node>
+    </cluster>
diff --git a/asterix-yarn/src/main/resources/scripts/asterix b/asterix-yarn/src/main/resources/scripts/asterix
new file mode 100644
index 0000000..c2fc20f
--- /dev/null
+++ b/asterix-yarn/src/main/resources/scripts/asterix
@@ -0,0 +1,25 @@
+#!/usr/bin/env bash
+if [ -z $ASTERIX_HOME ]
+ then
+   pushd $(dirname $0) >/dev/null
+   cd ..
+   export ASTERIX_HOME=$(pwd)
+   popd >/dev/null
+fi
+
+for jar in `ls $ASTERIX_HOME/lib/*.jar`
+  do 
+  if [ -z $ASTERIX_CLASSPATH ] 
+  then 
+    ASTERIX_CLASSPATH=$jar 
+  else
+    ASTERIX_CLASSPATH=$ASTERIX_CLASSPATH:$jar 
+  fi
+done
+
+ASTERIX_CLASSPATH=$ASTERIX_CLASSPATH:
+ASTERIX_CLASSPATH=$ASTERIX_CLASSPATH:$YARN_CONF_DIR:$HADOOP_CONF_DIR:$HADOOP_CONF_PATH
+pushd $(dirname $0) > /dev/null
+cd $ASTERIX_HOME
+java $JAVA_OPTS -cp $ASTERIX_CLASSPATH -Dlog4j.configuration=file://$ASTERIX_HOME/conf/asterix-client-log4j.properties edu.uci.ics.asterix.aoya.AsterixYARNClient $@
+popd > /dev/null
diff --git a/asterix-yarn/src/main/resources/scripts/asterix.cmd b/asterix-yarn/src/main/resources/scripts/asterix.cmd
new file mode 100644
index 0000000..a85e411
--- /dev/null
+++ b/asterix-yarn/src/main/resources/scripts/asterix.cmd
@@ -0,0 +1,103 @@
+@echo off
+@rem Licensed to the Apache Software Foundation (ASF) under one or more
+@rem contributor license agreements.  See the NOTICE file distributed with
+@rem this work for additional information regarding copyright ownership.
+@rem The ASF licenses this file to You under the Apache License, Version 2.0
+@rem (the "License"); you may not use this file except in compliance with
+@rem the License.  You may obtain a copy of the License at
+@rem
+@rem     http://www.apache.org/licenses/LICENSE-2.0
+@rem
+@rem Unless required by applicable law or agreed to in writing, software
+@rem distributed under the License is distributed on an "AS IS" BASIS,
+@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+@rem See the License for the specific language governing permissions and
+@rem limitations under the License.
+
+setlocal enabledelayedexpansion
+
+if not defined HADOOP_BIN_PATH ( 
+  set HADOOP_BIN_PATH=%~dp0
+)
+
+if "%HADOOP_BIN_PATH:~-1%" == "\" (
+  set HADOOP_BIN_PATH=%HADOOP_BIN_PATH:~0,-1%
+)
+
+set DEFAULT_LIBEXEC_DIR=%HADOOP_BIN_PATH%\..\libexec
+if not defined HADOOP_LIBEXEC_DIR (
+  set HADOOP_LIBEXEC_DIR=%DEFAULT_LIBEXEC_DIR%
+)
+
+:main
+
+  set CLASS=edu.uci.ics.asterix.aoya.AsterixYARNClient
+
+  @rem JAVA and JAVA_HEAP_MAX and set in hadoop-config.cmd
+
+  if defined YARN_HEAPSIZE (
+    @rem echo run with Java heapsize %YARN_HEAPSIZE%
+    set JAVA_HEAP_MAX=-Xmx%YARN_HEAPSIZE%m
+  )
+
+  @rem CLASSPATH initially contains HADOOP_CONF_DIR & YARN_CONF_DIR
+  if not defined HADOOP_CONF_DIR (
+    echo No HADOOP_CONF_DIR set. 
+    echo Please specify it either in yarn-env.cmd or in the environment.
+    goto :eof
+  )
+
+  set CLASSPATH=%HADOOP_CONF_DIR%;%YARN_CONF_DIR%;%CLASSPATH%;Z:\lib\*
+
+  @rem for developers, add Hadoop classes to CLASSPATH
+  if exist %HADOOP_YARN_HOME%\yarn-api\target\classes (
+    set CLASSPATH=%CLASSPATH%;%HADOOP_YARN_HOME%\yarn-api\target\classes
+  )
+
+  if exist %HADOOP_YARN_HOME%\yarn-common\target\classes (
+    set CLASSPATH=%CLASSPATH%;%HADOOP_YARN_HOME%\yarn-common\target\classes
+  )
+
+  if exist %HADOOP_YARN_HOME%\yarn-mapreduce\target\classes (
+    set CLASSPATH=%CLASSPATH%;%HADOOP_YARN_HOME%\yarn-mapreduce\target\classes
+  )
+
+  if exist %HADOOP_YARN_HOME%\yarn-master-worker\target\classes (
+    set CLASSPATH=%CLASSPATH%;%HADOOP_YARN_HOME%\yarn-master-worker\target\classes
+  )
+
+  if exist %HADOOP_YARN_HOME%\yarn-server\yarn-server-nodemanager\target\classes (
+    set CLASSPATH=%CLASSPATH%;%HADOOP_YARN_HOME%\yarn-server\yarn-server-nodemanager\target\classes
+  )
+
+  if exist %HADOOP_YARN_HOME%\yarn-server\yarn-server-common\target\classes (
+    set CLASSPATH=%CLASSPATH%;%HADOOP_YARN_HOME%\yarn-server\yarn-server-common\target\classes
+  )
+
+  if exist %HADOOP_YARN_HOME%\yarn-server\yarn-server-resourcemanager\target\classes (
+    set CLASSPATH=%CLASSPATH%;%HADOOP_YARN_HOME%\yarn-server\yarn-server-resourcemanager\target\classes
+  )
+
+  if exist %HADOOP_YARN_HOME%\build\test\classes (
+    set CLASSPATH=%CLASSPATH%;%HADOOP_YARN_HOME%\build\test\classes
+  )
+
+  if exist %HADOOP_YARN_HOME%\build\tools (
+    set CLASSPATH=%CLASSPATH%;%HADOOP_YARN_HOME%\build\tools
+  )
+
+  set CLASSPATH=%CLASSPATH%;%HADOOP_YARN_HOME%\%YARN_DIR%\*
+  set CLASSPATH=%CLASSPATH%;%HADOOP_YARN_HOME%\%YARN_LIB_JARS_DIR%\*
+
+  
+
+  if defined JAVA_LIBRARY_PATH (
+    set YARN_OPTS=%YARN_OPTS% -Djava.library.path=%JAVA_LIBRARY_PATH%
+  )
+
+  set java_arguments=%JAVA_HEAP_MAX% %YARN_OPTS% -classpath %CLASSPATH% %CLASS%
+  call java %java_arguments% %*
+
+goto :eof
+
+endlocal