YARN integration for AsterixDB
This is an initial version of YARN integration for AsterixDB.
- Uses static assignment of CC and NC nodes to NM locations
- Stores state locally on each NM, outside of HDFS
- "All or nothing" container allocation. We don't attempt to
move or rellocate containers the RM may kill (yet).
- Retains feature parity with managix.
Change-Id: I49c849179d17fc7faa446b9be57a0695df6836ab
Reviewed-on: https://asterix-gerrit.ics.uci.edu/161
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <hubailmor@gmail.com>
diff --git a/asterix-yarn/pom.xml b/asterix-yarn/pom.xml
new file mode 100644
index 0000000..e4eb3c6
--- /dev/null
+++ b/asterix-yarn/pom.xml
@@ -0,0 +1,241 @@
+<!--
+ ! Copyright 2009-2013 by The Regents of the University of California
+ ! Licensed under the Apache License, Version 2.0 (the "License");
+ ! you may not use this file except in compliance with the License.
+ ! you may obtain a copy of the License from
+ !
+ ! http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing, software
+ ! distributed under the License is distributed on an "AS IS" BASIS,
+ ! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ! See the License for the specific language governing permissions and
+ ! limitations under the License.
+ !-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>asterix</artifactId>
+ <groupId>edu.uci.ics.asterix</groupId>
+ <version>0.8.7-SNAPSHOT</version>
+ </parent>
+ <artifactId>asterix-yarn</artifactId>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.0.2</version>
+ <configuration>
+ <source>1.7</source>
+ <target>1.7</target>
+ <fork>true</fork>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>aoya</id>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ <phase>package</phase>
+ <configuration>
+ <classifier>aoya</classifier>
+ <archive>
+ <manifest>
+ <MainClass>edu.uci.ics.asterix.aoya.Client</MainClass>
+ </manifest>
+ </archive>
+ <includes>
+ <include>**/uci/ics/asterix/tools/aoya/*</include>
+ </includes>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>2.2-beta-5</version>
+ <executions>
+ <execution>
+ <configuration>
+ <descriptors>
+ <descriptor>src/main/assembly/binary-assembly.xml</descriptor>
+ </descriptors>
+ </configuration>
+ <phase>package</phase>
+ <goals>
+ <goal>attached</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <version>2.6</version>
+ <configuration>
+ <runOrder>alphabetical</runOrder>
+ <forkMode>pertest</forkMode>
+ <excludes>
+ </excludes>
+ <environmentVariables>
+ <JAVA_HOME>${java.home}</JAVA_HOME>
+ </environmentVariables>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>integration-test</goal>
+ <goal>verify</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <repositories>
+ <repository>
+ <id>central</id>
+ <url>http://repo1.maven.org/maven2</url>
+ <releases>
+ <enabled>true</enabled>
+ </releases>
+ <snapshots>
+ <enabled>false</enabled>
+ </snapshots>
+ </repository>
+ <repository>
+ <releases>
+ <enabled>true</enabled>
+ <updatePolicy>always</updatePolicy>
+ <checksumPolicy>warn</checksumPolicy>
+ </releases>
+ <snapshots>
+ <enabled>false</enabled>
+ <updatePolicy>never</updatePolicy>
+ <checksumPolicy>fail</checksumPolicy>
+ </snapshots>
+ <id>HDPReleases</id>
+ <name>HDP Releases</name>
+ <url>http://repo.hortonworks.com/content/repositories/releases</url>
+ <layout>default</layout>
+ </repository>
+ </repositories>
+
+ <dependencies>
+ <dependency>
+ <groupId>edu.uci.ics.asterix</groupId>
+ <artifactId>asterix-events</artifactId>
+ <version>0.8.7-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.asterix</groupId>
+ <artifactId>asterix-common</artifactId>
+ <version>0.8.7-SNAPSHOT</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.asterix</groupId>
+ <artifactId>asterix-test-framework</artifactId>
+ <version>0.8.7-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.asterix</groupId>
+ <artifactId>asterix-common</artifactId>
+ <version>0.8.7-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-client</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ <version>1.4</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-net</groupId>
+ <artifactId>commons-net</artifactId>
+ <version>3.1</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ <version>2.5</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-collections</groupId>
+ <artifactId>commons-collections</artifactId>
+ <version>3.2.1</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-configuration</groupId>
+ <artifactId>commons-configuration</artifactId>
+ <version>1.6</version>
+ </dependency>
+
+ <dependency>
+ <groupId>commons-cli</groupId>
+ <artifactId>commons-cli</artifactId>
+ <version>1.2</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-math</artifactId>
+ <version>2.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>4.2.2</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpcore</artifactId>
+ <version>4.2.2</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.asterix</groupId>
+ <artifactId>asterix-server</artifactId>
+ <version>0.8.7-SNAPSHOT</version>
+ <type>zip</type>
+ <classifier>binary-assembly</classifier>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.asterix</groupId>
+ <artifactId>asterix-runtime</artifactId>
+ <version>0.8.7-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>edu.uci.ics.asterix</groupId>
+ <artifactId>asterix-app</artifactId>
+ <version>0.8.7-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-simple</artifactId>
+ <version>1.6.1</version>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/asterix-yarn/src/main/assembly/binary-assembly.xml b/asterix-yarn/src/main/assembly/binary-assembly.xml
new file mode 100644
index 0000000..8457c66
--- /dev/null
+++ b/asterix-yarn/src/main/assembly/binary-assembly.xml
@@ -0,0 +1,101 @@
+<!--
+ ! Copyright 2009-2013 by The Regents of the University of California
+ ! Licensed under the Apache License, Version 2.0 (the "License");
+ ! you may not use this file except in compliance with the License.
+ ! you may obtain a copy of the License from
+ !
+ ! http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing, software
+ ! distributed under the License is distributed on an "AS IS" BASIS,
+ ! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ! See the License for the specific language governing permissions and
+ ! limitations under the License.
+ !-->
+<assembly>
+ <id>binary-assembly</id>
+ <formats>
+ <format>dir</format>
+ <format>zip</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <fileSets>
+ <fileSet>
+ <directory>src/main/resources/scripts</directory>
+ <fileMode>0755</fileMode>
+ <includes>
+ <include>asterix</include>
+ </includes>
+ <outputDirectory>bin</outputDirectory>
+ </fileSet>
+ <fileSet>
+ <directory>src/main/resources/configs</directory>
+ <fileMode>0755</fileMode>
+ <includes>
+ <include>local.xml</include>
+ </includes>
+ <outputDirectory>configs</outputDirectory>
+ </fileSet>
+ <fileSet>
+ <directory>src/main/resources/</directory>
+ <fileMode>0755</fileMode>
+ <includes>
+ <include>base-asterix-configuration.xml</include>
+ </includes>
+ <outputDirectory>conf</outputDirectory>
+ </fileSet>
+ <fileSet>
+ <directory>src/main/resources/configs</directory>
+ <fileMode>0755</fileMode>
+ <includes>
+ <include>my_awesome_cluster_desc.xml</include>
+ </includes>
+ <outputDirectory>configs</outputDirectory>
+ </fileSet>
+ <fileSet>
+ <directory>src/main/resources/configs</directory>
+ <fileMode>0755</fileMode>
+ <includes>
+ <include>asterix-client-log4j.properties</include>
+ </includes>
+ <outputDirectory>conf</outputDirectory>
+ </fileSet>
+ </fileSets>
+ <dependencySets>
+ <dependencySet>
+ <includes>
+ <include>edu.uci.ics.asterix:asterix-yarn</include>
+ <include>edu.uci.ics.asterix:asterix-common</include>
+ <include>log4j:log4j</include>
+ <include>org.slf4j:slf4j-api</include>
+ <include>org.slf4j:slf4j-simple</include>
+ <include>commons-io:commons-io</include>
+ <include>commons-cli:commons-cli</include>
+ <include>commons-configuration:commons-configuration</include>
+ <include>commons-logging:commons-logging</include>
+ <include>commons-codec:commons-codec</include>
+ <include>commons-lang:commons-lang</include>
+ <include>org.apache.hadoop:hadoop-common</include>
+ <include>org.apache.hadoop:hadoop-hdfs</include>
+ <include>org.apache.hadoop:hadoop-auth</include>
+ <include>org.apache.hadoop:hadoop-yarn-client</include>
+ <include>org.apache.hadoop:hadoop-yarn-common</include>
+ <include>org.apache.hadoop:hadoop-yarn-api</include>
+ <include>org.apache.httpcomponents:httpcore</include>
+ <include>org.apache.httpcomponents:httpclient</include>
+ <include>commons-httpclient:commons-httpclient</include>
+ <include>com.google.guava:guava</include>
+ <include>com.google.protobuf:protobuf-java</include>
+ </includes>
+ <outputDirectory>lib</outputDirectory>
+ </dependencySet>
+ <dependencySet>
+ <outputDirectory>asterix</outputDirectory>
+ <includes>
+ <include>asterix-server*</include>
+ </includes>
+ <unpack>false</unpack>
+ <useTransitiveDependencies>false</useTransitiveDependencies>
+ </dependencySet>
+ </dependencySets>
+</assembly>
diff --git a/asterix-yarn/src/main/assembly/hyracks-assembly.xml b/asterix-yarn/src/main/assembly/hyracks-assembly.xml
new file mode 100644
index 0000000..ae362ca
--- /dev/null
+++ b/asterix-yarn/src/main/assembly/hyracks-assembly.xml
@@ -0,0 +1,37 @@
+<!--
+ ! Copyright 2009-2013 by The Regents of the University of California
+ ! Licensed under the Apache License, Version 2.0 (the "License");
+ ! you may not use this file except in compliance with the License.
+ ! you may obtain a copy of the License from
+ !
+ ! http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing, software
+ ! distributed under the License is distributed on an "AS IS" BASIS,
+ ! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ! See the License for the specific language governing permissions and
+ ! limitations under the License.
+ !-->
+<assembly>
+ <id>binary-assembly</id>
+ <formats>
+ <format>zip</format>
+ <format>dir</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <fileSets>
+ <fileSet>
+ <directory>target/appassembler/bin</directory>
+ <outputDirectory>bin</outputDirectory>
+ <fileMode>0755</fileMode>
+ </fileSet>
+ <fileSet>
+ <directory>target/appassembler/lib</directory>
+ <outputDirectory>lib</outputDirectory>
+ </fileSet>
+ <fileSet>
+ <directory>docs</directory>
+ <outputDirectory>docs</outputDirectory>
+ </fileSet>
+ </fileSets>
+</assembly>
diff --git a/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/AConstants.java b/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/AConstants.java
new file mode 100644
index 0000000..f8548db
--- /dev/null
+++ b/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/AConstants.java
@@ -0,0 +1,102 @@
+package edu.uci.ics.asterix.aoya;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Constants used in both Client and Application Master
+ */
+@InterfaceAudience.LimitedPrivate(value = { "For simplicity between Client and AM." })
+@InterfaceStability.Unstable
+public class AConstants {
+ /**
+ * Environment key name pointing to the the app master jar location
+ */
+ public static final String APPLICATIONMASTERJARLOCATION = "APPLICATIONMASTERJARLOCATION";
+
+ /**
+ * Environment key name denoting the file timestamp for the the app master jar.
+ * Used to validate the local resource.
+ */
+ public static final String APPLICATIONMASTERJARTIMESTAMP = "APPLICATIONMASTERJARTIMESTAMP";
+
+ /**
+ * Environment key name denoting the file content length for the app master jar.
+ * Used to validate the local resource.
+ */
+ public static final String APPLICATIONMASTERJARLEN = "APPLICATIONMASTERJARLEN";
+ /**
+ * Environment key name pointing to the Asterix distributable tar
+ */
+ public static final String TARLOCATION = "TARLOCATION";
+
+ /**
+ * Environment key name denoting the file timestamp for the Asterix tar.
+ * Used to validate the local resource.
+ */
+ public static final String TARTIMESTAMP = "TARTIMESTAMP";
+
+ /**
+ * Environment key name denoting the file content length for the Asterix tar.
+ * Used to validate the local resource.
+ */
+ public static final String TARLEN = "TARLEN";
+
+ /**
+ * Environment key name pointing to the Asterix cluster configuration file
+ */
+ public static final String CONFLOCATION = "CONFLOCATION";
+
+ /**
+ * Environment key name denoting the file timestamp for the Asterix config.
+ * Used to validate the local resource.
+ */
+
+ public static final String CONFTIMESTAMP = "CONFTIMESTAMP";
+
+ /**
+ * Environment key name denoting the file content length for the Asterix config.
+ * Used to validate the local resource.
+ */
+
+ public static final String CONFLEN = "CONFLEN";
+
+ /**
+ * Environment key name pointing to the Asterix parameters file
+ */
+
+ public static final String PARAMLOCATION = "PARAMLOCATION";
+
+ /**
+ * Environment key name denoting the file timestamp for the Asterix parameters.
+ * Used to validate the local resource.
+ */
+
+ public static final String PARAMTIMESTAMP = "PARAMTIMESTAMP";
+
+ /**
+ * Environment key name denoting the file content length for the Asterix parameters.
+ * Used to validate the local resource.
+ */
+
+ public static final String PARAMLEN = "PARAMLEN";
+
+ public static final String PATHSUFFIX = "PATHSUFFIX";
+
+ public static final String INSTANCESTORE = "INSTANCESTORE";
+
+ public static final String RMADDRESS = "RMADDRESS";
+
+ public static final String RMSCHEDULERADDRESS = "RMSCHEDULERADDRESS";
+
+ public static final String DFS_BASE = "DFSBASE";
+
+ public static final String NC_JAVA_OPTS = "NCJAVAOPTS";
+
+ public static final String CC_JAVA_OPTS = "CCJAVAOPTS";
+
+ public static final String NC_CONTAINER_MEM = "NC_CONTAINER_MEM";
+
+ public static final String CC_CONTAINER_MEM = "CC_CONTAINER_MEM";
+
+}
diff --git a/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/AsterixApplicationMaster.java b/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/AsterixApplicationMaster.java
new file mode 100644
index 0000000..1539a34
--- /dev/null
+++ b/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/AsterixApplicationMaster.java
@@ -0,0 +1,1288 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.aoya;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.net.InetAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+
+import edu.uci.ics.asterix.common.config.GlobalConfig;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.event.schema.yarnCluster.Cluster;
+import edu.uci.ics.asterix.event.schema.yarnCluster.MasterNode;
+import edu.uci.ics.asterix.event.schema.yarnCluster.Node;
+import org.apache.log4j.ConsoleAppender;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PatternLayout;
+
+public class AsterixApplicationMaster {
+
+ static
+ {
+ Logger rootLogger = Logger.getRootLogger();
+ rootLogger.setLevel(Level.INFO);
+ rootLogger.addAppender(new ConsoleAppender(
+ new PatternLayout("%-6r [%p] %c - %m%n")));
+ }
+ private static final Log LOG = LogFactory.getLog(AsterixApplicationMaster.class);
+ private static final String CLUSTER_DESC_PATH = "cluster-config.xml";
+ private static final String ASTERIX_CONF_NAME = "asterix-configuration.xml";
+ private static final String ASTERIX_ZIP_NAME = "asterix-server.zip";
+
+ private static final int CC_MEMORY_MBS_DEFAULT = 1024;
+ private static final int NC_MEMORY_MBS_DEFAULT = 1536;
+ private static final String EXTERNAL_CC_JAVA_OPTS_DEFAULT = "-Xmx" + CC_MEMORY_MBS_DEFAULT + "m";
+ private static final String EXTERNAL_NC_JAVA_OPTS_DEFAULT = "-Xmx" + NC_MEMORY_MBS_DEFAULT + "m";
+ private static final String OBLITERATOR_CLASSNAME = "edu.uci.ics.asterix.aoya.Deleter";
+ private static final String HDFS_BACKUP_CLASSNAME = "edu.uci.ics.asterix.aoya.HDFSBackup";
+ private static final String NC_CLASSNAME = "edu.uci.ics.hyracks.control.nc.NCDriver";
+ private static final String CC_CLASSNAME = "edu.uci.ics.hyracks.control.cc.CCDriver";
+ private static final String JAVA_HOME = System.getProperty("java.home");
+ private boolean doneAllocating = false;
+
+ // Configuration
+ private Configuration conf;
+
+ // Handle to communicate with the Resource Manager
+ private AMRMClientAsync<ContainerRequest> resourceManager;
+
+ // Handle to communicate with the Node Manager
+ private NMClientAsync nmClientAsync;
+ // Listen to process the response from the Node Manager
+ private NMCallbackHandler containerListener;
+ // Application Attempt Id ( combination of attemptId and fail count )
+ private ApplicationAttemptId appAttemptID;
+
+ // TODO
+ // For status update for clients - yet to be implemented
+ // Hostname of the container
+ private String appMasterHostname = "";
+ // Port on which the app master listens for status updates from clients
+ private int appMasterRpcPort = new Random().nextInt(65535-49152);
+ // Tracking url to which app master publishes info for clients to monitor
+ private String appMasterTrackingUrl = "";
+
+ // Counter for completed containers ( complete denotes successful or failed )
+ private AtomicInteger numCompletedContainers = new AtomicInteger();
+ // Allocated container count so that we know how many containers has the RM
+ // allocated to us
+ private AtomicInteger numAllocatedContainers = new AtomicInteger();
+ // Count of failed containers
+ private AtomicInteger numFailedContainers = new AtomicInteger();
+ // Count of containers already requested from the RM
+ // Needed as once requested, we should not request for containers again.
+ // Only request for more if the original requirement changes.
+ private AtomicInteger numRequestedContainers = new AtomicInteger();
+ //Tells us whether the Cluster Controller is up so we can safely start some Node Controllers
+ private AtomicBoolean ccUp = new AtomicBoolean();
+ private AtomicBoolean ccStarted = new AtomicBoolean();
+ private Queue<Node> pendingNCs = new ArrayDeque<Node>();
+
+ //HDFS path to AsterixDB distributable zip
+ private String asterixZipPath = "";
+ // Timestamp needed for creating a local resource
+ private long asterixZipTimestamp = 0;
+ // File length needed for local resource
+ private long asterixZipLen = 0;
+
+ //HDFS path to AsterixDB cluster description
+ private String asterixConfPath = "";
+ // Timestamp needed for creating a local resource
+ private long asterixConfTimestamp = 0;
+ // File length needed for local resource
+ private long asterixConfLen = 0;
+
+ private String instanceConfPath = "";
+
+ //base dir under which all configs and binaries lie
+ private String dfsBasePath;
+
+ private int numTotalContainers = 0;
+
+ // Set the local resources
+ private Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+
+ private Cluster clusterDesc = null;
+ private MasterNode cC = null;
+ private String ccJavaOpts = null;
+ private int ccMem = 0;
+ private String ncJavaOpts = null;
+ private int ncMem = 0;
+ private volatile boolean done;
+ private volatile boolean success;
+
+ private boolean obliterate = false;
+ private Path appMasterJar = null;
+ private boolean backup = false;
+ long backupTimestamp;
+ String snapName;
+ private boolean restore = false;
+ private boolean initial = false;
+
+ // Launch threads
+ private List<Thread> launchThreads = new CopyOnWriteArrayList<Thread>();
+
+ public static void main(String[] args) {
+
+ boolean result = false;
+ try {
+
+ AsterixApplicationMaster appMaster = new AsterixApplicationMaster();
+ LOG.info("Initializing ApplicationMaster");
+ appMaster.setEnvs(appMaster.setArgs(args));
+ boolean doRun = appMaster.init();
+ if (!doRun) {
+ System.exit(0);
+ }
+ result = appMaster.run();
+ } catch (Exception e) {
+ LOG.fatal("Error running ApplicationMaster", e);
+ System.exit(1);
+ }
+ if (result) {
+ LOG.info("Application Master completed successfully. exiting");
+ System.exit(0);
+ } else {
+ LOG.info("Application Master failed. exiting");
+ System.exit(2);
+ }
+ }
+
+ private void dumpOutDebugInfo() {
+
+ LOG.info("Dump debug output");
+ Map<String, String> envs = System.getenv();
+ for (Map.Entry<String, String> env : envs.entrySet()) {
+ LOG.info("System env: key=" + env.getKey() + ", val=" + env.getValue());
+ System.out.println("System env: key=" + env.getKey() + ", val=" + env.getValue());
+ }
+
+ String cmd = "ls -alhLR";
+ Runtime run = Runtime.getRuntime();
+ Process pr = null;
+ try {
+ pr = run.exec(cmd);
+ pr.waitFor();
+
+ BufferedReader buf = new BufferedReader(new InputStreamReader(pr.getInputStream()));
+ String line = "";
+ while ((line = buf.readLine()) != null) {
+ LOG.info("System CWD content: " + line);
+ System.out.println("System CWD content: " + line);
+ }
+ buf.close();
+ } catch (IOException e) {
+ LOG.info(e);
+ } catch (InterruptedException e) {
+ LOG.info(e);
+ }
+ }
+
+ public AsterixApplicationMaster() {
+ // Set up the configuration and RPC
+ conf = new YarnConfiguration();
+
+ }
+
+ public CommandLine setArgs(String[] args) throws ParseException {
+ Options opts = new Options();
+ opts.addOption("app_attempt_id", true, "App Attempt ID. Not to be used unless for testing purposes");
+ opts.addOption("priority", true, "Application Priority. Default 0");
+ opts.addOption("debug", false, "Dump out debug information");
+ opts.addOption("help", false, "Print usage");
+ opts.addOption("initial", false, "Initialize existing Asterix instance.");
+ opts.addOption("obliterate", false, "Delete asterix instance completely.");
+ opts.addOption("backup", false, "Back up AsterixDB instance");
+ opts.addOption("restore", true, "Restore an AsterixDB instance");
+
+ CommandLine cliParser = new GnuParser().parse(opts, args);
+
+ if (cliParser.hasOption("help")) {
+ printUsage(opts);
+ }
+
+ if (cliParser.hasOption("debug")) {
+ dumpOutDebugInfo();
+ }
+
+ if (cliParser.hasOption("obliterate")) {
+ obliterate = true;
+ }
+ if(cliParser.hasOption("initial")){
+ initial = true;
+ }
+
+ if (cliParser.hasOption("backup")) {
+ backup = true;
+ backupTimestamp = System.currentTimeMillis();
+ }
+ if (cliParser.hasOption("restore")) {
+ restore = true;
+ snapName = cliParser.getOptionValue("restore");
+ LOG.info(snapName);
+ }
+ return cliParser;
+ }
+
+ public void setEnvs(CommandLine cliParser) {
+ Map<String, String> envs = System.getenv();
+ if (envs.containsKey("HADOOP_CONF_DIR")) {
+ File hadoopConfDir = new File(envs.get("HADOOP_CONF_DIR"));
+ if (hadoopConfDir.isDirectory()) {
+ for (File config : hadoopConfDir.listFiles()) {
+ if (config.getName().matches("^.*(xml)$")) {
+ conf.addResource(new Path(config.getAbsolutePath()));
+ }
+ }
+ }
+ }
+ //the containerID might be in the arguments or the environment
+ if (!envs.containsKey(Environment.CONTAINER_ID.name())) {
+ if (cliParser.hasOption("app_attempt_id")) {
+ String appIdStr = cliParser.getOptionValue("app_attempt_id", "");
+ appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr);
+ } else {
+
+ throw new IllegalArgumentException(
+ "Environment is not set correctly- please check client submission settings");
+ }
+ } else {
+ ContainerId containerId = ConverterUtils.toContainerId(envs.get(Environment.CONTAINER_ID.name()));
+ appAttemptID = containerId.getApplicationAttemptId();
+ }
+
+ if (!envs.containsKey(ApplicationConstants.APP_SUBMIT_TIME_ENV)
+ || !envs.containsKey(Environment.NM_HOST.name()) || !envs.containsKey(Environment.NM_HTTP_PORT.name())
+ || !envs.containsKey(Environment.NM_PORT.name())) {
+ throw new IllegalArgumentException(
+ "Environment is not set correctly- please check client submission settings");
+ }
+ System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, envs.get("PWD") + File.separator + "bin" + File.separator
+ + ASTERIX_CONF_NAME);
+
+ LOG.info("Application master for app" + ", appId=" + appAttemptID.getApplicationId().getId()
+ + ", clustertimestamp=" + appAttemptID.getApplicationId().getClusterTimestamp() + ", attemptId="
+ + appAttemptID.getAttemptId());
+
+ asterixZipPath = envs.get(AConstants.TARLOCATION);
+ asterixZipTimestamp = Long.parseLong(envs.get(AConstants.TARTIMESTAMP));
+ asterixZipLen = Long.parseLong(envs.get(AConstants.TARLEN));
+
+ asterixConfPath = envs.get(AConstants.CONFLOCATION);
+ asterixConfTimestamp = Long.parseLong(envs.get(AConstants.CONFTIMESTAMP));
+ asterixConfLen = Long.parseLong(envs.get(AConstants.CONFLEN));
+
+ instanceConfPath = envs.get(AConstants.INSTANCESTORE);
+ //the only time this is null is during testing, when asterix-yarn isn't packaged in a JAR yet.
+ if(envs.get(AConstants.APPLICATIONMASTERJARLOCATION) != null
+ && !envs.get(AConstants.APPLICATIONMASTERJARLOCATION).endsWith(File.separator)){
+ appMasterJar = new Path(envs.get(AConstants.APPLICATIONMASTERJARLOCATION));
+ }
+ else{
+ appMasterJar = null;
+ }
+
+ dfsBasePath = envs.get(AConstants.DFS_BASE);
+ //If the NM has an odd environment where the proper hadoop XML configs dont get imported, we can end up not being able to talk to the RM
+ // this solves that!
+ //in a testing environment these can be null however.
+ if (envs.get(AConstants.RMADDRESS) != null) {
+ conf.set("yarn.resourcemanager.address", envs.get(AConstants.RMADDRESS));
+ LOG.info("RM Address: " + envs.get(AConstants.RMADDRESS));
+ }
+ if (envs.get(AConstants.RMADDRESS) != null) {
+ conf.set("yarn.resourcemanager.scheduler.address", envs.get(AConstants.RMSCHEDULERADDRESS));
+ }
+ ccJavaOpts = envs.get(AConstants.CC_JAVA_OPTS);
+ //set defaults if no special given options
+ if (ccJavaOpts == null) {
+ ccJavaOpts = EXTERNAL_CC_JAVA_OPTS_DEFAULT;
+ }
+ ncJavaOpts = envs.get(AConstants.NC_JAVA_OPTS);
+ if (ncJavaOpts == null) {
+ ncJavaOpts = EXTERNAL_NC_JAVA_OPTS_DEFAULT;
+ }
+
+ LOG.info("Path suffix: " + instanceConfPath);
+ }
+
+ public boolean init() throws ParseException, IOException, AsterixException, YarnException {
+ try {
+ localizeDFSResources();
+ clusterDesc = Utils.parseYarnClusterConfig(CLUSTER_DESC_PATH);
+ cC = clusterDesc.getMasterNode();
+ appMasterTrackingUrl = "http://" + cC.getClientIp() + ":" + cC.getClientPort() + Path.SEPARATOR;
+ distributeAsterixConfig();
+ //now let's read what's in there so we can set the JVM opts right
+ LOG.debug("config file loc: " + System.getProperty(GlobalConfig.CONFIG_FILE_PROPERTY));
+ } catch (FileNotFoundException | IllegalStateException e) {
+ LOG.error("Could not deserialize Cluster Config from disk- aborting!");
+ LOG.error(e);
+ throw e;
+ }
+
+ return true;
+ }
+
+ /**
+ * Sets up the parameters for the Asterix config.
+ *
+ * @throws IOException
+ */
+ private void distributeAsterixConfig() throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ String pathSuffix = instanceConfPath + File.separator + ASTERIX_CONF_NAME;
+ Path dst = new Path(dfsBasePath, pathSuffix);
+ URI paramLocation = dst.toUri();
+ FileStatus paramFileStatus = fs.getFileStatus(dst);
+ Long paramLen = paramFileStatus.getLen();
+ Long paramTimestamp = paramFileStatus.getModificationTime();
+ LocalResource asterixParamLoc = Records.newRecord(LocalResource.class);
+ asterixParamLoc.setType(LocalResourceType.FILE);
+ asterixParamLoc.setVisibility(LocalResourceVisibility.PRIVATE);
+ asterixParamLoc.setResource(ConverterUtils.getYarnUrlFromURI(paramLocation));
+ asterixParamLoc.setTimestamp(paramTimestamp);
+ asterixParamLoc.setSize(paramLen);
+ localResources.put(ASTERIX_CONF_NAME, asterixParamLoc);
+
+ }
+
+ /**
+ * @param c
+ * The cluster exception to attempt to alocate with the RM
+ * @throws YarnException
+ */
+ private void requestResources(Cluster c) throws YarnException, UnknownHostException {
+ //set memory
+ if (c.getCcContainerMem() != null) {
+ ccMem = Integer.parseInt(c.getCcContainerMem());
+ } else {
+ ccMem = CC_MEMORY_MBS_DEFAULT;
+ }
+ if (c.getNcContainerMem() != null) {
+ ncMem = Integer.parseInt(c.getNcContainerMem());
+ } else {
+ ncMem = CC_MEMORY_MBS_DEFAULT;
+ }
+ //request CC
+ int numNodes = 0;
+ ContainerRequest ccAsk = hostToRequest(cC.getClusterIp(), true);
+ resourceManager.addContainerRequest(ccAsk);
+ LOG.info("Asked for CC: " + Arrays.toString(ccAsk.getNodes().toArray()));
+ numNodes++;
+ //now we wait to be given the CC before starting the NCs...
+ //we will wait a minute.
+ int deathClock = 60;
+ while (ccUp.get() == false && deathClock > 0) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ex) {
+ LOG.debug(ex);
+ }
+ --deathClock;
+ }
+ if (deathClock == 0 && ccUp.get() == false) {
+ throw new YarnException("Couldn't allocate container for CC. Abort!");
+ }
+ LOG.info("Waiting for CC process to start");
+ //TODO: inspect for actual liveness instead of waiting.
+ // is there a good way to do this? maybe try opening a socket to it...
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException ex) {
+ LOG.debug(ex);
+ }
+ //request NCs
+ for (Node n : c.getNode()) {
+ resourceManager.addContainerRequest(hostToRequest(n.getClusterIp(), false));
+ LOG.info("Asked for NC: " + n.getClusterIp());
+ numNodes++;
+ synchronized(pendingNCs){
+ pendingNCs.add(n);
+ }
+ }
+ LOG.info("Requested all NCs and CCs. Wait for things to settle!");
+ numRequestedContainers.set(numNodes);
+ numTotalContainers = numNodes;
+ doneAllocating = true;
+
+ }
+
+ /**
+ * Asks the RM for a particular host, nicely.
+ *
+ * @param host
+ * The host to request
+ * @param cc
+ * Whether or not the host is the CC
+ * @return A container request that is (hopefully) for the host we asked for.
+ */
+ private ContainerRequest hostToRequest(String host, boolean cc) throws UnknownHostException {
+ InetAddress hostIp = InetAddress.getByName(host);
+ Priority pri = Records.newRecord(Priority.class);
+ pri.setPriority(0);
+ Resource capability = Records.newRecord(Resource.class);
+ if (cc) {
+ capability.setMemory(ccMem);
+ } else {
+ capability.setMemory(ncMem);
+ }
+ //we dont set anything else because we don't care about that and yarn doesn't honor it yet
+ String[] hosts = new String[1];
+ //TODO this is silly
+ hosts[0] = hostIp.getHostName();
+ LOG.info("IP addr: " + host + " resolved to " + hostIp.getHostName());
+ ContainerRequest request = new ContainerRequest(capability, hosts, null, pri, false);
+ LOG.info("Requested host ask: " + request.getNodes());
+ return request;
+ }
+
+ /**
+ * Determines whether or not a container is the one on which the CC should reside
+ *
+ * @param c
+ * The container in question
+ * @return True if the container should have the CC process on it, false otherwise.
+ */
+ boolean containerIsCC(Container c) {
+ String containerHost = c.getNodeId().getHost();
+ try {
+ InetAddress containerIp = InetAddress.getByName(containerHost);
+ LOG.info(containerIp.getCanonicalHostName());
+ InetAddress ccIp = InetAddress.getByName(cC.getClusterIp());
+ LOG.info(ccIp.getCanonicalHostName());
+ return containerIp.getCanonicalHostName().equals(ccIp.getCanonicalHostName());
+ } catch (UnknownHostException e) {
+ return false;
+ }
+ }
+
+ /**
+ * Attempts to find the Node in the Cluster Description that matches this container
+ *
+ * @param c
+ * The container to resolve
+ * @return The node this container corresponds to
+ * @throws java.net.UnknownHostException
+ * if the container isn't present in the description
+ */
+ Node containerToNode(Container c, Cluster cl) throws UnknownHostException {
+ String containerHost = c.getNodeId().getHost();
+ InetAddress containerIp = InetAddress.getByName(containerHost);
+ LOG.info("Resolved Container IP: " + containerIp);
+ for (Node node : cl.getNode()) {
+ InetAddress nodeIp = InetAddress.getByName(node.getClusterIp());
+ LOG.info(nodeIp + "?=" + containerIp);
+ if (nodeIp.equals(containerIp))
+ return node;
+ }
+ //if we find nothing, this is bad...
+ throw new java.net.UnknownHostException("Could not resolve container" + containerHost + " to node");
+ }
+
+ /**
+ * Here I am just pointing the Containers to the exisiting HDFS resources given by the Client
+ * filesystem of the nodes.
+ *
+ * @throws IOException
+ */
+ private void localizeDFSResources() throws IOException {
+ //if performing an 'offline' task, skip a lot of resource distribution
+ if (obliterate || backup || restore) {
+ if (appMasterJar == null || ("").equals(appMasterJar)) {
+ //this can happen in a jUnit testing environment. we don't need to set it there.
+ if (!conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
+ throw new IllegalStateException("AM jar not provided in environment.");
+ } else {
+ return;
+ }
+ }
+ FileSystem fs = FileSystem.get(conf);
+ FileStatus appMasterJarStatus = fs.getFileStatus(appMasterJar);
+ LocalResource obliteratorJar = Records.newRecord(LocalResource.class);
+ obliteratorJar.setType(LocalResourceType.FILE);
+ obliteratorJar.setVisibility(LocalResourceVisibility.PRIVATE);
+ obliteratorJar.setResource(ConverterUtils.getYarnUrlFromPath(appMasterJar));
+ obliteratorJar.setTimestamp(appMasterJarStatus.getModificationTime());
+ obliteratorJar.setSize(appMasterJarStatus.getLen());
+ localResources.put("asterix-yarn.jar", obliteratorJar);
+ LOG.info(localResources.values());
+ return;
+ }
+ //otherwise, distribute evertything to start up asterix
+
+ LocalResource asterixZip = Records.newRecord(LocalResource.class);
+
+ //this un-tar's the asterix distribution
+ asterixZip.setType(LocalResourceType.ARCHIVE);
+
+ asterixZip.setVisibility(LocalResourceVisibility.PRIVATE);
+ try {
+ asterixZip.setResource(ConverterUtils.getYarnUrlFromURI(new URI(asterixZipPath)));
+
+ } catch (URISyntaxException e) {
+ LOG.error("Error locating Asterix zip" + " in env, path=" + asterixZipPath);
+ throw new IOException(e);
+ }
+
+ asterixZip.setTimestamp(asterixZipTimestamp);
+ asterixZip.setSize(asterixZipLen);
+ localResources.put(ASTERIX_ZIP_NAME, asterixZip);
+
+ //now let's do the same for the cluster description XML
+ LocalResource asterixConf = Records.newRecord(LocalResource.class);
+ asterixConf.setType(LocalResourceType.FILE);
+
+ asterixConf.setVisibility(LocalResourceVisibility.PRIVATE);
+ try {
+ asterixConf.setResource(ConverterUtils.getYarnUrlFromURI(new URI(asterixConfPath)));
+ } catch (URISyntaxException e) {
+ LOG.error("Error locating Asterix config" + " in env, path=" + asterixConfPath);
+ throw new IOException(e);
+ }
+ //TODO: I could avoid localizing this everywhere by only calling this block on the metadata node.
+ asterixConf.setTimestamp(asterixConfTimestamp);
+ asterixConf.setSize(asterixConfLen);
+ localResources.put("cluster-config.xml", asterixConf);
+ //now add the libraries if there are any
+ try {
+ FileSystem fs = FileSystem.get(conf);
+ Path p = new Path(dfsBasePath, instanceConfPath + File.separator + "library" + Path.SEPARATOR);
+ if (fs.exists(p)) {
+ FileStatus[] dataverses = fs.listStatus(p);
+ for (FileStatus d : dataverses) {
+ if (!d.isDirectory())
+ throw new IOException("Library configuration directory structure is incorrect");
+ FileStatus[] libraries = fs.listStatus(d.getPath());
+ for (FileStatus l : libraries) {
+ if (l.isDirectory())
+ throw new IOException("Library configuration directory structure is incorrect");
+ LocalResource lr = Records.newRecord(LocalResource.class);
+ lr.setResource(ConverterUtils.getYarnUrlFromURI(l.getPath().toUri()));
+ lr.setSize(l.getLen());
+ lr.setTimestamp(l.getModificationTime());
+ lr.setType(LocalResourceType.ARCHIVE);
+ lr.setVisibility(LocalResourceVisibility.PRIVATE);
+ localResources.put("library" + Path.SEPARATOR + d.getPath().getName() + Path.SEPARATOR
+ + l.getPath().getName().split("\\.")[0], lr);
+ LOG.info("Found library: " + l.getPath().toString());
+ LOG.info(l.getPath().getName());
+ }
+ }
+ }
+ } catch (FileNotFoundException e) {
+ LOG.info("No external libraries present");
+ //do nothing, it just means there aren't libraries. that is possible and ok
+ // it should be handled by the fs.exists(p) check though.
+ }
+ LOG.info(localResources.values());
+
+ }
+
+ private void printUsage(Options opts) {
+ new HelpFormatter().printHelp("ApplicationMaster", opts);
+ }
+
+ /**
+ * Start the AM and request all necessary resources.
+ *
+ * @return True if the run fully succeeded, false otherwise.
+ * @throws YarnException
+ * @throws IOException
+ */
+ public boolean run() throws YarnException, IOException {
+ LOG.info("Starting ApplicationMaster");
+
+ AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
+ resourceManager = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
+ resourceManager.init(conf);
+ resourceManager.start();
+
+ containerListener = new NMCallbackHandler();
+ nmClientAsync = new NMClientAsyncImpl(containerListener);
+ nmClientAsync.init(conf);
+ nmClientAsync.start();
+
+ // Register self with ResourceManager
+ // This will start heartbeating to the RM
+ try {
+ appMasterHostname = InetAddress.getLocalHost().toString();
+ } catch (java.net.UnknownHostException uhe) {
+ appMasterHostname = uhe.toString();
+ }
+ RegisterApplicationMasterResponse response = resourceManager.registerApplicationMaster(appMasterHostname,
+ appMasterRpcPort, appMasterTrackingUrl);
+
+ // Dump out information about cluster capability as seen by the
+ // resource manager
+ int maxMem = response.getMaximumResourceCapability().getMemory();
+ LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
+
+ try {
+ requestResources(clusterDesc);
+ } catch (YarnException e) {
+ LOG.error("Could not allocate resources properly:" + e.getMessage());
+ done = true;
+ throw e;
+ }
+ //now we just sit and listen for messages from the RM
+
+ while (!done) {
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException ex) {
+ }
+ }
+ finish();
+ return success;
+ }
+
+ /**
+ * Clean up, whether or not we were successful.
+ */
+ private void finish() {
+ // Join all launched threads
+ // needed for when we time out
+ // and we need to release containers
+ for (Thread launchThread : launchThreads) {
+ try {
+ launchThread.join(10000);
+ } catch (InterruptedException e) {
+ LOG.info("Exception thrown in thread join: " + e.getMessage());
+ //from https://stackoverflow.com/questions/4812570/how-to-store-printstacktrace-into-a-string
+ StringWriter errors = new StringWriter();
+ e.printStackTrace(new PrintWriter(errors));
+ LOG.error(errors.toString());
+ }
+ }
+
+ // When the application completes, it should stop all running containers
+ LOG.info("Application completed. Stopping running containers");
+ nmClientAsync.stop();
+
+ // When the application completes, it should send a finish application
+ // signal to the RM
+ LOG.info("Application completed. Signalling finish to RM");
+
+ FinalApplicationStatus appStatus;
+ String appMessage = null;
+ success = true;
+ if (numFailedContainers.get() == 0 && numCompletedContainers.get() == numTotalContainers) {
+ appStatus = FinalApplicationStatus.SUCCEEDED;
+ } else {
+ appStatus = FinalApplicationStatus.FAILED;
+ appMessage = "Diagnostics." + ", total=" + numTotalContainers + ", completed="
+ + numCompletedContainers.get() + ", allocated=" + numAllocatedContainers.get() + ", failed="
+ + numFailedContainers.get();
+ success = false;
+ }
+ try {
+ resourceManager.unregisterApplicationMaster(appStatus, appMessage, null);
+ } catch (YarnException ex) {
+ LOG.error("Failed to unregister application", ex);
+ } catch (IOException e) {
+ LOG.error("Failed to unregister application", e);
+ }
+ done = true;
+ resourceManager.stop();
+ }
+
+ /**
+ * This handles the information that comes in from the RM while the AM
+ * is running.
+ */
+ private class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
+ public void onContainersCompleted(List<ContainerStatus> completedContainers) {
+ LOG.info("Got response from RM for container ask, completedCnt=" + completedContainers.size());
+ for (ContainerStatus containerStatus : completedContainers) {
+ LOG.info("Got container status for containerID=" + containerStatus.getContainerId() + ", state="
+ + containerStatus.getState() + ", exitStatus=" + containerStatus.getExitStatus()
+ + ", diagnostics=" + containerStatus.getDiagnostics());
+
+ // non complete containers should not be here
+ if(containerStatus.getState() != ContainerState.COMPLETE){
+ throw new IllegalStateException("Non-completed container given as completed by RM.");
+ }
+
+ // increment counters for completed/failed containers
+ int exitStatus = containerStatus.getExitStatus();
+ if (0 != exitStatus) {
+ // container failed
+ numCompletedContainers.incrementAndGet();
+ numFailedContainers.incrementAndGet();
+ } else {
+ // nothing to do
+ // container completed successfully
+ numCompletedContainers.incrementAndGet();
+ LOG.info("Container completed successfully." + ", containerId=" + containerStatus.getContainerId());
+ }
+ }
+ //stop infinite looping of run()
+ if (numCompletedContainers.get() + numFailedContainers.get() == numAllocatedContainers.get()
+ && doneAllocating)
+ done = true;
+ }
+
+ public void onContainersAllocated(List<Container> allocatedContainers) {
+ LOG.info("Got response from RM for container ask, allocatedCnt=" + allocatedContainers.size());
+ numAllocatedContainers.addAndGet(allocatedContainers.size());
+ for (Container allocatedContainer : allocatedContainers) {
+ synchronized(pendingNCs){
+ try {
+ if (!pendingNCs.contains(containerToNode(allocatedContainer, clusterDesc)) && ccUp.get()) {
+ nmClientAsync.stopContainerAsync(allocatedContainer.getId(), allocatedContainer.getNodeId());
+ continue;
+ }
+ } catch(UnknownHostException ex){
+ LOG.error("Unknown host allocated for us by RM- this shouldn't happen.", ex);
+ }
+ }
+ LOG.info("Launching shell command on a new container." + ", containerId=" + allocatedContainer.getId()
+ + ", containerNode=" + allocatedContainer.getNodeId().getHost() + ":"
+ + allocatedContainer.getNodeId().getPort() + ", containerNodeURI="
+ + allocatedContainer.getNodeHttpAddress() + ", containerResourceMemory"
+ + allocatedContainer.getResource().getMemory());
+
+ LaunchAsterixContainer runnableLaunchContainer = new LaunchAsterixContainer(allocatedContainer,
+ containerListener);
+ Thread launchThread = new Thread(runnableLaunchContainer, "Asterix CC/NC");
+
+ // I want to know if this node is the CC, because it must start before the NCs.
+ LOG.info("Allocated: " + allocatedContainer.getNodeId().getHost());
+ LOG.info("CC : " + cC.getId());
+ synchronized(pendingNCs){
+ try {
+ if (ccUp.get()) {
+ pendingNCs.remove(containerToNode(allocatedContainer, clusterDesc));
+ }
+ } catch(UnknownHostException ex){
+ LOG.error("Unknown host allocated for us by RM- this shouldn't happen.", ex);
+ }
+ }
+
+ if (containerIsCC(allocatedContainer)) {
+ ccUp.set(true);
+ }
+ // launch and start the container on a separate thread to keep
+ // the main thread unblocked
+ // as all containers may not be allocated at one go.
+ launchThreads.add(launchThread);
+ launchThread.start();
+ }
+ }
+
+ /**
+ * Ask the processes on the container to gracefully exit.
+ */
+ public void onShutdownRequest() {
+ LOG.info("AM shutting down per request");
+ done = true;
+ }
+
+ public void onNodesUpdated(List<NodeReport> updatedNodes) {
+ //TODO: This will become important when we deal with what happens if an NC dies
+ }
+
+ public float getProgress() {
+ //return half way because progress is basically meaningless for us
+ if (!doneAllocating) {
+ return 0.0f;
+ }
+ return (float) 0.5;
+ }
+
+ public void onError(Throwable arg0) {
+ LOG.error("Fatal Error recieved by AM: " + arg0);
+ done = true;
+ }
+ }
+
+ private class NMCallbackHandler implements NMClientAsync.CallbackHandler {
+
+ private ConcurrentMap<ContainerId, Container> containers = new ConcurrentHashMap<ContainerId, Container>();
+
+ public void addContainer(ContainerId containerId, Container container) {
+ containers.putIfAbsent(containerId, container);
+ }
+
+ public void onContainerStopped(ContainerId containerId) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Succeeded to stop Container " + containerId);
+ }
+ containers.remove(containerId);
+ }
+
+ public void onContainerStatusReceived(ContainerId containerId, ContainerStatus containerStatus) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Container Status: id=" + containerId + ", status=" + containerStatus);
+ }
+ }
+
+ public void onContainerStarted(ContainerId containerId, Map<String, ByteBuffer> allServiceResponse) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Succeeded to start Container " + containerId);
+ }
+ Container container = containers.get(containerId);
+ if (container != null) {
+ nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId());
+ }
+ }
+
+ public void onStartContainerError(ContainerId containerId, Throwable t) {
+ LOG.error("Failed to start Container " + containerId);
+ containers.remove(containerId);
+ }
+
+ public void onGetContainerStatusError(ContainerId containerId, Throwable t) {
+ LOG.error("Failed to query the status of Container " + containerId);
+ }
+
+ public void onStopContainerError(ContainerId containerId, Throwable t) {
+ LOG.error("Failed to stop Container " + containerId);
+ containers.remove(containerId);
+ }
+ }
+
+ /**
+ * Thread to connect to the {@link ContainerManagementProtocol} and launch the container
+ * that will execute the shell command.
+ */
+ private class LaunchAsterixContainer implements Runnable {
+
+ // Allocated container
+ final Container container;
+
+ final NMCallbackHandler containerListener;
+
+ /**
+ * @param lcontainer
+ * Allocated container
+ * @param containerListener
+ * Callback handler of the container
+ */
+ public LaunchAsterixContainer(Container lcontainer, NMCallbackHandler containerListener) {
+ this.container = lcontainer;
+ this.containerListener = containerListener;
+ }
+
+ /**
+ * Connects to CM, sets up container launch context
+ * for shell command and eventually dispatches the container
+ * start request to the CM.
+ */
+ public void run() {
+ LOG.info("Setting up container launch container for containerid=" + container.getId());
+ ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
+ // Set the local resources
+ ctx.setLocalResources(localResources);
+
+ //Set the env variables to be setup in the env where the application master will be run
+ LOG.info("Set the environment for the node");
+ Map<String, String> env = new HashMap<String, String>();
+
+ // Add AppMaster.jar location to classpath
+ // At some point we should not be required to add
+ // the hadoop specific classpaths to the env.
+ // It should be provided out of the box.
+ // For now setting all required classpaths including
+ // the classpath to "." for the application jar
+ StringBuilder classPathEnv = new StringBuilder(Environment.CLASSPATH.$()).append(File.pathSeparatorChar)
+ .append("." + File.pathSeparatorChar + "*");
+ for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+ YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
+ classPathEnv.append(File.pathSeparatorChar);
+ classPathEnv.append(c.trim());
+ }
+ classPathEnv.append('.').append(File.pathSeparatorChar).append("log4j.properties");
+
+ // add the runtime classpath needed for tests to work
+ if (conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
+ classPathEnv.append(System.getProperty("path.separator"));
+ classPathEnv.append(System.getProperty("java.class.path"));
+ env.put("HADOOP_CONF_DIR", System.getProperty("user.dir") + File.separator + "target" + File.separator);
+ }
+
+ env.put("CLASSPATH", classPathEnv.toString());
+
+ ctx.setEnvironment(env);
+ LOG.info(ctx.getEnvironment().toString());
+ List<String> startCmd = null;
+ if (obliterate) {
+ LOG.debug("AM in obliterate mode");
+ startCmd = produceObliterateCommand(container);
+ } else if (backup) {
+ startCmd = produceBackupCommand(container);
+ LOG.debug("AM in backup mode");
+ } else if (restore) {
+ startCmd = produceRestoreCommand(container);
+ LOG.debug("AM in restore mode");
+ } else {
+ startCmd = produceStartCmd(container);
+ }
+
+ if (startCmd == null || startCmd.size() == 0) {
+ LOG.fatal("Could not map one or more NCs to NM container hosts- aborting!");
+ return;
+ }
+
+ for (String s : startCmd) {
+ LOG.info("Command to execute: " + s);
+ }
+ ctx.setCommands(startCmd);
+ containerListener.addContainer(container.getId(), container);
+ //finally start the container!?
+ nmClientAsync.startContainerAsync(container, ctx);
+ }
+
+ /**
+ * Determines for a given container what the necessary command line
+ * arguments are to start the Asterix processes on that instance
+ *
+ * @param container
+ * The container to produce the commands for
+ * @return A list of the commands that should be executed
+ */
+ private List<String> produceStartCmd(Container container) {
+ List<String> commands = new ArrayList<String>();
+ // Set the necessary command to execute on the allocated container
+ List<CharSequence> vargs = new ArrayList<CharSequence>(5);
+
+ vargs.add(JAVA_HOME + File.separator + "bin" + File.separator + "java");
+ vargs.add("-classpath " + '\'' + ASTERIX_ZIP_NAME + File.separator + "repo" + File.separator + "*\'");
+ vargs.add("-Dapp.repo=" + ASTERIX_ZIP_NAME + File.separator + "repo" + File.separator);
+ //first see if this node is the CC
+ if (containerIsCC(container) && (ccStarted.get() == false)) {
+ LOG.info("CC found on container" + container.getNodeId().getHost());
+ //get our java opts
+ vargs.add(ccJavaOpts);
+ vargs.add(CC_CLASSNAME);
+ vargs.add("-app-cc-main-class edu.uci.ics.asterix.hyracks.bootstrap.CCApplicationEntryPoint");
+ vargs.add("-cluster-net-ip-address " + cC.getClusterIp());
+ vargs.add("-client-net-ip-address " + cC.getClientIp());
+ ccStarted.set(true);
+
+ } else {
+ //now we need to know what node we are on, so we can apply the correct properties
+
+ Node local;
+ try {
+ local = containerToNode(container, clusterDesc);
+ LOG.info("Attempting to start NC on host " + local.getId());
+ String iodevice = local.getIodevices();
+ if (iodevice == null) {
+ iodevice = clusterDesc.getIodevices();
+ }
+ String storageSuffix = local.getStore() == null ? clusterDesc.getStore() : local.getStore();
+ String storagePath = iodevice + File.separator + storageSuffix;
+ vargs.add(ncJavaOpts);
+ vargs.add(NC_CLASSNAME);
+ vargs.add("-app-nc-main-class edu.uci.ics.asterix.hyracks.bootstrap.NCApplicationEntryPoint");
+ vargs.add("-node-id " + local.getId());
+ vargs.add("-cc-host " + cC.getClusterIp());
+ vargs.add("-iodevices " + storagePath);
+ vargs.add("-cluster-net-ip-address " + local.getClusterIp());
+ vargs.add("-data-ip-address " + local.getClusterIp());
+ vargs.add("-result-ip-address " + local.getClusterIp());
+ vargs.add("--");
+ if(initial){
+ vargs.add("-initial-run ");
+ }
+ } catch (UnknownHostException e) {
+ LOG.error("Unable to find NC or CC configured for host: " + container.getId() + " " + e);
+ }
+ }
+
+ // Add log redirect params
+ vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + File.separator + "stdout");
+ vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + File.separator + "stderr");
+
+ // Get final commmand
+ StringBuilder command = new StringBuilder();
+ for (CharSequence str : vargs) {
+ command.append(str).append(" ");
+ }
+ commands.add(command.toString());
+ LOG.error(Arrays.toString(commands.toArray()));
+ return commands;
+ }
+
+ private List<String> produceObliterateCommand(Container container) {
+ //if this container has no NCs on it, nothing will be there to delete.
+ Node local = null;
+ List<String> iodevices = null;
+ try {
+ local = containerToNode(container, clusterDesc);
+ if (local.getIodevices() == null) {
+ iodevices = Arrays.asList(clusterDesc.getIodevices().split(",", -1));
+ } else {
+ iodevices = Arrays.asList(local.getIodevices().split(",", -1));
+ }
+ } catch (UnknownHostException e) {
+ //we expect this may happen for the CC if it isn't colocated with an NC. otherwise it is not suppose to happen.
+ if (!containerIsCC(container)) {
+ LOG.error("Unable to find NC configured for host: " + container.getId() + e);
+ return null;
+ }
+ else {
+ return Arrays.asList("");
+ }
+ }
+ StringBuilder classPathEnv = new StringBuilder("").append("*");
+ classPathEnv.append(File.pathSeparatorChar).append("log4j.properties");
+
+ List<String> commands = new ArrayList<String>();
+ Vector<CharSequence> vargs = new Vector<CharSequence>(5);
+ vargs.add(JAVA_HOME + File.separator + "bin" + File.separator + "java");
+ vargs.add("-cp " + classPathEnv.toString());
+ vargs.add(OBLITERATOR_CLASSNAME);
+ for (String s : iodevices) {
+ vargs.add(s + File.separator + clusterDesc.getStore());
+ LOG.debug("Deleting from: " + s);
+ //logs only exist on 1st iodevice
+ if (iodevices.indexOf(s) == 0) {
+ vargs.add(clusterDesc.getTxnLogDir() + "txnLogs" + File.separator);
+ LOG.debug("Deleting logs from: " + clusterDesc.getTxnLogDir());
+ }
+ }
+ vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + File.separator + "stdout");
+ vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + File.separator + "stderr");
+ StringBuilder command = new StringBuilder();
+ for (CharSequence str : vargs) {
+ command.append(str).append(" ");
+ }
+ commands.add(command.toString());
+ return commands;
+ }
+
+ private List<String> produceBackupCommand(Container container) {
+ Node local = null;
+ List<String> iodevices = null;
+ try {
+ local = containerToNode(container, clusterDesc);
+ if (local.getIodevices() == null) {
+ iodevices = Arrays.asList(clusterDesc.getIodevices().split(",", -1));
+ } else {
+ iodevices = Arrays.asList(local.getIodevices().split(",", -1));
+ }
+ } catch (UnknownHostException e) {
+ //we expect this may happen for the CC if it isn't colocated with an NC. otherwise it is not suppose to happen.
+ if (!containerIsCC(container)) {
+ LOG.error("Unable to find NC configured for host: " + container.getId() + e);
+ return null;
+ }else {
+ return Arrays.asList("");
+ }
+ }
+ StringBuilder classPathEnv = new StringBuilder("").append("." + File.separator + "*");
+ for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+ YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
+ classPathEnv.append(File.pathSeparatorChar);
+ classPathEnv.append(c.trim());
+ }
+ classPathEnv.append(File.pathSeparatorChar).append("." + File.separator + "log4j.properties");
+
+ List<String> commands = new ArrayList<String>();
+ Vector<CharSequence> vargs = new Vector<CharSequence>(5);
+ vargs.add(JAVA_HOME + File.separator + "bin" + File.separator + "java");
+ vargs.add("-cp " + classPathEnv.toString());
+ vargs.add(HDFS_BACKUP_CLASSNAME);
+ vargs.add("-backup");
+
+ String dstBase = instanceConfPath + "backups" + Path.SEPARATOR + backupTimestamp + Path.SEPARATOR
+ + local.getId();
+ try {
+ createBackupFolder(dstBase);
+ } catch (IOException e) {
+ //something very bad happened- return null to cause attempt to abort
+ return null;
+ }
+ for (String s : iodevices) {
+ List<String> ioComponents = Arrays.asList(s.split("\\/"));
+ StringBuilder dst = new StringBuilder().append(dstBase);
+ for (String io : ioComponents) {
+ dst.append(io);
+ if (ioComponents.indexOf(io) != ioComponents.size() - 1) {
+ dst.append("_");
+ }
+ }
+ dst.append(Path.SEPARATOR);
+ vargs.add(s + File.separator + clusterDesc.getStore() + "," + dst);
+ LOG.debug("Backing up from: " + s);
+ //logs only exist on 1st iodevice
+ if (iodevices.indexOf(s) == 0) {
+ LOG.debug("Backing up logs from: " + clusterDesc.getTxnLogDir());
+ vargs.add(clusterDesc.getTxnLogDir() + "txnLogs" + File.separator + "," + dst);
+ }
+ }
+ LOG.debug("Backing up to: " + instanceConfPath + "backups" + Path.SEPARATOR + local.getId());
+
+ vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + File.separator + "stdout");
+ vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + File.separator + "stderr");
+ StringBuilder command = new StringBuilder();
+ for (CharSequence str : vargs) {
+ command.append(str).append(" ");
+ }
+ commands.add(command.toString());
+ return commands;
+ }
+
+ private void createBackupFolder(String path) throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ Path backupFolder = new Path(path);
+ fs.mkdirs(backupFolder);
+ }
+
+ private List<String> produceRestoreCommand(Container container) {
+ if (containerIsCC(container)) {
+ List<String> blank = new ArrayList<String>();
+ blank.add("");
+ return blank;
+ }
+ Node local = null;
+ List<String> iodevices = null;
+ try {
+ local = containerToNode(container, clusterDesc);
+ if (local.getIodevices() == null) {
+ iodevices = Arrays.asList(clusterDesc.getIodevices().split(",", -1));
+ } else {
+ iodevices = Arrays.asList(local.getIodevices().split(",", -1));
+ }
+ } catch (UnknownHostException e) {
+ //we expect this may happen for the CC if it isn't colocated with an NC. otherwise it is not suppose to happen.
+ if (!containerIsCC(container)) {
+ LOG.error("Unable to find NC configured for host: " + container.getId() + e);
+ return null;
+ } else {
+ return Arrays.asList("");
+ }
+ }
+ StringBuilder classPathEnv = new StringBuilder("").append("." + File.separator + "*");
+ for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+ YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
+ classPathEnv.append(File.pathSeparatorChar);
+ classPathEnv.append(c.trim());
+ }
+ classPathEnv.append(File.pathSeparatorChar).append("." + File.separator + "log4j.properties");
+
+ List<String> commands = new ArrayList<String>();
+ Vector<CharSequence> vargs = new Vector<CharSequence>(5);
+ vargs.add(JAVA_HOME + File.separator + "bin" + File.separator + "java");
+ vargs.add("-cp " + classPathEnv.toString());
+ vargs.add(HDFS_BACKUP_CLASSNAME);
+ vargs.add("-restore");
+ String srcBase = instanceConfPath + "backups" + Path.SEPARATOR + Long.parseLong(snapName) + Path.SEPARATOR
+ + local.getId();
+ for (String s : iodevices) {
+ List<String> ioComponents = Arrays.asList(s.split("\\/"));
+ StringBuilder src = new StringBuilder().append(srcBase);
+ for (String io : ioComponents) {
+ src.append(io);
+ if (ioComponents.indexOf(io) != ioComponents.size() - 1) {
+ src.append("_");
+ }
+ }
+ src.append(Path.SEPARATOR);
+ try {
+ FileSystem fs = FileSystem.get(conf);
+ FileStatus[] backups = fs.listStatus(new Path(src.toString()));
+ for (FileStatus b : backups) {
+ if (!b.getPath().toString().contains("txnLogs")
+ && !b.getPath().toString().contains(File.separator + "asterix_root_metadata")) {
+ vargs.add(b.getPath() + "," + s + File.separator + clusterDesc.getStore());
+ }
+ }
+ } catch (IOException e) {
+ LOG.error("Could not stat backup directory in DFS");
+ }
+ vargs.add(src + "," + s + clusterDesc.getStore());
+ LOG.debug("Restoring from: " + s);
+ //logs only exist on 1st iodevice
+ if (iodevices.indexOf(s) == 0) {
+ vargs.add(src + "txnLogs" + File.separator + "," + clusterDesc.getTxnLogDir() + File.separator);
+
+ LOG.debug("Restoring logs from: " + clusterDesc.getTxnLogDir());
+ }
+ }
+ LOG.debug("Restoring to: " + instanceConfPath + "backups" + Path.SEPARATOR + local.getId());
+
+ vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + File.separator + "stdout");
+ vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + File.separator + "stderr");
+ StringBuilder command = new StringBuilder();
+ for (CharSequence str : vargs) {
+ command.append(str).append(" ");
+ }
+ commands.add(command.toString());
+ return commands;
+ }
+
+ }
+}
diff --git a/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/AsterixYARNClient.java b/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/AsterixYARNClient.java
new file mode 100644
index 0000000..63b85e6
--- /dev/null
+++ b/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/AsterixYARNClient.java
@@ -0,0 +1,1387 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.aoya;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
+import java.util.regex.Pattern;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Marshaller;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.service.Service.STATE;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+
+import com.google.common.collect.ImmutableMap;
+
+import edu.uci.ics.asterix.common.configuration.AsterixConfiguration;
+import edu.uci.ics.asterix.common.configuration.Coredump;
+import edu.uci.ics.asterix.common.configuration.Store;
+import edu.uci.ics.asterix.common.configuration.TransactionLogDir;
+import edu.uci.ics.asterix.event.schema.yarnCluster.Cluster;
+import edu.uci.ics.asterix.event.schema.yarnCluster.Node;
+
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class AsterixYARNClient {
+
+ public static enum Mode {
+ INSTALL("install"),
+ START("start"),
+ STOP("stop"),
+ KILL("kill"),
+ DESTROY("destroy"),
+ ALTER("alter"),
+ LIBINSTALL("libinstall"),
+ DESCRIBE("describe"),
+ BACKUP("backup"),
+ LSBACKUP("lsbackups"),
+ RMBACKUP("rmbackup"),
+ RESTORE("restore"),
+ NOOP("");
+
+ public final String alias;
+
+ Mode(String alias) {
+ this.alias = alias;
+ }
+
+ public static Mode fromAlias(String a) {
+ return STRING_TO_MODE.get(a.toLowerCase());
+ }
+ }
+
+ public static final Map<String, AsterixYARNClient.Mode> STRING_TO_MODE = ImmutableMap
+ .<String, AsterixYARNClient.Mode> builder().put(Mode.INSTALL.alias, Mode.INSTALL)
+ .put(Mode.START.alias, Mode.START).put(Mode.STOP.alias, Mode.STOP).put(Mode.KILL.alias, Mode.KILL)
+ .put(Mode.DESTROY.alias, Mode.DESTROY).put(Mode.ALTER.alias, Mode.ALTER)
+ .put(Mode.LIBINSTALL.alias, Mode.LIBINSTALL).put(Mode.DESCRIBE.alias, Mode.DESCRIBE)
+ .put(Mode.BACKUP.alias, Mode.BACKUP).put(Mode.LSBACKUP.alias, Mode.LSBACKUP)
+ .put(Mode.RMBACKUP.alias, Mode.RMBACKUP).put(Mode.RESTORE.alias, Mode.RESTORE).build();
+ private static final Log LOG = LogFactory.getLog(AsterixYARNClient.class);
+ public static final String CONF_DIR_REL = ".asterix" + File.separator;
+ private static final String instanceLock = "instance";
+ public static final String CONFIG_DEFAULT_NAME = "cluster-config.xml";
+ public static final String PARAMS_DEFAULT_NAME = "asterix-configuration.xml";
+ private static String DEFAULT_PARAMETERS_PATH = "conf" + File.separator + "base-asterix-configuration.xml";
+ private static String MERGED_PARAMETERS_PATH = "conf" + File.separator + PARAMS_DEFAULT_NAME;
+ private static final String JAVA_HOME = System.getProperty("java.home");
+ public static final String NC_JAVA_OPTS_KEY = "nc.java.opts";
+ public static final String CC_JAVA_OPTS_KEY = "cc.java.opts";
+ public static final String CC_REST_PORT_KEY = "api.port";
+ private Mode mode = Mode.NOOP;
+
+ // Hadoop Configuration
+ private Configuration conf;
+ private YarnClient yarnClient;
+ // Application master specific info to register a new Application with
+ // RM/ASM
+ private String appName = "";
+ // App master priority
+ private int amPriority = 0;
+ // Queue for App master
+ private String amQueue = "";
+ // Amt. of memory resource to request for to run the App Master
+ private int amMemory = 1000;
+
+ // Main class to invoke application master
+ private final String appMasterMainClass = "edu.uci.ics.asterix.aoya.AsterixApplicationMaster";
+
+ //instance name
+ private String instanceName = "";
+ //location of distributable AsterixDB zip
+ private String asterixZip = "";
+ // Location of cluster configuration
+ private String asterixConf = "";
+ // Location of optional external libraries
+ private String extLibs = "";
+
+ private String instanceFolder = "";
+
+ // log4j.properties file
+ // if available, add to local resources and set into classpath
+ private String log4jPropFile = "";
+
+ // Debug flag
+ boolean debugFlag = false;
+ private boolean refresh = false;
+ private boolean force = false;
+
+ // Command line options
+ private Options opts;
+ private String libDataverse;
+ private String snapName = "";
+ private String baseConfig = ".";
+ private String ccJavaOpts = "";
+ private String ncJavaOpts = "";
+
+ //Ports
+ private int ccRestPort = 19002;
+
+ /**
+ * @param args
+ * Command line arguments
+ */
+ public static void main(String[] args) {
+
+ try {
+ AsterixYARNClient client = new AsterixYARNClient();
+ try {
+ client.init(args);
+ AsterixYARNClient.execute(client);
+ } catch (ParseException | ApplicationNotFoundException e) {
+ LOG.fatal(e);
+ client.printUsage();
+ System.exit(-1);
+ }
+ } catch (Exception e) {
+ LOG.fatal("Error running client", e);
+ System.exit(1);
+ }
+ LOG.info("Command executed successfully.");
+ System.exit(0);
+ }
+
+ public static void execute(AsterixYARNClient client) throws IOException, YarnException {
+ YarnClientApplication app;
+ List<DFSResourceCoordinate> res;
+
+ System.out.println("JAVA HOME: " + JAVA_HOME);
+ switch (client.mode) {
+ case START:
+ startAction(client);
+ break;
+ case STOP:
+ try {
+ client.stopInstance();
+ } catch (ApplicationNotFoundException e) {
+ LOG.info(e);
+ System.out.println("Asterix instance by that name already exited or was never started");
+ client.deleteLockFile();
+ }
+ break;
+ case KILL:
+ if (client.isRunning() &&
+ Utils.confirmAction("Are you sure you want to kill this instance? In-progress tasks will be aborted")) {
+ try {
+ AsterixYARNClient.killApplication(client.getLockFile(), client.yarnClient);
+ } catch (ApplicationNotFoundException e) {
+ LOG.info(e);
+ System.out.println("Asterix instance by that name already exited or was never started");
+ client.deleteLockFile();
+ }
+ }
+ else if(!client.isRunning()){
+ System.out.println("Asterix instance by that name already exited or was never started");
+ client.deleteLockFile();
+ }
+ break;
+ case DESCRIBE:
+ Utils.listInstances(client.conf, CONF_DIR_REL);
+ break;
+ case INSTALL:
+ installAction(client);
+ break;
+ case LIBINSTALL:
+ client.installExtLibs();
+ break;
+ case ALTER:
+ client.writeAsterixConfig(Utils.parseYarnClusterConfig(client.asterixConf));
+ client.installAsterixConfig(true);
+ System.out.println("Configuration successfully modified");
+ break;
+ case DESTROY:
+ try {
+ if (client.force
+ || Utils.confirmAction("Are you really sure you want to obliterate this instance? This action cannot be undone!")) {
+ app = client.makeApplicationContext();
+ res = client.deployConfig();
+ res.addAll(client.distributeBinaries());
+ client.removeInstance(app, res);
+ }
+ } catch (YarnException | IOException e) {
+ LOG.error("Asterix failed to deploy on to cluster");
+ throw e;
+ }
+ break;
+ case BACKUP:
+ if (client.force || Utils.confirmAction("Performing a backup will stop a running instance.")) {
+ app = client.makeApplicationContext();
+ res = client.deployConfig();
+ res.addAll(client.distributeBinaries());
+ client.backupInstance(app, res);
+ }
+ break;
+ case LSBACKUP:
+ Utils.listBackups(client.conf, CONF_DIR_REL, client.instanceName);
+ break;
+ case RMBACKUP:
+ Utils.rmBackup(client.conf, CONF_DIR_REL, client.instanceName, Long.parseLong(client.snapName));
+ break;
+ case RESTORE:
+ if (client.force || Utils.confirmAction("Performing a restore will stop a running instance.")) {
+ app = client.makeApplicationContext();
+ res = client.deployConfig();
+ res.addAll(client.distributeBinaries());
+ client.restoreInstance(app, res);
+ }
+ break;
+ default:
+ LOG.fatal("Unknown mode. Known client modes are: start, stop, install, describe, kill, destroy, describe, backup, restore, lsbackup, rmbackup");
+ client.printUsage();
+ System.exit(-1);
+ }
+ }
+
+ private static void startAction(AsterixYARNClient client) throws YarnException {
+ YarnClientApplication app;
+ List<DFSResourceCoordinate> res;
+ ApplicationId appId;
+ try {
+ app = client.makeApplicationContext();
+ res = client.deployConfig();
+ res.addAll(client.distributeBinaries());
+ appId = client.deployAM(app, res, client.mode);
+ LOG.info("Asterix started up with Application ID: " + appId.toString());
+ if (Utils.waitForLiveness(appId, "Waiting for AsterixDB instance to resume ", client.yarnClient,
+ client.instanceName, client.conf, client.ccRestPort)) {
+ System.out.println("Asterix successfully deployed and is now running.");
+ } else {
+ LOG.fatal("AsterixDB appears to have failed to install and start");
+ throw new YarnException("AsterixDB appears to have failed to install and start");
+ }
+ } catch (IOException e) {
+ throw new YarnException(e);
+ }
+ }
+
+ private static void installAction(AsterixYARNClient client) throws YarnException {
+ YarnClientApplication app;
+ List<DFSResourceCoordinate> res;
+ ApplicationId appId;
+ try {
+ app = client.makeApplicationContext();
+ client.installConfig();
+ client.writeAsterixConfig(Utils.parseYarnClusterConfig(client.asterixConf));
+ client.installAsterixConfig(false);
+ res = client.deployConfig();
+ res.addAll(client.distributeBinaries());
+
+ appId = client.deployAM(app, res, client.mode);
+ LOG.info("Asterix started up with Application ID: " + appId.toString());
+ if (Utils.waitForLiveness(appId, "Waiting for new AsterixDB Instance to start ", client.yarnClient,
+ client.instanceName, client.conf, client.ccRestPort)) {
+ System.out.println("Asterix successfully deployed and is now running.");
+ } else {
+ LOG.fatal("AsterixDB appears to have failed to install and start");
+ throw new YarnException("AsterixDB appears to have failed to install and start");
+ }
+ } catch (IOException e) {
+ LOG.fatal("Asterix failed to deploy on to cluster");
+ throw new YarnException(e);
+ }
+ }
+
+ public AsterixYARNClient(Configuration conf) throws Exception {
+
+ this.conf = conf;
+ yarnClient = YarnClient.createYarnClient();
+ //If the HDFS jars aren't on the classpath this won't be set
+ if (conf.get("fs.hdfs.impl", null) == conf.get("fs.file.impl", null)) { //only would happen if both are null
+ conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
+ conf.set("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem");
+ }
+ yarnClient.init(conf);
+ opts = parseConf(conf);
+ }
+
+ private static Options parseConf(Configuration conf) {
+ Options opts = new Options();
+ opts.addOption(new Option("appname", true, "Application Name. Default value - Asterix"));
+ opts.addOption(new Option("priority", true, "Application Priority. Default 0"));
+ opts.addOption(new Option("queue", true, "RM Queue in which this application is to be submitted"));
+ opts.addOption(new Option("master_memory", true,
+ "Amount of memory in MB to be requested to run the application master"));
+ opts.addOption(new Option("log_properties", true, "log4j.properties file"));
+ opts.addOption(new Option("n", "name", true, "Asterix instance name (required)"));
+ opts.addOption(new Option("zip", "asterixZip", true,
+ "zip file with AsterixDB inside- if in non-default location"));
+ opts.addOption(new Option("bc", "baseConfig", true,
+ "base Asterix parameters configuration file if not in default position"));
+ opts.addOption(new Option("c", "asterixConf", true, "Asterix cluster config (required on install)"));
+ opts.addOption(new Option("l", "externalLibs", true, "Libraries to deploy along with Asterix instance"));
+ opts.addOption(new Option("ld", "libDataverse", true, "Dataverse to deploy external libraries to"));
+ opts.addOption(new Option("r", "refresh", false,
+ "If starting an existing instance, this will replace them with the local copy on startup"));
+ opts.addOption(new Option("appId", true, "ApplicationID to monitor if running client in status monitor mode"));
+ opts.addOption(new Option("masterLibsDir", true, "Directory that contains the JARs needed to run the AM"));
+ opts.addOption(new Option("s", "snapshot", true,
+ "Backup timestamp for arguments requiring a specific backup (rm, restore)"));
+ opts.addOption(new Option("v", "debug", false, "Dump out debug information"));
+ opts.addOption(new Option("help", false, "Print usage"));
+ opts.addOption(new Option("f", "force", false,
+ "Execute this command as fully as possible, disregarding any caution"));
+ return opts;
+ }
+
+ /**
+ */
+ public AsterixYARNClient() throws Exception {
+ this(new YarnConfiguration());
+ }
+
+ /**
+ * Helper function to print out usage
+ */
+ private void printUsage() {
+ new HelpFormatter().printHelp("Asterix YARN client. Usage: asterix [options] [mode]", opts);
+ }
+
+ /**
+ * Initialize the client's arguments and parameters before execution.
+ *
+ * @param args
+ * - Standard command-line arguments.
+ * @throws ParseException
+ */
+ public void init(String[] args) throws ParseException {
+
+ CommandLine cliParser = new GnuParser().parse(opts, args);
+ if (cliParser.hasOption("help")) {
+ printUsage();
+ return;
+ }
+ //initialize most things
+ debugFlag = cliParser.hasOption("debug");
+ force = cliParser.hasOption("force");
+ baseConfig = cliParser.getOptionValue("baseConfig");
+ extLibs = cliParser.getOptionValue("externalLibs");
+ libDataverse = cliParser.getOptionValue("libDataverse");
+
+ appName = cliParser.getOptionValue("appname", "AsterixDB");
+ amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0"));
+ amQueue = cliParser.getOptionValue("queue", "default");
+ amMemory = Integer.parseInt(cliParser.getOptionValue("master_memory", "10"));
+
+ instanceName = cliParser.getOptionValue("name");
+ instanceFolder = instanceName + '/';
+ appName = appName + ": " + instanceName;
+
+ asterixConf = cliParser.getOptionValue("asterixConf");
+
+ log4jPropFile = cliParser.getOptionValue("log_properties", "");
+
+ //see if the given argument values are sane in general
+ checkConfSanity(args, cliParser);
+
+ //intialize the mode, see if it is a valid one.
+ initMode(args, cliParser);
+
+ //now check the validity of the arguments given the mode
+ checkModeSanity(args, cliParser);
+
+ //if we are going to refresh the binaries, find that out
+ refresh = cliParser.hasOption("refresh");
+ //same goes for snapshot restoration/removal
+ snapName = cliParser.getOptionValue("snapshot");
+
+ if (!cliParser.hasOption("asterixZip")
+ && (mode == Mode.INSTALL || mode == Mode.ALTER || mode == Mode.DESTROY || mode == Mode.BACKUP)) {
+
+ asterixZip = cliParser.getOptionValue("asterixZip", getAsterixDistributableLocation().getAbsolutePath());
+ } else {
+ asterixZip = cliParser.getOptionValue("asterixZip");
+ }
+
+ }
+
+ /**
+ * Cursory sanity checks for argument sanity, without considering the mode of the client
+ *
+ * @param args
+ * @param cliParser
+ * The parsed arguments.
+ * @throws ParseException
+ */
+ private void checkConfSanity(String[] args, CommandLine cliParser) throws ParseException {
+ String message = null;
+
+ //Sanity check for no args
+ if (args.length == 0) {
+ message = "No args specified for client to initialize";
+ }
+ //AM memory should be a sane value
+ else if (amMemory < 0) {
+ message = "Invalid memory specified for application master, exiting." + " Specified memory=" + amMemory;
+ }
+ //we're good!
+ else {
+ return;
+ }
+ //default:
+ throw new ParseException(message);
+
+ }
+
+ /**
+ * Initialize the mode of the client from the arguments.
+ *
+ * @param args
+ * @param cliParser
+ * @throws ParseException
+ */
+ private void initMode(String[] args, CommandLine cliParser) throws ParseException {
+ @SuppressWarnings("unchecked")
+ List<String> clientVerb = cliParser.getArgList();
+ String message = null;
+ //Now check if there is a mode
+ if (clientVerb == null || clientVerb.size() < 1) {
+ message = "You must specify an action.";
+ }
+ //But there can only be one mode...
+ else if (clientVerb.size() > 1) {
+ message = "Trailing arguments, or too many arguments. Only one action may be performed at a time.";
+ }
+ if (message != null) {
+ throw new ParseException(message);
+ }
+ //Now we can initialize the mode and check it against parameters
+ mode = Mode.fromAlias(clientVerb.get(0));
+ if (mode == null) {
+ mode = Mode.NOOP;
+ }
+ }
+
+ /**
+ * Determine if the command line arguments are sufficient for the requested client mode.
+ *
+ * @param args
+ * The command line arguments.
+ * @param cliParser
+ * Parsed command line arguments.
+ * @throws ParseException
+ */
+
+ private void checkModeSanity(String[] args, CommandLine cliParser) throws ParseException {
+
+ String message = null;
+ //The only time you can use the client without specifiying an instance, is to list all of the instances it sees.
+ if (!cliParser.hasOption("name") && mode != Mode.DESCRIBE) {
+ message = "You must give a name for the instance to be acted upon";
+ } else if (mode == Mode.INSTALL && !cliParser.hasOption("asterixConf")) {
+ message = "No Configuration XML given. Please specify a config for cluster installation";
+ } else if (mode != Mode.START && cliParser.hasOption("refresh")) {
+ message = "Cannot specify refresh in any mode besides start, mode is: " + mode;
+ } else if (cliParser.hasOption("snapshot") && !(mode == Mode.RESTORE || mode == Mode.RMBACKUP)) {
+ message = "Cannot specify a snapshot to restore in any mode besides restore or rmbackup, mode is: " + mode;
+ } else if ((mode == Mode.ALTER || mode == Mode.INSTALL) && baseConfig == null
+ && !(new File(DEFAULT_PARAMETERS_PATH).exists())) {
+ message = "Default asterix parameters file is not in the default location, and no custom location is specified";
+ }
+ //nothing is wrong, so exit
+ else {
+ return;
+ }
+ //otherwise, something is bad.
+ throw new ParseException(message);
+
+ }
+
+ /**
+ * Find the distributable asterix bundle, be it in the default location or in a user-specified location.
+ *
+ * @return
+ */
+ private File getAsterixDistributableLocation() {
+ //Look in the PWD for the "asterix" folder
+ File tarDir = new File("asterix");
+ if (!tarDir.exists()) {
+ throw new IllegalArgumentException(
+ "Default directory structure not in use- please specify an asterix zip and base config file to distribute");
+ }
+ FileFilter tarFilter = new WildcardFileFilter("asterix-server*.zip");
+ File[] tarFiles = tarDir.listFiles(tarFilter);
+ if (tarFiles.length != 1) {
+ throw new IllegalArgumentException(
+ "There is more than one canonically named asterix distributable in the default directory. Please leave only one there.");
+ }
+ return tarFiles[0];
+ }
+
+ /**
+ * Initialize and register the application attempt with the YARN ResourceManager.
+ *
+ * @return
+ * @throws IOException
+ * @throws YarnException
+ */
+ public YarnClientApplication makeApplicationContext() throws IOException, YarnException {
+
+ //first check to see if an instance already exists.
+ FileSystem fs = FileSystem.get(conf);
+ Path lock = new Path(fs.getHomeDirectory(), CONF_DIR_REL + instanceFolder + instanceLock);
+ LOG.info("Running Deployment");
+ yarnClient.start();
+ if (fs.exists(lock)) {
+ ApplicationId lockAppId = getLockFile();
+ try {
+ ApplicationReport previousAppReport = yarnClient.getApplicationReport(lockAppId);
+ YarnApplicationState prevStatus = previousAppReport.getYarnApplicationState();
+ if (!(prevStatus == YarnApplicationState.FAILED || prevStatus == YarnApplicationState.KILLED || prevStatus == YarnApplicationState.FINISHED)
+ && mode != Mode.DESTROY && mode != Mode.BACKUP && mode != Mode.RESTORE) {
+ throw new IllegalStateException("Instance is already running in: " + lockAppId);
+ } else if (mode != Mode.DESTROY && mode != Mode.BACKUP && mode != Mode.RESTORE) {
+ //stale lock file
+ LOG.warn("Stale lockfile detected. Instance attempt " + lockAppId + " may have exited abnormally");
+ deleteLockFile();
+ }
+ } catch (YarnException e) {
+ LOG.warn("Stale lockfile detected, but the RM has no record of this application's last run. This is normal if the cluster was restarted.");
+ deleteLockFile();
+ }
+ }
+
+ // Get a new application id
+ YarnClientApplication app = yarnClient.createApplication();
+ GetNewApplicationResponse appResponse = app.getNewApplicationResponse();
+ int maxMem = appResponse.getMaximumResourceCapability().getMemory();
+ LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
+
+ // A resource ask cannot exceed the max.
+ if (amMemory > maxMem) {
+ LOG.info("AM memory specified above max threshold of cluster. Using max value." + ", specified=" + amMemory
+ + ", max=" + maxMem);
+ amMemory = maxMem;
+ }
+
+ // set the application name
+ ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
+ appContext.setApplicationName(appName);
+
+ return app;
+ }
+
+ /**
+ * Upload the Asterix cluster description on to the DFS. This will persist the state of the instance.
+ *
+ * @return
+ * @throws YarnException
+ * @throws IOException
+ */
+ private List<DFSResourceCoordinate> deployConfig() throws YarnException, IOException {
+
+ FileSystem fs = FileSystem.get(conf);
+ List<DFSResourceCoordinate> resources = new ArrayList<DFSResourceCoordinate>(2);
+
+ String pathSuffix = CONF_DIR_REL + instanceFolder + CONFIG_DEFAULT_NAME;
+ Path dstConf = new Path(fs.getHomeDirectory(), pathSuffix);
+ FileStatus destStatus;
+ try {
+ destStatus = fs.getFileStatus(dstConf);
+ } catch (IOException e) {
+ throw new YarnException("Asterix instance by that name does not appear to exist in DFS");
+ }
+ LocalResource asterixConfLoc = Records.newRecord(LocalResource.class);
+ asterixConfLoc.setType(LocalResourceType.FILE);
+ asterixConfLoc.setVisibility(LocalResourceVisibility.PRIVATE);
+ asterixConfLoc.setResource(ConverterUtils.getYarnUrlFromPath(dstConf));
+ asterixConfLoc.setTimestamp(destStatus.getModificationTime());
+
+ DFSResourceCoordinate conf = new DFSResourceCoordinate();
+ conf.envs.put(dstConf.toUri().toString(), AConstants.CONFLOCATION);
+ conf.envs.put(Long.toString(asterixConfLoc.getSize()), AConstants.CONFLEN);
+ conf.envs.put(Long.toString(asterixConfLoc.getTimestamp()), AConstants.CONFTIMESTAMP);
+ conf.name = CONFIG_DEFAULT_NAME;
+ conf.res = asterixConfLoc;
+ resources.add(conf);
+
+ return resources;
+
+ }
+
+ /**
+ * Install the current Asterix parameters to the DFS. This can be modified via alter.
+ *
+ * @throws YarnException
+ * @throws IOException
+ */
+ private void installConfig() throws YarnException, IOException {
+ FileSystem fs = FileSystem.get(conf);
+ String pathSuffix = CONF_DIR_REL + instanceFolder + CONFIG_DEFAULT_NAME;
+ Path dstConf = new Path(fs.getHomeDirectory(), pathSuffix);
+ try {
+ fs.getFileStatus(dstConf);
+ if (mode == Mode.INSTALL) {
+ throw new IllegalStateException("Instance with this name already exists.");
+ }
+ } catch (FileNotFoundException e) {
+ if (mode == Mode.START) {
+ throw new IllegalStateException("Instance does not exist for this user", e);
+ }
+ }
+ if (mode == Mode.INSTALL) {
+ Path src = new Path(asterixConf);
+ fs.copyFromLocalFile(false, true, src, dstConf);
+ }
+
+ }
+
+ /**
+ * Upload External libraries and functions to HDFS for an instance to use when started
+ * @throws IllegalStateException
+ * @throws IOException
+ */
+
+ private void installExtLibs() throws IllegalStateException, IOException {
+ FileSystem fs = FileSystem.get(conf);
+ if (!instanceExists()) {
+ throw new IllegalStateException("No instance by name " + instanceName + " found.");
+ }
+ if (isRunning()) {
+ throw new IllegalStateException("Instance " + instanceName
+ + " is running. Please stop it before installing any libraries.");
+ }
+ String libPathSuffix = CONF_DIR_REL + instanceFolder + "library" + Path.SEPARATOR + libDataverse
+ + Path.SEPARATOR;
+ Path src = new Path(extLibs);
+ String fullLibPath = libPathSuffix + src.getName();
+ Path libFilePath = new Path(fs.getHomeDirectory(), fullLibPath);
+ LOG.info("Copying Asterix external library to DFS");
+ fs.copyFromLocalFile(false, true, src, libFilePath);
+ }
+
+ /**
+ * Finds the minimal classes and JARs needed to start the AM only.
+ * @return Resources the AM needs to start on the initial container.
+ * @throws IllegalStateException
+ * @throws IOException
+ */
+ private List<DFSResourceCoordinate> installAmLibs() throws IllegalStateException, IOException {
+ List<DFSResourceCoordinate> resources = new ArrayList<DFSResourceCoordinate>(2);
+ FileSystem fs = FileSystem.get(conf);
+ String fullLibPath = CONF_DIR_REL + instanceFolder + "am_jars" + Path.SEPARATOR;
+ String[] cp = System.getProperty("java.class.path").split(System.getProperty("path.separator"));
+ String asterixJarPattern = "^(asterix).*(jar)$"; //starts with asterix,ends with jar
+ String commonsJarPattern = "^(commons).*(jar)$";
+ String surefireJarPattern = "^(surefire).*(jar)$"; //for maven tests
+ String jUnitTestPattern = "^(asterix-yarn" + File.separator + "target)$";
+
+ LOG.info(File.separator);
+ for (String j : cp) {
+ String[] pathComponents = j.split(Pattern.quote(File.separator));
+ LOG.info(j);
+ LOG.info(pathComponents[pathComponents.length - 1]);
+ if (pathComponents[pathComponents.length - 1].matches(asterixJarPattern)
+ || pathComponents[pathComponents.length - 1].matches(commonsJarPattern)
+ || pathComponents[pathComponents.length - 1].matches(surefireJarPattern)
+ || pathComponents[pathComponents.length - 1].matches(jUnitTestPattern)) {
+ LOG.info("Loading JAR/classpath: " + j);
+ File f = new File(j);
+ Path dst = new Path(fs.getHomeDirectory(), fullLibPath + f.getName());
+ if (!fs.exists(dst) || refresh) {
+ fs.copyFromLocalFile(false, true, new Path(f.getAbsolutePath()), dst);
+ }
+ FileStatus dstSt = fs.getFileStatus(dst);
+ LocalResource amLib = Records.newRecord(LocalResource.class);
+ amLib.setType(LocalResourceType.FILE);
+ amLib.setVisibility(LocalResourceVisibility.PRIVATE);
+ amLib.setResource(ConverterUtils.getYarnUrlFromPath(dst));
+ amLib.setTimestamp(dstSt.getModificationTime());
+ amLib.setSize(dstSt.getLen());
+ DFSResourceCoordinate amLibCoord = new DFSResourceCoordinate();
+ amLibCoord.res = amLib;
+ amLibCoord.name = f.getName();
+ if (f.getName().contains("asterix-yarn") || f.getName().contains("surefire")) {
+ amLibCoord.envs.put(dst.toUri().toString(), AConstants.APPLICATIONMASTERJARLOCATION);
+ amLibCoord.envs.put(Long.toString(dstSt.getLen()), AConstants.APPLICATIONMASTERJARLEN);
+ amLibCoord.envs.put(Long.toString(dstSt.getModificationTime()),
+ AConstants.APPLICATIONMASTERJARTIMESTAMP);
+ }
+ resources.add(amLibCoord);
+ }
+
+ }
+ if (resources.size() == 0) {
+ throw new IOException("Required JARs are missing. Please check your directory structure");
+ }
+ return resources;
+ }
+
+ /**
+ * Uploads a AsterixDB cluster configuration to HDFS for the AM to use.
+ * @param overwrite Overwrite existing configurations by the same name.
+ * @throws IllegalStateException
+ * @throws IOException
+ */
+ private void installAsterixConfig(boolean overwrite) throws IllegalStateException, IOException {
+ FileSystem fs = FileSystem.get(conf);
+ File srcfile = new File(MERGED_PARAMETERS_PATH);
+ Path src = new Path(srcfile.getCanonicalPath());
+ String pathSuffix = CONF_DIR_REL + instanceFolder + File.separator + PARAMS_DEFAULT_NAME;
+ Path dst = new Path(fs.getHomeDirectory(), pathSuffix);
+ if (fs.exists(dst) && !overwrite) {
+
+ throw new IllegalStateException(
+ "Instance exists. Please delete an existing instance before trying to overwrite");
+ }
+ fs.copyFromLocalFile(false, true, src, dst);
+ }
+
+ /**
+ * Uploads binary resources to HDFS for use by the AM
+ * @return
+ * @throws IOException
+ * @throws YarnException
+ */
+ public List<DFSResourceCoordinate> distributeBinaries() throws IOException, YarnException {
+
+ List<DFSResourceCoordinate> resources = new ArrayList<DFSResourceCoordinate>(2);
+ // Copy the application master jar to the filesystem
+ // Create a local resource to point to the destination jar path
+ FileSystem fs = FileSystem.get(conf);
+ Path src, dst;
+ FileStatus destStatus;
+ String pathSuffix;
+
+ // adding info so we can add the jar to the App master container path
+
+ // Add the asterix tarfile to HDFS for easy distribution
+ // Keep it all archived for now so add it as a file...
+
+ pathSuffix = CONF_DIR_REL + instanceFolder + "asterix-server.zip";
+ dst = new Path(fs.getHomeDirectory(), pathSuffix);
+ if (refresh) {
+ if (fs.exists(dst)) {
+ fs.delete(dst, false);
+ }
+ }
+ if (!fs.exists(dst)) {
+ src = new Path(asterixZip);
+ LOG.info("Copying Asterix distributable to DFS");
+ fs.copyFromLocalFile(false, true, src, dst);
+ }
+ destStatus = fs.getFileStatus(dst);
+ LocalResource asterixTarLoc = Records.newRecord(LocalResource.class);
+ asterixTarLoc.setType(LocalResourceType.ARCHIVE);
+ asterixTarLoc.setVisibility(LocalResourceVisibility.PRIVATE);
+ asterixTarLoc.setResource(ConverterUtils.getYarnUrlFromPath(dst));
+ asterixTarLoc.setTimestamp(destStatus.getModificationTime());
+
+ // adding info so we can add the tarball to the App master container path
+ DFSResourceCoordinate tar = new DFSResourceCoordinate();
+ tar.envs.put(dst.toUri().toString(), AConstants.TARLOCATION);
+ tar.envs.put(Long.toString(asterixTarLoc.getSize()), AConstants.TARLEN);
+ tar.envs.put(Long.toString(asterixTarLoc.getTimestamp()), AConstants.TARTIMESTAMP);
+ tar.res = asterixTarLoc;
+ tar.name = "asterix-server.zip";
+ resources.add(tar);
+
+ // Set the log4j properties if needed
+ if (!log4jPropFile.isEmpty()) {
+ Path log4jSrc = new Path(log4jPropFile);
+ Path log4jDst = new Path(fs.getHomeDirectory(), "log4j.props");
+ fs.copyFromLocalFile(false, true, log4jSrc, log4jDst);
+ FileStatus log4jFileStatus = fs.getFileStatus(log4jDst);
+ LocalResource log4jRsrc = Records.newRecord(LocalResource.class);
+ log4jRsrc.setType(LocalResourceType.FILE);
+ log4jRsrc.setVisibility(LocalResourceVisibility.PRIVATE);
+ log4jRsrc.setResource(ConverterUtils.getYarnUrlFromURI(log4jDst.toUri()));
+ log4jRsrc.setTimestamp(log4jFileStatus.getModificationTime());
+ log4jRsrc.setSize(log4jFileStatus.getLen());
+ DFSResourceCoordinate l4j = new DFSResourceCoordinate();
+ tar.res = log4jRsrc;
+ tar.name = "log4j.properties";
+ resources.add(l4j);
+ }
+
+ resources.addAll(installAmLibs());
+ return resources;
+ }
+
+ /**
+ * Submits the request to start the AsterixApplicationMaster to the YARN ResourceManager.
+ *
+ * @param app
+ * The application attempt handle.
+ * @param resources
+ * Resources to be distributed as part of the container launch
+ * @param mode
+ * The mode of the ApplicationMaster
+ * @return The application ID of the new Asterix instance.
+ * @throws IOException
+ * @throws YarnException
+ */
+
+ public ApplicationId deployAM(YarnClientApplication app, List<DFSResourceCoordinate> resources, Mode mode)
+ throws IOException, YarnException {
+
+ // Set up the container launch context for the application master
+ ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
+
+ ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
+
+ // Set local resource info into app master container launch context
+ Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+ for (DFSResourceCoordinate res : resources) {
+ localResources.put(res.name, res.res);
+ }
+ amContainer.setLocalResources(localResources);
+ // Set the env variables to be setup in the env where the application
+ // master will be run
+ LOG.info("Set the environment for the application master");
+ Map<String, String> env = new HashMap<String, String>();
+
+ // using the env info, the application master will create the correct
+ // local resource for the
+ // eventual containers that will be launched to execute the shell
+ // scripts
+ for (DFSResourceCoordinate res : resources) {
+ if (res.envs == null) { //some entries may not have environment variables.
+ continue;
+ }
+ for (Map.Entry<String, String> e : res.envs.entrySet()) {
+ env.put(e.getValue(), e.getKey());
+ }
+ }
+ //this is needed for when the RM address isn't known from the environment of the AM
+ env.put(AConstants.RMADDRESS, conf.get("yarn.resourcemanager.address"));
+ env.put(AConstants.RMSCHEDULERADDRESS, conf.get("yarn.resourcemanager.scheduler.address"));
+ ///add miscellaneous environment variables.
+ env.put(AConstants.INSTANCESTORE, CONF_DIR_REL + instanceFolder);
+ env.put(AConstants.DFS_BASE, FileSystem.get(conf).getHomeDirectory().toUri().toString());
+ env.put(AConstants.CC_JAVA_OPTS, ccJavaOpts);
+ env.put(AConstants.NC_JAVA_OPTS, ncJavaOpts);
+
+ // Add AppMaster.jar location to classpath
+ // At some point we should not be required to add
+ // the hadoop specific classpaths to the env.
+ // It should be provided out of the box.
+ // For now setting all required classpaths including
+ // the classpath to "." for the application jar
+ StringBuilder classPathEnv = new StringBuilder("").append("./*");
+ for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+ YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
+ classPathEnv.append(File.pathSeparatorChar);
+ classPathEnv.append(c.trim());
+ }
+ classPathEnv.append(File.pathSeparatorChar).append("." + File.separator + "log4j.properties");
+
+ // add the runtime classpath needed for tests to work
+ if (conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
+ LOG.info("In YARN MiniCluster");
+ classPathEnv.append(System.getProperty("path.separator"));
+ classPathEnv.append(System.getProperty("java.class.path"));
+ env.put("HADOOP_CONF_DIR", System.getProperty("user.dir") + File.separator + "target" + File.separator);
+ }
+ LOG.info("AM Classpath:" + classPathEnv.toString());
+ env.put("CLASSPATH", classPathEnv.toString());
+
+ amContainer.setEnvironment(env);
+
+ // Set the necessary command to execute the application master
+ Vector<CharSequence> vargs = new Vector<CharSequence>(30);
+
+ // Set java executable command
+ LOG.info("Setting up app master command");
+ vargs.add(JAVA_HOME + File.separator + "bin" + File.separator + "java");
+ // Set class name
+ vargs.add(appMasterMainClass);
+ //Set params for Application Master
+ if (debugFlag) {
+ vargs.add("-debug");
+ }
+ if (mode == Mode.DESTROY) {
+ vargs.add("-obliterate");
+ }
+ else if (mode == Mode.BACKUP) {
+ vargs.add("-backup");
+ }
+ else if (mode == Mode.RESTORE) {
+ vargs.add("-restore " + snapName);
+ }
+ else if( mode == Mode.INSTALL){
+ vargs.add("-initial ");
+ }
+ if (refresh) {
+ vargs.add("-refresh");
+ }
+ //vargs.add("/bin/ls -alh asterix-server.zip/repo");
+ vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + File.separator + "AppMaster.stdout");
+ vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + File.separator + "AppMaster.stderr");
+ // Get final commmand
+ StringBuilder command = new StringBuilder();
+ for (CharSequence str : vargs) {
+ command.append(str).append(" ");
+ }
+
+ LOG.info("Completed setting up app master command " + command.toString());
+ List<String> commands = new ArrayList<String>();
+ commands.add(command.toString());
+ amContainer.setCommands(commands);
+
+ // Set up resource type requirements
+ // For now, only memory is supported so we set memory requirements
+ Resource capability = Records.newRecord(Resource.class);
+ capability.setMemory(amMemory);
+ appContext.setResource(capability);
+
+ // Service data is a binary blob that can be passed to the application
+ // Not needed in this scenario
+ // amContainer.setServiceData(serviceData);
+
+ // The following are not required for launching an application master
+ // amContainer.setContainerId(containerId);
+
+ appContext.setAMContainerSpec(amContainer);
+
+ // Set the priority for the application master
+ Priority pri = Records.newRecord(Priority.class);
+ // TODO - what is the range for priority? how to decide?
+ pri.setPriority(amPriority);
+ appContext.setPriority(pri);
+
+ // Set the queue to which this application is to be submitted in the RM
+ appContext.setQueue(amQueue);
+
+ // Submit the application to the applications manager
+ // SubmitApplicationResponse submitResp =
+ // applicationsManager.submitApplication(appRequest);
+ // Ignore the response as either a valid response object is returned on
+ // success
+ // or an exception thrown to denote some form of a failure
+ LOG.info("Submitting application to ASM");
+
+ yarnClient.submitApplication(appContext);
+
+ //now write the instance lock
+ if (mode == Mode.INSTALL || mode == Mode.START) {
+ FileSystem fs = FileSystem.get(conf);
+ Path lock = new Path(fs.getHomeDirectory(), CONF_DIR_REL + instanceFolder + instanceLock);
+ if (fs.exists(lock)) {
+ throw new IllegalStateException("Somehow, this instance has been launched twice. ");
+ }
+ BufferedWriter out = new BufferedWriter(new OutputStreamWriter(fs.create(lock, true)));
+ try {
+ out.write(app.getApplicationSubmissionContext().getApplicationId().toString());
+ out.close();
+ } finally {
+ out.close();
+ }
+ }
+ return app.getApplicationSubmissionContext().getApplicationId();
+
+ }
+
+ /**
+ * Asks YARN to kill a given application by appId
+ * @param appId The application to kill.
+ * @param yarnClient The YARN client object that is connected to the RM.
+ * @throws YarnException
+ * @throws IOException
+ */
+
+ public static void killApplication(ApplicationId appId, YarnClient yarnClient) throws YarnException, IOException {
+ if (appId == null) {
+ throw new YarnException("No Application given to kill");
+ }
+ if (yarnClient.isInState(STATE.INITED)) {
+ yarnClient.start();
+ }
+ YarnApplicationState st;
+ ApplicationReport rep = yarnClient.getApplicationReport(appId);
+ st = rep.getYarnApplicationState();
+ if (st == YarnApplicationState.FINISHED || st == YarnApplicationState.KILLED
+ || st == YarnApplicationState.FAILED) {
+ LOG.info("Application " + appId + " already exited.");
+ return;
+ }
+ LOG.info("Killing applicaiton with ID: " + appId);
+ yarnClient.killApplication(appId);
+
+ }
+
+ /**
+ * Tries to stop a running AsterixDB instance gracefully.
+ * @throws IOException
+ * @throws YarnException
+ */
+ private void stopInstanceIfRunning()
+ throws IOException, YarnException {
+ FileSystem fs = FileSystem.get(conf);
+ String pathSuffix = CONF_DIR_REL + instanceFolder + CONFIG_DEFAULT_NAME;
+ Path dstConf = new Path(fs.getHomeDirectory(), pathSuffix);
+ //if the instance is up, fix that
+ if (isRunning()) {
+ try {
+ this.stopInstance();
+ } catch (IOException e) {
+ throw new YarnException(e);
+ }
+ } else if (!fs.exists(dstConf)) {
+ throw new YarnException("No instance configured with that name exists");
+ }
+ }
+
+ /**
+ * Start a YARN job to delete local AsterixDB resources of an extant instance
+ * @param app The Client connection
+ * @param resources AM resources
+ * @throws IOException
+ * @throws YarnException
+ */
+
+ private void removeInstance(YarnClientApplication app, List<DFSResourceCoordinate> resources) throws IOException,
+ YarnException {
+ FileSystem fs = FileSystem.get(conf);
+ //if the instance is up, fix that
+ stopInstanceIfRunning();
+ //now try deleting all of the on-disk artifacts on the cluster
+ ApplicationId deleter = deployAM(app, resources, Mode.DESTROY);
+ boolean delete_start = Utils.waitForApplication(deleter, yarnClient, "Waiting for deletion to start", ccRestPort);
+ if (!delete_start) {
+ if (force) {
+ fs.delete(new Path(CONF_DIR_REL + instanceFolder), true);
+ LOG.error("Forcing deletion of HDFS resources");
+ }
+ LOG.fatal(" of on-disk persistient resources on individual nodes failed.");
+ throw new YarnException();
+ }
+ boolean deleted = waitForCompletion(deleter, "Deletion in progress");
+ if (!(deleted || force)) {
+ LOG.fatal("Cleanup of on-disk persistent resources failed.");
+ return;
+ } else {
+ fs.delete(new Path(CONF_DIR_REL + instanceFolder), true);
+ }
+ System.out.println("Deletion of instance succeeded.");
+
+ }
+
+ /**
+ * Start a YARN job to copy all data-containing resources of an AsterixDB instance to HDFS
+ * @param app
+ * @param resources
+ * @throws IOException
+ * @throws YarnException
+ */
+
+ private void backupInstance(YarnClientApplication app, List<DFSResourceCoordinate> resources) throws IOException,
+ YarnException {
+ stopInstanceIfRunning();
+ ApplicationId backerUpper = deployAM(app, resources, Mode.BACKUP);
+ boolean backupStart;
+ backupStart = Utils.waitForApplication(backerUpper, yarnClient, "Waiting for backup " + backerUpper.toString()
+ + "to start", ccRestPort);
+ if (!backupStart) {
+ LOG.fatal("Backup failed to start");
+ throw new YarnException();
+ }
+ boolean complete;
+ complete = waitForCompletion(backerUpper, "Backup in progress");
+ if (!complete) {
+ LOG.fatal("Backup failed- timeout waiting for completion");
+ return;
+ }
+ System.out.println("Backup of instance succeeded.");
+ }
+
+ /**
+ * Start a YARN job to copy a set of resources from backupInstance to restore the state of an extant AsterixDB instance
+ * @param app
+ * @param resources
+ * @throws IOException
+ * @throws YarnException
+ */
+
+ private void restoreInstance(YarnClientApplication app, List<DFSResourceCoordinate> resources) throws IOException,
+ YarnException {
+ stopInstanceIfRunning();
+ ApplicationId restorer = deployAM(app, resources, Mode.RESTORE);
+ boolean restoreStart = Utils.waitForApplication(restorer, yarnClient, "Waiting for restore to start", ccRestPort);
+ if (!restoreStart) {
+ LOG.fatal("Restore failed to start");
+ throw new YarnException();
+ }
+ boolean complete = waitForCompletion(restorer, "Restore in progress");
+ if (!complete) {
+ LOG.fatal("Restore failed- timeout waiting for completion");
+ return;
+ }
+ System.out.println("Restoration of instance succeeded.");
+ }
+
+ /**
+ * Stops the instance and remove the lockfile to allow a restart.
+ *
+ * @throws IOException
+ * @throws JAXBException
+ * @throws YarnException
+ */
+
+ private void stopInstance() throws IOException, YarnException {
+ ApplicationId appId = getLockFile();
+ //get CC rest API port if it is nonstandard
+ readConfigParams(locateConfig());
+ if (yarnClient.isInState(STATE.INITED)) {
+ yarnClient.start();
+ }
+ System.out.println("Stopping instance " + instanceName);
+ if (!isRunning()) {
+ LOG.fatal("AsterixDB instance by that name is stopped already");
+ return;
+ }
+ try {
+ String ccIp = Utils.getCCHostname(instanceName, conf);
+ Utils.sendShutdownCall(ccIp,ccRestPort);
+ } catch (IOException e) {
+ LOG.error("Error while trying to issue safe shutdown:", e);
+ }
+ //now make sure it is actually gone and not "stuck"
+ String message = "Waiting for AsterixDB to shut down";
+ boolean completed = waitForCompletion(appId, message);
+ if (!completed && force) {
+ LOG.warn("Instance failed to stop gracefully, now killing it");
+ try {
+ AsterixYARNClient.killApplication(appId, yarnClient);
+ completed = true;
+ } catch (YarnException e1) {
+ LOG.fatal("Could not stop nor kill instance gracefully.",e1);
+ return;
+ }
+ }
+ if (completed) {
+ deleteLockFile();
+ }
+ }
+
+ private void deleteLockFile() throws IOException {
+ if (instanceName == null || instanceName == "") {
+ return;
+ }
+ FileSystem fs = FileSystem.get(conf);
+ Path lockPath = new Path(fs.getHomeDirectory(), CONF_DIR_REL + instanceName + '/' + instanceLock);
+ if (fs.exists(lockPath)) {
+ fs.delete(lockPath, false);
+ }
+ }
+
+ private boolean instanceExists() throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ String pathSuffix = CONF_DIR_REL + instanceFolder + CONFIG_DEFAULT_NAME;
+ Path dstConf = new Path(fs.getHomeDirectory(), pathSuffix);
+ return fs.exists(dstConf);
+ }
+
+ private boolean isRunning() throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ String pathSuffix = CONF_DIR_REL + instanceFolder + CONFIG_DEFAULT_NAME;
+ Path dstConf = new Path(fs.getHomeDirectory(), pathSuffix);
+ if (fs.exists(dstConf)) {
+ Path lock = new Path(fs.getHomeDirectory(), CONF_DIR_REL + instanceFolder + instanceLock);
+ return fs.exists(lock);
+ } else {
+ return false;
+ }
+ }
+
+ private ApplicationId getLockFile() throws IOException, YarnException {
+ if (instanceFolder == "") {
+ throw new IllegalStateException("Instance name not given.");
+ }
+ FileSystem fs = FileSystem.get(conf);
+ Path lockPath = new Path(fs.getHomeDirectory(), CONF_DIR_REL + instanceFolder + instanceLock);
+ if (!fs.exists(lockPath)) {
+ throw new YarnException("Instance appears to not be running. If you know it is, try using kill");
+ }
+ BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(lockPath)));
+ String lockAppId = br.readLine();
+ br.close();
+ return ConverterUtils.toApplicationId(lockAppId);
+ }
+
+ public static ApplicationId getLockFile(String instanceName, Configuration conf) throws IOException {
+ if (instanceName == "") {
+ throw new IllegalStateException("Instance name not given.");
+ }
+ FileSystem fs = FileSystem.get(conf);
+ Path lockPath = new Path(fs.getHomeDirectory(), CONF_DIR_REL + instanceName + '/' + instanceLock);
+ if (!fs.exists(lockPath)) {
+ return null;
+ }
+ BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(lockPath)));
+ String lockAppId = br.readLine();
+ br.close();
+ return ConverterUtils.toApplicationId(lockAppId);
+ }
+
+ /**
+ * Locate the Asterix parameters file.
+ * @return
+ * @throws FileNotFoundException
+ * @throws IOException
+ */
+ private AsterixConfiguration locateConfig() throws FileNotFoundException, IOException{
+ AsterixConfiguration configuration;
+ String configPathBase = MERGED_PARAMETERS_PATH;
+ if (baseConfig != null) {
+ configuration = Utils.loadAsterixConfig(baseConfig);
+ configPathBase = new File(baseConfig).getParentFile().getAbsolutePath() + File.separator
+ + PARAMS_DEFAULT_NAME;
+ MERGED_PARAMETERS_PATH = configPathBase;
+ } else {
+ configuration = Utils.loadAsterixConfig(DEFAULT_PARAMETERS_PATH);
+ }
+ return configuration;
+ }
+
+ /**
+ *
+ */
+ private void readConfigParams(AsterixConfiguration configuration){
+ //this is the "base" config that is inside the zip, we start here
+ for (edu.uci.ics.asterix.common.configuration.Property property : configuration.getProperty()) {
+ if (property.getName().equalsIgnoreCase(CC_JAVA_OPTS_KEY)) {
+ ccJavaOpts = property.getValue();
+ } else if (property.getName().equalsIgnoreCase(NC_JAVA_OPTS_KEY)) {
+ ncJavaOpts = property.getValue();
+ } else if(property.getName().equalsIgnoreCase(CC_REST_PORT_KEY)){
+ ccRestPort = Integer.parseInt(property.getValue());
+ }
+
+ }
+ }
+
+ /**
+ * Retrieves necessary information from the cluster configuration and splices it into the Asterix configuration parameters
+ * @param cluster
+ * @throws FileNotFoundException
+ * @throws IOException
+ */
+
+ private void writeAsterixConfig(Cluster cluster) throws FileNotFoundException, IOException {
+ String metadataNodeId = Utils.getMetadataNode(cluster).getId();
+ String asterixInstanceName = instanceName;
+
+ AsterixConfiguration configuration = locateConfig();
+
+ readConfigParams(configuration);
+
+ String version = Utils.getAsterixVersionFromClasspath();
+ configuration.setVersion(version);
+
+ configuration.setInstanceName(asterixInstanceName);
+ String storeDir = null;
+ List<Store> stores = new ArrayList<Store>();
+ for (Node node : cluster.getNode()) {
+ storeDir = node.getStore() == null ? cluster.getStore() : node.getStore();
+ stores.add(new Store(node.getId(), storeDir));
+ }
+ configuration.setStore(stores);
+
+ List<Coredump> coredump = new ArrayList<Coredump>();
+ String coredumpDir = null;
+ List<TransactionLogDir> txnLogDirs = new ArrayList<TransactionLogDir>();
+ String txnLogDir = null;
+ for (Node node : cluster.getNode()) {
+ coredumpDir = node.getLogDir() == null ? cluster.getLogDir() : node.getLogDir();
+ coredump.add(new Coredump(node.getId(), coredumpDir + "coredump" + File.separator));
+ txnLogDir = node.getTxnLogDir() == null ? cluster.getTxnLogDir() : node.getTxnLogDir(); //node or cluster-wide
+ txnLogDirs.add(new TransactionLogDir(node.getId(), txnLogDir
+ + (txnLogDir.charAt(txnLogDir.length() - 1) == File.separatorChar ? File.separator : "")
+ + "txnLogs" //if the string doesn't have a trailing / add one
+ + File.separator));
+ }
+ configuration.setMetadataNode(metadataNodeId);
+
+ configuration.setCoredump(coredump);
+ configuration.setTransactionLogDir(txnLogDirs);
+ FileOutputStream os = new FileOutputStream(MERGED_PARAMETERS_PATH);
+ try {
+ JAXBContext ctx = JAXBContext.newInstance(AsterixConfiguration.class);
+ Marshaller marshaller = ctx.createMarshaller();
+ marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true);
+
+ marshaller.marshal(configuration, os);
+ } catch (JAXBException e) {
+ throw new IOException(e);
+ } finally {
+ os.close();
+ }
+ }
+
+ private boolean waitForCompletion(ApplicationId appId, String message) throws YarnException, IOException {
+ return Utils.waitForApplication(appId, yarnClient, message, ccRestPort);
+ }
+
+ private class DFSResourceCoordinate {
+ String name;
+ LocalResource res;
+ Map<String, String> envs;
+
+ public DFSResourceCoordinate() {
+ envs = new HashMap<String, String>(3);
+ }
+ }
+}
diff --git a/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/Deleter.java b/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/Deleter.java
new file mode 100644
index 0000000..55a9d0b
--- /dev/null
+++ b/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/Deleter.java
@@ -0,0 +1,31 @@
+package edu.uci.ics.asterix.aoya;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+
+public class Deleter {
+ private static final Log LOG = LogFactory.getLog(Deleter.class);
+
+ public static void main(String[] args) throws IOException {
+
+ LogManager.getRootLogger().setLevel(Level.DEBUG);
+
+ LOG.info("Obliterator args: " + Arrays.toString(args));
+ for (int i = 0; i < args.length; i++) {
+ File f = new File(args[i]);
+ if (f.exists()) {
+ LOG.info("Deleting: " + f.getPath());
+ FileUtils.deleteDirectory(f);
+ } else {
+ LOG.error("Could not find file to delete: " + f.getPath());
+ }
+ }
+ }
+}
diff --git a/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/HDFSBackup.java b/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/HDFSBackup.java
new file mode 100644
index 0000000..13c9a6e
--- /dev/null
+++ b/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/HDFSBackup.java
@@ -0,0 +1,94 @@
+package edu.uci.ics.asterix.aoya;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+public class HDFSBackup {
+ Configuration conf = new YarnConfiguration();
+ private static final Log LOG = LogFactory.getLog(AsterixApplicationMaster.class);
+ boolean restore = false;
+ boolean backup = false;
+
+ public static void main(String[] args) throws ParseException, IllegalArgumentException, IOException {
+
+ HDFSBackup back = new HDFSBackup();
+ Map<String, String> envs = System.getenv();
+ if(envs.containsKey("HADOOP_CONF_DIR")){
+ File hadoopConfDir = new File(envs.get("HADOOP_CONF_DIR"));
+ if(hadoopConfDir.isDirectory()){
+ for(File config: hadoopConfDir.listFiles()){
+ if(config.getName().matches("^.*(xml)$")){
+ back.conf.addResource(new Path(config.getAbsolutePath()));
+ }
+ }
+ }
+ }
+ Options opts = new Options();
+ opts.addOption("restore", false, "");
+ opts.addOption("backup", false, "");
+ CommandLine cliParser = new GnuParser().parse(opts, args);
+ if (cliParser.hasOption("restore")) {
+ back.restore = true;
+ }
+ if (cliParser.hasOption("backup")) {
+ back.backup = true;
+ }
+ @SuppressWarnings("unchecked")
+ List<String> pairs = (List<String>) cliParser.getArgList();
+
+ List<Path[]> sources = new ArrayList<Path[]>(10);
+ for (String p : pairs) {
+ String[] s = p.split(",");
+ sources.add(new Path[] { new Path(s[0]), new Path(s[1]) });
+ }
+
+ try {
+ if (back.backup) {
+ back.performBackup(sources);
+ }
+ if (back.restore) {
+ back.performRestore(sources);
+ }
+ } catch (IOException e) {
+ back.LOG.fatal("Backup/restoration unsuccessful: " + e.getMessage());
+ throw e;
+ }
+ }
+
+ private void performBackup(List<Path[]> paths) throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ for (Path[] p : paths) {
+ LOG.info("Backing up " + p[0] + " to " + p[1] + ".");
+ fs.copyFromLocalFile(p[0], p[1]);
+ }
+ }
+
+ private void performRestore(List<Path[]> paths) throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ for (Path[] p : paths) {
+ LOG.info("Restoring " + p[0] + " to " + p[1] + ".");
+ File f = new File(p[1].toString() + File.separator + p[0].getName());
+ LOG.info(f.getAbsolutePath());
+ if (f.exists()) {
+ FileUtils.deleteDirectory(f);
+ }
+ LOG.info(f.exists());
+ fs.copyToLocalFile(false, p[0], p[1], true);
+ }
+ }
+}
diff --git a/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/Utils.java b/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/Utils.java
new file mode 100644
index 0000000..4eb8bc3
--- /dev/null
+++ b/asterix-yarn/src/main/java/edu/uci/ics/asterix/aoya/Utils.java
@@ -0,0 +1,462 @@
+package edu.uci.ics.asterix.aoya;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.ConnectException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Scanner;
+import java.util.regex.Pattern;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Marshaller;
+import javax.xml.bind.Unmarshaller;
+
+import org.apache.commons.httpclient.*;
+import org.apache.commons.httpclient.methods.GetMethod;
+import org.apache.commons.httpclient.methods.PostMethod;
+import org.apache.commons.httpclient.params.HttpMethodParams;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+import edu.uci.ics.asterix.common.configuration.AsterixConfiguration;
+import edu.uci.ics.asterix.event.schema.yarnCluster.Cluster;
+import edu.uci.ics.asterix.event.schema.yarnCluster.Node;
+
+public class Utils {
+
+ private Utils() {
+
+ }
+
+ private static final String CONF_DIR_REL = AsterixYARNClient.CONF_DIR_REL;
+
+ public static String hostFromContainerID(String containerID) {
+ return containerID.split("_")[4];
+ }
+
+ /**
+ * Gets the metadata node from an AsterixDB cluster description file
+ *
+ * @param cluster
+ * The cluster description in question.
+ * @return
+ */
+ public static Node getMetadataNode(Cluster cluster) {
+ Node metadataNode = null;
+ if (cluster.getMetadataNode() != null) {
+ for (Node node : cluster.getNode()) {
+ if (node.getId().equals(cluster.getMetadataNode())) {
+ metadataNode = node;
+ break;
+ }
+ }
+ } else {
+ //I will pick one for you.
+ metadataNode = cluster.getNode().get(1);
+ }
+ return metadataNode;
+ }
+
+ /**
+ * Sends a "poison pill" message to an AsterixDB instance for it to shut down safely.
+ *
+ * @param host
+ * The host to shut down.
+ * @throws IOException
+ */
+
+ public static void sendShutdownCall(String host, int port) throws IOException {
+ final String url = "http://" + host + ":" + port + "/admin/shutdown";
+ PostMethod method = new PostMethod(url);
+ try {
+ executeHTTPCall(method);
+ } catch (NoHttpResponseException e) {
+ //do nothing... this is expected
+ }
+ //now let's test that the instance is really down, or throw an exception
+ try {
+ executeHTTPCall(method);
+ } catch (ConnectException e) {
+ return;
+ }
+ throw new IOException("Instance did not shut down cleanly.");
+ }
+
+ /**
+ * Simple test via the AsterixDB Javascript API to determine if an instance is truly live or not.
+ * Queries the Metadata dataset and returns true if the query completes successfully, false otherwise.
+ *
+ * @param host
+ * The host to run the query against
+ * @return
+ * True if the instance is OK, false otherwise.
+ * @throws IOException
+ */
+ public static boolean probeLiveness(String host, int port) throws IOException {
+ final String url = "http://" + host + ":" + port + "/query";
+ final String test = "for $x in dataset Metadata.Dataset return $x;";
+ GetMethod method = new GetMethod(url);
+ method.setQueryString(new NameValuePair[] { new NameValuePair("query", test) });
+ InputStream response;
+ try {
+ response = executeHTTPCall(method);
+ } catch (ConnectException e) {
+ return false;
+ }
+ if (response == null) {
+ return false;
+ }
+ BufferedReader br = new BufferedReader(new InputStreamReader(response));
+ String result = br.readLine();
+ if (result == null) {
+ return false;
+ }
+ if(method.getStatusCode() != HttpStatus.SC_OK){
+ return false;
+ }
+ return true;
+ }
+
+ private static InputStream executeHTTPCall(HttpMethod method) throws HttpException, IOException {
+ HttpClient client = new HttpClient();
+ HttpMethodRetryHandler noop = new HttpMethodRetryHandler() {
+ @Override
+ public boolean retryMethod(final HttpMethod method, final IOException exception, int executionCount) {
+ return false;
+ }
+ };
+ client.getParams().setParameter(HttpMethodParams.RETRY_HANDLER, noop);
+ client.executeMethod(method);
+ return method.getResponseBodyAsStream();
+ }
+
+ //**
+
+ public static String makeDots(int iter) {
+ int pos = iter % 3;
+ char[] dots = { ' ', ' ', ' ' };
+ dots[pos] = '.';
+ return new String(dots);
+ }
+
+ public static boolean confirmAction(String warning) {
+ System.out.println(warning);
+ System.out.print("Are you sure you want to do this? (yes/no): ");
+ Scanner in = new Scanner(System.in);
+ while (true) {
+ try {
+ String input = in.nextLine();
+ if ("yes".equals(input)) {
+ return true;
+ } else if ("no".equals(input)) {
+ return false;
+ } else {
+ System.out.println("Please type yes or no");
+ }
+ } finally {
+ in.close();
+ }
+ }
+ }
+
+ /**
+ * Lists the deployed instances of AsterixDB on a YARN cluster
+ *
+ * @param conf
+ * Hadoop configuration object
+ * @param confDirRel
+ * Relative AsterixDB configuration path for DFS
+ * @throws IOException
+ */
+
+ public static void listInstances(Configuration conf, String confDirRel) throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ Path instanceFolder = new Path(fs.getHomeDirectory(), confDirRel);
+ if (!fs.exists(instanceFolder)) {
+ System.out.println("No running or stopped AsterixDB instances exist in this cluster.");
+ return;
+ }
+ FileStatus[] instances = fs.listStatus(instanceFolder);
+ if (instances.length != 0) {
+ System.out.println("Existing AsterixDB instances: ");
+ for (int i = 0; i < instances.length; i++) {
+ FileStatus st = instances[i];
+ String name = st.getPath().getName();
+ ApplicationId lockFile = AsterixYARNClient.getLockFile(name, conf);
+ if (lockFile != null) {
+ System.out.println("Instance " + name + " is running with Application ID: " + lockFile.toString());
+ } else {
+ System.out.println("Instance " + name + " is stopped");
+ }
+ }
+ } else {
+ System.out.println("No running or stopped AsterixDB instances exist in this cluster");
+ }
+ }
+
+ /**
+ * Lists the backups in the DFS.
+ *
+ * @param conf
+ * YARN configuration
+ * @param confDirRel
+ * Relative config path
+ * @param instance
+ * Instance name
+ * @throws IOException
+ */
+ public static void listBackups(Configuration conf, String confDirRel, String instance) throws IOException {
+ List<String> backups = getBackups(conf,confDirRel,instance);
+ if (backups.size() != 0) {
+ System.out.println("Backups for instance " + instance + ": ");
+ for (String name : backups) {
+ System.out.println("Backup: " + name);
+ }
+ } else {
+ System.out.println("No backups found for instance " + instance + ".");
+ }
+ }
+ /**
+ * Return the available snapshot names
+ * @param conf
+ * @param confDirRel
+ * @param instance
+ * @return
+ * @throws IOException
+ */
+ public static List<String> getBackups(Configuration conf, String confDirRel, String instance) throws IOException{
+ FileSystem fs = FileSystem.get(conf);
+ Path backupFolder = new Path(fs.getHomeDirectory(), confDirRel + "/" + instance + "/" + "backups");
+ FileStatus[] backups = fs.listStatus(backupFolder);
+ List<String> backupNames = new ArrayList<String>();
+ for(FileStatus f: backups){
+ backupNames.add(f.getPath().getName());
+ }
+ return backupNames;
+ }
+
+ /**
+ * Removes backup snapshots from the DFS
+ *
+ * @param conf
+ * DFS Configuration
+ * @param confDirRel
+ * Configuration relative directory
+ * @param instance
+ * The asterix instance name
+ * @param timestamp
+ * The snapshot timestap (ID)
+ * @throws IOException
+ */
+ public static void rmBackup(Configuration conf, String confDirRel, String instance, long timestamp)
+ throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ Path backupFolder = new Path(fs.getHomeDirectory(), confDirRel + "/" + instance + "/" + "backups");
+ FileStatus[] backups = fs.listStatus(backupFolder);
+ if (backups.length != 0) {
+ System.out.println("Backups for instance " + instance + ": ");
+ } else {
+ System.out.println("No backups found for instance " + instance + ".");
+ }
+ for (FileStatus f : backups) {
+ String name = f.getPath().getName();
+ long file_ts = Long.parseLong(name);
+ if (file_ts == timestamp) {
+ System.out.println("Deleting backup " + timestamp);
+ if (!fs.delete(f.getPath(), true)) {
+ System.out.println("Backup could not be deleted");
+ return;
+ } else {
+ return;
+ }
+ }
+ }
+ System.out.println("No backup found with specified timestamp");
+
+ }
+
+ /**
+ * Simply parses out the YARN cluster config and instantiates it into a nice object.
+ *
+ * @return The object representing the configuration
+ * @throws FileNotFoundException
+ * @throws JAXBException
+ */
+ public static Cluster parseYarnClusterConfig(String path) throws YarnException {
+ try {
+ File f = new File(path);
+ JAXBContext configCtx = JAXBContext.newInstance(Cluster.class);
+ Unmarshaller unmarshaller = configCtx.createUnmarshaller();
+ Cluster cl = (Cluster) unmarshaller.unmarshal(f);
+ return cl;
+ } catch (JAXBException e) {
+ throw new YarnException(e);
+ }
+ }
+
+ public static void writeYarnClusterConfig(String path, Cluster cl) throws YarnException {
+ try {
+ File f = new File(path);
+ JAXBContext configCtx = JAXBContext.newInstance(Cluster.class);
+ Marshaller marhsaller = configCtx.createMarshaller();
+ marhsaller.marshal(cl, f);
+ } catch (JAXBException e) {
+ throw new YarnException(e);
+ }
+ }
+
+ /**
+ * Looks in the current class path for AsterixDB libraries and gets the version number from the name of the first match.
+ *
+ * @return The version found, as a string.
+ */
+
+ public static String getAsterixVersionFromClasspath() {
+ String[] cp = System.getProperty("java.class.path").split(System.getProperty("path.separator"));
+ String asterixJarPattern = "^(asterix).*(jar)$"; //starts with asterix,ends with jar
+
+ for (String j : cp) {
+ //escape backslashes for windows
+ String[] pathComponents = j.split(Pattern.quote(File.separator));
+ if (pathComponents[pathComponents.length - 1].matches(asterixJarPattern)) {
+ //get components of maven version
+ String[] byDash = pathComponents[pathComponents.length - 1].split("-");
+ //get the version number but remove the possible '.jar' tailing it
+ String version = (byDash[2].split("\\."))[0];
+ //SNAPSHOT suffix
+ if (byDash.length == 4) {
+ //do the same if it's a snapshot suffix
+ return version + '-' + (byDash[3].split("\\."))[0];
+ }
+ //stable version
+ return version;
+ }
+ }
+ return null;
+ }
+
+ public static boolean waitForLiveness(ApplicationId appId, boolean probe, boolean print, String message,
+ YarnClient yarnClient, String instanceName, Configuration conf, int port) throws YarnException {
+ ApplicationReport report;
+ try {
+ report = yarnClient.getApplicationReport(appId);
+ } catch (IOException e) {
+ throw new YarnException(e);
+ }
+ YarnApplicationState st = report.getYarnApplicationState();
+ for (int i = 0; i < 120; i++) {
+ if (st != YarnApplicationState.RUNNING) {
+ try {
+ report = yarnClient.getApplicationReport(appId);
+ st = report.getYarnApplicationState();
+ if (print) {
+ System.out.print(message + Utils.makeDots(i) + "\r");
+ }
+ Thread.sleep(1000);
+ } catch (InterruptedException e1) {
+ Thread.currentThread().interrupt();
+ } catch (IOException e1) {
+ throw new YarnException(e1);
+ }
+ if (st == YarnApplicationState.FAILED || st == YarnApplicationState.FINISHED
+ || st == YarnApplicationState.KILLED) {
+ return false;
+ }
+ }
+ if (probe) {
+ String host;
+ host = getCCHostname(instanceName, conf);
+ try {
+ for (int j = 0; j < 60; j++) {
+ if (!Utils.probeLiveness(host, port)) {
+ try {
+ if (print) {
+ System.out.print(message + Utils.makeDots(i) + "\r");
+ }
+ Thread.sleep(1000);
+ } catch (InterruptedException e2) {
+ Thread.currentThread().interrupt();
+ }
+ } else {
+ if (print) {
+ System.out.println("");
+ }
+ return true;
+ }
+ }
+ } catch (IOException e1) {
+ throw new YarnException(e1);
+ }
+ } else {
+ if (print) {
+ System.out.println("");
+ }
+ return true;
+ }
+ }
+ if (print) {
+ System.out.println("");
+ }
+ return false;
+ }
+
+ public static boolean waitForLiveness(ApplicationId appId, String message, YarnClient yarnClient,
+ String instanceName, Configuration conf, int port) throws YarnException, IOException {
+ return waitForLiveness(appId, true, true, message, yarnClient, instanceName, conf, port);
+ }
+
+ public static boolean waitForApplication(ApplicationId appId, YarnClient yarnClient, String message, int port)
+ throws YarnException, IOException {
+ return waitForLiveness(appId, false, true, message, yarnClient, "", null, port);
+ }
+
+ public static boolean waitForApplication(ApplicationId appId, YarnClient yarnClient, int port) throws YarnException,
+ IOException, JAXBException {
+ return waitForLiveness(appId, false, false, "", yarnClient, "", null, port);
+ }
+
+ public static String getCCHostname(String instanceName, Configuration conf) throws YarnException {
+ try {
+ FileSystem fs = FileSystem.get(conf);
+ String instanceFolder = instanceName + "/";
+ String pathSuffix = CONF_DIR_REL + instanceFolder + "cluster-config.xml";
+ Path dstConf = new Path(fs.getHomeDirectory(), pathSuffix);
+ File tmp = File.createTempFile("cluster-config", "xml");
+ tmp.deleteOnExit();
+ fs.copyToLocalFile(dstConf, new Path(tmp.getPath()));
+ JAXBContext clusterConf = JAXBContext.newInstance(Cluster.class);
+ Unmarshaller unm = clusterConf.createUnmarshaller();
+ Cluster cl = (Cluster) unm.unmarshal(tmp);
+ String ccIp = cl.getMasterNode().getClientIp();
+ return ccIp;
+ } catch (IOException | JAXBException e) {
+ throw new YarnException(e);
+ }
+ }
+
+ public static AsterixConfiguration loadAsterixConfig(String path) throws IOException {
+ File f = new File(path);
+ try {
+ JAXBContext configCtx = JAXBContext.newInstance(AsterixConfiguration.class);
+ Unmarshaller unmarshaller = configCtx.createUnmarshaller();
+ AsterixConfiguration conf = (AsterixConfiguration) unmarshaller.unmarshal(f);
+ return conf;
+ } catch (JAXBException e) {
+ throw new IOException(e);
+ }
+ }
+
+}
diff --git a/asterix-yarn/src/main/resources/base-asterix-configuration.xml b/asterix-yarn/src/main/resources/base-asterix-configuration.xml
new file mode 100644
index 0000000..76c00db
--- /dev/null
+++ b/asterix-yarn/src/main/resources/base-asterix-configuration.xml
@@ -0,0 +1,246 @@
+<!--
+ ! Copyright 2009-2013 by The Regents of the University of California
+ ! Licensed under the Apache License, Version 2.0 (the "License");
+ ! you may not use this file except in compliance with the License.
+ ! you may obtain a copy of the License from
+ !
+ ! http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing, software
+ ! distributed under the License is distributed on an "AS IS" BASIS,
+ ! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ! See the License for the specific language governing permissions and
+ ! limitations under the License.
+ !-->
+<asterixConfiguration xmlns="asterixconf">
+
+ <property>
+ <name>nc.java.opts</name>
+ <value>-Xmx1536m</value>
+ <description>JVM parameters for each Node Contoller (NC)</description>
+ </property>
+
+ <property>
+ <name>cc.java.opts</name>
+ <value>-Xmx1024m</value>
+ <description>JVM parameters for each Cluster Contoller (CC)
+ </description>
+ </property>
+
+ <property>
+ <name>max.wait.active.cluster</name>
+ <value>60</value>
+ <description>Maximum wait (in seconds) for a cluster to be ACTIVE (all nodes are available)
+ before a submitted query/statement can be executed. (Default = 60 seconds)
+ </description>
+ </property>
+
+ <property>
+ <name>storage.buffercache.pagesize</name>
+ <value>131072</value>
+ <description>The page size in bytes for pages in the buffer cache.
+ (Default = "131072" // 128KB)
+ </description>
+ </property>
+
+ <property>
+ <name>storage.buffercache.size</name>
+ <value>536870912</value>
+ <description>The size of memory allocated to the disk buffer cache.
+ The value should be a multiple of the buffer cache page size(Default
+ = "536870912" // 512MB)
+ </description>
+ </property>
+
+ <property>
+ <name>storage.buffercache.maxopenfiles</name>
+ <value>214748364</value>
+ <description>The maximum number of open files in the buffer cache.
+ (Default = "214748364")
+ </description>
+ </property>
+
+ <property>
+ <name>storage.memorycomponent.pagesize</name>
+ <value>131072</value>
+ <description>The page size in bytes for pages allocated to memory
+ components. (Default = "131072" // 128KB)
+ </description>
+ </property>
+
+ <property>
+ <name>storage.memorycomponent.numpages</name>
+ <value>256</value>
+ <description>The number of pages to allocate for a memory component.
+ (Default = 256)
+ </description>
+ </property>
+
+ <property>
+ <name>storage.metadata.memorycomponent.numpages</name>
+ <value>64</value>
+ <description>The number of pages to allocate for a memory component.
+ (Default = 64)
+ </description>
+ </property>
+
+ <property>
+ <name>storage.memorycomponent.numcomponents</name>
+ <value>2</value>
+ <description>The number of memory components to be used per lsm index.
+ (Default = 2)
+ </description>
+ </property>
+
+ <property>
+ <name>storage.memorycomponent.globalbudget</name>
+ <value>536870912</value>
+ <description>The total size of memory in bytes that the sum of all
+ open memory
+ components cannot exceed. (Default = "536870192" // 512MB)
+ </description>
+ </property>
+
+ <property>
+ <name>storage.lsm.bloomfilter.falsepositiverate</name>
+ <value>0.01</value>
+ <description>The maximum acceptable false positive rate for bloom
+ filters associated with LSM indexes. (Default = "0.01" // 1%)
+ </description>
+ </property>
+
+ <property>
+ <name>txn.log.buffer.numpages</name>
+ <value>8</value>
+ <description>The number of in-memory log buffer pages. (Default = "8")
+ </description>
+ </property>
+
+ <property>
+ <name>txn.log.buffer.pagesize</name>
+ <value>524288</value>
+ <description>The size of pages in the in-memory log buffer. (Default =
+ "524288" // 512KB)
+ </description>
+ </property>
+
+ <property>
+ <name>txn.log.partitionsize</name>
+ <value>2147483648</value>
+ <description>The maximum size of a log file partition allowed before
+ rotating the log to the next partition. (Default = "2147483648" //
+ 2GB)
+ </description>
+ </property>
+
+ <property>
+ <name>txn.log.checkpoint.lsnthreshold</name>
+ <value>67108864</value>
+ <description>The size of the window that the maximum LSN is allowed to
+ be ahead of the checkpoint LSN by. (Default = ""67108864" // 64M)
+ </description>
+ </property>
+
+ <property>
+ <name>txn.log.checkpoint.pollfrequency</name>
+ <value>120</value>
+ <description>The time in seconds between that the checkpoint thread
+ waits between polls. (Default = "120" // 120s)
+ </description>
+ </property>
+
+ <property>
+ <name>txn.log.checkpoint.history</name>
+ <value>0</value>
+ <description>The number of old log partition files to keep before
+ discarding. (Default = "0")
+ </description>
+ </property>
+
+ <property>
+ <name>txn.lock.escalationthreshold</name>
+ <value>1000</value>
+ <description>The number of entity level locks that need to be acquired
+ before the locks are coalesced and escalated into a dataset level
+ lock. (Default = "1000")
+ </description>
+ </property>
+
+ <property>
+ <name>txn.lock.shrinktimer</name>
+ <value>5000</value>
+ <description>The time in milliseconds to wait before deallocating
+ unused lock manager memory. (Default = "5000" // 5s)
+ </description>
+ </property>
+
+ <property>
+ <name>txn.lock.timeout.waitthreshold</name>
+ <value>60000</value>
+ <description>The time in milliseconds to wait before labeling a
+ transaction which has been waiting for a lock timed-out. (Default =
+ "60000" // 60s)
+ </description>
+ </property>
+
+ <property>
+ <name>txn.lock.timeout.sweepthreshold</name>
+ <value>10000</value>
+ <description>The time in milliseconds the timeout thread waits between
+ sweeps to detect timed-out transactions. (Default = "10000" // 10s)
+ </description>
+ </property>
+
+ <property>
+ <name>compiler.sortmemory</name>
+ <value>33554432</value>
+ <description>The amount of memory in bytes given to sort operations.
+ (Default = "33554432" // 32mb)
+ </description>
+ </property>
+
+ <property>
+ <name>compiler.joinmemory</name>
+ <value>33554432</value>
+ <description>The amount of memory in bytes given to join operations.
+ (Default = "33554432" // 32mb)
+ </description>
+ </property>
+
+ <property>
+ <name>compiler.framesize</name>
+ <value>131072</value>
+ <description>The Hyracks frame size that the compiler configures per
+ job. (Default = "131072" // 128KB)
+ </description>
+ </property>
+
+ <property>
+ <name>web.port</name>
+ <value>19001</value>
+ <description>The port for the ASTERIX web interface. (Default = 19001)
+ </description>
+ </property>
+
+ <property>
+ <name>api.port</name>
+ <value>19002</value>
+ <description>The port for the ASTERIX API server. (Default = 19002)
+ </description>
+ </property>
+
+ <property>
+ <name>log.level</name>
+ <value>INFO</value>
+ <description>The minimum log level to be displayed. (Default = INFO)
+ </description>
+ </property>
+
+ <property>
+ <name>plot.activate</name>
+ <value>false</value>
+ <description>Enabling plot of Algebricks plan to tmp folder. (Default = false)
+ </description>
+ </property>
+
+</asterixConfiguration>
diff --git a/asterix-yarn/src/main/resources/configs/asterix-client-log4j.properties b/asterix-yarn/src/main/resources/configs/asterix-client-log4j.properties
new file mode 100644
index 0000000..4b936b5
--- /dev/null
+++ b/asterix-yarn/src/main/resources/configs/asterix-client-log4j.properties
@@ -0,0 +1,23 @@
+#/*
+# Copyright 2009-2013 by The Regents of the University of California
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# you may obtain a copy of the License from
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#*/
+log4j.rootLogger=info, A1
+
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+# Print the date in ISO 8601 format
+log4j.appender.A1.layout.ConversionPattern=%-p: %m%n
+
+log4j.logger.edu.uci.ics.asterix.event.management=info
+log4j.logger.org.apache.zookeeper=info
diff --git a/asterix-yarn/src/main/resources/configs/base-asterix-configuration.xml b/asterix-yarn/src/main/resources/configs/base-asterix-configuration.xml
new file mode 100644
index 0000000..76c00db
--- /dev/null
+++ b/asterix-yarn/src/main/resources/configs/base-asterix-configuration.xml
@@ -0,0 +1,246 @@
+<!--
+ ! Copyright 2009-2013 by The Regents of the University of California
+ ! Licensed under the Apache License, Version 2.0 (the "License");
+ ! you may not use this file except in compliance with the License.
+ ! you may obtain a copy of the License from
+ !
+ ! http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing, software
+ ! distributed under the License is distributed on an "AS IS" BASIS,
+ ! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ! See the License for the specific language governing permissions and
+ ! limitations under the License.
+ !-->
+<asterixConfiguration xmlns="asterixconf">
+
+ <property>
+ <name>nc.java.opts</name>
+ <value>-Xmx1536m</value>
+ <description>JVM parameters for each Node Contoller (NC)</description>
+ </property>
+
+ <property>
+ <name>cc.java.opts</name>
+ <value>-Xmx1024m</value>
+ <description>JVM parameters for each Cluster Contoller (CC)
+ </description>
+ </property>
+
+ <property>
+ <name>max.wait.active.cluster</name>
+ <value>60</value>
+ <description>Maximum wait (in seconds) for a cluster to be ACTIVE (all nodes are available)
+ before a submitted query/statement can be executed. (Default = 60 seconds)
+ </description>
+ </property>
+
+ <property>
+ <name>storage.buffercache.pagesize</name>
+ <value>131072</value>
+ <description>The page size in bytes for pages in the buffer cache.
+ (Default = "131072" // 128KB)
+ </description>
+ </property>
+
+ <property>
+ <name>storage.buffercache.size</name>
+ <value>536870912</value>
+ <description>The size of memory allocated to the disk buffer cache.
+ The value should be a multiple of the buffer cache page size(Default
+ = "536870912" // 512MB)
+ </description>
+ </property>
+
+ <property>
+ <name>storage.buffercache.maxopenfiles</name>
+ <value>214748364</value>
+ <description>The maximum number of open files in the buffer cache.
+ (Default = "214748364")
+ </description>
+ </property>
+
+ <property>
+ <name>storage.memorycomponent.pagesize</name>
+ <value>131072</value>
+ <description>The page size in bytes for pages allocated to memory
+ components. (Default = "131072" // 128KB)
+ </description>
+ </property>
+
+ <property>
+ <name>storage.memorycomponent.numpages</name>
+ <value>256</value>
+ <description>The number of pages to allocate for a memory component.
+ (Default = 256)
+ </description>
+ </property>
+
+ <property>
+ <name>storage.metadata.memorycomponent.numpages</name>
+ <value>64</value>
+ <description>The number of pages to allocate for a memory component.
+ (Default = 64)
+ </description>
+ </property>
+
+ <property>
+ <name>storage.memorycomponent.numcomponents</name>
+ <value>2</value>
+ <description>The number of memory components to be used per lsm index.
+ (Default = 2)
+ </description>
+ </property>
+
+ <property>
+ <name>storage.memorycomponent.globalbudget</name>
+ <value>536870912</value>
+ <description>The total size of memory in bytes that the sum of all
+ open memory
+ components cannot exceed. (Default = "536870192" // 512MB)
+ </description>
+ </property>
+
+ <property>
+ <name>storage.lsm.bloomfilter.falsepositiverate</name>
+ <value>0.01</value>
+ <description>The maximum acceptable false positive rate for bloom
+ filters associated with LSM indexes. (Default = "0.01" // 1%)
+ </description>
+ </property>
+
+ <property>
+ <name>txn.log.buffer.numpages</name>
+ <value>8</value>
+ <description>The number of in-memory log buffer pages. (Default = "8")
+ </description>
+ </property>
+
+ <property>
+ <name>txn.log.buffer.pagesize</name>
+ <value>524288</value>
+ <description>The size of pages in the in-memory log buffer. (Default =
+ "524288" // 512KB)
+ </description>
+ </property>
+
+ <property>
+ <name>txn.log.partitionsize</name>
+ <value>2147483648</value>
+ <description>The maximum size of a log file partition allowed before
+ rotating the log to the next partition. (Default = "2147483648" //
+ 2GB)
+ </description>
+ </property>
+
+ <property>
+ <name>txn.log.checkpoint.lsnthreshold</name>
+ <value>67108864</value>
+ <description>The size of the window that the maximum LSN is allowed to
+ be ahead of the checkpoint LSN by. (Default = ""67108864" // 64M)
+ </description>
+ </property>
+
+ <property>
+ <name>txn.log.checkpoint.pollfrequency</name>
+ <value>120</value>
+ <description>The time in seconds between that the checkpoint thread
+ waits between polls. (Default = "120" // 120s)
+ </description>
+ </property>
+
+ <property>
+ <name>txn.log.checkpoint.history</name>
+ <value>0</value>
+ <description>The number of old log partition files to keep before
+ discarding. (Default = "0")
+ </description>
+ </property>
+
+ <property>
+ <name>txn.lock.escalationthreshold</name>
+ <value>1000</value>
+ <description>The number of entity level locks that need to be acquired
+ before the locks are coalesced and escalated into a dataset level
+ lock. (Default = "1000")
+ </description>
+ </property>
+
+ <property>
+ <name>txn.lock.shrinktimer</name>
+ <value>5000</value>
+ <description>The time in milliseconds to wait before deallocating
+ unused lock manager memory. (Default = "5000" // 5s)
+ </description>
+ </property>
+
+ <property>
+ <name>txn.lock.timeout.waitthreshold</name>
+ <value>60000</value>
+ <description>The time in milliseconds to wait before labeling a
+ transaction which has been waiting for a lock timed-out. (Default =
+ "60000" // 60s)
+ </description>
+ </property>
+
+ <property>
+ <name>txn.lock.timeout.sweepthreshold</name>
+ <value>10000</value>
+ <description>The time in milliseconds the timeout thread waits between
+ sweeps to detect timed-out transactions. (Default = "10000" // 10s)
+ </description>
+ </property>
+
+ <property>
+ <name>compiler.sortmemory</name>
+ <value>33554432</value>
+ <description>The amount of memory in bytes given to sort operations.
+ (Default = "33554432" // 32mb)
+ </description>
+ </property>
+
+ <property>
+ <name>compiler.joinmemory</name>
+ <value>33554432</value>
+ <description>The amount of memory in bytes given to join operations.
+ (Default = "33554432" // 32mb)
+ </description>
+ </property>
+
+ <property>
+ <name>compiler.framesize</name>
+ <value>131072</value>
+ <description>The Hyracks frame size that the compiler configures per
+ job. (Default = "131072" // 128KB)
+ </description>
+ </property>
+
+ <property>
+ <name>web.port</name>
+ <value>19001</value>
+ <description>The port for the ASTERIX web interface. (Default = 19001)
+ </description>
+ </property>
+
+ <property>
+ <name>api.port</name>
+ <value>19002</value>
+ <description>The port for the ASTERIX API server. (Default = 19002)
+ </description>
+ </property>
+
+ <property>
+ <name>log.level</name>
+ <value>INFO</value>
+ <description>The minimum log level to be displayed. (Default = INFO)
+ </description>
+ </property>
+
+ <property>
+ <name>plot.activate</name>
+ <value>false</value>
+ <description>Enabling plot of Algebricks plan to tmp folder. (Default = false)
+ </description>
+ </property>
+
+</asterixConfiguration>
diff --git a/asterix-yarn/src/main/resources/configs/local.xml b/asterix-yarn/src/main/resources/configs/local.xml
new file mode 100644
index 0000000..582254c
--- /dev/null
+++ b/asterix-yarn/src/main/resources/configs/local.xml
@@ -0,0 +1,31 @@
+ <cluster xmlns="yarn_cluster">
+
+ <!-- Name of the cluster -->
+ <name>local</name>
+
+ <log_dir>/tmp/</log_dir>
+ <txn_log_dir>/tmp/</txn_log_dir>
+
+ <!-- Mount point of an iodevice. Use a comma separated list for a machine that
+ has multiple iodevices (disks).
+ This property can be overriden for a node by redefining at the node level. -->
+ <iodevices>/tmp</iodevices>
+
+ <!-- Path on each iodevice where Asterix will store its data -->
+ <store>storage</store>
+
+ <!-- IP addresses of the master machine A -->
+ <master_node>
+ <id>cc</id>
+ <client_ip>localhost</client_ip>
+ <cluster_ip>localhost</cluster_ip>
+ <client_port>1098</client_port>
+ <cluster_port>1099</cluster_port>
+ <http_port>8888</http_port>
+ </master_node>
+ <node>
+ <id>nc1</id>
+ <cluster_ip>127.0.0.1</cluster_ip>
+ </node>
+ <metadata_node>nc1</metadata_node>
+</cluster>
diff --git a/asterix-yarn/src/main/resources/configs/my_awesome_cluster_desc.xml b/asterix-yarn/src/main/resources/configs/my_awesome_cluster_desc.xml
new file mode 100644
index 0000000..749bc6f
--- /dev/null
+++ b/asterix-yarn/src/main/resources/configs/my_awesome_cluster_desc.xml
@@ -0,0 +1,23 @@
+ <cluster xmlns="yarn_cluster">
+ <name>my_awesome_instance</name>
+ <txn_log_dir>/home/yarn/</txn_log_dir>
+ <iodevices>/home/yarn/</iodevices>
+ <store>asterix-data</store>
+ <master_node>
+ <id>cc</id>
+ <client_ip>10.10.0.2</client_ip>
+ <cluster_ip>10.10.0.2</cluster_ip>
+ <client_port>1098</client_port>
+ <cluster_port>1099</cluster_port>
+ <http_port>8888</http_port>
+ </master_node>
+ <node>
+ <id>nc1</id>
+ <cluster_ip>10.10.0.3</cluster_ip>
+ </node>
+ <node>
+ <id>nc2</id>
+ <cluster_ip>10.10.0.4</cluster_ip>
+ </node>
+ <metadata_node>nc1</metadata_node>
+ </cluster>
diff --git a/asterix-yarn/src/main/resources/scripts/asterix b/asterix-yarn/src/main/resources/scripts/asterix
new file mode 100644
index 0000000..c2fc20f
--- /dev/null
+++ b/asterix-yarn/src/main/resources/scripts/asterix
@@ -0,0 +1,25 @@
+#!/usr/bin/env bash
+if [ -z $ASTERIX_HOME ]
+ then
+ pushd $(dirname $0) >/dev/null
+ cd ..
+ export ASTERIX_HOME=$(pwd)
+ popd >/dev/null
+fi
+
+for jar in `ls $ASTERIX_HOME/lib/*.jar`
+ do
+ if [ -z $ASTERIX_CLASSPATH ]
+ then
+ ASTERIX_CLASSPATH=$jar
+ else
+ ASTERIX_CLASSPATH=$ASTERIX_CLASSPATH:$jar
+ fi
+done
+
+ASTERIX_CLASSPATH=$ASTERIX_CLASSPATH:
+ASTERIX_CLASSPATH=$ASTERIX_CLASSPATH:$YARN_CONF_DIR:$HADOOP_CONF_DIR:$HADOOP_CONF_PATH
+pushd $(dirname $0) > /dev/null
+cd $ASTERIX_HOME
+java $JAVA_OPTS -cp $ASTERIX_CLASSPATH -Dlog4j.configuration=file://$ASTERIX_HOME/conf/asterix-client-log4j.properties edu.uci.ics.asterix.aoya.AsterixYARNClient $@
+popd > /dev/null
diff --git a/asterix-yarn/src/main/resources/scripts/asterix.cmd b/asterix-yarn/src/main/resources/scripts/asterix.cmd
new file mode 100644
index 0000000..a85e411
--- /dev/null
+++ b/asterix-yarn/src/main/resources/scripts/asterix.cmd
@@ -0,0 +1,103 @@
+@echo off
+@rem Licensed to the Apache Software Foundation (ASF) under one or more
+@rem contributor license agreements. See the NOTICE file distributed with
+@rem this work for additional information regarding copyright ownership.
+@rem The ASF licenses this file to You under the Apache License, Version 2.0
+@rem (the "License"); you may not use this file except in compliance with
+@rem the License. You may obtain a copy of the License at
+@rem
+@rem http://www.apache.org/licenses/LICENSE-2.0
+@rem
+@rem Unless required by applicable law or agreed to in writing, software
+@rem distributed under the License is distributed on an "AS IS" BASIS,
+@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+@rem See the License for the specific language governing permissions and
+@rem limitations under the License.
+
+setlocal enabledelayedexpansion
+
+if not defined HADOOP_BIN_PATH (
+ set HADOOP_BIN_PATH=%~dp0
+)
+
+if "%HADOOP_BIN_PATH:~-1%" == "\" (
+ set HADOOP_BIN_PATH=%HADOOP_BIN_PATH:~0,-1%
+)
+
+set DEFAULT_LIBEXEC_DIR=%HADOOP_BIN_PATH%\..\libexec
+if not defined HADOOP_LIBEXEC_DIR (
+ set HADOOP_LIBEXEC_DIR=%DEFAULT_LIBEXEC_DIR%
+)
+
+:main
+
+ set CLASS=edu.uci.ics.asterix.aoya.AsterixYARNClient
+
+ @rem JAVA and JAVA_HEAP_MAX and set in hadoop-config.cmd
+
+ if defined YARN_HEAPSIZE (
+ @rem echo run with Java heapsize %YARN_HEAPSIZE%
+ set JAVA_HEAP_MAX=-Xmx%YARN_HEAPSIZE%m
+ )
+
+ @rem CLASSPATH initially contains HADOOP_CONF_DIR & YARN_CONF_DIR
+ if not defined HADOOP_CONF_DIR (
+ echo No HADOOP_CONF_DIR set.
+ echo Please specify it either in yarn-env.cmd or in the environment.
+ goto :eof
+ )
+
+ set CLASSPATH=%HADOOP_CONF_DIR%;%YARN_CONF_DIR%;%CLASSPATH%;Z:\lib\*
+
+ @rem for developers, add Hadoop classes to CLASSPATH
+ if exist %HADOOP_YARN_HOME%\yarn-api\target\classes (
+ set CLASSPATH=%CLASSPATH%;%HADOOP_YARN_HOME%\yarn-api\target\classes
+ )
+
+ if exist %HADOOP_YARN_HOME%\yarn-common\target\classes (
+ set CLASSPATH=%CLASSPATH%;%HADOOP_YARN_HOME%\yarn-common\target\classes
+ )
+
+ if exist %HADOOP_YARN_HOME%\yarn-mapreduce\target\classes (
+ set CLASSPATH=%CLASSPATH%;%HADOOP_YARN_HOME%\yarn-mapreduce\target\classes
+ )
+
+ if exist %HADOOP_YARN_HOME%\yarn-master-worker\target\classes (
+ set CLASSPATH=%CLASSPATH%;%HADOOP_YARN_HOME%\yarn-master-worker\target\classes
+ )
+
+ if exist %HADOOP_YARN_HOME%\yarn-server\yarn-server-nodemanager\target\classes (
+ set CLASSPATH=%CLASSPATH%;%HADOOP_YARN_HOME%\yarn-server\yarn-server-nodemanager\target\classes
+ )
+
+ if exist %HADOOP_YARN_HOME%\yarn-server\yarn-server-common\target\classes (
+ set CLASSPATH=%CLASSPATH%;%HADOOP_YARN_HOME%\yarn-server\yarn-server-common\target\classes
+ )
+
+ if exist %HADOOP_YARN_HOME%\yarn-server\yarn-server-resourcemanager\target\classes (
+ set CLASSPATH=%CLASSPATH%;%HADOOP_YARN_HOME%\yarn-server\yarn-server-resourcemanager\target\classes
+ )
+
+ if exist %HADOOP_YARN_HOME%\build\test\classes (
+ set CLASSPATH=%CLASSPATH%;%HADOOP_YARN_HOME%\build\test\classes
+ )
+
+ if exist %HADOOP_YARN_HOME%\build\tools (
+ set CLASSPATH=%CLASSPATH%;%HADOOP_YARN_HOME%\build\tools
+ )
+
+ set CLASSPATH=%CLASSPATH%;%HADOOP_YARN_HOME%\%YARN_DIR%\*
+ set CLASSPATH=%CLASSPATH%;%HADOOP_YARN_HOME%\%YARN_LIB_JARS_DIR%\*
+
+
+
+ if defined JAVA_LIBRARY_PATH (
+ set YARN_OPTS=%YARN_OPTS% -Djava.library.path=%JAVA_LIBRARY_PATH%
+ )
+
+ set java_arguments=%JAVA_HEAP_MAX% %YARN_OPTS% -classpath %CLASSPATH% %CLASS%
+ call java %java_arguments% %*
+
+goto :eof
+
+endlocal
diff --git a/asterix-yarn/src/test/java/edu/uci/ics/asterix/aoya/test/AsterixYARNInstanceUtil.java b/asterix-yarn/src/test/java/edu/uci/ics/asterix/aoya/test/AsterixYARNInstanceUtil.java
new file mode 100644
index 0000000..fd16159
--- /dev/null
+++ b/asterix-yarn/src/test/java/edu/uci/ics/asterix/aoya/test/AsterixYARNInstanceUtil.java
@@ -0,0 +1,107 @@
+package edu.uci.ics.asterix.aoya.test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FilenameFilter;
+import java.util.logging.Logger;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.junit.Assert;
+
+import edu.uci.ics.asterix.aoya.AsterixYARNClient;
+import edu.uci.ics.asterix.aoya.Utils;
+import edu.uci.ics.asterix.event.schema.yarnCluster.Cluster;
+import edu.uci.ics.asterix.event.schema.yarnCluster.Node;
+
+public class AsterixYARNInstanceUtil {
+ private static final String PATH_ACTUAL = "ittest/";
+ private static final String INSTANCE_NAME = "asterix-integration-test";
+ private MiniYARNCluster miniCluster;
+ private YarnConfiguration appConf;
+ public String aoyaHome;
+ public String configPath;
+ public String aoyaServerPath;
+ public String parameterPath;
+
+ public YarnConfiguration setUp() throws Exception {
+ File asterixProjectDir = new File(System.getProperty("user.dir"));
+
+ File installerTargetDir = new File(asterixProjectDir, "target");
+
+ String[] dirsInTarget = installerTargetDir.list(new FilenameFilter() {
+ @Override
+ public boolean accept(File dir, String name) {
+ return new File(dir, name).isDirectory() && name.startsWith("asterix-yarn")
+ && name.endsWith("binary-assembly");
+ }
+
+ });
+ if (dirsInTarget.length != 1) {
+ throw new IllegalStateException("Could not find binary to run YARN integration test with");
+ }
+ aoyaHome = installerTargetDir.getAbsolutePath() + File.separator + dirsInTarget[0];
+ File asterixServerInstallerDir = new File(aoyaHome, "asterix");
+ String[] zipsInFolder = asterixServerInstallerDir.list(new FilenameFilter() {
+ @Override
+ public boolean accept(File dir, String name) {
+ return name.startsWith("asterix-server") && name.endsWith("binary-assembly.zip");
+ }
+ });
+ if (zipsInFolder.length != 1) {
+ throw new IllegalStateException("Could not find server binary to run YARN integration test with");
+ }
+ aoyaServerPath = asterixServerInstallerDir.getAbsolutePath() + File.separator + zipsInFolder[0];
+ configPath = aoyaHome + File.separator + "configs" + File.separator + "local.xml";
+ parameterPath = aoyaHome + File.separator + "conf" + File.separator + "base-asterix-configuration.xml";
+ YARNCluster.getInstance().setup();
+ appConf = new YarnConfiguration();
+ File baseDir = new File("./target/hdfs/").getAbsoluteFile();
+ FileUtil.fullyDelete(baseDir);
+ appConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
+ MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(appConf);
+ MiniDFSCluster hdfsCluster = builder.build();
+ miniCluster = YARNCluster.getInstance().getCluster();
+ appConf.set("fs.defaultFS", "hdfs://localhost:" + hdfsCluster.getNameNodePort());
+ miniCluster.init(appConf);
+ Cluster defaultConfig = Utils.parseYarnClusterConfig(configPath);
+ for (Node n : defaultConfig.getNode()) {
+ n.setClusterIp(MiniYARNCluster.getHostname());
+ }
+ defaultConfig.getMasterNode().setClusterIp(MiniYARNCluster.getHostname());
+ configPath = "target" + File.separator + "localized-aoya-config.xml";
+ Utils.writeYarnClusterConfig(configPath, defaultConfig);
+ miniCluster.start();
+ appConf = new YarnConfiguration(miniCluster.getConfig());
+ appConf.set("fs.defaultFS", "hdfs://localhost:" + hdfsCluster.getNameNodePort());
+ //TODO:why must I do this!? what is not being passed properly via environment variables???
+ appConf.writeXml(new FileOutputStream("target" + File.separator + "yarn-site.xml"));
+
+ //once the cluster is created, you can get its configuration
+ //with the binding details to the cluster added from the minicluster
+ FileSystem fs = FileSystem.get(appConf);
+ Path instanceState = new Path(fs.getHomeDirectory(), AsterixYARNClient.CONF_DIR_REL + INSTANCE_NAME + "/");
+ fs.delete(instanceState, true);
+ Assert.assertFalse(fs.exists(instanceState));
+
+ File outdir = new File(PATH_ACTUAL);
+ outdir.mkdirs();
+ return appConf;
+ }
+
+ public void tearDown() throws Exception {
+ FileSystem fs = FileSystem.get(appConf);
+ Path instance = new Path(fs.getHomeDirectory(), AsterixYARNClient.CONF_DIR_REL + "/");
+ fs.delete(instance, true);
+ miniCluster.close();
+ File outdir = new File(PATH_ACTUAL);
+ File[] files = outdir.listFiles();
+ if (files == null || files.length == 0) {
+ outdir.delete();
+ }
+ }
+}
\ No newline at end of file
diff --git a/asterix-yarn/src/test/java/edu/uci/ics/asterix/aoya/test/AsterixYARNLibraryTestIT.java b/asterix-yarn/src/test/java/edu/uci/ics/asterix/aoya/test/AsterixYARNLibraryTestIT.java
new file mode 100644
index 0000000..82f0818
--- /dev/null
+++ b/asterix-yarn/src/test/java/edu/uci/ics/asterix/aoya/test/AsterixYARNLibraryTestIT.java
@@ -0,0 +1,90 @@
+package edu.uci.ics.asterix.aoya.test;
+
+import java.io.File;
+import java.util.List;
+import java.util.logging.Logger;
+
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import edu.uci.ics.asterix.aoya.AsterixYARNClient;
+import edu.uci.ics.asterix.test.aql.TestsUtils;
+import edu.uci.ics.asterix.testframework.context.TestCaseContext;
+
+public class AsterixYARNLibraryTestIT {
+ private static final String LIBRARY_NAME = "testlib";
+ private static final String LIBRARY_DATAVERSE = "externallibtest";
+ private static final String INSTANCE_NAME = "asterix-lib-test";
+ private static final String PATH_BASE = "src/test/resources/library";
+ private static final String PATH_ACTUAL = "ittest/";
+ private static final Logger LOGGER = Logger.getLogger(AsterixYARNLifecycleIT.class.getName());
+ private static String configPath;
+ private static String aoyaServerPath;
+ private static String parameterPath;
+ private static AsterixYARNInstanceUtil instance;
+ private static YarnConfiguration appConf;
+ private static List<TestCaseContext> testCaseCollection;
+ private static final String LIBRARY_PATH = "asterix-external-data" + File.separator + "target" + File.separator
+ + "testlib-zip-binary-assembly.zip";
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ instance = new AsterixYARNInstanceUtil();
+ appConf = instance.setUp();
+ configPath = instance.configPath;
+ aoyaServerPath = instance.aoyaServerPath;
+ parameterPath = instance.parameterPath;
+
+ String command = "-n " + INSTANCE_NAME + " -c " + configPath + " -bc " + parameterPath + " -zip "
+ + aoyaServerPath + " install";
+ executeAoyaCommand(command);
+
+ command = "-n " + INSTANCE_NAME + " -bc " + parameterPath + " stop";
+ executeAoyaCommand(command);
+
+ String asterixExternalLibraryPath = new File(System.getProperty("user.dir")).getParentFile().getAbsolutePath()
+ + File.separator + LIBRARY_PATH;
+ command = "-n " + INSTANCE_NAME + " -l " + asterixExternalLibraryPath + " -ld " + LIBRARY_DATAVERSE + " -bc " + parameterPath + " libinstall";
+ executeAoyaCommand(command);
+
+ command = "-n " + INSTANCE_NAME + " -bc " + parameterPath + " start";
+ executeAoyaCommand(command);
+
+ TestCaseContext.Builder b = new TestCaseContext.Builder();
+ testCaseCollection = b.build(new File(PATH_BASE));
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ String command = "-n " + INSTANCE_NAME + " -zip " + aoyaServerPath + " -f" + " -bc " + parameterPath + " destroy";
+ executeAoyaCommand(command);
+ instance.tearDown();
+ }
+
+ @Test
+ public void test() throws Exception {
+ for (TestCaseContext testCaseCtx : testCaseCollection) {
+ TestsUtils.executeTest(PATH_ACTUAL, testCaseCtx, null, false);
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ try {
+ setUp();
+ new AsterixYARNLibraryTestIT().test();
+ } catch (Exception e) {
+ e.printStackTrace();
+ LOGGER.info("TEST CASES FAILED");
+ } finally {
+ tearDown();
+ }
+ }
+
+ static void executeAoyaCommand(String cmd) throws Exception {
+ AsterixYARNClient aoyaClient = new AsterixYARNClient(appConf);
+ aoyaClient.init(cmd.split(" "));
+ AsterixYARNClient.execute(aoyaClient);
+ }
+}
diff --git a/asterix-yarn/src/test/java/edu/uci/ics/asterix/aoya/test/AsterixYARNLifecycleIT.java b/asterix-yarn/src/test/java/edu/uci/ics/asterix/aoya/test/AsterixYARNLifecycleIT.java
new file mode 100644
index 0000000..781158d
--- /dev/null
+++ b/asterix-yarn/src/test/java/edu/uci/ics/asterix/aoya/test/AsterixYARNLifecycleIT.java
@@ -0,0 +1,162 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.aoya.test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FilenameFilter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.logging.Logger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runners.Parameterized.Parameters;
+import org.junit.FixMethodOrder;
+import org.junit.runners.MethodSorters;
+
+import edu.uci.ics.asterix.aoya.AsterixYARNClient;
+import edu.uci.ics.asterix.aoya.Utils;
+import edu.uci.ics.asterix.event.error.VerificationUtil;
+import edu.uci.ics.asterix.event.model.AsterixInstance;
+import edu.uci.ics.asterix.event.model.AsterixInstance.State;
+import edu.uci.ics.asterix.event.model.AsterixRuntimeState;
+import edu.uci.ics.asterix.event.schema.yarnCluster.Cluster;
+import edu.uci.ics.asterix.event.schema.yarnCluster.Node;
+import edu.uci.ics.asterix.event.service.ServiceProvider;
+import edu.uci.ics.asterix.test.aql.TestsUtils;
+import edu.uci.ics.asterix.aoya.test.YARNCluster;
+import edu.uci.ics.asterix.common.configuration.AsterixConfiguration;
+import edu.uci.ics.asterix.testframework.context.TestCaseContext;
+
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public class AsterixYARNLifecycleIT {
+
+ private static final String PATH_ACTUAL = "ittest/";
+ private static final Logger LOGGER = Logger.getLogger(AsterixYARNLifecycleIT.class.getName());
+ private static final String INSTANCE_NAME = "asterix-integration-test";
+ private static YarnConfiguration appConf;
+ private static String configPath;
+ private static String aoyaServerPath;
+ private static String parameterPath;
+ private static AsterixYARNInstanceUtil instance;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ instance = new AsterixYARNInstanceUtil();
+ appConf = instance.setUp();
+ configPath = instance.configPath;
+ aoyaServerPath = instance.aoyaServerPath;
+ parameterPath = instance.parameterPath;
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ instance.tearDown();
+ }
+
+ @Parameters
+ public static Collection<Object[]> tests() throws Exception {
+ Collection<Object[]> testArgs = new ArrayList<Object[]>();
+ return testArgs;
+ }
+
+ @Test
+ public void test_1_InstallActiveInstance() throws Exception {
+ String command = "-n " + INSTANCE_NAME + " -c " + configPath + " -bc " + parameterPath + " -zip "
+ + aoyaServerPath + " install";
+ executeAoyaCommand(command);
+ }
+
+ @Test
+ public void test_2_StopActiveInstance() throws Exception {
+ String command = "-n " + INSTANCE_NAME + " -bc " + parameterPath + " stop";
+ executeAoyaCommand(command);
+ }
+
+ @Test
+ public void test_3_BackupInActiveInstance() throws Exception {
+ String command = "-n " + INSTANCE_NAME + " -zip " + aoyaServerPath + " -f" + " backup";
+ executeAoyaCommand(command);
+ }
+
+ @Test
+ public void test_4_StartActiveInstance() throws Exception {
+ String command = "-n " + INSTANCE_NAME + " -bc " + parameterPath + " start";
+ executeAoyaCommand(command);
+ }
+
+ @Test
+ public void test_5_KillActiveInstance() throws Exception {
+ String command = "-n " + INSTANCE_NAME + " -bc " + parameterPath + " -f" + " stop";
+ executeAoyaCommand(command);
+ }
+
+ @Test
+ public void test_6_RestoreInActiveInstance() throws Exception {
+ List<String> backupNames = Utils.getBackups(appConf, ".asterix" + File.separator, INSTANCE_NAME);
+ if (backupNames.size() != 1) {
+ throw new IllegalStateException();
+ }
+ String command = "-n " + INSTANCE_NAME + " -zip " + aoyaServerPath + " -s" + backupNames.get(0) + " -f"
+ + " restore";
+ executeAoyaCommand(command);
+ }
+
+ @Test
+ public void test_7_StartRestoredInstance() throws Exception {
+ String command = "-n " + INSTANCE_NAME + " -bc " + parameterPath + " start";
+ executeAoyaCommand(command);
+ }
+
+ @Test
+ public void test_8_DeleteActiveInstance() throws Exception {
+ String command = "-n " + INSTANCE_NAME + " -zip " + aoyaServerPath + " -f" + " -bc " + parameterPath + " destroy";
+ executeAoyaCommand(command);
+ }
+
+ static void executeAoyaCommand(String cmd) throws Exception {
+ AsterixYARNClient aoyaClient = new AsterixYARNClient(appConf);
+ aoyaClient.init(cmd.split(" "));
+ AsterixYARNClient.execute(aoyaClient);
+ }
+
+ public static void main(String[] args) throws Exception {
+ try {
+ setUp();
+ new AsterixYARNLifecycleIT();
+ } catch (Exception e) {
+ e.printStackTrace();
+ LOGGER.info("TEST CASE(S) FAILED");
+ } finally {
+ tearDown();
+ }
+ }
+
+}
diff --git a/asterix-yarn/src/test/java/edu/uci/ics/asterix/aoya/test/YARNCluster.java b/asterix-yarn/src/test/java/edu/uci/ics/asterix/aoya/test/YARNCluster.java
new file mode 100644
index 0000000..a067d88
--- /dev/null
+++ b/asterix-yarn/src/test/java/edu/uci/ics/asterix/aoya/test/YARNCluster.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.aoya.test;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
+
+import edu.uci.ics.asterix.external.dataset.adapter.HDFSAdapter;
+
+/**
+ * Manages a Mini (local VM) YARN cluster with a configured number of NodeManager(s).
+ *
+ */
+@SuppressWarnings("deprecation")
+public class YARNCluster {
+
+ private static final String PATH_TO_HADOOP_CONF = "src/test/resources/hadoop/conf";
+ private static final int nameNodePort = 31888;
+ private static final String DATA_PATH = "data/hdfs";
+ private static final String HDFS_PATH = "/asterix";
+ private static final YARNCluster INSTANCE = new YARNCluster();
+
+ private MiniYARNCluster miniCluster;
+ private int numDataNodes = 2;
+ private Configuration conf = new YarnConfiguration();
+ private FileSystem dfs;
+
+ public static YARNCluster getInstance() {
+ return INSTANCE;
+ }
+
+ private YARNCluster() {
+
+ }
+
+ /**
+ * Instantiates the (Mini) DFS Cluster with the configured number of datanodes.
+ * Post instantiation, data is laoded to HDFS.
+ * Called prior to running the Runtime test suite.
+ */
+ public void setup() throws Exception {
+ conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/core-site.xml"));
+ conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/mapred-site.xml"));
+ conf.addResource(new Path(PATH_TO_HADOOP_CONF + "/hdfs-site.xml"));
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 64);
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, ResourceScheduler.class);
+ conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, "target/integrationts/data");
+ cleanupLocal();
+ //this constructor is deprecated in hadoop 2x
+ //dfsCluster = new MiniDFSCluster(nameNodePort, conf, numDataNodes, true, true, StartupOption.REGULAR, null);
+ miniCluster = new MiniYARNCluster("Asterix_testing", numDataNodes, 1, 1);
+ miniCluster.init(conf);
+ dfs = FileSystem.get(conf);
+ }
+
+ public MiniYARNCluster getCluster(){
+ return miniCluster;
+ }
+
+ private void cleanupLocal() throws IOException {
+ // cleanup artifacts created on the local file system
+ FileSystem lfs = FileSystem.getLocal(new Configuration());
+ lfs.delete(new Path("build"), true);
+ System.setProperty("hadoop.log.dir", "logs");
+ }
+
+ public void cleanup() throws Exception {
+ if (miniCluster != null) {
+ miniCluster.close();
+ cleanupLocal();
+ }
+ }
+
+}
diff --git a/asterix-yarn/src/test/resources/hadoop/conf/core-site.xml b/asterix-yarn/src/test/resources/hadoop/conf/core-site.xml
new file mode 100644
index 0000000..0eca8e4
--- /dev/null
+++ b/asterix-yarn/src/test/resources/hadoop/conf/core-site.xml
@@ -0,0 +1,29 @@
+<?xml version="1.0"?>
+<!--
+ ! Copyright 2009-2013 by The Regents of the University of California
+ ! Licensed under the Apache License, Version 2.0 (the "License");
+ ! you may not use this file except in compliance with the License.
+ ! you may obtain a copy of the License from
+ !
+ ! http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing, software
+ ! distributed under the License is distributed on an "AS IS" BASIS,
+ ! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ! See the License for the specific language governing permissions and
+ ! limitations under the License.
+ !-->
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+
+
+<property>
+ <name>hadoop.tmp.dir</name>
+ <value>/tmp/hadoop</value>
+</property>
+
+
+</configuration>
diff --git a/asterix-yarn/src/test/resources/hadoop/conf/hdfs-site.xml b/asterix-yarn/src/test/resources/hadoop/conf/hdfs-site.xml
new file mode 100644
index 0000000..4f3f777
--- /dev/null
+++ b/asterix-yarn/src/test/resources/hadoop/conf/hdfs-site.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0"?>
+<!--
+ ! Copyright 2009-2013 by The Regents of the University of California
+ ! Licensed under the Apache License, Version 2.0 (the "License");
+ ! you may not use this file except in compliance with the License.
+ ! you may obtain a copy of the License from
+ !
+ ! http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing, software
+ ! distributed under the License is distributed on an "AS IS" BASIS,
+ ! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ! See the License for the specific language governing permissions and
+ ! limitations under the License.
+ !-->
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+
+<property>
+ <name>dfs.replication</name>
+ <value>1</value>
+</property>
+
+<property>
+ <name>dfs.block.size</name>
+ <value>1048576</value>
+</property>
+
+</configuration>
diff --git a/asterix-yarn/src/test/resources/hadoop/conf/log4j.properties b/asterix-yarn/src/test/resources/hadoop/conf/log4j.properties
new file mode 100644
index 0000000..0be7ebf
--- /dev/null
+++ b/asterix-yarn/src/test/resources/hadoop/conf/log4j.properties
@@ -0,0 +1,108 @@
+#/*
+# Copyright 2009-2013 by The Regents of the University of California
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# you may obtain a copy of the License from
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#*/
+# Define some default values that can be overridden by system properties
+hadoop.root.logger=FATAL,console
+hadoop.log.dir=.
+hadoop.log.file=hadoop.log
+
+# Define the root logger to the system property "hadoop.root.logger".
+log4j.rootLogger=${hadoop.root.logger}, EventCounter
+
+# Logging Threshold
+log4j.threshhold=FATAL
+
+#
+# Daily Rolling File Appender
+#
+
+log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.DRFA.File=${hadoop.log.dir}/${hadoop.log.file}
+
+# Rollver at midnight
+log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
+
+# 30-day backup
+#log4j.appender.DRFA.MaxBackupIndex=30
+log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+# Debugging Pattern format
+#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+
+#
+# console
+# Add "console" to rootlogger above if you want to use this
+#
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
+
+#
+# TaskLog Appender
+#
+
+#Default values
+hadoop.tasklog.taskid=null
+hadoop.tasklog.noKeepSplits=4
+hadoop.tasklog.totalLogFileSize=100
+hadoop.tasklog.purgeLogSplits=true
+hadoop.tasklog.logsRetainHours=12
+
+log4j.appender.TLA=org.apache.hadoop.mapred.TaskLogAppender
+log4j.appender.TLA.taskId=${hadoop.tasklog.taskid}
+log4j.appender.TLA.totalLogFileSize=${hadoop.tasklog.totalLogFileSize}
+
+log4j.appender.TLA.layout=org.apache.log4j.PatternLayout
+log4j.appender.TLA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+
+#
+# Rolling File Appender
+#
+
+#log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+#log4j.appender.RFA.File=${hadoop.log.dir}/${hadoop.log.file}
+
+# Logfile size and and 30-day backups
+#log4j.appender.RFA.MaxFileSize=1MB
+#log4j.appender.RFA.MaxBackupIndex=30
+
+#log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} - %m%n
+#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+#
+# FSNamesystem Audit logging
+# All audit events are logged at INFO level
+#
+log4j.logger.org.apache.hadoop.fs.FSNamesystem.audit=WARN
+
+# Custom Logging levels
+
+#log4j.logger.org.apache.hadoop.mapred.JobTracker=DEBUG
+#log4j.logger.org.apache.hadoop.mapred.TaskTracker=DEBUG
+#log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG
+
+# Jets3t library
+log4j.logger.org.jets3t.service.impl.rest.httpclient.RestS3Service=ERROR
+
+#
+# Event Counter Appender
+# Sends counts of logging messages at different severity levels to Hadoop Metrics.
+#
+log4j.appender.EventCounter=org.apache.hadoop.metrics.jvm.EventCounter
diff --git a/asterix-yarn/src/test/resources/hadoop/conf/mapred-site.xml b/asterix-yarn/src/test/resources/hadoop/conf/mapred-site.xml
new file mode 100644
index 0000000..ab3b7c3
--- /dev/null
+++ b/asterix-yarn/src/test/resources/hadoop/conf/mapred-site.xml
@@ -0,0 +1,39 @@
+<?xml version="1.0"?>
+<!--
+ ! Copyright 2009-2013 by The Regents of the University of California
+ ! Licensed under the Apache License, Version 2.0 (the "License");
+ ! you may not use this file except in compliance with the License.
+ ! you may obtain a copy of the License from
+ !
+ ! http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing, software
+ ! distributed under the License is distributed on an "AS IS" BASIS,
+ ! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ! See the License for the specific language governing permissions and
+ ! limitations under the License.
+ !-->
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+
+ <property>
+ <name>mapred.job.tracker</name>
+ <value>localhost:29007</value>
+ </property>
+ <property>
+ <name>mapred.tasktracker.map.tasks.maximum</name>
+ <value>20</value>
+ </property>
+ <property>
+ <name>mapred.tasktracker.reduce.tasks.maximum</name>
+ <value>20</value>
+ </property>
+ <property>
+ <name>mapred.max.split.size</name>
+ <value>128</value>
+ </property>
+
+</configuration>
diff --git a/asterix-yarn/src/test/resources/library/queries/library-adapters/typed_adapter/typed_adapter.1.ddl.aql b/asterix-yarn/src/test/resources/library/queries/library-adapters/typed_adapter/typed_adapter.1.ddl.aql
new file mode 100644
index 0000000..5ca1022
--- /dev/null
+++ b/asterix-yarn/src/test/resources/library/queries/library-adapters/typed_adapter/typed_adapter.1.ddl.aql
@@ -0,0 +1,21 @@
+/*
+ * Description : Create a feed dataset that uses the feed simulator adapter.
+ The feed simulator simulates feed from a file in the local fs.
+ Associate with the feed an external user-defined function. The UDF
+ finds topics in each tweet. A topic is identified by a #.
+ Begin ingestion and apply external user defined function
+ * Expected Res : Success
+ * Date : 23rd Apr 2013
+ */
+use dataverse externallibtest;
+
+create type TestTypedAdapterOutputType as closed {
+ tweetid: int64,
+ message-text: string
+}
+
+create dataset TweetsTestAdapter(TestTypedAdapterOutputType)
+primary key tweetid;
+
+create feed TestTypedAdapterFeed
+using "testlib-zip-binary-assembly#test_typed_adapter" (("num_output_records"="5"),("type-name"="TestTypedAdapterOutputType"));
diff --git a/asterix-yarn/src/test/resources/library/queries/library-adapters/typed_adapter/typed_adapter.2.update.aql b/asterix-yarn/src/test/resources/library/queries/library-adapters/typed_adapter/typed_adapter.2.update.aql
new file mode 100644
index 0000000..3efd63c
--- /dev/null
+++ b/asterix-yarn/src/test/resources/library/queries/library-adapters/typed_adapter/typed_adapter.2.update.aql
@@ -0,0 +1,14 @@
+/*
+ * Description : Create a feed dataset that uses the feed simulator adapter.
+ The feed simulator simulates feed from a file in the local fs.
+ Associate with the feed an external user-defined function. The UDF
+ finds topics in each tweet. A topic is identified by a #.
+ Begin ingestion and apply external user defined function
+ * Expected Res : Success
+ * Date : 23rd Apr 2013
+ */
+use dataverse externallibtest;
+
+set wait-for-completion-feed "true";
+
+connect feed TestTypedAdapterFeed to dataset TweetsTestAdapter;
diff --git a/asterix-yarn/src/test/resources/library/queries/library-adapters/typed_adapter/typed_adapter.3.query.aql b/asterix-yarn/src/test/resources/library/queries/library-adapters/typed_adapter/typed_adapter.3.query.aql
new file mode 100644
index 0000000..c0e216b
--- /dev/null
+++ b/asterix-yarn/src/test/resources/library/queries/library-adapters/typed_adapter/typed_adapter.3.query.aql
@@ -0,0 +1,14 @@
+/*
+ * Description : Create a feed dataset that uses the feed simulator adapter.
+ The feed simulator simulates feed from a file in the local fs.
+ Associate with the feed an external user-defined function. The UDF
+ finds topics in each tweet. A topic is identified by a #.
+ Begin ingestion and apply external user defined function
+ * Expected Res : Success
+ * Date : 23rd Apr 2013
+ */
+use dataverse externallibtest;
+
+for $x in dataset TweetsTestAdapter
+order by $x.tweetid
+return $x
diff --git a/asterix-yarn/src/test/resources/library/queries/library-feeds/feed_ingest/feed_ingest.1.ddl.aql b/asterix-yarn/src/test/resources/library/queries/library-feeds/feed_ingest/feed_ingest.1.ddl.aql
new file mode 100644
index 0000000..43ff18b
--- /dev/null
+++ b/asterix-yarn/src/test/resources/library/queries/library-feeds/feed_ingest/feed_ingest.1.ddl.aql
@@ -0,0 +1,35 @@
+/*
+ * Description : Create a feed dataset that uses the feed simulator adapter.
+ The feed simulator simulates feed from a file in the local fs.
+ Associate with the feed an external user-defined function. The UDF
+ finds topics in each tweet. A topic is identified by a #.
+ Begin ingestion and apply external user defined function
+ * Expected Res : Success
+ * Date : 23rd Apr 2013
+ */
+use dataverse externallibtest;
+
+create type TweetInputType as closed {
+ id: string,
+ username : string,
+ location : string,
+ text : string,
+ timestamp : string
+}
+
+create type TweetOutputType as closed {
+ id: string,
+ username : string,
+ location : string,
+ text : string,
+ timestamp : string,
+ topics : {{string}}
+}
+
+create feed TweetFeed
+using file_feed
+(("type-name"="TweetInputType"),("fs"="localfs"),("path"="127.0.0.1://../../../../../../asterix-app/data/twitter/obamatweets.adm"),("format"="adm"),("tuple-interval"="10"))
+apply function testlib#parseTweet;
+
+create dataset TweetsFeedIngest(TweetOutputType)
+primary key id;
diff --git a/asterix-yarn/src/test/resources/library/queries/library-feeds/feed_ingest/feed_ingest.2.update.aql b/asterix-yarn/src/test/resources/library/queries/library-feeds/feed_ingest/feed_ingest.2.update.aql
new file mode 100644
index 0000000..7414bba
--- /dev/null
+++ b/asterix-yarn/src/test/resources/library/queries/library-feeds/feed_ingest/feed_ingest.2.update.aql
@@ -0,0 +1,14 @@
+/*
+ * Description : Create a feed dataset that uses the feed simulator adapter.
+ The feed simulator simulates feed from a file in the local fs.
+ Associate with the feed an external user-defined function. The UDF
+ finds topics in each tweet. A topic is identified by a #.
+ Begin ingestion and apply external user defined function
+ * Expected Res : Success
+ * Date : 23rd Apr 2013
+ */
+use dataverse externallibtest;
+
+set wait-for-completion-feed "true";
+
+connect feed TweetFeed to dataset TweetsFeedIngest;
diff --git a/asterix-yarn/src/test/resources/library/queries/library-feeds/feed_ingest/feed_ingest.3.query.aql b/asterix-yarn/src/test/resources/library/queries/library-feeds/feed_ingest/feed_ingest.3.query.aql
new file mode 100644
index 0000000..7d838be
--- /dev/null
+++ b/asterix-yarn/src/test/resources/library/queries/library-feeds/feed_ingest/feed_ingest.3.query.aql
@@ -0,0 +1,13 @@
+/*
+ * Description : Create a feed dataset that uses the feed simulator adapter.
+ The feed simulator simulates feed from a file in the local fs.
+ Associate with the feed an external user-defined function. The UDF
+ finds topics in each tweet. A topic is identified by a #.
+ Begin ingestion and apply external user defined function
+ * Expected Res : Success
+ * Date : 23rd Apr 2013
+ */
+use dataverse externallibtest;
+
+for $x in dataset TweetsFeedIngest
+return $x
diff --git a/asterix-yarn/src/test/resources/library/queries/library-functions/getCapital/getCapital.1.ddl.aql b/asterix-yarn/src/test/resources/library/queries/library-functions/getCapital/getCapital.1.ddl.aql
new file mode 100644
index 0000000..e140d9a
--- /dev/null
+++ b/asterix-yarn/src/test/resources/library/queries/library-functions/getCapital/getCapital.1.ddl.aql
@@ -0,0 +1,6 @@
+use dataverse externallibtest;
+
+create type CountryCapitalType if not exists as closed {
+country: string,
+capital: string
+};
diff --git a/asterix-yarn/src/test/resources/library/queries/library-functions/getCapital/getCapital.2.query.aql b/asterix-yarn/src/test/resources/library/queries/library-functions/getCapital/getCapital.2.query.aql
new file mode 100644
index 0000000..16e0eee
--- /dev/null
+++ b/asterix-yarn/src/test/resources/library/queries/library-functions/getCapital/getCapital.2.query.aql
@@ -0,0 +1,5 @@
+use dataverse externallibtest;
+
+let $input:=["England","Italy","China","United States","India","Jupiter"]
+for $country in $input
+return testlib-zip-binary-assembly#getCapital($country)
diff --git a/asterix-yarn/src/test/resources/library/queries/library-functions/insert-from-select/insert-from-select.1.ddl.aql b/asterix-yarn/src/test/resources/library/queries/library-functions/insert-from-select/insert-from-select.1.ddl.aql
new file mode 100644
index 0000000..11a5ddc
--- /dev/null
+++ b/asterix-yarn/src/test/resources/library/queries/library-functions/insert-from-select/insert-from-select.1.ddl.aql
@@ -0,0 +1,9 @@
+use dataverse externallibtest;
+
+create type TextType if not exists as closed {
+id: int32,
+text: string
+};
+
+create dataset Check(TextType)
+primary key id;
diff --git a/asterix-yarn/src/test/resources/library/queries/library-functions/insert-from-select/insert-from-select.2.update.aql b/asterix-yarn/src/test/resources/library/queries/library-functions/insert-from-select/insert-from-select.2.update.aql
new file mode 100644
index 0000000..8a14669
--- /dev/null
+++ b/asterix-yarn/src/test/resources/library/queries/library-functions/insert-from-select/insert-from-select.2.update.aql
@@ -0,0 +1,6 @@
+use dataverse externallibtest;
+
+insert into dataset Check (
+{"id": 1, "text":"university of california, irvine"}
+);
+
diff --git a/asterix-yarn/src/test/resources/library/queries/library-functions/insert-from-select/insert-from-select.3.update.aql b/asterix-yarn/src/test/resources/library/queries/library-functions/insert-from-select/insert-from-select.3.update.aql
new file mode 100644
index 0000000..7d365b7
--- /dev/null
+++ b/asterix-yarn/src/test/resources/library/queries/library-functions/insert-from-select/insert-from-select.3.update.aql
@@ -0,0 +1,7 @@
+use dataverse externallibtest;
+
+insert into dataset Check (
+ for $x in dataset Check
+ let $y:=testlib-zip-binary-assembly#toUpper($x)
+ return $y
+);
diff --git a/asterix-yarn/src/test/resources/library/queries/library-functions/insert-from-select/insert-from-select.4.query.aql b/asterix-yarn/src/test/resources/library/queries/library-functions/insert-from-select/insert-from-select.4.query.aql
new file mode 100644
index 0000000..997c333
--- /dev/null
+++ b/asterix-yarn/src/test/resources/library/queries/library-functions/insert-from-select/insert-from-select.4.query.aql
@@ -0,0 +1,6 @@
+use dataverse externallibtest;
+
+for $x in dataset Check
+where $x.id < 0
+order by $x.id
+return $x
diff --git a/asterix-yarn/src/test/resources/library/queries/library-functions/mysum/mysum.1.query.aql b/asterix-yarn/src/test/resources/library/queries/library-functions/mysum/mysum.1.query.aql
new file mode 100644
index 0000000..83e565c
--- /dev/null
+++ b/asterix-yarn/src/test/resources/library/queries/library-functions/mysum/mysum.1.query.aql
@@ -0,0 +1,4 @@
+use dataverse externallibtest;
+
+let $x:=testlib-zip-binary-assembly#mysum(3,4)
+return $x
diff --git a/asterix-yarn/src/test/resources/library/queries/library-functions/toUpper/toUpper.1.ddl.aql b/asterix-yarn/src/test/resources/library/queries/library-functions/toUpper/toUpper.1.ddl.aql
new file mode 100644
index 0000000..67635f5
--- /dev/null
+++ b/asterix-yarn/src/test/resources/library/queries/library-functions/toUpper/toUpper.1.ddl.aql
@@ -0,0 +1,7 @@
+use dataverse externallibtest;
+
+create type TextType if not exists as closed {
+id: int32,
+text: string
+};
+
diff --git a/asterix-yarn/src/test/resources/library/queries/library-functions/toUpper/toUpper.2.query.aql b/asterix-yarn/src/test/resources/library/queries/library-functions/toUpper/toUpper.2.query.aql
new file mode 100644
index 0000000..d546f9f
--- /dev/null
+++ b/asterix-yarn/src/test/resources/library/queries/library-functions/toUpper/toUpper.2.query.aql
@@ -0,0 +1,5 @@
+use dataverse externallibtest;
+
+let $input:={"id": int32("1"), "text":"university of california, irvine"}
+let $x:=testlib-zip-binary-assembly#toUpper($input)
+return $x
diff --git a/asterix-yarn/src/test/resources/library/queries/library-metadata/dataverseDataset/dataverseDataset.1.query.aql b/asterix-yarn/src/test/resources/library/queries/library-metadata/dataverseDataset/dataverseDataset.1.query.aql
new file mode 100644
index 0000000..40316d8
--- /dev/null
+++ b/asterix-yarn/src/test/resources/library/queries/library-metadata/dataverseDataset/dataverseDataset.1.query.aql
@@ -0,0 +1,3 @@
+for $x in dataset Metadata.Dataverse
+order by $x.DataverseName
+return $x
diff --git a/asterix-yarn/src/test/resources/library/queries/library-metadata/functionDataset/functionDataset.1.query.aql b/asterix-yarn/src/test/resources/library/queries/library-metadata/functionDataset/functionDataset.1.query.aql
new file mode 100644
index 0000000..fc47972
--- /dev/null
+++ b/asterix-yarn/src/test/resources/library/queries/library-metadata/functionDataset/functionDataset.1.query.aql
@@ -0,0 +1,3 @@
+for $x in dataset Metadata.Function
+order by $x.Name
+return $x
diff --git a/asterix-yarn/src/test/resources/library/queries/library-metadata/libraryDataset/libraryDataset.1.query.aql b/asterix-yarn/src/test/resources/library/queries/library-metadata/libraryDataset/libraryDataset.1.query.aql
new file mode 100644
index 0000000..36a8a52
--- /dev/null
+++ b/asterix-yarn/src/test/resources/library/queries/library-metadata/libraryDataset/libraryDataset.1.query.aql
@@ -0,0 +1,3 @@
+for $x in dataset Metadata.Library
+order by $x.Name
+return $x
diff --git a/asterix-yarn/src/test/resources/library/results/library-adapters/typed_adapter/typed_adapter.1.adm b/asterix-yarn/src/test/resources/library/results/library-adapters/typed_adapter/typed_adapter.1.adm
new file mode 100644
index 0000000..6ec20b5
--- /dev/null
+++ b/asterix-yarn/src/test/resources/library/results/library-adapters/typed_adapter/typed_adapter.1.adm
@@ -0,0 +1,6 @@
+[ { "tweetid": 1, "message-text": "1" }
+, { "tweetid": 2, "message-text": "2" }
+, { "tweetid": 3, "message-text": "3" }
+, { "tweetid": 4, "message-text": "4" }
+, { "tweetid": 5, "message-text": "5" }
+ ]
diff --git a/asterix-yarn/src/test/resources/library/results/library-feeds/feed_ingest/feed_ingest.1.adm b/asterix-yarn/src/test/resources/library/results/library-feeds/feed_ingest/feed_ingest.1.adm
new file mode 100644
index 0000000..b629c81
--- /dev/null
+++ b/asterix-yarn/src/test/resources/library/results/library-feeds/feed_ingest/feed_ingest.1.adm
@@ -0,0 +1,13 @@
+[ { "id": "nc1:1", "username": "BronsonMike", "location": "", "text": "@GottaLaff @reutersus Christie and obama just foul weather friends", "timestamp": "Thu Dec 06 16:53:06 PST 2012", "topics": {{ }} }
+, { "id": "nc1:100", "username": "KidrauhlProuds", "location": "", "text": "RT @01Direclieber: A filha do Michael Jackson uma Belieber,a filha do Eminem e uma Belieber,as filhas de Obama sao Beliebers, e a filha do meu pai e Belieber", "timestamp": "Thu Dec 06 16:53:16 PST 2012", "topics": {{ }} }
+, { "id": "nc1:102", "username": "jaysauce82", "location": "", "text": "Not voting for President Obama #BadDecision", "timestamp": "Thu Dec 06 16:53:16 PST 2012", "topics": {{ "#BadDecision" }} }
+, { "id": "nc1:104", "username": "princeofsupras", "location": "", "text": "RT @01Direclieber: A filha do Michael Jackson e uma Belieber,a filha do Eminem e uma Belieber,as filhas de Obama sao Beliebers, e a filha do meu pai e Belieber", "timestamp": "Thu Dec 06 16:53:15 PST 2012", "topics": {{ }} }
+, { "id": "nc1:106", "username": "GulfDogs", "location": "", "text": "Obama Admin Knew Libyan Terrorists Had US-Provided Weaponsteaparty #tcot #ccot #NewGuards #BreitbartArmy #patriotwttp://t.co/vJxzrQUE", "timestamp": "Thu Dec 06 16:53:14 PST 2012", "topics": {{ "#tcot", "#ccot", "#NewGuards", "#BreitbartArmy", "#patriotwttp://t.co/vJxzrQUE" }} }
+, { "id": "nc1:108", "username": "Laugzpz", "location": "", "text": "@AlfredoJalife Maestro Obama se hace de la vista gorda, es un acuerdo de siempre creo yo.", "timestamp": "Thu Dec 06 16:53:14 PST 2012", "topics": {{ }} }
+, { "id": "nc1:11", "username": "magarika", "location": "", "text": "RT @ken24xavier: Obama tells SOROS - our plan is ALMOST finished http://t.co/WvzK0GtU", "timestamp": "Thu Dec 06 16:53:05 PST 2012", "topics": {{ }} }
+, { "id": "nc1:111", "username": "ToucanMall", "location": "", "text": "RT @WorldWar3Watch: Michelle Obama Gets More Grammy Nominations Than Justin ... #Obama #WW3 http://t.co/0Wv2GKij", "timestamp": "Thu Dec 06 16:53:13 PST 2012", "topics": {{ "#Obama", "#WW3" }} }
+, { "id": "nc1:113", "username": "ToucanMall", "location": "", "text": "RT @ObamaPalooza: Tiffany Shared What $2,000 Meant to Her ... and the President Stopped by to Talk About It http://t.co/sgT7lsNV #Obama", "timestamp": "Thu Dec 06 16:53:12 PST 2012", "topics": {{ "#Obama" }} }
+, { "id": "nc1:115", "username": "thewildpitch", "location": "", "text": "RT @RevkahJC: Dennis Miller: Obama Should Just Say He Wants To Tax Successful People http://t.co/Ihlemy9Y", "timestamp": "Thu Dec 06 16:53:11 PST 2012", "topics": {{ }} }
+, { "id": "nc1:117", "username": "Rnugent24", "location": "", "text": "RT @ConservativeQuo: unemployment is above 8% again. I wonder how long it will take for Obama to start blaming Bush? 3-2-1 #tcot #antiobama", "timestamp": "Thu Dec 06 16:53:10 PST 2012", "topics": {{ "#tcot", "#antiobama" }} }
+, { "id": "nc1:119", "username": "ToucanMall", "location": "", "text": "RT @Newitrsdotcom: I hope #Obama will win re-election... Other four years without meaningless #wars", "timestamp": "Thu Dec 06 16:53:09 PST 2012", "topics": {{ "#Obama", "#wars" }} }
+ ]
diff --git a/asterix-yarn/src/test/resources/library/results/library-functions/getCapital/getCapital.1.adm b/asterix-yarn/src/test/resources/library/results/library-functions/getCapital/getCapital.1.adm
new file mode 100644
index 0000000..dd15f55
--- /dev/null
+++ b/asterix-yarn/src/test/resources/library/results/library-functions/getCapital/getCapital.1.adm
@@ -0,0 +1,7 @@
+[ { "country": "England", "capital": "London" }
+, { "country": "Italy", "capital": "Rome" }
+, { "country": "China", "capital": "Beijing" }
+, { "country": "United States", "capital": "Washington D.C." }
+, { "country": "India", "capital": "New Delhi" }
+, { "country": "Jupiter", "capital": "NOT_FOUND" }
+ ]
diff --git a/asterix-yarn/src/test/resources/library/results/library-functions/insert-from-select/insert-from-select.1.adm b/asterix-yarn/src/test/resources/library/results/library-functions/insert-from-select/insert-from-select.1.adm
new file mode 100644
index 0000000..44b8ed5
--- /dev/null
+++ b/asterix-yarn/src/test/resources/library/results/library-functions/insert-from-select/insert-from-select.1.adm
@@ -0,0 +1,2 @@
+[ { "id": -1i32, "text": "UNIVERSITY OF CALIFORNIA, IRVINE" }
+ ]
diff --git a/asterix-yarn/src/test/resources/library/results/library-functions/mysum/mysum.1.adm b/asterix-yarn/src/test/resources/library/results/library-functions/mysum/mysum.1.adm
new file mode 100644
index 0000000..b67d9d5
--- /dev/null
+++ b/asterix-yarn/src/test/resources/library/results/library-functions/mysum/mysum.1.adm
@@ -0,0 +1,2 @@
+[ 7i32
+ ]
diff --git a/asterix-yarn/src/test/resources/library/results/library-functions/toUpper/toUpper.1.adm b/asterix-yarn/src/test/resources/library/results/library-functions/toUpper/toUpper.1.adm
new file mode 100644
index 0000000..44b8ed5
--- /dev/null
+++ b/asterix-yarn/src/test/resources/library/results/library-functions/toUpper/toUpper.1.adm
@@ -0,0 +1,2 @@
+[ { "id": -1i32, "text": "UNIVERSITY OF CALIFORNIA, IRVINE" }
+ ]
diff --git a/asterix-yarn/src/test/resources/library/results/library-metadata/dataverseDataset/dataverseDataset.1.adm b/asterix-yarn/src/test/resources/library/results/library-metadata/dataverseDataset/dataverseDataset.1.adm
new file mode 100644
index 0000000..9c987ef
--- /dev/null
+++ b/asterix-yarn/src/test/resources/library/results/library-metadata/dataverseDataset/dataverseDataset.1.adm
@@ -0,0 +1,3 @@
+[ { "DataverseName": "Metadata", "DataFormat": "edu.uci.ics.asterix.runtime.formats.NonTaggedDataFormat", "Timestamp": "Thu Apr 25 11:17:56 PDT 2013", "PendingOp": 0 }
+, { "DataverseName": "externallibtest", "DataFormat": "edu.uci.ics.asterix.runtime.formats.NonTaggedDataFormat", "Timestamp": "Thu Apr 25 11:18:12 PDT 2013", "PendingOp": 0 }
+ ]
diff --git a/asterix-yarn/src/test/resources/library/results/library-metadata/functionDataset/functionDataset.1.adm b/asterix-yarn/src/test/resources/library/results/library-metadata/functionDataset/functionDataset.1.adm
new file mode 100644
index 0000000..374a1e9
--- /dev/null
+++ b/asterix-yarn/src/test/resources/library/results/library-metadata/functionDataset/functionDataset.1.adm
@@ -0,0 +1,9 @@
+[ { "DataverseName": "externallibtest", "Name": "testlib-zip-binary-assembly#addHashTags", "Arity": "1", "Params": [ "Tweet" ], "ReturnType": "ProcessedTweet", "Definition": "edu.uci.ics.asterix.external.library.AddHashTagsFactory", "Language": "JAVA", "Kind": "SCALAR" }
+, { "DataverseName": "externallibtest", "Name": "testlib-zip-binary-assembly#addHashTagsInPlace", "Arity": "1", "Params": [ "Tweet" ], "ReturnType": "ProcessedTweet", "Definition": "edu.uci.ics.asterix.external.library.AddHashTagsInPlaceFactory", "Language": "JAVA", "Kind": "SCALAR" }
+, { "DataverseName": "externallibtest", "Name": "testlib-zip-binary-assembly#allTypes", "Arity": "1", "Params": [ "AllType" ], "ReturnType": "AllType", "Definition": "edu.uci.ics.asterix.external.library.AllTypesFactory", "Language": "JAVA", "Kind": "SCALAR" }
+, { "DataverseName": "externallibtest", "Name": "testlib-zip-binary-assembly#echoDelay", "Arity": "1", "Params": [ "TweetMessageType" ], "ReturnType": "TweetMessageType", "Definition": "edu.uci.ics.asterix.external.library.EchoDelayFactory", "Language": "JAVA", "Kind": "SCALAR" }
+, { "DataverseName": "externallibtest", "Name": "testlib-zip-binary-assembly#getCapital", "Arity": "1", "Params": [ "ASTRING" ], "ReturnType": "CountryCapitalType", "Definition": "edu.uci.ics.asterix.external.library.CapitalFinderFactory", "Language": "JAVA", "Kind": "SCALAR" }
+, { "DataverseName": "externallibtest", "Name": "testlib-zip-binary-assembly#mysum", "Arity": "2", "Params": [ "AINT32", "AINT32" ], "ReturnType": "AINT32", "Definition": "edu.uci.ics.asterix.external.library.SumFactory", "Language": "JAVA", "Kind": "SCALAR" }
+, { "DataverseName": "externallibtest", "Name": "testlib-zip-binary-assembly#parseTweet", "Arity": "1", "Params": [ "TweetInputType" ], "ReturnType": "TweetOutputType", "Definition": "edu.uci.ics.asterix.external.library.ParseTweetFactory", "Language": "JAVA", "Kind": "SCALAR" }
+, { "DataverseName": "externallibtest", "Name": "testlib-zip-binary-assembly#toUpper", "Arity": "1", "Params": [ "TextType" ], "ReturnType": "TextType", "Definition": "edu.uci.ics.asterix.external.library.UpperCaseFactory", "Language": "JAVA", "Kind": "SCALAR" }
+ ]
diff --git a/asterix-yarn/src/test/resources/library/results/library-metadata/libraryDataset/libraryDataset.1.adm b/asterix-yarn/src/test/resources/library/results/library-metadata/libraryDataset/libraryDataset.1.adm
new file mode 100644
index 0000000..b6832ed
--- /dev/null
+++ b/asterix-yarn/src/test/resources/library/results/library-metadata/libraryDataset/libraryDataset.1.adm
@@ -0,0 +1,2 @@
+[ { "DataverseName": "externallibtest", "Name": "testlib-zip-binary-assembly", "Timestamp": "Mon Apr 22 23:36:55 PDT 2013" }
+ ]
diff --git a/asterix-yarn/src/test/resources/library/testsuite.xml b/asterix-yarn/src/test/resources/library/testsuite.xml
new file mode 100644
index 0000000..ef21a16
--- /dev/null
+++ b/asterix-yarn/src/test/resources/library/testsuite.xml
@@ -0,0 +1,56 @@
+<test-suite xmlns="urn:xml.testframework.asterix.ics.uci.edu" ResultOffsetPath="results" QueryOffsetPath="queries" QueryFileExtension=".aql">
+ <test-group name="library-functions">
+ <test-case FilePath="library-functions">
+ <compilation-unit name="mysum">
+ <output-dir compare="Text">mysum</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="library-functions">
+ <compilation-unit name="toUpper">
+ <output-dir compare="Text">toUpper</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="library-functions">
+ <compilation-unit name="insert-from-select">
+ <output-dir compare="Text">insert-from-select</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="library-functions">
+ <compilation-unit name="getCapital">
+ <output-dir compare="Text">getCapital</output-dir>
+ </compilation-unit>
+ </test-case>
+ </test-group>
+ <test-group name="library-metadata">
+ <test-case FilePath="library-metadata">
+ <compilation-unit name="functionDataset">
+ <output-dir compare="Text">functionDataset</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="library-metadata">
+ <compilation-unit name="libraryDataset">
+ <output-dir compare="Text">libraryDataset</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="library-metadata">
+ <compilation-unit name="dataverseDataset">
+ <output-dir compare="Text">dataverseDataset</output-dir>
+ </compilation-unit>
+ </test-case>
+ </test-group>
+ <test-group name="library-feeds">
+ <test-case FilePath="library-feeds" category="slow">
+ <compilation-unit name="feed_ingest">
+ <output-dir compare="Text">feed_ingest</output-dir>
+ </compilation-unit>
+ </test-case>
+ </test-group>
+ <test-group name="library-adapters">
+ <test-case FilePath="library-adapters">
+ <compilation-unit name="typed_adapter">
+ <output-dir compare="Text">typed_adapter</output-dir>
+ </compilation-unit>
+ </test-case>
+ </test-group>
+</test-suite>
+