integration of asterix-installer: checkpoint 1
git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_stabilization_installer@1219 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-events/pom.xml b/asterix-events/pom.xml
index a9aeaa5..46bb96a 100644
--- a/asterix-events/pom.xml
+++ b/asterix-events/pom.xml
@@ -125,6 +125,21 @@
</execution>
</executions>
</plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>2.2-beta-2</version>
+ <executions>
+ <execution>
+ <configuration>
+ <descriptor>src/main/assembly/binary-assembly.xml</descriptor>
+ </configuration>
+ <phase>package</phase>
+ <goals>
+ <goal>attached</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
diff --git a/asterix-events/src/main/assembly/binary-assembly.xml b/asterix-events/src/main/assembly/binary-assembly.xml
new file mode 100644
index 0000000..29ebbdd
--- /dev/null
+++ b/asterix-events/src/main/assembly/binary-assembly.xml
@@ -0,0 +1,27 @@
+<assembly>
+ <id>bin</id>
+ <formats>
+ <format>tar.gz</format>
+ <format>tar.bz2</format>
+ <format>zip</format>
+ </formats>
+ <fileSets>
+ <fileSet>
+ <directory>src/main/resources/events</directory>
+ <outputDirectory>events</outputDirectory>
+ <includes></includes>
+ </fileSet>
+ <fileSet>
+ <directory>src/main/resources/scripts</directory>
+ <outputDirectory>scripts</outputDirectory>
+ <includes></includes>
+ </fileSet>
+ <fileSet>
+ <directory>target</directory>
+ <outputDirectory>target</outputDirectory>
+ <includes>
+ <include>*.jar</include>
+ </includes>
+ </fileSet>
+ </fileSets>
+</assembly>
diff --git a/asterix-events/src/main/java/edu/uci/ics/asterix/event/driver/EventDriver.java b/asterix-events/src/main/java/edu/uci/ics/asterix/event/driver/EventDriver.java
index 379d811..ddefdd1 100644
--- a/asterix-events/src/main/java/edu/uci/ics/asterix/event/driver/EventDriver.java
+++ b/asterix-events/src/main/java/edu/uci/ics/asterix/event/driver/EventDriver.java
@@ -20,7 +20,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.logging.Logger;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBException;
@@ -41,143 +40,134 @@
public class EventDriver {
- public static final String CLIENT_NODE_ID = "client_node";
- public static final Node CLIENT_NODE = new Node(CLIENT_NODE_ID, "127.0.0.1", null, null, null);
+ public static final String CLIENT_NODE_ID = "client_node";
+ public static final Node CLIENT_NODE = new Node(CLIENT_NODE_ID,
+ "127.0.0.1", null, null, null);
- private static final Logger LOGGER = Logger.getLogger(EventDriver.class.getName());
+ private static String eventsDir;
+ private static Events events;
+ private static Map<String, String> env = new HashMap<String, String>();
+ private static String scriptDirSuffix;
- private static String homeDir;
- private static Events events;
- private static Map<String, String> env = new HashMap<String, String>();
- private static String scriptDirSuffix;
+ public static String getEventsDir() {
+ return eventsDir;
+ }
- public static String getHomeDir() {
- return homeDir;
- }
+ public static Events getEvents() {
+ return events;
+ }
- public static void setHomeDir(String homeDir) {
- EventDriver.homeDir = homeDir;
- }
+ public static Map<String, String> getEnvironment() {
+ return env;
+ }
- public static Events getEvents() {
- return events;
- }
+ public static String getStringifiedEnv(Cluster cluster) {
+ StringBuffer buffer = new StringBuffer();
+ for (Property p : cluster.getEnv().getProperty()) {
+ buffer.append(p.getKey() + "=" + p.getValue() + " ");
+ }
+ return buffer.toString();
+ }
- public static void setEvents(Events events) {
- EventDriver.events = events;
- }
+ public static Cluster initializeCluster(String path) throws JAXBException,
+ IOException {
+ File file = new File(path);
+ JAXBContext ctx = JAXBContext.newInstance(Cluster.class);
+ Unmarshaller unmarshaller = ctx.createUnmarshaller();
+ Cluster cluster = (Cluster) unmarshaller.unmarshal(file);
+ for (Property p : cluster.getEnv().getProperty()) {
+ env.put(p.getKey(), p.getValue());
+ }
+ return cluster;
+ }
- public static Map<String, String> getEnvironment() {
- return env;
- }
+ public static Patterns initializePatterns(String path)
+ throws JAXBException, IOException {
+ File file = new File(path);
+ JAXBContext ctx = JAXBContext.newInstance(Patterns.class);
+ Unmarshaller unmarshaller = ctx.createUnmarshaller();
+ return (Patterns) unmarshaller.unmarshal(file);
+ }
- public static String getStringifiedEnv(Cluster cluster) {
- StringBuffer buffer = new StringBuffer();
- for (Property p : cluster.getEnv().getProperty()) {
- buffer.append(p.getKey() + "=" + p.getValue() + " ");
- }
- return buffer.toString();
- }
+ private static void initialize(EventConfig eventConfig) throws IOException,
+ JAXBException {
- public static void initializeEvents(String path) throws IOException {
- try {
- File eventsFile = new File(path);
- JAXBContext ctx = JAXBContext.newInstance(Events.class);
- Unmarshaller unmarshaller = ctx.createUnmarshaller();
- events = (Events) unmarshaller.unmarshal(eventsFile);
- } catch (JAXBException e) {
- e.printStackTrace();
- }
- }
+ }
- public static Cluster initializeCluster(String path) throws JAXBException, IOException {
- File file = new File(path);
- JAXBContext ctx = JAXBContext.newInstance(Cluster.class);
- Unmarshaller unmarshaller = ctx.createUnmarshaller();
- Cluster cluster = (Cluster) unmarshaller.unmarshal(file);
- for (Property p : cluster.getEnv().getProperty()) {
- env.put(p.getKey(), p.getValue());
- }
- return cluster;
- }
+ public static EventrixClient getClient(String eventsDir, Cluster cluster,
+ boolean dryRun) throws Exception {
+ return new EventrixClient(eventsDir, cluster, dryRun,
+ new DefaultOutputHandler());
+ }
- public static Patterns initializePatterns(String path) throws JAXBException, IOException {
- File file = new File(path);
- JAXBContext ctx = JAXBContext.newInstance(Patterns.class);
- Unmarshaller unmarshaller = ctx.createUnmarshaller();
- return (Patterns) unmarshaller.unmarshal(file);
- }
+ public static EventrixClient getClient(String eventsDir, Cluster cluster,
+ boolean dryRun, IOutputHandler outputHandler) throws Exception {
+ return new EventrixClient(eventsDir, cluster, dryRun, outputHandler);
+ }
- private static void initialize(EventConfig eventConfig) throws IOException, JAXBException {
- homeDir = System.getenv("EVENT_HOME");
- if (homeDir == null) {
- throw new IllegalStateException("EVENT_HOME is not set");
- }
- initializeEvents(homeDir + "/" + EventUtil.EVENTS_DIR + "/" + "events.xml");
- }
+ public static void main(String[] args) throws Exception {
+ String eventsHome = System.getenv("EVENT_HOME");
+ if (eventsHome == null) {
+ throw new IllegalStateException("EVENT_HOME is not set");
+ }
+ eventsDir = eventsHome + File.separator + EventUtil.EVENTS_DIR;
+ EventConfig eventConfig = new EventConfig();
+ CmdLineParser parser = new CmdLineParser(eventConfig);
+ try {
+ parser.parseArgument(args);
+ if (eventConfig.help) {
+ parser.printUsage(System.out);
+ }
+ if (eventConfig.seed > 0) {
+ Randomizer.getInstance(eventConfig.seed);
+ }
+ Cluster cluster = initializeCluster(eventConfig.clusterPath);
+ Patterns patterns = initializePatterns(eventConfig.patternPath);
+ initialize(eventConfig);
- public static EventrixClient getClient(Cluster cluster, boolean dryRun) throws Exception {
- return new EventrixClient(cluster, dryRun, new DefaultOutputHandler());
- }
+ if (!eventConfig.dryRun) {
+ prepare(cluster);
+ }
+ EventrixClient client = new EventrixClient(eventsDir, cluster,
+ eventConfig.dryRun, new DefaultOutputHandler());
+ client.submit(patterns);
+ if (!eventConfig.dryRun) {
+ cleanup(cluster);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ parser.printUsage(System.err);
+ }
+ }
- public static EventrixClient getClient(Cluster cluster, boolean dryRun, IOutputHandler outputHandler)
- throws Exception {
- return new EventrixClient(cluster, dryRun, outputHandler);
- }
+ private static void prepare(Cluster cluster) throws IOException,
+ InterruptedException {
- public static void main(String[] args) throws Exception {
- EventConfig eventConfig = new EventConfig();
- CmdLineParser parser = new CmdLineParser(eventConfig);
- try {
- parser.parseArgument(args);
- if (eventConfig.help) {
- parser.printUsage(System.out);
- }
- if (eventConfig.seed > 0) {
- Randomizer.getInstance(eventConfig.seed);
- }
- Cluster cluster = initializeCluster(eventConfig.clusterPath);
- Patterns patterns = initializePatterns(eventConfig.patternPath);
+ scriptDirSuffix = "" + System.nanoTime();
+ List<String> args = new ArrayList<String>();
+ args.add(scriptDirSuffix);
+ Node clientNode = new Node();
+ clientNode.setId("client");
+ clientNode.setIp("127.0.0.1");
+ for (Node node : cluster.getNode()) {
+ args.add(node.getIp());
+ }
+ EventUtil.executeLocalScript(clientNode, eventsDir + "/" + "events"
+ + "/" + "prepare.sh", args);
+ }
- initialize(eventConfig);
- if (!eventConfig.dryRun) {
- prepare(cluster);
- }
- EventrixClient client = new EventrixClient(cluster, eventConfig.dryRun, new DefaultOutputHandler());
- client.submit(patterns);
- if (!eventConfig.dryRun) {
- cleanup(cluster);
- }
- } catch (Exception e) {
- e.printStackTrace();
- parser.printUsage(System.err);
- }
- }
-
- private static void prepare(Cluster cluster) throws IOException, InterruptedException {
-
- scriptDirSuffix = "" + System.nanoTime();
- List<String> args = new ArrayList<String>();
- args.add(scriptDirSuffix);
- Node clientNode = new Node();
- clientNode.setId("client");
- clientNode.setIp("127.0.0.1");
- for (Node node : cluster.getNode()) {
- args.add(node.getIp());
- }
- EventUtil.executeLocalScript(clientNode, homeDir + "/" + "events" + "/" + "prepare.sh", args);
- }
-
- private static void cleanup(Cluster cluster) throws IOException, InterruptedException {
- List<String> args = new ArrayList<String>();
- args.add(scriptDirSuffix);
- Node clientNode = new Node();
- clientNode.setId("client");
- clientNode.setIp("127.0.0.1");
- for (Node node : cluster.getNode()) {
- args.add(node.getIp());
- }
- EventUtil.executeLocalScript(clientNode, homeDir + "/" + "events" + "/" + "cleanup.sh", args);
- }
+ private static void cleanup(Cluster cluster) throws IOException,
+ InterruptedException {
+ List<String> args = new ArrayList<String>();
+ args.add(scriptDirSuffix);
+ Node clientNode = new Node();
+ clientNode.setId("client");
+ clientNode.setIp("127.0.0.1");
+ for (Node node : cluster.getNode()) {
+ args.add(node.getIp());
+ }
+ EventUtil.executeLocalScript(clientNode, eventsDir + "/" + "events"
+ + "/" + "cleanup.sh", args);
+ }
}
diff --git a/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventExecutor.java b/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventExecutor.java
index 869edd7..92bdf72 100644
--- a/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventExecutor.java
+++ b/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventExecutor.java
@@ -39,16 +39,16 @@
private static final String DAEMON = "DAEMON";
public void executeEvent(Node node, String script, List<String> args, boolean isDaemon, Cluster cluster,
- Pattern pattern, IOutputHandler outputHandler) throws IOException {
+ Pattern pattern, IOutputHandler outputHandler, EventrixClient client) throws IOException {
List<String> pargs = new ArrayList<String>();
pargs.add("/bin/bash");
- pargs.add(EventDriver.getHomeDir() + File.separator + "events.pkg" + File.separator + EXECUTE_SCRIPT);
- StringBuffer argBuffer = new StringBuffer();
+ pargs.add(client.getEventsDir() + File.separator + "scripts" + File.separator + EXECUTE_SCRIPT);
StringBuffer envBuffer = new StringBuffer(IP_LOCATION + "=" + node.getIp());
if (!node.getId().equals(EventDriver.CLIENT_NODE_ID)) {
envBuffer.append(" " + EventDriver.getStringifiedEnv(cluster));
pargs.add(cluster.getUsername() == null ? System.getProperty("user.name") : cluster.getUsername());
}
+ StringBuffer argBuffer = new StringBuffer();
if (args != null && args.size() > 0) {
for (String arg : args) {
argBuffer.append(arg + " ");
diff --git a/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventTask.java b/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventTask.java
index aac2604..9f52642 100644
--- a/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventTask.java
+++ b/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventTask.java
@@ -25,131 +25,142 @@
import edu.uci.ics.asterix.event.driver.EventDriver;
import edu.uci.ics.asterix.event.schema.cluster.Node;
+import edu.uci.ics.asterix.event.schema.event.Event;
import edu.uci.ics.asterix.event.schema.pattern.Pattern;
import edu.uci.ics.asterix.event.schema.pattern.Period;
public class EventTask extends TimerTask {
- public static enum State {
- INITIALIZED,
- IN_PROGRESS,
- COMPLETED,
- FAILED
- }
+ public static enum State {
+ INITIALIZED, IN_PROGRESS, COMPLETED, FAILED
+ }
- private static final Logger logger = Logger.getLogger(EventTask.class.getName());
+ private static final Logger logger = Logger.getLogger(EventTask.class
+ .getName());
- private Pattern pattern;
- private long interval = 0;
- private long initialDelay = 0;
- private int maxOccurs = Integer.MAX_VALUE;
- private int occurrenceCount = 0;
- private Timer timer;
- private String taskScript;
- private Node location;
- private List<String> taskArgs;
- private EventrixClient scheduler;
- private List<Node> candidateLocations;
- private boolean dynamicLocation = false;
- private boolean reuseLocation = false;
- private State state;
+ private Pattern pattern;
+ private Event event;
+ private long interval = 0;
+ private long initialDelay = 0;
+ private int maxOccurs = Integer.MAX_VALUE;
+ private int occurrenceCount = 0;
+ private Timer timer;
+ private String taskScript;
+ private Node location;
+ private List<String> taskArgs;
+ private EventrixClient client;
+ private List<Node> candidateLocations;
+ private boolean dynamicLocation = false;
+ private boolean reuseLocation = false;
+ private State state;
- static {
- logger.setLevel(Level.WARNING);
- }
+ static {
+ logger.setLevel(Level.WARNING);
+ }
- public EventTask(Pattern pattern, EventrixClient client) {
- this.pattern = pattern;
- this.scheduler = client;
- Period period = pattern.getPeriod();
- if (period != null && period.getAbsvalue() != null) {
- this.interval = EventUtil.parseTimeInterval(period.getAbsvalue(), period.getUnit());
- }
- if (pattern.getDelay() != null) {
- this.initialDelay = EventUtil.parseTimeInterval(new ValueType(pattern.getDelay().getValue()), pattern
- .getDelay().getUnit());
- }
- if (pattern.getMaxOccurs() != null) {
- this.maxOccurs = pattern.getMaxOccurs();
- }
- this.timer = new Timer();
- taskArgs = EventUtil.getEventArgs(pattern);
- candidateLocations = EventUtil.getCandidateLocations(pattern, client.getCluster());
- if (pattern.getEvent().getNodeid().getValue().getRandom() != null && period != null && maxOccurs > 1) {
- dynamicLocation = true;
- reuseLocation = pattern.getEvent().getNodeid().getValue().getRandom().getRange().isReuse();
- } else {
- location = EventUtil.getEventLocation(pattern, candidateLocations, scheduler.getCluster());
- }
- String scriptsDir;
- if (location.getId().equals(EventDriver.CLIENT_NODE_ID)) {
- scriptsDir = EventDriver.getHomeDir() + File.separator + "events.pkg";
- } else {
- scriptsDir = client.getCluster().getWorkingDir().getDir() + File.separator + "events.pkg";
- }
- taskScript = scriptsDir + File.separator + EventUtil.getEvent(pattern).getScript();
- state = State.INITIALIZED;
- }
+ public EventTask(Pattern pattern, EventrixClient client) {
+ this.pattern = pattern;
+ this.client = client;
+ Period period = pattern.getPeriod();
+ if (period != null && period.getAbsvalue() != null) {
+ this.interval = EventUtil.parseTimeInterval(period.getAbsvalue(),
+ period.getUnit());
+ }
+ if (pattern.getDelay() != null) {
+ this.initialDelay = EventUtil.parseTimeInterval(new ValueType(
+ pattern.getDelay().getValue()), pattern.getDelay()
+ .getUnit());
+ }
+ if (pattern.getMaxOccurs() != null) {
+ this.maxOccurs = pattern.getMaxOccurs();
+ }
+ this.timer = new Timer();
+ taskArgs = EventUtil.getEventArgs(pattern);
+ candidateLocations = EventUtil.getCandidateLocations(pattern,
+ client.getCluster());
+ if (pattern.getEvent().getNodeid().getValue().getRandom() != null
+ && period != null && maxOccurs > 1) {
+ dynamicLocation = true;
+ reuseLocation = pattern.getEvent().getNodeid().getValue()
+ .getRandom().getRange().isReuse();
+ } else {
+ location = EventUtil.getEventLocation(pattern, candidateLocations,
+ client.getCluster());
+ }
+ String scriptsDir;
+ if (location.getId().equals(EventDriver.CLIENT_NODE_ID)) {
+ scriptsDir = client.getEventsDir() + File.separator + "events";
+ } else {
+ scriptsDir = client.getCluster().getWorkingDir().getDir() + File.separator + "eventrix" + File.separator + "events";
+ }
+ event = EventUtil.getEvent(pattern, client.getEvents());
+ taskScript = scriptsDir + File.separator + event.getScript();
+ state = State.INITIALIZED;
+ }
- public void start() {
- if (interval > 0) {
- timer.schedule(this, initialDelay, interval);
- } else {
- timer.schedule(this, initialDelay);
- }
- }
+ public void start() {
+ if (interval > 0) {
+ timer.schedule(this, initialDelay, interval);
+ } else {
+ timer.schedule(this, initialDelay);
+ }
+ }
- @Override
- public void run() {
- if (candidateLocations.size() == 0) {
- timer.cancel();
- scheduler.notifyCompletion(new EventTaskReport(this));
- } else {
- if (dynamicLocation) {
- location = EventUtil.getEventLocation(pattern, candidateLocations, scheduler.getCluster());
- if (!reuseLocation) {
- candidateLocations.remove(location);
- }
- }
+ @Override
+ public void run() {
+ if (candidateLocations.size() == 0) {
+ timer.cancel();
+ client.notifyCompletion(new EventTaskReport(this));
+ } else {
+ if (dynamicLocation) {
+ location = EventUtil.getEventLocation(pattern,
+ candidateLocations, client.getCluster());
+ if (!reuseLocation) {
+ candidateLocations.remove(location);
+ }
+ }
- logger.info(EventUtil.dateFormat.format(new Date()) + " " + "EVENT "
- + pattern.getEvent().getType().toUpperCase() + " at " + location.getId().toUpperCase());
- try {
- if (!scheduler.isDryRun()) {
- new EventExecutor().executeEvent(location, taskScript, taskArgs, EventUtil.getEvent(pattern)
- .isDaemon(), scheduler.getCluster(), pattern, scheduler.getErrorHandler());
- }
- occurrenceCount++;
- if (occurrenceCount >= maxOccurs) {
- timer.cancel();
- scheduler.notifyCompletion(new EventTaskReport(this));
- }
- } catch (IOException ioe) {
- timer.cancel();
- scheduler.notifyCompletion(new EventTaskReport(this, false, ioe));
- }
- }
+ logger.info(EventUtil.dateFormat.format(new Date()) + " "
+ + "EVENT " + pattern.getEvent().getType().toUpperCase()
+ + " at " + location.getId().toUpperCase());
+ try {
+ if (!client.isDryRun()) {
+ new EventExecutor().executeEvent(location, taskScript,
+ taskArgs, event.isDaemon(), client.getCluster(),
+ pattern, client.getErrorHandler(), client);
+ }
+ occurrenceCount++;
+ if (occurrenceCount >= maxOccurs) {
+ timer.cancel();
+ client.notifyCompletion(new EventTaskReport(this));
+ }
+ } catch (IOException ioe) {
+ timer.cancel();
+ client
+ .notifyCompletion(new EventTaskReport(this, false, ioe));
+ }
+ }
- }
+ }
- public Node getLocation() {
- return location;
- }
+ public Node getLocation() {
+ return location;
+ }
- public long getInterval() {
- return interval;
- }
+ public long getInterval() {
+ return interval;
+ }
- public long getInitialDelay() {
- return initialDelay;
- }
+ public long getInitialDelay() {
+ return initialDelay;
+ }
- public Pattern getPattern() {
- return pattern;
- }
+ public Pattern getPattern() {
+ return pattern;
+ }
- public State getState() {
- return state;
- }
+ public State getState() {
+ return state;
+ }
}
diff --git a/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventUtil.java b/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventUtil.java
index 80bc0da..5f70c59 100644
--- a/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventUtil.java
+++ b/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventUtil.java
@@ -19,220 +19,235 @@
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;
-import java.util.logging.Logger;
import edu.uci.ics.asterix.event.driver.EventDriver;
import edu.uci.ics.asterix.event.management.ValueType.Type;
import edu.uci.ics.asterix.event.schema.cluster.Cluster;
import edu.uci.ics.asterix.event.schema.cluster.Node;
import edu.uci.ics.asterix.event.schema.event.Event;
+import edu.uci.ics.asterix.event.schema.event.Events;
import edu.uci.ics.asterix.event.schema.pattern.Pattern;
public class EventUtil {
- public static final String EVENTS_DIR = "events";
- public static final String CLUSTER_CONF = "config/cluster.xml";
- public static final String PATTERN_CONF = "config/pattern.xml";
- private static final String IP_LOCATION = "IP_LOCATION";
- private static final String CLUSTER_ENV = "ENV";
- private static final String SCRIPT = "SCRIPT";
- private static final String ARGS = "ARGS";
- private static final String EXECUTE_SCRIPT = "events/execute.sh";
+ public static final String EVENTS_DIR = "events";
+ public static final String CLUSTER_CONF = "config/cluster.xml";
+ public static final String PATTERN_CONF = "config/pattern.xml";
+ public static final DateFormat dateFormat = new SimpleDateFormat(
+ "yyyy/MM/dd HH:mm:ss");
- public static final DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
+ private static final String IP_LOCATION = "IP_LOCATION";
+ private static final String CLUSTER_ENV = "ENV";
+ private static final String SCRIPT = "SCRIPT";
+ private static final String ARGS = "ARGS";
+ private static final String EXECUTE_SCRIPT = "events/execute.sh";
- private static final Logger logger = Logger.getLogger(EventDriver.class.getName());
+ public static long parseTimeInterval(ValueType v, String unit)
+ throws IllegalArgumentException {
+ int val = 0;
+ switch (v.getType()) {
+ case ABS:
+ val = Integer.parseInt(v.getAbsoluteValue());
+ break;
+ case RANDOM_MIN_MAX:
+ val = Randomizer.getInstance().getRandomInt(v.getMin(), v.getMax());
+ break;
+ case RANDOM_RANGE:
+ String[] values = v.getRangeSet();
+ val = Integer.parseInt(values[Randomizer.getInstance()
+ .getRandomInt(0, values.length - 1)]);
+ break;
+ }
+ return computeInterval(val, unit);
+ }
- public static long parseTimeInterval(ValueType v, String unit) throws IllegalArgumentException {
- int val = 0;
- switch (v.getType()) {
- case ABS:
- val = Integer.parseInt(v.getAbsoluteValue());
- break;
- case RANDOM_MIN_MAX:
- val = Randomizer.getInstance().getRandomInt(v.getMin(), v.getMax());
- break;
- case RANDOM_RANGE:
- String[] values = v.getRangeSet();
- val = Integer.parseInt(values[Randomizer.getInstance().getRandomInt(0, values.length - 1)]);
- break;
- }
- return computeInterval(val, unit);
- }
+ public static long parseTimeInterval(String v, String unit)
+ throws IllegalArgumentException {
+ int value = Integer.parseInt(v);
+ return computeInterval(value, unit);
+ }
- public static long parseTimeInterval(String v, String unit) throws IllegalArgumentException {
- int value = Integer.parseInt(v);
- return computeInterval(value, unit);
- }
+ private static long computeInterval(int val, String unit) {
+ int vmult = 1;
+ if ("hr".equalsIgnoreCase(unit)) {
+ vmult = 3600 * 1000;
+ } else if ("min".equalsIgnoreCase(unit)) {
+ vmult = 60 * 1000;
+ } else if ("sec".equalsIgnoreCase(unit)) {
+ vmult = 1000;
+ } else
+ throw new IllegalArgumentException(
+ " invalid unit value specified for frequency (hr,min,sec)");
+ return val * vmult;
- private static long computeInterval(int val, String unit) {
- int vmult = 1;
- if ("hr".equalsIgnoreCase(unit)) {
- vmult = 3600 * 1000;
- } else if ("min".equalsIgnoreCase(unit)) {
- vmult = 60 * 1000;
- } else if ("sec".equalsIgnoreCase(unit)) {
- vmult = 1000;
- } else
- throw new IllegalArgumentException(" invalid unit value specified for frequency (hr,min,sec)");
- return val * vmult;
+ }
+
+ public static Event getEvent(Pattern pattern, Events events) {
+ for (Event event : events.getEvent()) {
+ if (event.getType().equals(pattern.getEvent().getType())) {
+ return event;
+ }
+ }
+ throw new IllegalArgumentException(" Unknown event type"
+ + pattern.getEvent().getType());
+ }
+
+ public static Node getEventLocation(Pattern pattern,
+ List<Node> candidateLocations, Cluster cluster) {
+ ValueType value = new ValueType(pattern.getEvent().getNodeid()
+ .getValue());
+ Node location = null;
+ Type vtype = value.getType();
- }
+ switch (vtype) {
+ case ABS:
+ location = getNodeFromId(value.getAbsoluteValue(), cluster);
+ break;
+ case RANDOM_RANGE:
+ int nodeIndex = Randomizer.getInstance().getRandomInt(0,
+ candidateLocations.size() - 1);
+ location = candidateLocations.get(nodeIndex);
+ break;
+ case RANDOM_MIN_MAX:
+ throw new IllegalStateException(
+ " Canont configure a min max value range for location");
+ }
+ return location;
- public static Event getEvent(Pattern pattern) {
- for (Event event : EventDriver.getEvents().getEvent()) {
- if (event.getType().equals(pattern.getEvent().getType())) {
- return event;
- }
- }
- throw new IllegalArgumentException(" Unknown event type" + pattern.getEvent().getType());
- }
+ }
- public static Node getEventLocation(Pattern pattern, List<Node> candidateLocations, Cluster cluster) {
- ValueType value = new ValueType(pattern.getEvent().getNodeid().getValue());
- Node location = null;
- Type vtype = value.getType();
+ public static List<Node> getCandidateLocations(Pattern pattern,
+ Cluster cluster) {
+ ValueType value = new ValueType(pattern.getEvent().getNodeid()
+ .getValue());
+ List<Node> candidateList = new ArrayList<Node>();
+ switch (value.getType()) {
+ case ABS:
+ candidateList.add(getNodeFromId(value.getAbsoluteValue(), cluster));
+ break;
+ case RANDOM_RANGE:
+ boolean anyOption = false;
+ String[] values = value.getRangeSet();
+ for (String v : values) {
+ if (v.equalsIgnoreCase("ANY")) {
+ anyOption = true;
+ }
+ }
+ if (anyOption) {
+ for (Node node : cluster.getNode()) {
+ candidateList.add(node);
+ }
+ } else {
+ boolean found = false;
+ for (String v : values) {
+ for (Node node : cluster.getNode()) {
+ if (node.getId().equals(v)) {
+ candidateList.add(node);
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ throw new IllegalStateException("Unknonw nodeId : " + v);
+ }
+ found = false;
+ }
- switch (vtype) {
- case ABS:
- location = getNodeFromId(value.getAbsoluteValue(), cluster);
- break;
- case RANDOM_RANGE:
- int nodeIndex = Randomizer.getInstance().getRandomInt(0, candidateLocations.size() - 1);
- location = candidateLocations.get(nodeIndex);
- break;
- case RANDOM_MIN_MAX:
- throw new IllegalStateException(" Canont configure a min max value range for location");
- }
- return location;
+ }
+ String[] excluded = value.getRangeExcluded();
+ if (excluded != null && excluded.length > 0) {
+ List<Node> markedForRemoval = new ArrayList<Node>();
+ for (String exclusion : excluded) {
+ for (Node node : candidateList) {
+ if (node.getId().equals(exclusion)) {
+ markedForRemoval.add(node);
+ }
+ }
+ }
+ candidateList.removeAll(markedForRemoval);
+ }
+ break;
+ case RANDOM_MIN_MAX:
+ throw new IllegalStateException(
+ " Invalid value configured for location");
+ }
+ return candidateList;
+ }
- }
+ private static Node getNodeFromId(String nodeid, Cluster cluster) {
+ if (nodeid.equals(EventDriver.CLIENT_NODE.getId())) {
+ return EventDriver.CLIENT_NODE;
+ }
- public static List<Node> getCandidateLocations(Pattern pattern, Cluster cluster) {
- ValueType value = new ValueType(pattern.getEvent().getNodeid().getValue());
- List<Node> candidateList = new ArrayList<Node>();
- switch (value.getType()) {
- case ABS:
- candidateList.add(getNodeFromId(value.getAbsoluteValue(), cluster));
- break;
- case RANDOM_RANGE:
- boolean anyOption = false;
- String[] values = value.getRangeSet();
- for (String v : values) {
- if (v.equalsIgnoreCase("ANY")) {
- anyOption = true;
- }
- }
- if (anyOption) {
- for (Node node : cluster.getNode()) {
- candidateList.add(node);
- }
- } else {
- boolean found = false;
- for (String v : values) {
- for (Node node : cluster.getNode()) {
- if (node.getId().equals(v)) {
- candidateList.add(node);
- found = true;
- break;
- }
- }
- if (!found) {
- throw new IllegalStateException("Unknonw nodeId : " + v);
- }
- found = false;
- }
+ if (nodeid.equals(cluster.getMasterNode().getId())) {
+ return new Node(cluster.getMasterNode().getId(), cluster
+ .getMasterNode().getIp(), null, null, null);
+ }
- }
- String[] excluded = value.getRangeExcluded();
- if (excluded != null && excluded.length > 0) {
- List<Node> markedForRemoval = new ArrayList<Node>();
- for (String exclusion : excluded) {
- for (Node node : candidateList) {
- if (node.getId().equals(exclusion)) {
- markedForRemoval.add(node);
- }
- }
- }
- candidateList.removeAll(markedForRemoval);
- }
- break;
- case RANDOM_MIN_MAX:
- throw new IllegalStateException(" Invalid value configured for location");
- }
- return candidateList;
- }
+ List<Node> nodeList = cluster.getNode();
+ for (Node node : nodeList) {
+ if (node.getId().equals(nodeid)) {
+ return node;
+ }
+ }
+ StringBuffer buffer = new StringBuffer();
+ buffer.append(EventDriver.CLIENT_NODE.getId() + ",");
+ buffer.append(cluster.getMasterNode().getId() + ",");
+ for (Node v : cluster.getNode()) {
+ buffer.append(v.getId() + ",");
+ }
+ buffer.deleteCharAt(buffer.length() - 1);
+ throw new IllegalArgumentException("Unknown node id :" + nodeid
+ + " valid ids:" + buffer);
+ }
- private static Node getNodeFromId(String nodeid, Cluster cluster) {
- if (nodeid.equals(EventDriver.CLIENT_NODE.getId())) {
- return EventDriver.CLIENT_NODE;
- }
+ public static void executeEventScript(Node node, String script,
+ List<String> args, Cluster cluster) throws IOException,
+ InterruptedException {
+ List<String> pargs = new ArrayList<String>();
+ pargs.add("/bin/bash");
+ pargs.add(EventDriver.getEventsDir() + "/" + EXECUTE_SCRIPT);
+ StringBuffer argBuffer = new StringBuffer();
+ String env = EventDriver.getStringifiedEnv(cluster) + " " + IP_LOCATION
+ + "=" + node.getIp();
+ if (args != null) {
+ for (String arg : args) {
+ argBuffer.append(arg + " ");
+ }
+ }
+ ProcessBuilder pb = new ProcessBuilder(pargs);
+ pb.environment().putAll(EventDriver.getEnvironment());
+ pb.environment().put(IP_LOCATION, node.getIp());
+ pb.environment().put(CLUSTER_ENV, env);
+ pb.environment().put(SCRIPT, script);
+ pb.environment().put(ARGS, argBuffer.toString());
+ pb.start();
+ }
- if (nodeid.equals(cluster.getMasterNode().getId())) {
- return new Node(cluster.getMasterNode().getId(), cluster.getMasterNode().getIp(), null, null, null);
- }
+ public static void executeLocalScript(Node node, String script,
+ List<String> args) throws IOException, InterruptedException {
+ List<String> pargs = new ArrayList<String>();
+ pargs.add("/bin/bash");
+ pargs.add(script);
+ if (args != null) {
+ pargs.addAll(args);
+ }
+ ProcessBuilder pb = new ProcessBuilder(pargs);
+ pb.environment().putAll(EventDriver.getEnvironment());
+ pb.environment().put(IP_LOCATION, node.getIp());
+ pb.start();
+ }
- List<Node> nodeList = cluster.getNode();
- for (Node node : nodeList) {
- if (node.getId().equals(nodeid)) {
- return node;
- }
- }
- StringBuffer buffer = new StringBuffer();
- buffer.append(EventDriver.CLIENT_NODE.getId() + ",");
- buffer.append(cluster.getMasterNode().getId() + ",");
- for (Node v : cluster.getNode()) {
- buffer.append(v.getId() + ",");
- }
- buffer.deleteCharAt(buffer.length() - 1);
- throw new IllegalArgumentException("Unknown node id :" + nodeid + " valid ids:" + buffer);
- }
-
- public static void executeEventScript(Node node, String script, List<String> args, Cluster cluster)
- throws IOException, InterruptedException {
- List<String> pargs = new ArrayList<String>();
- pargs.add("/bin/bash");
- pargs.add(EventDriver.getHomeDir() + "/" + EXECUTE_SCRIPT);
- StringBuffer argBuffer = new StringBuffer();
- String env = EventDriver.getStringifiedEnv(cluster) + " " + IP_LOCATION + "=" + node.getIp();
- if (args != null) {
- for (String arg : args) {
- argBuffer.append(arg + " ");
- }
- }
- ProcessBuilder pb = new ProcessBuilder(pargs);
- pb.environment().putAll(EventDriver.getEnvironment());
- pb.environment().put(IP_LOCATION, node.getIp());
- pb.environment().put(CLUSTER_ENV, env);
- pb.environment().put(SCRIPT, script);
- pb.environment().put(ARGS, argBuffer.toString());
- pb.start();
- }
-
- public static void executeLocalScript(Node node, String script, List<String> args) throws IOException,
- InterruptedException {
- List<String> pargs = new ArrayList<String>();
- pargs.add("/bin/bash");
- pargs.add(script);
- if (args != null) {
- pargs.addAll(args);
- }
- ProcessBuilder pb = new ProcessBuilder(pargs);
- pb.environment().putAll(EventDriver.getEnvironment());
- pb.environment().put(IP_LOCATION, node.getIp());
- pb.start();
- }
-
- public static List<String> getEventArgs(Pattern pattern) {
- List<String> pargs = new ArrayList<String>();
- if (pattern.getEvent().getPargs() == null) {
- return pargs;
- }
- String[] args = pattern.getEvent().getPargs().split(" ");
- for (String arg : args) {
- pargs.add(arg.trim());
- }
- return pargs;
- }
+ public static List<String> getEventArgs(Pattern pattern) {
+ List<String> pargs = new ArrayList<String>();
+ if (pattern.getEvent().getPargs() == null) {
+ return pargs;
+ }
+ String[] args = pattern.getEvent().getPargs().split(" ");
+ for (String arg : args) {
+ pargs.add(arg.trim());
+ }
+ return pargs;
+ }
}
diff --git a/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventrixClient.java b/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventrixClient.java
index 3792eb8..4cd4b82 100644
--- a/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventrixClient.java
+++ b/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventrixClient.java
@@ -15,15 +15,21 @@
package edu.uci.ics.asterix.event.management;
import java.io.File;
+import java.io.FileNotFoundException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Unmarshaller;
+
import edu.uci.ics.asterix.event.driver.EventDriver;
import edu.uci.ics.asterix.event.schema.cluster.Cluster;
import edu.uci.ics.asterix.event.schema.cluster.Node;
+import edu.uci.ics.asterix.event.schema.event.Events;
import edu.uci.ics.asterix.event.schema.pattern.Event;
import edu.uci.ics.asterix.event.schema.pattern.Nodeid;
import edu.uci.ics.asterix.event.schema.pattern.Pattern;
@@ -32,146 +38,166 @@
public class EventrixClient {
- private static final Logger LOGGER = Logger.getLogger(EventrixClient.class.getName());
+ private static final Logger LOGGER = Logger.getLogger(EventrixClient.class
+ .getName());
- private EventTask[] tasks;
- private boolean dryRun = false;
- private LinkedBlockingQueue<EventTaskReport> msgInbox = new LinkedBlockingQueue<EventTaskReport>();
- private AtomicInteger pendingTasks = new AtomicInteger(0);
- private final Cluster cluster;
- private IPatternListener listener;
- private IOutputHandler outputHandler;
+ private EventTask[] tasks;
+ private boolean dryRun = false;
+ private LinkedBlockingQueue<EventTaskReport> msgInbox = new LinkedBlockingQueue<EventTaskReport>();
+ private AtomicInteger pendingTasks = new AtomicInteger(0);
+ private final Cluster cluster;
+ private IPatternListener listener;
+ private IOutputHandler outputHandler;
+ private Events events;
+ private String eventsDir;
- public EventrixClient(Cluster cluster, boolean dryRun, IOutputHandler outputHandler) throws Exception {
- this.cluster = cluster;
- this.dryRun = dryRun;
- this.outputHandler = outputHandler;
- if (!dryRun) {
- initializeCluster();
- }
- }
+ public EventrixClient(String eventsDir, Cluster cluster, boolean dryRun,
+ IOutputHandler outputHandler) throws Exception {
+ this.eventsDir = eventsDir;
+ this.events = initializeEvents();
+ this.cluster = cluster;
+ this.dryRun = dryRun;
+ this.outputHandler = outputHandler;
+ if (!dryRun) {
+ initializeCluster(eventsDir);
+ }
+ }
- public void submit(Patterns patterns) throws Exception {
- initTasks(patterns);
- try {
- waitForCompletion();
- } catch (InterruptedException ie) {
- LOGGER.info("Interrupted exception :" + ie);
- } catch (Exception e) {
- throw e;
- }
+ public void submit(Patterns patterns) throws Exception {
+ initTasks(patterns);
+ try {
+ waitForCompletion();
+ } catch (InterruptedException ie) {
+ LOGGER.info("Interrupted exception :" + ie);
+ } catch (Exception e) {
+ throw e;
+ }
- }
+ }
- public void submit(Patterns patterns, IPatternListener listener) throws Exception {
- this.listener = listener;
- initTasks(patterns);
- }
+ public void submit(Patterns patterns, IPatternListener listener)
+ throws Exception {
+ this.listener = listener;
+ initTasks(patterns);
+ }
- private void initTasks(Patterns patterns) {
- tasks = new EventTask[patterns.getPattern().size()];
- pendingTasks.set(tasks.length);
- int index = 0;
- for (Pattern pattern : patterns.getPattern()) {
- tasks[index] = new EventTask(pattern, this);
- tasks[index].start();
- index++;
- }
- }
+ private void initTasks(Patterns patterns) {
+ tasks = new EventTask[patterns.getPattern().size()];
+ pendingTasks.set(tasks.length);
+ int index = 0;
+ for (Pattern pattern : patterns.getPattern()) {
+ tasks[index] = new EventTask(pattern, this);
+ tasks[index].start();
+ index++;
+ }
+ }
- public Cluster getCluster() {
- return cluster;
- }
+ public Cluster getCluster() {
+ return cluster;
+ }
- public boolean isDryRun() {
- return dryRun;
- }
+ public boolean isDryRun() {
+ return dryRun;
+ }
- public void notifyCompletion(EventTaskReport report) {
+ public Events getEvents() {
+ return events;
+ }
- if (report.isSuccess()) {
- pendingTasks.decrementAndGet();
- if (listener != null) {
- listener.eventCompleted(report);
- if (pendingTasks.get() == 0) {
- listener.jobCompleted();
- }
- } else {
- try {
- msgInbox.put(report);
- } catch (InterruptedException e) {
- }
- }
- } else {
- for (EventTask t : tasks) {
- if (t.getState() == EventTask.State.INITIALIZED || t.getState() == EventTask.State.IN_PROGRESS) {
- t.cancel();
- }
- }
- if (listener != null) {
- listener.jobFailed(report);
- } else {
- try {
- msgInbox.put(report);
- } catch (InterruptedException e) {
- }
- }
- }
- }
+ public String getEventsDir() {
+ return eventsDir;
+ }
- public void notifyFailure(EventTaskReport report) {
+ public synchronized void notifyCompletion(EventTaskReport report) {
- }
+ if (report.isSuccess()) {
+ if (listener != null) {
+ pendingTasks.decrementAndGet();
+ listener.eventCompleted(report);
+ if (pendingTasks.get() == 0) {
+ listener.jobCompleted();
+ }
+ } else {
+ try {
+ msgInbox.put(report);
+ } catch (InterruptedException e) {
+ }
+ }
+ } else {
+ for (EventTask t : tasks) {
+ if (t.getState() == EventTask.State.INITIALIZED
+ || t.getState() == EventTask.State.IN_PROGRESS) {
+ t.cancel();
+ }
+ }
+ if (listener != null) {
+ listener.jobFailed(report);
+ } else {
+ try {
+ msgInbox.put(report);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ }
- private void waitForCompletion() throws Exception {
- while (true) {
- EventTaskReport report = msgInbox.take();
- if (report.isSuccess()) {
- if (pendingTasks.get() == 0) {
- break;
- }
- } else {
- throw new RuntimeException(report.getException().getMessage());
- }
- }
- }
+ private void waitForCompletion() throws Exception {
+ while (true) {
+ EventTaskReport report = msgInbox.take();
+ if (report.isSuccess()) {
+ if (pendingTasks.decrementAndGet() == 0) {
+ break;
+ }
+ } else {
+ throw new RuntimeException(report.getException().getMessage());
+ }
+ }
+ }
- private void initializeCluster() throws Exception {
- Patterns patterns = initPattern();
- submit(patterns);
- }
+ private void initializeCluster(String eventsDir) throws Exception {
+ Patterns patterns = initPattern(eventsDir);
+ submit(patterns);
+ }
- private Patterns initPattern() {
- Nodeid nodeid = new Nodeid(new Value(null, EventDriver.CLIENT_NODE.getId()));
- List<Pattern> patternList = new ArrayList<Pattern>();
- String workingDir = cluster.getWorkingDir().getDir();
- File eventsPkg = new File(EventDriver.getHomeDir() + File.separator + "events.pkg.tar");
- if (!eventsPkg.exists()) {
- throw new IllegalStateException(" Events package does not exist (" + eventsPkg.getAbsolutePath() + ")");
- }
+ private Patterns initPattern(String eventsDir) {
+ Nodeid nodeid = new Nodeid(new Value(null,
+ EventDriver.CLIENT_NODE.getId()));
+ List<Pattern> patternList = new ArrayList<Pattern>();
+ String workingDir = cluster.getWorkingDir().getDir();
+ String username = cluster.getUsername() == null ? System
+ .getProperty("user.name") : cluster.getUsername();
+ patternList.add(getDirectoryTransferPattern(username, eventsDir,
+ nodeid, cluster.getMasterNode().getIp(), workingDir));
- String username = cluster.getUsername() == null ? System.getProperty("user.name") : cluster.getUsername();
- patternList.add(getFileTransferPattern(username, eventsPkg.getAbsolutePath(), nodeid, cluster.getMasterNode()
- .getIp(), workingDir, true));
+ if (!cluster.getWorkingDir().isNFS()) {
+ for (Node node : cluster.getNode()) {
+ patternList.add(getDirectoryTransferPattern(username,
+ eventsDir, nodeid, node.getIp(), workingDir));
+ }
+ }
+ Patterns patterns = new Patterns(patternList);
+ return patterns;
+ }
- if (!cluster.getWorkingDir().isNFS()) {
- for (Node node : cluster.getNode()) {
- patternList.add(getFileTransferPattern(username, eventsPkg.getAbsolutePath(), nodeid, node.getIp(),
- workingDir, true));
- }
- }
- Patterns patterns = new Patterns(patternList);
- return patterns;
- }
+ private Pattern getDirectoryTransferPattern(String username, String src,
+ Nodeid srcNode, String destNodeIp, String destDir) {
+ String pargs = username + " " + src + " " + destNodeIp + " " + destDir;
+ Event event = new Event("directory_transfer", srcNode, pargs);
+ return new Pattern(null, 1, null, event);
+ }
- private Pattern getFileTransferPattern(String username, String src, Nodeid srcNode, String destNodeIp,
- String destDir, boolean unpack) {
- String pargs = username + " " + src + " " + destNodeIp + " " + destDir + " " + "unpack";
- Event event = new Event("file_transfer", srcNode, pargs);
- return new Pattern(null, 1, null, event);
- }
+ public IOutputHandler getErrorHandler() {
+ return outputHandler;
+ }
- public IOutputHandler getErrorHandler() {
- return outputHandler;
- }
+ private Events initializeEvents() throws JAXBException,
+ FileNotFoundException {
+ File file = new File(eventsDir + File.separator + "events"
+ + File.separator + "events.xml");
+ JAXBContext eventCtx = JAXBContext.newInstance(Events.class);
+ Unmarshaller unmarshaller = eventCtx.createUnmarshaller();
+ events = (Events) unmarshaller.unmarshal(file);
+ return events;
+ }
+
}
diff --git a/asterix-events/src/main/resources/events/asterix_deploy/asterix_deploy.sh b/asterix-events/src/main/resources/events/asterix_deploy/asterix_deploy.sh
new file mode 100755
index 0000000..532e559
--- /dev/null
+++ b/asterix-events/src/main/resources/events/asterix_deploy/asterix_deploy.sh
@@ -0,0 +1,11 @@
+MANAGIX_HOME=$1
+HYRACKS_CLI=$MANAGIX_HOME/asterix/hyracks-cli/bin/hyrackscli
+if ! [ -x $HYRACKS_CLI ]
+then
+ chmod +x $HYRACKS_CLI
+fi
+ASTERIX_ZIP=$2
+HOST=$3
+echo "connect to \"$HOST\";" > temp
+echo "create application asterix \"$ASTERIX_ZIP\";" >> temp
+($HYRACKS_CLI < temp)
diff --git a/asterix-events/src/main/resources/events/backup/backup.sh b/asterix-events/src/main/resources/events/backup/backup.sh
new file mode 100755
index 0000000..cff37df
--- /dev/null
+++ b/asterix-events/src/main/resources/events/backup/backup.sh
@@ -0,0 +1,19 @@
+WORKING_DIR=$1
+ASTERIX_INSTANCE_NAME=$2
+ASTERIX_DATA_DIR=$3
+BACKUP_ID=$4
+HDFS_URL=$5
+HADOOP_VERSION=$6
+HDFS_BACKUP_DIR=$7
+NODE_ID=$8
+
+export HADOOP_HOME=$WORKING_DIR/hadoop-$HADOOP_VERSION
+
+nodeStores=$(echo $ASTERIX_DATA_DIR | tr "," "\n")
+for nodeStore in $nodeStores
+do
+ NODE_BACKUP_DIR=$HDFS_BACKUP_DIR/$ASTERIX_INSTANCE_NAME/$BACKUP_ID/$NODE_ID/$nodeStore
+ $HADOOP_HOME/bin/hadoop fs -mkdir $HDFS_URL/$NODE_BACKUP_DIR
+ echo "$HADOOP_HOME/bin/hadoop fs -copyFromLocal $nodeStore/$NODE_ID $HDFS_URL/$NODE_BACKUP_DIR" >> ~/backup.log
+ $HADOOP_HOME/bin/hadoop fs -copyFromLocal $nodeStore/$NODE_ID/$ASTERIX_INSTANCE_NAME/* $HDFS_URL/$NODE_BACKUP_DIR/
+done
diff --git a/asterix-events/src/main/resources/events/cc_failure/cc_failure.sh b/asterix-events/src/main/resources/events/cc_failure/cc_failure.sh
new file mode 100755
index 0000000..0855f5f
--- /dev/null
+++ b/asterix-events/src/main/resources/events/cc_failure/cc_failure.sh
@@ -0,0 +1,6 @@
+#kill -9 `ps -ef | grep hyracks | grep -v grep | cut -d "/" -f1 | tr -s " " | cut -d " " -f2`
+CC_PARENT_ID_INFO=`ps -ef | grep hyracks | grep cc_start | grep -v ssh`
+CC_PARENT_ID=`echo $CC_PARENT_ID_INFO | tr -s " " | cut -d " " -f2`
+CC_ID_INFO=`ps -ef | grep hyracks | grep $CC_PARENT_ID | grep -v bash`
+CC_ID=`echo $CC_ID_INFO | tr -s " " | cut -d " " -f2`
+kill -9 $CC_ID
diff --git a/asterix-events/src/main/resources/events/cc_start/cc_start.sh b/asterix-events/src/main/resources/events/cc_start/cc_start.sh
new file mode 100755
index 0000000..e0b29e0
--- /dev/null
+++ b/asterix-events/src/main/resources/events/cc_start/cc_start.sh
@@ -0,0 +1,9 @@
+if [ ! -d $LOG_DIR ];
+then
+ mkdir -p $LOG_DIR
+fi
+if [ ! -z $1 ];
+then
+ JAVA_OPTS="$JAVA_OPTS -Xdebug -Xrunjdwp:transport=dt_socket,address=$1,server=y,suspend=n"
+fi
+$HYRACKS_HOME/bin/hyrackscc -client-net-ip-address $CLIENT_NET_IP -client-net-port 1098 -cluster-net-ip-address $CLUSTER_NET_IP -cluster-net-port 1099 -http-port 8888 &> $LOG_DIR/cc.log
diff --git a/asterix-events/src/main/resources/events/events.xml b/asterix-events/src/main/resources/events/events.xml
new file mode 100644
index 0000000..857ee5a
--- /dev/null
+++ b/asterix-events/src/main/resources/events/events.xml
@@ -0,0 +1,100 @@
+<events xmlns="events">
+ <event>
+ <type>node_join</type>
+ <script>node_join/nc_join.sh</script>
+ <description>Creates a NodeController process at a specified location.</description>
+ <args>location_of_cc location(hostname/ip_address) node_controller_id</args>
+ <daemon>true</daemon>
+ </event>
+ <event>
+ <type>node_failure</type>
+ <script>node_failure/nc_failure.sh</script>
+ <description>Kills a NodeController process at a specified location.</description>
+ <args>node_controller_id</args>
+ <daemon>false</daemon>
+ </event>
+ <event>
+ <type>cc_start</type>
+ <script>cc_start/cc_start.sh</script>
+ <description>Starts a ClusterController process at a specified location.</description>
+ <args></args>
+ <daemon>true</daemon>
+ </event>
+ <event>
+ <type>cc_failure</type>
+ <script>cc_failure/cc_failure.sh</script>
+ <description>Kills the Cluster Controller process running at a specified location.</description>
+ <args></args>
+ <daemon>false</daemon>
+ </event>
+ <event>
+ <type>node_restart</type>
+ <script>node_restart/nc_restart.sh</script>
+ <description>Shuts and restarts a NodeControllerProcess after a specified time interval, at a specified location</description>
+ <args>address of cc, node controller id and sleep interval(seconds)</args>
+ <daemon>true</daemon>
+ </event>
+ <event>
+ <type>asterix_deploy</type>
+ <script>asterix_deploy/asterix_deploy.sh</script>
+ <description>Deploys Asterix application on a cluster running hyracks</description>
+ <args>IP address of the node running the hyracks cluster controller</args>
+ <daemon>false</daemon>
+ </event>
+ <event>
+ <type>zookeeper_start</type>
+ <script>zookeeper/start.sh</script>
+ <description>Launches ZooKeeper server process</description>
+ <args>IP address of the ZooKeeper server</args>
+ <daemon>true</daemon>
+ </event>
+ <event>
+ <type>zookeeper_stop</type>
+ <script>zookeeper/stop.sh</script>
+ <description>Terminates ZooKeeper server process</description>
+ <args>IP address of the ZooKeeper server</args>
+ <daemon>false</daemon>
+ </event>
+ <event>
+ <type>file_transfer</type>
+ <script>file/transfer.sh</script>
+ <description>Copies a file on the local file system to a remote node</description>
+ <args>local_source_path destination_node destination_path</args>
+ <daemon>false</daemon>
+ </event>
+ <event>
+ <type>directory_transfer</type>
+ <script>file/dir_transfer.sh</script>
+ <description>Copies a directory (and its contents) on the local file system to a remote node</description>
+ <args>local_source_path destination_node destination_path</args>
+ <daemon>false</daemon>
+ </event>
+ <event>
+ <type>file_delete</type>
+ <script>file/delete.sh</script>
+ <description>Deletes a file on the local file system to a remote node</description>
+ <args>local_source_path destination_node destination_path</args>
+ <daemon>false</daemon>
+ </event>
+ <event>
+ <type>backup</type>
+ <script>backup/backup.sh</script>
+ <description>Takes a backup of an Asterix instance</description>
+ <args>Asterix_data_dir HDFSurl</args>
+ <daemon>false</daemon>
+ </event>
+ <event>
+ <type>restore</type>
+ <script>restore/restore.sh</script>
+ <description>Restores an Asterix instance from a back up</description>
+ <args>Asterix_data_dir HDFSurl</args>
+ <daemon>false</daemon>
+ </event>
+ <event>
+ <type>hdfs_delete</type>
+ <script>hdfs/delete.sh</script>
+ <description>Deletes an HDFS path</description>
+ <args>WorkingDir HadoopVersion HDFSUrl Path_to_Delete</args>
+ <daemon>false</daemon>
+ </event>
+</events>
diff --git a/asterix-events/src/main/resources/events/file/delete.sh b/asterix-events/src/main/resources/events/file/delete.sh
new file mode 100755
index 0000000..d5ac3ff
--- /dev/null
+++ b/asterix-events/src/main/resources/events/file/delete.sh
@@ -0,0 +1,3 @@
+PATH_TO_DELETE=$1
+echo "rm -rf $PATH_TO_DELETE" >> ~/backup.log
+rm -rf $PATH_TO_DELETE
diff --git a/asterix-events/src/main/resources/events/file/dir_transfer.sh b/asterix-events/src/main/resources/events/file/dir_transfer.sh
new file mode 100755
index 0000000..af7da70
--- /dev/null
+++ b/asterix-events/src/main/resources/events/file/dir_transfer.sh
@@ -0,0 +1,7 @@
+USERNAME=$1
+DIR_TO_TRANSFER=$2
+DEST_HOST=$3
+DEST_DIR=$4
+ssh -l $USERNAME $DEST_HOST "mkdir -p $DEST_DIR"
+echo "scp -r $DIR_TO_TRANSFER $USERNAME@$DEST_HOST:$DEST_DIR/"
+scp -r $DIR_TO_TRANSFER $USERNAME@$DEST_HOST:$DEST_DIR/
diff --git a/asterix-events/src/main/resources/events/file/transfer.sh b/asterix-events/src/main/resources/events/file/transfer.sh
new file mode 100755
index 0000000..08f0b43
--- /dev/null
+++ b/asterix-events/src/main/resources/events/file/transfer.sh
@@ -0,0 +1,23 @@
+USERNAME=$1
+FILE_TO_TRANSFER=$2
+DEST_HOST=$3
+DEST_DIR=$4
+POST_ACTION=$5
+ssh -l $USERNAME $DEST_HOST "mkdir -p $DEST_DIR"
+echo "scp $FILE_TO_TRANSFER $USERNAME@$DEST_HOST:$DEST_DIR/"
+scp $FILE_TO_TRANSFER $USERNAME@$DEST_HOST:$DEST_DIR/
+if [ $POST_ACTION == "unpack" ]
+ then
+ filename=`echo ${FILE_TO_TRANSFER##*/}`
+ fileType=`echo ${FILE_TO_TRANSFER##*.}`
+ if [ $fileType == "tar" ]
+ then
+ echo "ssh -l $USERNAME $DEST_HOST cd $DEST_DIR && tar xf $filename"
+ ssh -l $USERNAME $DEST_HOST "cd $DEST_DIR && tar xf $filename"
+ else if [ $fileType == "zip" ]
+ then
+ echo "ssh -l $USERNAME $DEST_HOST unzip -o -q -d $DEST_DIR $DEST_DIR/$filename"
+ ssh -l $USERNAME $DEST_HOST "unzip -o -q -d $DEST_DIR $DEST_DIR/$filename"
+ fi
+ fi
+fi
diff --git a/asterix-events/src/main/resources/events/hdfs/delete.sh b/asterix-events/src/main/resources/events/hdfs/delete.sh
new file mode 100755
index 0000000..6ff54ee
--- /dev/null
+++ b/asterix-events/src/main/resources/events/hdfs/delete.sh
@@ -0,0 +1,7 @@
+WORKING_DIR=$1
+HADOOP_VERSION=$2
+HDFS_URL=$3
+HDFS_PATH=$4
+export HADOOP_HOME=$WORKING_DIR/hadoop-$HADOOP_VERSION
+echo "$HADOOP_HOME/bin/hadoop fs -rmr $HDFS_URL/$HDFS_PATH"
+$HADOOP_HOME/bin/hadoop fs -rmr $HDFS_URL/$HDFS_PATH
diff --git a/asterix-events/src/main/resources/events/node_failure/nc_failure.sh b/asterix-events/src/main/resources/events/node_failure/nc_failure.sh
new file mode 100755
index 0000000..b853be1
--- /dev/null
+++ b/asterix-events/src/main/resources/events/node_failure/nc_failure.sh
@@ -0,0 +1,19 @@
+NC_ID=$1
+
+#if [ $NC_ID == 'ANY' ]
+#then
+# NC_ID="."
+#fi
+#
+#USER=`who am i | tr -s " " | cut -d " " -f1`
+#PARENT_ID=`ps -ef | tr -s " " | grep nc_join | grep -v grep | grep -v ssh | grep $NC_ID | cut -d " " -f2 | head -n 1`
+#PID=`ps -ef | tr -s " " | grep hyracks | grep -v grep | grep -v nc_join | grep $PARENT_ID | cut -d " " -f2 | head -n 1`
+#kill -9 $PID
+#
+
+
+INFO=`ps -ef | grep nc_join | grep -v grep | grep -v ssh| grep $NC_ID | head -n 1`
+PARENT_ID=`echo $INFO | cut -d " " -f2`
+PID_INFO=`ps -ef | grep hyracks | grep -v grep | grep -v nc_join | grep $PARENT_ID`
+PID=`echo $PID_INFO | cut -d " " -f2`
+kill -9 $PID
diff --git a/asterix-events/src/main/resources/events/node_join/nc_join.sh b/asterix-events/src/main/resources/events/node_join/nc_join.sh
new file mode 100755
index 0000000..62d71ce
--- /dev/null
+++ b/asterix-events/src/main/resources/events/node_join/nc_join.sh
@@ -0,0 +1,11 @@
+CC_HOST=$1
+NC_ID=$2
+if [ ! -d $LOG_DIR ];
+then
+ mkdir -p $LOG_DIR
+fi
+if [ ! -z $3 ];
+then
+ JAVA_OPTS="$JAVA_OPTS -Xdebug -Xrunjdwp:transport=dt_socket,address=$3,server=y,suspend=n"
+fi
+$HYRACKS_HOME/bin/hyracksnc -node-id $NC_ID -cc-host $CC_HOST -cc-port 1099 -cluster-net-ip-address $IP_LOCATION -data-ip-address $IP_LOCATION &> $LOG_DIR/${NC_ID}.log
diff --git a/asterix-events/src/main/resources/events/node_restart/nc_restart.sh b/asterix-events/src/main/resources/events/node_restart/nc_restart.sh
new file mode 100755
index 0000000..961ce8d
--- /dev/null
+++ b/asterix-events/src/main/resources/events/node_restart/nc_restart.sh
@@ -0,0 +1,21 @@
+CC_HOST=$1
+NC_ID=$2
+SLEEP_TIME=$3
+
+if [ $NC_ID == 'ANY' ]
+then
+ NC_ID="."
+ PARENT_ID=`ps -ej | tr -s " " | grep nc_join | grep -v grep | grep -v ssh | cut -d " " -f2 | head -n 1`
+ PARENT_PROCESS_ENTRY=`ps -ef | grep $PARENT_ID | grep -v grep | head -n 1`
+ NC_ID=`echo ${PARENT_PROCESS_ENTRY##* }`
+ echo "NCid is $NC_ID" >> ~/try.txt
+else
+ PARENT_ID=`ps -ej | tr -s " " | grep nc_join | grep -v grep | grep -v ssh | grep $NC_ID | cut -d " " -f2 | head -n 1`
+fi
+
+PID=`ps -ej | tr -s " " | grep hyracks | grep -v grep | grep -v nc_join | grep $PARENT_ID | cut -d " " -f2 | head -n 1`
+kill -9 $PID
+
+sleep $3
+
+$HYRACKS_HOME/hyracks-server/target/hyracks-server-0.2.2-SNAPSHOT-binary-assembly/bin/hyracksnc -node-id $NC_ID -cc-host $CC_HOST -cc-port 1099 -cluster-net-ip-address $IP_LOCATION -data-ip-address $IP_LOCATION
diff --git a/asterix-events/src/main/resources/events/restore/restore.sh b/asterix-events/src/main/resources/events/restore/restore.sh
new file mode 100755
index 0000000..96f3db4
--- /dev/null
+++ b/asterix-events/src/main/resources/events/restore/restore.sh
@@ -0,0 +1,25 @@
+WORKING_DIR=$1
+ASTERIX_INSTANCE_NAME=$2
+ASTERIX_DATA_DIR=$3
+BACKUP_ID=$4
+HDFS_URL=$5
+HADOOP_VERSION=$6
+HDFS_BACKUP_DIR=$7
+NODE_ID=$8
+
+export HADOOP_HOME=$WORKING_DIR/hadoop-$HADOOP_VERSION
+
+nodeStores=$(echo $ASTERIX_DATA_DIR | tr "," "\n")
+for nodeStore in $nodeStores
+do
+ NODE_BACKUP_DIR=$HDFS_BACKUP_DIR/$ASTERIX_INSTANCE_NAME/$BACKUP_ID/$NODE_ID/$nodeStore
+ DEST_DIR=$nodeStore/$NODE_ID/$ASTERIX_INSTANCE_NAME
+ if [ ! -d $DEST_DIR ]
+ then
+ mkdir -p $DEST_DIR
+ else
+ rm -rf $DEST_DIR/*
+ fi
+ echo "$HADOOP_HOME/bin/hadoop fs -copyToLocal $HDFS_URL/$NODE_BACKUP_DIR/ $DEST_DIR/" >> ~/restore.log
+ $HADOOP_HOME/bin/hadoop fs -copyToLocal $HDFS_URL/$NODE_BACKUP_DIR/* $DEST_DIR/
+done
diff --git a/asterix-events/src/main/resources/scripts/execute.sh b/asterix-events/src/main/resources/scripts/execute.sh
index 114a12a..72234c1 100755
--- a/asterix-events/src/main/resources/scripts/execute.sh
+++ b/asterix-events/src/main/resources/scripts/execute.sh
@@ -1,16 +1,22 @@
-#!/bin/bash
-val=0
-line=""
-for x in $@
-do
- if [[ $val == 0 ]]
- then
- line="$x="
- val=1
- else
- msg="$line$x"
- echo $line >> envr
- val=0
- fi
-done
-cat ./envr
+USERNAME=$1
+if [ $DAEMON == "false" ]; then
+ if [ -z $USERNAME ]
+ then
+ cmd_output=$(ssh $IP_LOCATION "$ENV $SCRIPT $ARGS" 2>&1 >/dev/null)
+ echo "ssh $IP_LOCATION $ENV $SCRIPT $ARGS" >> ./execute.log
+ echo "$cmd_output"
+ else
+ echo "ssh -l $USERNAME $IP_LOCATION $ENV $SCRIPT $ARGS" >> ./execute.log
+ cmd_output=$(ssh -l $USERNAME $IP_LOCATION "$ENV $SCRIPT $ARGS" 2>&1 >/dev/null)
+ echo "$cmd_output"
+ fi
+else
+ if [ -z $USERNAME ];
+ then
+ echo "ssh $IP_LOCATION $ENV $SCRIPT $ARGS &" >> ./execute.log
+ ssh $IP_LOCATION "$ENV $SCRIPT $ARGS" &
+ else
+ echo "ssh -l $USERNAME $IP_LOCATION $ENV $SCRIPT $ARGS &" >> ./execute.log
+ ssh -l $USERNAME $IP_LOCATION "$ENV $SCRIPT $ARGS" &
+ fi
+fi