managix changes post feedback (taking checkpoint)
git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_lsm_stabilization_installer@1446 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-events/src/main/java/edu/uci/ics/asterix/event/driver/EventDriver.java b/asterix-events/src/main/java/edu/uci/ics/asterix/event/driver/EventDriver.java
index d3c338d..66c0102 100644
--- a/asterix-events/src/main/java/edu/uci/ics/asterix/event/driver/EventDriver.java
+++ b/asterix-events/src/main/java/edu/uci/ics/asterix/event/driver/EventDriver.java
@@ -41,7 +41,7 @@
public class EventDriver {
public static final String CLIENT_NODE_ID = "client_node";
- public static final Node CLIENT_NODE = new Node(CLIENT_NODE_ID, "127.0.0.1", null, null, null, null, null);
+ public static final Node CLIENT_NODE = new Node(CLIENT_NODE_ID, "127.0.0.1", null, null, null, null, null, null);
private static String eventsDir;
private static Events events;
@@ -141,9 +141,9 @@
args.add(scriptDirSuffix);
Node clientNode = new Node();
clientNode.setId("client");
- clientNode.setIp("127.0.0.1");
+ clientNode.setClusterIp("127.0.0.1");
for (Node node : cluster.getNode()) {
- args.add(node.getIp());
+ args.add(node.getClusterIp());
}
EventUtil.executeLocalScript(clientNode, eventsDir + "/" + "events" + "/" + "prepare.sh", args);
}
@@ -153,11 +153,10 @@
args.add(scriptDirSuffix);
Node clientNode = new Node();
clientNode.setId("client");
- clientNode.setIp("127.0.0.1");
+ clientNode.setClusterIp("127.0.0.1");
for (Node node : cluster.getNode()) {
- args.add(node.getIp());
+ args.add(node.getClusterIp());
}
EventUtil.executeLocalScript(clientNode, eventsDir + "/" + "events" + "/" + "cleanup.sh", args);
}
-
}
diff --git a/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventExecutor.java b/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventExecutor.java
index 0157f65..6d89c88 100644
--- a/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventExecutor.java
+++ b/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventExecutor.java
@@ -18,6 +18,7 @@
import java.io.File;
import java.io.IOException;
import java.io.StringWriter;
+import java.math.BigInteger;
import java.util.ArrayList;
import java.util.List;
@@ -44,21 +45,28 @@
List<String> pargs = new ArrayList<String>();
pargs.add("/bin/bash");
pargs.add(client.getEventsDir() + File.separator + "scripts" + File.separator + EXECUTE_SCRIPT);
- StringBuffer envBuffer = new StringBuffer(IP_LOCATION + "=" + node.getIp() + " ");
+ StringBuffer envBuffer = new StringBuffer(IP_LOCATION + "=" + node.getClusterIp() + " ");
if (!node.getId().equals(EventDriver.CLIENT_NODE_ID) && cluster.getEnv() != null) {
for (Property p : cluster.getEnv().getProperty()) {
if (p.getKey().equals("JAVA_HOME")) {
String val = node.getJavaHome() == null ? p.getValue() : node.getJavaHome();
envBuffer.append(p.getKey() + "=" + val + " ");
} else if (p.getKey().equals("JAVA_OPTS")) {
- String val = "\"" + "-Xmx"
- + (node.getJavaHeap() == null ? cluster.getJavaHeap() : node.getJavaHeap());
- if (node.getDebug() != null) {
- val = val + " " + "-Xdebug -Xrunjdwp:transport=dt_socket,address=" + node.getDebug().intValue()
- + "," + "server=y,suspend=n";
+ StringBuilder builder = new StringBuilder();
+ builder.append("\"");
+ String javaOpts = (node.getJavaOpts() == null ? cluster.getJavaOpts() : node.getJavaOpts());
+ if (javaOpts != null) {
+ builder.append(javaOpts);
}
- val = val + "\"";
- envBuffer.append(p.getKey() + "=" + val + " ");
+ if (cluster.isDebugEnabled() != null && cluster.isDebugEnabled().booleanValue()) {
+ BigInteger debugPort = node.getDebug() == null ? cluster.getDebug() : node.getDebug();
+ if (debugPort != null) {
+ builder.append("-Xdebug -Xrunjdwp:transport=dt_socket,address=" + debugPort.intValue()
+ + "," + "server=y,suspend=n");
+ }
+ }
+ builder.append("\"");
+ envBuffer.append(p.getKey() + "=" + builder + " ");
} else {
envBuffer.append(p.getKey() + "=" + p.getValue() + " ");
}
@@ -66,6 +74,7 @@
}
pargs.add(cluster.getUsername() == null ? System.getProperty("user.name") : cluster.getUsername());
}
+
StringBuffer argBuffer = new StringBuffer();
if (args != null && args.size() > 0) {
for (String arg : args) {
@@ -74,7 +83,7 @@
}
ProcessBuilder pb = new ProcessBuilder(pargs);
- pb.environment().put(IP_LOCATION, node.getIp());
+ pb.environment().put(IP_LOCATION, node.getClusterIp());
pb.environment().put(CLUSTER_ENV, envBuffer.toString());
pb.environment().put(SCRIPT, script);
pb.environment().put(ARGS, argBuffer.toString());
diff --git a/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventUtil.java b/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventUtil.java
index bf31e9f..d484947 100644
--- a/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventUtil.java
+++ b/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventUtil.java
@@ -31,232 +31,215 @@
public class EventUtil {
- public static final String EVENTS_DIR = "events";
- public static final String CLUSTER_CONF = "config/cluster.xml";
- public static final String PATTERN_CONF = "config/pattern.xml";
- public static final DateFormat dateFormat = new SimpleDateFormat(
- "yyyy/MM/dd HH:mm:ss");
+ public static final String EVENTS_DIR = "events";
+ public static final String CLUSTER_CONF = "config/cluster.xml";
+ public static final String PATTERN_CONF = "config/pattern.xml";
+ public static final DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
- private static final String IP_LOCATION = "IP_LOCATION";
- private static final String CLUSTER_ENV = "ENV";
- private static final String SCRIPT = "SCRIPT";
- private static final String ARGS = "ARGS";
- private static final String EXECUTE_SCRIPT = "events/execute.sh";
+ private static final String IP_LOCATION = "IP_LOCATION";
+ private static final String CLUSTER_ENV = "ENV";
+ private static final String SCRIPT = "SCRIPT";
+ private static final String ARGS = "ARGS";
+ private static final String EXECUTE_SCRIPT = "events/execute.sh";
- public static long parseTimeInterval(ValueType v, String unit)
- throws IllegalArgumentException {
- int val = 0;
- switch (v.getType()) {
- case ABS:
- val = Integer.parseInt(v.getAbsoluteValue());
- break;
- case RANDOM_MIN_MAX:
- val = Randomizer.getInstance().getRandomInt(v.getMin(), v.getMax());
- break;
- case RANDOM_RANGE:
- String[] values = v.getRangeSet();
- val = Integer.parseInt(values[Randomizer.getInstance()
- .getRandomInt(0, values.length - 1)]);
- break;
- }
- return computeInterval(val, unit);
- }
+ public static long parseTimeInterval(ValueType v, String unit) throws IllegalArgumentException {
+ int val = 0;
+ switch (v.getType()) {
+ case ABS:
+ val = Integer.parseInt(v.getAbsoluteValue());
+ break;
+ case RANDOM_MIN_MAX:
+ val = Randomizer.getInstance().getRandomInt(v.getMin(), v.getMax());
+ break;
+ case RANDOM_RANGE:
+ String[] values = v.getRangeSet();
+ val = Integer.parseInt(values[Randomizer.getInstance().getRandomInt(0, values.length - 1)]);
+ break;
+ }
+ return computeInterval(val, unit);
+ }
- public static long parseTimeInterval(String v, String unit)
- throws IllegalArgumentException {
- int value = Integer.parseInt(v);
- return computeInterval(value, unit);
- }
+ public static long parseTimeInterval(String v, String unit) throws IllegalArgumentException {
+ int value = Integer.parseInt(v);
+ return computeInterval(value, unit);
+ }
- private static long computeInterval(int val, String unit) {
- int vmult = 1;
- if ("hr".equalsIgnoreCase(unit)) {
- vmult = 3600 * 1000;
- } else if ("min".equalsIgnoreCase(unit)) {
- vmult = 60 * 1000;
- } else if ("sec".equalsIgnoreCase(unit)) {
- vmult = 1000;
- } else
- throw new IllegalArgumentException(
- " invalid unit value specified for frequency (hr,min,sec)");
- return val * vmult;
+ private static long computeInterval(int val, String unit) {
+ int vmult = 1;
+ if ("hr".equalsIgnoreCase(unit)) {
+ vmult = 3600 * 1000;
+ } else if ("min".equalsIgnoreCase(unit)) {
+ vmult = 60 * 1000;
+ } else if ("sec".equalsIgnoreCase(unit)) {
+ vmult = 1000;
+ } else
+ throw new IllegalArgumentException(" invalid unit value specified for frequency (hr,min,sec)");
+ return val * vmult;
- }
+ }
- public static Event getEvent(Pattern pattern, Events events) {
- for (Event event : events.getEvent()) {
- if (event.getType().equals(pattern.getEvent().getType())) {
- return event;
- }
- }
- throw new IllegalArgumentException(" Unknown event type"
- + pattern.getEvent().getType());
- }
+ public static Event getEvent(Pattern pattern, Events events) {
+ for (Event event : events.getEvent()) {
+ if (event.getType().equals(pattern.getEvent().getType())) {
+ return event;
+ }
+ }
+ throw new IllegalArgumentException(" Unknown event type" + pattern.getEvent().getType());
+ }
- public static Node getEventLocation(Pattern pattern,
- List<Node> candidateLocations, Cluster cluster) {
- ValueType value = new ValueType(pattern.getEvent().getNodeid()
- .getValue());
- Node location = null;
- Type vtype = value.getType();
+ public static Node getEventLocation(Pattern pattern, List<Node> candidateLocations, Cluster cluster) {
+ ValueType value = new ValueType(pattern.getEvent().getNodeid().getValue());
+ Node location = null;
+ Type vtype = value.getType();
- switch (vtype) {
- case ABS:
- location = getNodeFromId(value.getAbsoluteValue(), cluster);
- break;
- case RANDOM_RANGE:
- int nodeIndex = Randomizer.getInstance().getRandomInt(0,
- candidateLocations.size() - 1);
- location = candidateLocations.get(nodeIndex);
- break;
- case RANDOM_MIN_MAX:
- throw new IllegalStateException(
- " Canont configure a min max value range for location");
- }
- return location;
+ switch (vtype) {
+ case ABS:
+ location = getNodeFromId(value.getAbsoluteValue(), cluster);
+ break;
+ case RANDOM_RANGE:
+ int nodeIndex = Randomizer.getInstance().getRandomInt(0, candidateLocations.size() - 1);
+ location = candidateLocations.get(nodeIndex);
+ break;
+ case RANDOM_MIN_MAX:
+ throw new IllegalStateException(" Canont configure a min max value range for location");
+ }
+ return location;
- }
+ }
- public static List<Node> getCandidateLocations(Pattern pattern,
- Cluster cluster) {
- ValueType value = new ValueType(pattern.getEvent().getNodeid()
- .getValue());
- List<Node> candidateList = new ArrayList<Node>();
- switch (value.getType()) {
- case ABS:
- candidateList.add(getNodeFromId(value.getAbsoluteValue(), cluster));
- break;
- case RANDOM_RANGE:
- boolean anyOption = false;
- String[] values = value.getRangeSet();
- for (String v : values) {
- if (v.equalsIgnoreCase("ANY")) {
- anyOption = true;
- }
- }
- if (anyOption) {
- for (Node node : cluster.getNode()) {
- candidateList.add(node);
- }
- } else {
- boolean found = false;
- for (String v : values) {
- for (Node node : cluster.getNode()) {
- if (node.getId().equals(v)) {
- candidateList.add(node);
- found = true;
- break;
- }
- }
- if (!found) {
- throw new IllegalStateException("Unknonw nodeId : " + v);
- }
- found = false;
- }
+ public static List<Node> getCandidateLocations(Pattern pattern, Cluster cluster) {
+ ValueType value = new ValueType(pattern.getEvent().getNodeid().getValue());
+ List<Node> candidateList = new ArrayList<Node>();
+ switch (value.getType()) {
+ case ABS:
+ candidateList.add(getNodeFromId(value.getAbsoluteValue(), cluster));
+ break;
+ case RANDOM_RANGE:
+ boolean anyOption = false;
+ String[] values = value.getRangeSet();
+ for (String v : values) {
+ if (v.equalsIgnoreCase("ANY")) {
+ anyOption = true;
+ }
+ }
+ if (anyOption) {
+ for (Node node : cluster.getNode()) {
+ candidateList.add(node);
+ }
+ } else {
+ boolean found = false;
+ for (String v : values) {
+ for (Node node : cluster.getNode()) {
+ if (node.getId().equals(v)) {
+ candidateList.add(node);
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ throw new IllegalStateException("Unknonw nodeId : " + v);
+ }
+ found = false;
+ }
- }
- String[] excluded = value.getRangeExcluded();
- if (excluded != null && excluded.length > 0) {
- List<Node> markedForRemoval = new ArrayList<Node>();
- for (String exclusion : excluded) {
- for (Node node : candidateList) {
- if (node.getId().equals(exclusion)) {
- markedForRemoval.add(node);
- }
- }
- }
- candidateList.removeAll(markedForRemoval);
- }
- break;
- case RANDOM_MIN_MAX:
- throw new IllegalStateException(
- " Invalid value configured for location");
- }
- return candidateList;
- }
+ }
+ String[] excluded = value.getRangeExcluded();
+ if (excluded != null && excluded.length > 0) {
+ List<Node> markedForRemoval = new ArrayList<Node>();
+ for (String exclusion : excluded) {
+ for (Node node : candidateList) {
+ if (node.getId().equals(exclusion)) {
+ markedForRemoval.add(node);
+ }
+ }
+ }
+ candidateList.removeAll(markedForRemoval);
+ }
+ break;
+ case RANDOM_MIN_MAX:
+ throw new IllegalStateException(" Invalid value configured for location");
+ }
+ return candidateList;
+ }
- private static Node getNodeFromId(String nodeid, Cluster cluster) {
- if (nodeid.equals(EventDriver.CLIENT_NODE.getId())) {
- return EventDriver.CLIENT_NODE;
- }
+ private static Node getNodeFromId(String nodeid, Cluster cluster) {
+ if (nodeid.equals(EventDriver.CLIENT_NODE.getId())) {
+ return EventDriver.CLIENT_NODE;
+ }
- if (nodeid.equals(cluster.getMasterNode().getId())) {
- String ram = cluster.getMasterNode().getJavaHeap() == null ? cluster
- .getJavaHeap() : cluster.getMasterNode().getJavaHeap();
- String logDir = cluster.getMasterNode().getLogdir() == null ? cluster
- .getLogdir() : cluster.getMasterNode().getLogdir();
- String javaHome = cluster.getMasterNode().getJavaHome() == null ? cluster
- .getJavaHome() : cluster.getMasterNode().getJavaHome();
- BigInteger debug = cluster.getMasterNode().getDebug();
- return new Node(cluster.getMasterNode().getId(), cluster
- .getMasterNode().getIp(), ram, javaHome, logDir, null,
- debug);
- }
+ if (nodeid.equals(cluster.getMasterNode().getId())) {
+ String javaOpts = cluster.getMasterNode().getJavaOpts() == null ? cluster.getJavaOpts() : cluster
+ .getMasterNode().getJavaOpts();
+ String logDir = cluster.getMasterNode().getLogdir() == null ? cluster.getLogdir() : cluster.getMasterNode()
+ .getLogdir();
+ String javaHome = cluster.getMasterNode().getJavaHome() == null ? cluster.getJavaHome() : cluster
+ .getMasterNode().getJavaHome();
+ BigInteger debug = cluster.getMasterNode().getDebug();
+ return new Node(cluster.getMasterNode().getId(), cluster.getMasterNode().getClusterIp(), javaHome,
+ javaOpts, logDir, null, null, debug);
+ }
- List<Node> nodeList = cluster.getNode();
- for (Node node : nodeList) {
- if (node.getId().equals(nodeid)) {
- return node;
- }
- }
- StringBuffer buffer = new StringBuffer();
- buffer.append(EventDriver.CLIENT_NODE.getId() + ",");
- buffer.append(cluster.getMasterNode().getId() + ",");
- for (Node v : cluster.getNode()) {
- buffer.append(v.getId() + ",");
- }
- buffer.deleteCharAt(buffer.length() - 1);
- throw new IllegalArgumentException("Unknown node id :" + nodeid
- + " valid ids:" + buffer);
- }
+ List<Node> nodeList = cluster.getNode();
+ for (Node node : nodeList) {
+ if (node.getId().equals(nodeid)) {
+ return node;
+ }
+ }
+ StringBuffer buffer = new StringBuffer();
+ buffer.append(EventDriver.CLIENT_NODE.getId() + ",");
+ buffer.append(cluster.getMasterNode().getId() + ",");
+ for (Node v : cluster.getNode()) {
+ buffer.append(v.getId() + ",");
+ }
+ buffer.deleteCharAt(buffer.length() - 1);
+ throw new IllegalArgumentException("Unknown node id :" + nodeid + " valid ids:" + buffer);
+ }
- public static void executeEventScript(Node node, String script,
- List<String> args, Cluster cluster) throws IOException,
- InterruptedException {
- List<String> pargs = new ArrayList<String>();
- pargs.add("/bin/bash");
- pargs.add(EventDriver.getEventsDir() + "/" + EXECUTE_SCRIPT);
- StringBuffer argBuffer = new StringBuffer();
- String env = EventDriver.getStringifiedEnv(cluster) + " " + IP_LOCATION
- + "=" + node.getIp();
- if (args != null) {
- for (String arg : args) {
- argBuffer.append(arg + " ");
- }
- }
- ProcessBuilder pb = new ProcessBuilder(pargs);
- pb.environment().putAll(EventDriver.getEnvironment());
- pb.environment().put(IP_LOCATION, node.getIp());
- pb.environment().put(CLUSTER_ENV, env);
- pb.environment().put(SCRIPT, script);
- pb.environment().put(ARGS, argBuffer.toString());
- pb.start();
- }
+ public static void executeEventScript(Node node, String script, List<String> args, Cluster cluster)
+ throws IOException, InterruptedException {
+ List<String> pargs = new ArrayList<String>();
+ pargs.add("/bin/bash");
+ pargs.add(EventDriver.getEventsDir() + "/" + EXECUTE_SCRIPT);
+ StringBuffer argBuffer = new StringBuffer();
+ String env = EventDriver.getStringifiedEnv(cluster) + " " + IP_LOCATION + "=" + node.getClusterIp();
+ if (args != null) {
+ for (String arg : args) {
+ argBuffer.append(arg + " ");
+ }
+ }
+ ProcessBuilder pb = new ProcessBuilder(pargs);
+ pb.environment().putAll(EventDriver.getEnvironment());
+ pb.environment().put(IP_LOCATION, node.getClusterIp());
+ pb.environment().put(CLUSTER_ENV, env);
+ pb.environment().put(SCRIPT, script);
+ pb.environment().put(ARGS, argBuffer.toString());
+ pb.start();
+ }
- public static void executeLocalScript(Node node, String script,
- List<String> args) throws IOException, InterruptedException {
- List<String> pargs = new ArrayList<String>();
- pargs.add("/bin/bash");
- pargs.add(script);
- if (args != null) {
- pargs.addAll(args);
- }
- ProcessBuilder pb = new ProcessBuilder(pargs);
- pb.environment().putAll(EventDriver.getEnvironment());
- pb.environment().put(IP_LOCATION, node.getIp());
- pb.start();
- }
+ public static void executeLocalScript(Node node, String script, List<String> args) throws IOException,
+ InterruptedException {
+ List<String> pargs = new ArrayList<String>();
+ pargs.add("/bin/bash");
+ pargs.add(script);
+ if (args != null) {
+ pargs.addAll(args);
+ }
+ ProcessBuilder pb = new ProcessBuilder(pargs);
+ pb.environment().putAll(EventDriver.getEnvironment());
+ pb.environment().put(IP_LOCATION, node.getClusterIp());
+ pb.start();
+ }
- public static List<String> getEventArgs(Pattern pattern) {
- List<String> pargs = new ArrayList<String>();
- if (pattern.getEvent().getPargs() == null) {
- return pargs;
- }
- String[] args = pattern.getEvent().getPargs().split(" ");
- for (String arg : args) {
- pargs.add(arg.trim());
- }
- return pargs;
- }
+ public static List<String> getEventArgs(Pattern pattern) {
+ List<String> pargs = new ArrayList<String>();
+ if (pattern.getEvent().getPargs() == null) {
+ return pargs;
+ }
+ String[] args = pattern.getEvent().getPargs().split(" ");
+ for (String arg : args) {
+ pargs.add(arg.trim());
+ }
+ return pargs;
+ }
}
diff --git a/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventrixClient.java b/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventrixClient.java
index 4cd4b82..52d46c8 100644
--- a/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventrixClient.java
+++ b/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventrixClient.java
@@ -38,166 +38,159 @@
public class EventrixClient {
- private static final Logger LOGGER = Logger.getLogger(EventrixClient.class
- .getName());
+ private static final Logger LOGGER = Logger.getLogger(EventrixClient.class.getName());
- private EventTask[] tasks;
- private boolean dryRun = false;
- private LinkedBlockingQueue<EventTaskReport> msgInbox = new LinkedBlockingQueue<EventTaskReport>();
- private AtomicInteger pendingTasks = new AtomicInteger(0);
- private final Cluster cluster;
- private IPatternListener listener;
- private IOutputHandler outputHandler;
- private Events events;
- private String eventsDir;
+ private EventTask[] tasks;
+ private boolean dryRun = false;
+ private LinkedBlockingQueue<EventTaskReport> msgInbox = new LinkedBlockingQueue<EventTaskReport>();
+ private AtomicInteger pendingTasks = new AtomicInteger(0);
+ private final Cluster cluster;
+ private IPatternListener listener;
+ private IOutputHandler outputHandler;
+ private Events events;
+ private String eventsDir;
- public EventrixClient(String eventsDir, Cluster cluster, boolean dryRun,
- IOutputHandler outputHandler) throws Exception {
- this.eventsDir = eventsDir;
- this.events = initializeEvents();
- this.cluster = cluster;
- this.dryRun = dryRun;
- this.outputHandler = outputHandler;
- if (!dryRun) {
- initializeCluster(eventsDir);
- }
- }
+ public EventrixClient(String eventsDir, Cluster cluster, boolean dryRun, IOutputHandler outputHandler)
+ throws Exception {
+ this.eventsDir = eventsDir;
+ this.events = initializeEvents();
+ this.cluster = cluster;
+ this.dryRun = dryRun;
+ this.outputHandler = outputHandler;
+ if (!dryRun) {
+ initializeCluster(eventsDir);
+ }
+ }
- public void submit(Patterns patterns) throws Exception {
- initTasks(patterns);
- try {
- waitForCompletion();
- } catch (InterruptedException ie) {
- LOGGER.info("Interrupted exception :" + ie);
- } catch (Exception e) {
- throw e;
- }
+ public void submit(Patterns patterns) throws Exception {
+ initTasks(patterns);
+ try {
+ waitForCompletion();
+ } catch (InterruptedException ie) {
+ LOGGER.info("Interrupted exception :" + ie);
+ } catch (Exception e) {
+ throw e;
+ }
- }
+ }
- public void submit(Patterns patterns, IPatternListener listener)
- throws Exception {
- this.listener = listener;
- initTasks(patterns);
- }
+ public void submit(Patterns patterns, IPatternListener listener) throws Exception {
+ this.listener = listener;
+ initTasks(patterns);
+ }
- private void initTasks(Patterns patterns) {
- tasks = new EventTask[patterns.getPattern().size()];
- pendingTasks.set(tasks.length);
- int index = 0;
- for (Pattern pattern : patterns.getPattern()) {
- tasks[index] = new EventTask(pattern, this);
- tasks[index].start();
- index++;
- }
- }
+ private void initTasks(Patterns patterns) {
+ tasks = new EventTask[patterns.getPattern().size()];
+ pendingTasks.set(tasks.length);
+ int index = 0;
+ for (Pattern pattern : patterns.getPattern()) {
+ tasks[index] = new EventTask(pattern, this);
+ tasks[index].start();
+ index++;
+ }
+ }
- public Cluster getCluster() {
- return cluster;
- }
+ public Cluster getCluster() {
+ return cluster;
+ }
- public boolean isDryRun() {
- return dryRun;
- }
+ public boolean isDryRun() {
+ return dryRun;
+ }
- public Events getEvents() {
- return events;
- }
+ public Events getEvents() {
+ return events;
+ }
- public String getEventsDir() {
- return eventsDir;
- }
+ public String getEventsDir() {
+ return eventsDir;
+ }
- public synchronized void notifyCompletion(EventTaskReport report) {
+ public synchronized void notifyCompletion(EventTaskReport report) {
- if (report.isSuccess()) {
- if (listener != null) {
- pendingTasks.decrementAndGet();
- listener.eventCompleted(report);
- if (pendingTasks.get() == 0) {
- listener.jobCompleted();
- }
- } else {
- try {
- msgInbox.put(report);
- } catch (InterruptedException e) {
- }
- }
- } else {
- for (EventTask t : tasks) {
- if (t.getState() == EventTask.State.INITIALIZED
- || t.getState() == EventTask.State.IN_PROGRESS) {
- t.cancel();
- }
- }
- if (listener != null) {
- listener.jobFailed(report);
- } else {
- try {
- msgInbox.put(report);
- } catch (InterruptedException e) {
- }
- }
- }
- }
+ if (report.isSuccess()) {
+ if (listener != null) {
+ pendingTasks.decrementAndGet();
+ listener.eventCompleted(report);
+ if (pendingTasks.get() == 0) {
+ listener.jobCompleted();
+ }
+ } else {
+ try {
+ msgInbox.put(report);
+ } catch (InterruptedException e) {
+ }
+ }
+ } else {
+ for (EventTask t : tasks) {
+ if (t.getState() == EventTask.State.INITIALIZED || t.getState() == EventTask.State.IN_PROGRESS) {
+ t.cancel();
+ }
+ }
+ if (listener != null) {
+ listener.jobFailed(report);
+ } else {
+ try {
+ msgInbox.put(report);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ }
- private void waitForCompletion() throws Exception {
- while (true) {
- EventTaskReport report = msgInbox.take();
- if (report.isSuccess()) {
- if (pendingTasks.decrementAndGet() == 0) {
- break;
- }
- } else {
- throw new RuntimeException(report.getException().getMessage());
- }
- }
- }
+ private void waitForCompletion() throws Exception {
+ while (true) {
+ EventTaskReport report = msgInbox.take();
+ if (report.isSuccess()) {
+ if (pendingTasks.decrementAndGet() == 0) {
+ break;
+ }
+ } else {
+ throw new RuntimeException(report.getException().getMessage());
+ }
+ }
+ }
- private void initializeCluster(String eventsDir) throws Exception {
- Patterns patterns = initPattern(eventsDir);
- submit(patterns);
- }
+ private void initializeCluster(String eventsDir) throws Exception {
+ Patterns patterns = initPattern(eventsDir);
+ submit(patterns);
+ }
- private Patterns initPattern(String eventsDir) {
- Nodeid nodeid = new Nodeid(new Value(null,
- EventDriver.CLIENT_NODE.getId()));
- List<Pattern> patternList = new ArrayList<Pattern>();
- String workingDir = cluster.getWorkingDir().getDir();
- String username = cluster.getUsername() == null ? System
- .getProperty("user.name") : cluster.getUsername();
- patternList.add(getDirectoryTransferPattern(username, eventsDir,
- nodeid, cluster.getMasterNode().getIp(), workingDir));
+ private Patterns initPattern(String eventsDir) {
+ Nodeid nodeid = new Nodeid(new Value(null, EventDriver.CLIENT_NODE.getId()));
+ List<Pattern> patternList = new ArrayList<Pattern>();
+ String workingDir = cluster.getWorkingDir().getDir();
+ String username = cluster.getUsername() == null ? System.getProperty("user.name") : cluster.getUsername();
+ patternList.add(getDirectoryTransferPattern(username, eventsDir, nodeid,
+ cluster.getMasterNode().getClusterIp(), workingDir));
- if (!cluster.getWorkingDir().isNFS()) {
- for (Node node : cluster.getNode()) {
- patternList.add(getDirectoryTransferPattern(username,
- eventsDir, nodeid, node.getIp(), workingDir));
- }
- }
- Patterns patterns = new Patterns(patternList);
- return patterns;
- }
+ if (!cluster.getWorkingDir().isNFS()) {
+ for (Node node : cluster.getNode()) {
+ patternList.add(getDirectoryTransferPattern(username, eventsDir, nodeid, node.getClusterIp(),
+ workingDir));
+ }
+ }
+ Patterns patterns = new Patterns(patternList);
+ return patterns;
+ }
- private Pattern getDirectoryTransferPattern(String username, String src,
- Nodeid srcNode, String destNodeIp, String destDir) {
- String pargs = username + " " + src + " " + destNodeIp + " " + destDir;
- Event event = new Event("directory_transfer", srcNode, pargs);
- return new Pattern(null, 1, null, event);
- }
+ private Pattern getDirectoryTransferPattern(String username, String src, Nodeid srcNode, String destNodeIp,
+ String destDir) {
+ String pargs = username + " " + src + " " + destNodeIp + " " + destDir;
+ Event event = new Event("directory_transfer", srcNode, pargs);
+ return new Pattern(null, 1, null, event);
+ }
- public IOutputHandler getErrorHandler() {
- return outputHandler;
- }
+ public IOutputHandler getErrorHandler() {
+ return outputHandler;
+ }
- private Events initializeEvents() throws JAXBException,
- FileNotFoundException {
- File file = new File(eventsDir + File.separator + "events"
- + File.separator + "events.xml");
- JAXBContext eventCtx = JAXBContext.newInstance(Events.class);
- Unmarshaller unmarshaller = eventCtx.createUnmarshaller();
- events = (Events) unmarshaller.unmarshal(file);
- return events;
- }
+ private Events initializeEvents() throws JAXBException, FileNotFoundException {
+ File file = new File(eventsDir + File.separator + "events" + File.separator + "events.xml");
+ JAXBContext eventCtx = JAXBContext.newInstance(Events.class);
+ Unmarshaller unmarshaller = eventCtx.createUnmarshaller();
+ events = (Events) unmarshaller.unmarshal(file);
+ return events;
+ }
}
diff --git a/asterix-events/src/main/resources/events/node_join/nc_join.sh b/asterix-events/src/main/resources/events/node_join/nc_join.sh
index 5aa86db..6b5280b 100755
--- a/asterix-events/src/main/resources/events/node_join/nc_join.sh
+++ b/asterix-events/src/main/resources/events/node_join/nc_join.sh
@@ -1,7 +1,8 @@
CC_HOST=$1
NC_ID=$2
+IO_DEVICES=$3
if [ ! -d $LOG_DIR ];
then
mkdir -p $LOG_DIR
fi
-$ASTERIX_HOME/bin/asterixnc -node-id $NC_ID -cc-host $CC_HOST -cc-port 1099 -cluster-net-ip-address $IP_LOCATION -data-ip-address $IP_LOCATION -result-ip-address $IP_LOCATION &> $LOG_DIR/${NC_ID}.log
+$ASTERIX_HOME/bin/asterixnc -node-id $NC_ID -cc-host $CC_HOST -cc-port 1099 -cluster-net-ip-address $IP_LOCATION -data-ip-address $IP_LOCATION -iodevices $IO_DEVICES -result-ip-address $IP_LOCATION &> $LOG_DIR/${NC_ID}.log
diff --git a/asterix-events/src/main/resources/schema/cluster.xsd b/asterix-events/src/main/resources/schema/cluster.xsd
index 38afda3..0e1adce 100644
--- a/asterix-events/src/main/resources/schema/cluster.xsd
+++ b/asterix-events/src/main/resources/schema/cluster.xsd
@@ -3,19 +3,21 @@
<!-- definition of simple types -->
<xs:element name="name" type="xs:string"/>
-<xs:element name="java_heap" type="xs:string"/>
+<xs:element name="java_opts" type="xs:string"/>
<xs:element name="logdir" type="xs:string"/>
<xs:element name="id" type="xs:string"/>
-<xs:element name="ip" type="xs:string"/>
+<xs:element name="client-ip" type="xs:string"/>
<xs:element name="cluster-ip" type="xs:string"/>
<xs:element name="key" type="xs:string"/>
<xs:element name="value" type="xs:string"/>
<xs:element name="dir" type="xs:string"/>
<xs:element name="NFS" type="xs:boolean"/>
<xs:element name="store" type="xs:string"/>
+<xs:element name="iodevices" type="xs:string"/>
<xs:element name="java_home" type="xs:string"/>
<xs:element name="username" type="xs:string"/>
<xs:element name="debug" type="xs:integer"/>
+<xs:element name="debugEnabled" type="xs:boolean"/>
<!-- definition of complex elements -->
<xs:element name="workingDir">
@@ -31,10 +33,10 @@
<xs:complexType>
<xs:sequence>
<xs:element ref="cl:id"/>
- <xs:element ref="cl:ip"/>
+ <xs:element ref="cl:client-ip"/>
<xs:element ref="cl:cluster-ip"/>
<xs:element ref="cl:java_home" minOccurs="0"/>
- <xs:element ref="cl:java_heap" minOccurs="0"/>
+ <xs:element ref="cl:java_opts" minOccurs="0"/>
<xs:element ref="cl:logdir" minOccurs="0"/>
<xs:element ref="cl:debug" minOccurs="0"/>
</xs:sequence>
@@ -62,11 +64,12 @@
<xs:complexType>
<xs:sequence>
<xs:element ref="cl:id"/>
- <xs:element ref="cl:ip"/>
- <xs:element ref="cl:java_heap" minOccurs="0"/>
+ <xs:element ref="cl:cluster-ip"/>
<xs:element ref="cl:java_home" minOccurs="0"/>
+ <xs:element ref="cl:java_opts" minOccurs="0"/>
<xs:element ref="cl:logdir" minOccurs="0"/>
<xs:element ref="cl:store" minOccurs="0"/>
+ <xs:element ref="cl:iodevices" minOccurs="0"/>
<xs:element ref="cl:debug" minOccurs="0"/>
</xs:sequence>
</xs:complexType>
@@ -78,11 +81,14 @@
<xs:element ref="cl:name"/>
<xs:element ref="cl:username"/>
<xs:element ref="cl:env" minOccurs="0"/>
- <xs:element ref="cl:java_heap" minOccurs="0"/>
<xs:element ref="cl:java_home" minOccurs="0"/>
+ <xs:element ref="cl:java_opts" minOccurs="0"/>
<xs:element ref="cl:logdir" minOccurs="0"/>
<xs:element ref="cl:store" minOccurs="0"/>
+ <xs:element ref="cl:iodevices" minOccurs="0"/>
<xs:element ref="cl:workingDir"/>
+ <xs:element ref="cl:debugEnabled" minOccurs="0"/>
+ <xs:element ref="cl:debug" minOccurs="0"/>
<xs:element ref="cl:master-node"/>
<xs:element ref="cl:node" maxOccurs="unbounded"/>
</xs:sequence>