Merge branch 'master' of https://code.google.com/p/asterixdb
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index 428781f..66a3980 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -64,6 +64,7 @@
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.common.functions.FunctionSignature;
import edu.uci.ics.asterix.file.DatasetOperations;
+import edu.uci.ics.asterix.file.DataverseOperations;
import edu.uci.ics.asterix.file.FeedOperations;
import edu.uci.ics.asterix.file.IndexOperations;
import edu.uci.ics.asterix.formats.base.IDataFormat;
@@ -765,6 +766,7 @@
jobsToExecute.add(DatasetOperations.createDropDatasetJobSpec(cds, metadataProvider));
}
}
+ jobsToExecute.add(DataverseOperations.createDropDataverseJobSpec(dv, metadataProvider));
//#. mark PendingDropOp on the dataverse record by
// first, deleting the dataverse record from the DATAVERSE_DATASET
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/DataverseOperations.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/DataverseOperations.java
new file mode 100644
index 0000000..0654ff7
--- /dev/null
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/DataverseOperations.java
@@ -0,0 +1,22 @@
+package edu.uci.ics.asterix.file;
+
+import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
+import edu.uci.ics.asterix.metadata.entities.Dataverse;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.dataflow.std.file.FileRemoveOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+
+public class DataverseOperations {
+ public static JobSpecification createDropDataverseJobSpec(Dataverse dataverse, AqlMetadataProvider metadata) {
+ JobSpecification jobSpec = new JobSpecification();
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadata
+ .splitProviderAndPartitionConstraintsForDataverse(dataverse.getDataverseName());
+ FileRemoveOperatorDescriptor frod = new FileRemoveOperatorDescriptor(jobSpec, splitsAndConstraint.first);
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(jobSpec, frod, splitsAndConstraint.second);
+ jobSpec.addRoot(frod);
+ return jobSpec;
+ }
+}
diff --git a/asterix-events/pom.xml b/asterix-events/pom.xml
index 7b187bb..94042e1 100644
--- a/asterix-events/pom.xml
+++ b/asterix-events/pom.xml
@@ -166,5 +166,12 @@
<artifactId>commons-io</artifactId>
<version>1.4</version>
</dependency>
+ <dependency>
+ <groupId>edu.uci.ics.asterix</groupId>
+ <artifactId>asterix-common</artifactId>
+ <version>0.0.6-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventExecutor.java b/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventExecutor.java
index 352d787..9f75973 100644
--- a/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventExecutor.java
+++ b/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventExecutor.java
@@ -32,84 +32,95 @@
public class EventExecutor {
- public static final String EVENTS_DIR = "events";
- private static final String EXECUTE_SCRIPT = "execute.sh";
- private static final String IP_LOCATION = "IP_LOCATION";
- private static final String CLUSTER_ENV = "ENV";
- private static final String SCRIPT = "SCRIPT";
- private static final String ARGS = "ARGS";
- private static final String DAEMON = "DAEMON";
+ public static final String EVENTS_DIR = "events";
+ private static final String EXECUTE_SCRIPT = "execute.sh";
+ private static final String IP_LOCATION = "IP_LOCATION";
+ private static final String CLUSTER_ENV = "ENV";
+ private static final String SCRIPT = "SCRIPT";
+ private static final String ARGS = "ARGS";
+ private static final String DAEMON = "DAEMON";
- public void executeEvent(Node node, String script, List<String> args, boolean isDaemon, Cluster cluster,
- Pattern pattern, IOutputHandler outputHandler, EventrixClient client) throws IOException {
- List<String> pargs = new ArrayList<String>();
- pargs.add("/bin/bash");
- pargs.add(client.getEventsDir() + File.separator + "scripts" + File.separator + EXECUTE_SCRIPT);
- StringBuffer envBuffer = new StringBuffer(IP_LOCATION + "=" + node.getClusterIp() + " ");
- boolean isMasterNode = node.getId().equals(cluster.getMasterNode().getId());
+ public void executeEvent(Node node, String script, List<String> args,
+ boolean isDaemon, Cluster cluster, Pattern pattern,
+ IOutputHandler outputHandler, EventrixClient client)
+ throws IOException {
+ List<String> pargs = new ArrayList<String>();
+ pargs.add("/bin/bash");
+ pargs.add(client.getEventsDir() + File.separator + "scripts"
+ + File.separator + EXECUTE_SCRIPT);
+ StringBuffer envBuffer = new StringBuffer(IP_LOCATION + "="
+ + node.getClusterIp() + " ");
+ boolean isMasterNode = node.getId().equals(
+ cluster.getMasterNode().getId());
- if (!node.getId().equals(EventDriver.CLIENT_NODE_ID) && cluster.getEnv() != null) {
- for (Property p : cluster.getEnv().getProperty()) {
- if (p.getKey().equals("JAVA_HOME")) {
- String val = node.getJavaHome() == null ? p.getValue() : node.getJavaHome();
- envBuffer.append(p.getKey() + "=" + val + " ");
- } else if (p.getKey().equals("NC_JAVA_OPTS")) {
- if (!isMasterNode) {
- StringBuilder builder = new StringBuilder();
- builder.append("\"");
- String javaOpts = p.getValue();
- if (javaOpts != null) {
- builder.append(javaOpts);
- }
- builder.append("\"");
- envBuffer.append("JAVA_OPTS" + "=" + builder + " ");
- }
- } else if (p.getKey().equals("CC_JAVA_OPTS")) {
- if (isMasterNode) {
- StringBuilder builder = new StringBuilder();
- builder.append("\"");
- String javaOpts = p.getValue();
- if (javaOpts != null) {
- builder.append(javaOpts);
- }
- builder.append("\"");
- envBuffer.append("JAVA_OPTS" + "=" + builder + " ");
- }
- } else if (p.getKey().equals("LOG_DIR")) {
- String val = node.getLogDir() == null ? p.getValue() : node.getLogDir();
- envBuffer.append(p.getKey() + "=" + val + " ");
- } else {
- envBuffer.append(p.getKey() + "=" + p.getValue() + " ");
- }
+ if (!node.getId().equals(EventDriver.CLIENT_NODE_ID)
+ && cluster.getEnv() != null) {
+ for (Property p : cluster.getEnv().getProperty()) {
+ if (p.getKey().equals("JAVA_HOME")) {
+ String val = node.getJavaHome() == null ? p.getValue()
+ : node.getJavaHome();
+ envBuffer.append(p.getKey() + "=" + val + " ");
+ } else if (p.getKey().equals(EventUtil.NC_JAVA_OPTS)) {
+ if (!isMasterNode) {
+ StringBuilder builder = new StringBuilder();
+ builder.append("\"");
+ String javaOpts = p.getValue();
+ if (javaOpts != null) {
+ builder.append(javaOpts);
+ }
+ builder.append("\"");
+ envBuffer.append("JAVA_OPTS" + "=" + builder + " ");
+ }
+ } else if (p.getKey().equals(EventUtil.CC_JAVA_OPTS)) {
+ if (isMasterNode) {
+ StringBuilder builder = new StringBuilder();
+ builder.append("\"");
+ String javaOpts = p.getValue();
+ if (javaOpts != null) {
+ builder.append(javaOpts);
+ }
+ builder.append("\"");
+ envBuffer.append("JAVA_OPTS" + "=" + builder + " ");
+ }
+ } else if (p.getKey().equals("LOG_DIR")) {
+ String val = node.getLogDir() == null ? p.getValue() : node
+ .getLogDir();
+ envBuffer.append(p.getKey() + "=" + val + " ");
+ } else {
+ envBuffer.append(p.getKey() + "=" + p.getValue() + " ");
+ }
- }
- pargs.add(cluster.getUsername() == null ? System.getProperty("user.name") : cluster.getUsername());
- }
+ }
+ pargs.add(cluster.getUsername() == null ? System
+ .getProperty("user.name") : cluster.getUsername());
+ }
- StringBuffer argBuffer = new StringBuffer();
- if (args != null && args.size() > 0) {
- for (String arg : args) {
- argBuffer.append(arg + " ");
- }
- }
+ StringBuffer argBuffer = new StringBuffer();
+ if (args != null && args.size() > 0) {
+ for (String arg : args) {
+ argBuffer.append(arg + " ");
+ }
+ }
- ProcessBuilder pb = new ProcessBuilder(pargs);
- pb.environment().put(IP_LOCATION, node.getClusterIp());
- pb.environment().put(CLUSTER_ENV, envBuffer.toString());
- pb.environment().put(SCRIPT, script);
- pb.environment().put(ARGS, argBuffer.toString());
- pb.environment().put(DAEMON, isDaemon ? "true" : "false");
+ ProcessBuilder pb = new ProcessBuilder(pargs);
+ pb.environment().put(IP_LOCATION, node.getClusterIp());
+ pb.environment().put(CLUSTER_ENV, envBuffer.toString());
+ pb.environment().put(SCRIPT, script);
+ pb.environment().put(ARGS, argBuffer.toString());
+ pb.environment().put(DAEMON, isDaemon ? "true" : "false");
- Process p = pb.start();
- if (!isDaemon) {
- BufferedInputStream bis = new BufferedInputStream(p.getInputStream());
- StringWriter writer = new StringWriter();
- IOUtils.copy(bis, writer, "UTF-8");
- String result = writer.getBuffer().toString();
- OutputAnalysis analysis = outputHandler.reportEventOutput(pattern.getEvent(), result);
- if (!analysis.isExpected()) {
- throw new IOException(analysis.getErrorMessage() + result);
- }
- }
- }
+ Process p = pb.start();
+ if (!isDaemon) {
+ BufferedInputStream bis = new BufferedInputStream(p
+ .getInputStream());
+ StringWriter writer = new StringWriter();
+ IOUtils.copy(bis, writer, "UTF-8");
+ String result = writer.getBuffer().toString();
+ OutputAnalysis analysis = outputHandler.reportEventOutput(pattern
+ .getEvent(), result);
+ if (!analysis.isExpected()) {
+ throw new IOException(analysis.getErrorMessage() + result);
+ }
+ }
+ }
}
diff --git a/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventUtil.java b/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventUtil.java
index 7cbb515..533b2a4 100644
--- a/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventUtil.java
+++ b/asterix-events/src/main/java/edu/uci/ics/asterix/event/management/EventUtil.java
@@ -36,230 +36,252 @@
public class EventUtil {
- public static final String EVENTS_DIR = "events";
- public static final String CLUSTER_CONF = "config/cluster.xml";
- public static final String PATTERN_CONF = "config/pattern.xml";
- public static final DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
+ public static final String EVENTS_DIR = "events";
+ public static final String CLUSTER_CONF = "config/cluster.xml";
+ public static final String PATTERN_CONF = "config/pattern.xml";
+ public static final DateFormat dateFormat = new SimpleDateFormat(
+ "yyyy/MM/dd HH:mm:ss");
+ public static final String NC_JAVA_OPTS = "nc.java.opts";
+ public static final String CC_JAVA_OPTS = "cc.java.opts";
- private static final String IP_LOCATION = "IP_LOCATION";
- private static final String CLUSTER_ENV = "ENV";
- private static final String SCRIPT = "SCRIPT";
- private static final String ARGS = "ARGS";
- private static final String EXECUTE_SCRIPT = "events/execute.sh";
- private static final String LOCALHOST = "localhost";
- private static final String LOCALHOST_IP = "127.0.0.1";
+ private static final String IP_LOCATION = "IP_LOCATION";
+ private static final String CLUSTER_ENV = "ENV";
+ private static final String SCRIPT = "SCRIPT";
+ private static final String ARGS = "ARGS";
+ private static final String EXECUTE_SCRIPT = "events/execute.sh";
+ private static final String LOCALHOST = "localhost";
+ private static final String LOCALHOST_IP = "127.0.0.1";
- public static Cluster getCluster(String clusterConfigurationPath) throws JAXBException {
- File file = new File(clusterConfigurationPath);
- JAXBContext ctx = JAXBContext.newInstance(Cluster.class);
- Unmarshaller unmarshaller = ctx.createUnmarshaller();
- Cluster cluster = (Cluster) unmarshaller.unmarshal(file);
- if (cluster.getMasterNode().getClusterIp().equals(LOCALHOST)) {
- cluster.getMasterNode().setClusterIp(LOCALHOST_IP);
- }
- for (Node node : cluster.getNode()) {
- if (node.getClusterIp().equals(LOCALHOST)) {
- node.setClusterIp(LOCALHOST_IP);
- }
- }
- return cluster;
- }
+ public static Cluster getCluster(String clusterConfigurationPath)
+ throws JAXBException {
+ File file = new File(clusterConfigurationPath);
+ JAXBContext ctx = JAXBContext.newInstance(Cluster.class);
+ Unmarshaller unmarshaller = ctx.createUnmarshaller();
+ Cluster cluster = (Cluster) unmarshaller.unmarshal(file);
+ if (cluster.getMasterNode().getClusterIp().equals(LOCALHOST)) {
+ cluster.getMasterNode().setClusterIp(LOCALHOST_IP);
+ }
+ for (Node node : cluster.getNode()) {
+ if (node.getClusterIp().equals(LOCALHOST)) {
+ node.setClusterIp(LOCALHOST_IP);
+ }
+ }
+ return cluster;
+ }
- public static long parseTimeInterval(ValueType v, String unit) throws IllegalArgumentException {
- int val = 0;
- switch (v.getType()) {
- case ABS:
- val = Integer.parseInt(v.getAbsoluteValue());
- break;
- case RANDOM_MIN_MAX:
- val = Randomizer.getInstance().getRandomInt(v.getMin(), v.getMax());
- break;
- case RANDOM_RANGE:
- String[] values = v.getRangeSet();
- val = Integer.parseInt(values[Randomizer.getInstance().getRandomInt(0, values.length - 1)]);
- break;
- }
- return computeInterval(val, unit);
- }
+ public static long parseTimeInterval(ValueType v, String unit)
+ throws IllegalArgumentException {
+ int val = 0;
+ switch (v.getType()) {
+ case ABS:
+ val = Integer.parseInt(v.getAbsoluteValue());
+ break;
+ case RANDOM_MIN_MAX:
+ val = Randomizer.getInstance().getRandomInt(v.getMin(), v.getMax());
+ break;
+ case RANDOM_RANGE:
+ String[] values = v.getRangeSet();
+ val = Integer.parseInt(values[Randomizer.getInstance()
+ .getRandomInt(0, values.length - 1)]);
+ break;
+ }
+ return computeInterval(val, unit);
+ }
- public static long parseTimeInterval(String v, String unit) throws IllegalArgumentException {
- int value = Integer.parseInt(v);
- return computeInterval(value, unit);
- }
+ public static long parseTimeInterval(String v, String unit)
+ throws IllegalArgumentException {
+ int value = Integer.parseInt(v);
+ return computeInterval(value, unit);
+ }
- private static long computeInterval(int val, String unit) {
- int vmult = 1;
- if ("hr".equalsIgnoreCase(unit)) {
- vmult = 3600 * 1000;
- } else if ("min".equalsIgnoreCase(unit)) {
- vmult = 60 * 1000;
- } else if ("sec".equalsIgnoreCase(unit)) {
- vmult = 1000;
- } else
- throw new IllegalArgumentException(" invalid unit value specified for frequency (hr,min,sec)");
- return val * vmult;
+ private static long computeInterval(int val, String unit) {
+ int vmult = 1;
+ if ("hr".equalsIgnoreCase(unit)) {
+ vmult = 3600 * 1000;
+ } else if ("min".equalsIgnoreCase(unit)) {
+ vmult = 60 * 1000;
+ } else if ("sec".equalsIgnoreCase(unit)) {
+ vmult = 1000;
+ } else
+ throw new IllegalArgumentException(
+ " invalid unit value specified for frequency (hr,min,sec)");
+ return val * vmult;
- }
+ }
- public static Event getEvent(Pattern pattern, Events events) {
- for (Event event : events.getEvent()) {
- if (event.getType().equals(pattern.getEvent().getType())) {
- return event;
- }
- }
- throw new IllegalArgumentException(" Unknown event type" + pattern.getEvent().getType());
- }
+ public static Event getEvent(Pattern pattern, Events events) {
+ for (Event event : events.getEvent()) {
+ if (event.getType().equals(pattern.getEvent().getType())) {
+ return event;
+ }
+ }
+ throw new IllegalArgumentException(" Unknown event type"
+ + pattern.getEvent().getType());
+ }
- public static Node getEventLocation(Pattern pattern, List<Node> candidateLocations, Cluster cluster) {
- ValueType value = new ValueType(pattern.getEvent().getNodeid().getValue());
- Node location = null;
- Type vtype = value.getType();
+ public static Node getEventLocation(Pattern pattern,
+ List<Node> candidateLocations, Cluster cluster) {
+ ValueType value = new ValueType(pattern.getEvent().getNodeid()
+ .getValue());
+ Node location = null;
+ Type vtype = value.getType();
- switch (vtype) {
- case ABS:
- location = getNodeFromId(value.getAbsoluteValue(), cluster);
- break;
- case RANDOM_RANGE:
- int nodeIndex = Randomizer.getInstance().getRandomInt(0, candidateLocations.size() - 1);
- location = candidateLocations.get(nodeIndex);
- break;
- case RANDOM_MIN_MAX:
- throw new IllegalStateException(" Canont configure a min max value range for location");
- }
- return location;
+ switch (vtype) {
+ case ABS:
+ location = getNodeFromId(value.getAbsoluteValue(), cluster);
+ break;
+ case RANDOM_RANGE:
+ int nodeIndex = Randomizer.getInstance().getRandomInt(0,
+ candidateLocations.size() - 1);
+ location = candidateLocations.get(nodeIndex);
+ break;
+ case RANDOM_MIN_MAX:
+ throw new IllegalStateException(
+ " Canont configure a min max value range for location");
+ }
+ return location;
- }
+ }
- public static List<Node> getCandidateLocations(Pattern pattern, Cluster cluster) {
- ValueType value = new ValueType(pattern.getEvent().getNodeid().getValue());
- List<Node> candidateList = new ArrayList<Node>();
- switch (value.getType()) {
- case ABS:
- candidateList.add(getNodeFromId(value.getAbsoluteValue(), cluster));
- break;
- case RANDOM_RANGE:
- boolean anyOption = false;
- String[] values = value.getRangeSet();
- for (String v : values) {
- if (v.equalsIgnoreCase("ANY")) {
- anyOption = true;
- }
- }
- if (anyOption) {
- for (Node node : cluster.getNode()) {
- candidateList.add(node);
- }
- } else {
- boolean found = false;
- for (String v : values) {
- for (Node node : cluster.getNode()) {
- if (node.getId().equals(v)) {
- candidateList.add(node);
- found = true;
- break;
- }
- }
- if (!found) {
- throw new IllegalStateException("Unknonw nodeId : " + v);
- }
- found = false;
- }
+ public static List<Node> getCandidateLocations(Pattern pattern,
+ Cluster cluster) {
+ ValueType value = new ValueType(pattern.getEvent().getNodeid()
+ .getValue());
+ List<Node> candidateList = new ArrayList<Node>();
+ switch (value.getType()) {
+ case ABS:
+ candidateList.add(getNodeFromId(value.getAbsoluteValue(), cluster));
+ break;
+ case RANDOM_RANGE:
+ boolean anyOption = false;
+ String[] values = value.getRangeSet();
+ for (String v : values) {
+ if (v.equalsIgnoreCase("ANY")) {
+ anyOption = true;
+ }
+ }
+ if (anyOption) {
+ for (Node node : cluster.getNode()) {
+ candidateList.add(node);
+ }
+ } else {
+ boolean found = false;
+ for (String v : values) {
+ for (Node node : cluster.getNode()) {
+ if (node.getId().equals(v)) {
+ candidateList.add(node);
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ throw new IllegalStateException("Unknonw nodeId : " + v);
+ }
+ found = false;
+ }
- }
- String[] excluded = value.getRangeExcluded();
- if (excluded != null && excluded.length > 0) {
- List<Node> markedForRemoval = new ArrayList<Node>();
- for (String exclusion : excluded) {
- for (Node node : candidateList) {
- if (node.getId().equals(exclusion)) {
- markedForRemoval.add(node);
- }
- }
- }
- candidateList.removeAll(markedForRemoval);
- }
- break;
- case RANDOM_MIN_MAX:
- throw new IllegalStateException(" Invalid value configured for location");
- }
- return candidateList;
- }
+ }
+ String[] excluded = value.getRangeExcluded();
+ if (excluded != null && excluded.length > 0) {
+ List<Node> markedForRemoval = new ArrayList<Node>();
+ for (String exclusion : excluded) {
+ for (Node node : candidateList) {
+ if (node.getId().equals(exclusion)) {
+ markedForRemoval.add(node);
+ }
+ }
+ }
+ candidateList.removeAll(markedForRemoval);
+ }
+ break;
+ case RANDOM_MIN_MAX:
+ throw new IllegalStateException(
+ " Invalid value configured for location");
+ }
+ return candidateList;
+ }
- private static Node getNodeFromId(String nodeid, Cluster cluster) {
- if (nodeid.equals(EventDriver.CLIENT_NODE.getId())) {
- return EventDriver.CLIENT_NODE;
- }
+ private static Node getNodeFromId(String nodeid, Cluster cluster) {
+ if (nodeid.equals(EventDriver.CLIENT_NODE.getId())) {
+ return EventDriver.CLIENT_NODE;
+ }
- if (nodeid.equals(cluster.getMasterNode().getId())) {
- String logDir = cluster.getMasterNode().getLogDir() == null ? cluster.getLogDir() : cluster.getMasterNode()
- .getLogDir();
- String javaHome = cluster.getMasterNode().getJavaHome() == null ? cluster.getJavaHome() : cluster
- .getMasterNode().getJavaHome();
- return new Node(cluster.getMasterNode().getId(), cluster.getMasterNode().getClusterIp(), javaHome, logDir,
- null, null, null);
- }
+ if (nodeid.equals(cluster.getMasterNode().getId())) {
+ String logDir = cluster.getMasterNode().getLogDir() == null ? cluster
+ .getLogDir()
+ : cluster.getMasterNode().getLogDir();
+ String javaHome = cluster.getMasterNode().getJavaHome() == null ? cluster
+ .getJavaHome()
+ : cluster.getMasterNode().getJavaHome();
+ return new Node(cluster.getMasterNode().getId(), cluster
+ .getMasterNode().getClusterIp(), javaHome, logDir, null,
+ null, null);
+ }
- List<Node> nodeList = cluster.getNode();
- for (Node node : nodeList) {
- if (node.getId().equals(nodeid)) {
- return node;
- }
- }
- StringBuffer buffer = new StringBuffer();
- buffer.append(EventDriver.CLIENT_NODE.getId() + ",");
- buffer.append(cluster.getMasterNode().getId() + ",");
- for (Node v : cluster.getNode()) {
- buffer.append(v.getId() + ",");
- }
- buffer.deleteCharAt(buffer.length() - 1);
- throw new IllegalArgumentException("Unknown node id :" + nodeid + " valid ids:" + buffer);
- }
+ List<Node> nodeList = cluster.getNode();
+ for (Node node : nodeList) {
+ if (node.getId().equals(nodeid)) {
+ return node;
+ }
+ }
+ StringBuffer buffer = new StringBuffer();
+ buffer.append(EventDriver.CLIENT_NODE.getId() + ",");
+ buffer.append(cluster.getMasterNode().getId() + ",");
+ for (Node v : cluster.getNode()) {
+ buffer.append(v.getId() + ",");
+ }
+ buffer.deleteCharAt(buffer.length() - 1);
+ throw new IllegalArgumentException("Unknown node id :" + nodeid
+ + " valid ids:" + buffer);
+ }
- public static void executeEventScript(Node node, String script, List<String> args, Cluster cluster)
- throws IOException, InterruptedException {
- List<String> pargs = new ArrayList<String>();
- pargs.add("/bin/bash");
- pargs.add(EventDriver.getEventsDir() + "/" + EXECUTE_SCRIPT);
- StringBuffer argBuffer = new StringBuffer();
- String env = EventDriver.getStringifiedEnv(cluster) + " " + IP_LOCATION + "=" + node.getClusterIp();
- if (args != null) {
- for (String arg : args) {
- argBuffer.append(arg + " ");
- }
- }
- ProcessBuilder pb = new ProcessBuilder(pargs);
- pb.environment().putAll(EventDriver.getEnvironment());
- pb.environment().put(IP_LOCATION, node.getClusterIp());
- pb.environment().put(CLUSTER_ENV, env);
- pb.environment().put(SCRIPT, script);
- pb.environment().put(ARGS, argBuffer.toString());
- pb.start();
- }
+ public static void executeEventScript(Node node, String script,
+ List<String> args, Cluster cluster) throws IOException,
+ InterruptedException {
+ List<String> pargs = new ArrayList<String>();
+ pargs.add("/bin/bash");
+ pargs.add(EventDriver.getEventsDir() + "/" + EXECUTE_SCRIPT);
+ StringBuffer argBuffer = new StringBuffer();
+ String env = EventDriver.getStringifiedEnv(cluster) + " " + IP_LOCATION
+ + "=" + node.getClusterIp();
+ if (args != null) {
+ for (String arg : args) {
+ argBuffer.append(arg + " ");
+ }
+ }
+ ProcessBuilder pb = new ProcessBuilder(pargs);
+ pb.environment().putAll(EventDriver.getEnvironment());
+ pb.environment().put(IP_LOCATION, node.getClusterIp());
+ pb.environment().put(CLUSTER_ENV, env);
+ pb.environment().put(SCRIPT, script);
+ pb.environment().put(ARGS, argBuffer.toString());
+ pb.start();
+ }
- public static void executeLocalScript(Node node, String script, List<String> args) throws IOException,
- InterruptedException {
- List<String> pargs = new ArrayList<String>();
- pargs.add("/bin/bash");
- pargs.add(script);
- if (args != null) {
- pargs.addAll(args);
- }
- ProcessBuilder pb = new ProcessBuilder(pargs);
- pb.environment().putAll(EventDriver.getEnvironment());
- pb.environment().put(IP_LOCATION, node.getClusterIp());
- pb.start();
- }
+ public static void executeLocalScript(Node node, String script,
+ List<String> args) throws IOException, InterruptedException {
+ List<String> pargs = new ArrayList<String>();
+ pargs.add("/bin/bash");
+ pargs.add(script);
+ if (args != null) {
+ pargs.addAll(args);
+ }
+ ProcessBuilder pb = new ProcessBuilder(pargs);
+ pb.environment().putAll(EventDriver.getEnvironment());
+ pb.environment().put(IP_LOCATION, node.getClusterIp());
+ pb.start();
+ }
- public static List<String> getEventArgs(Pattern pattern) {
- List<String> pargs = new ArrayList<String>();
- if (pattern.getEvent().getPargs() == null) {
- return pargs;
- }
- String[] args = pattern.getEvent().getPargs().split(" ");
- for (String arg : args) {
- pargs.add(arg.trim());
- }
- return pargs;
- }
+ public static List<String> getEventArgs(Pattern pattern) {
+ List<String> pargs = new ArrayList<String>();
+ if (pattern.getEvent().getPargs() == null) {
+ return pargs;
+ }
+ String[] args = pattern.getEvent().getPargs().split(" ");
+ for (String arg : args) {
+ pargs.add(arg.trim());
+ }
+ return pargs;
+ }
}
diff --git a/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/driver/InstallerUtil.java b/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/driver/InstallerUtil.java
index c9395ca..2b884ef 100644
--- a/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/driver/InstallerUtil.java
+++ b/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/driver/InstallerUtil.java
@@ -51,6 +51,7 @@
import edu.uci.ics.asterix.common.configuration.Store;
import edu.uci.ics.asterix.event.driver.EventDriver;
import edu.uci.ics.asterix.event.management.EventrixClient;
+import edu.uci.ics.asterix.event.management.EventUtil;
import edu.uci.ics.asterix.event.schema.cluster.Cluster;
import edu.uci.ics.asterix.event.schema.cluster.Env;
import edu.uci.ics.asterix.event.schema.cluster.Node;
@@ -96,10 +97,10 @@
clusterProperties = new ArrayList<Property>();
}
for (edu.uci.ics.asterix.common.configuration.Property property : asterixConfiguration.getProperty()) {
- if (property.getName().equalsIgnoreCase(AsterixInstance.CC_JAVA_OPTS)) {
- clusterProperties.add(new Property("CC_JAVA_OPTS", property.getValue()));
- } else if (property.getName().equalsIgnoreCase(AsterixInstance.NC_JAVA_OPTS)) {
- clusterProperties.add(new Property("NC_JAVA_OPTS", property.getValue()));
+ if (property.getName().equalsIgnoreCase(EventUtil.CC_JAVA_OPTS)) {
+ clusterProperties.add(new Property(EventUtil.CC_JAVA_OPTS, property.getValue()));
+ } else if (property.getName().equalsIgnoreCase(EventUtil.NC_JAVA_OPTS)) {
+ clusterProperties.add(new Property(EventUtil.NC_JAVA_OPTS, property.getValue()));
}
}
clusterProperties.add(new Property("ASTERIX_HOME", cluster.getWorkingDir().getDir() + File.separator
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index fa42aed..4773ed0 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -1299,6 +1299,17 @@
public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
String dataverseName, String datasetName, String targetIdxName) throws AlgebricksException {
FileSplit[] splits = splitsForInternalOrFeedDataset(mdTxnCtx, dataverseName, datasetName, targetIdxName);
+ return splitProviderAndPartitionConstraints(splits);
+ }
+
+ public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForDataverse(
+ String dataverse) {
+ FileSplit[] splits = splitsForDataverse(mdTxnCtx, dataverse);
+ return splitProviderAndPartitionConstraints(splits);
+ }
+
+ private Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraints(
+ FileSplit[] splits) {
IFileSplitProvider splitProvider = new ConstantFileSplitProvider(splits);
String[] loc = new String[splits.length];
for (int p = 0; p < splits.length; p++) {
@@ -1308,6 +1319,23 @@
return new Pair<IFileSplitProvider, AlgebricksPartitionConstraint>(splitProvider, pc);
}
+ private FileSplit[] splitsForDataverse(MetadataTransactionContext mdTxnCtx, String dataverseName) {
+ File relPathFile = new File(dataverseName);
+ List<FileSplit> splits = new ArrayList<FileSplit>();
+ for (Map.Entry<String, String[]> entry : stores.entrySet()) {
+ String node = entry.getKey();
+ String[] nodeStores = entry.getValue();
+ if (nodeStores == null) {
+ continue;
+ }
+ for (int i = 0; i < nodeStores.length; i++) {
+ File f = new File(nodeStores[i] + File.separator + relPathFile);
+ splits.add(new FileSplit(node, new FileReference(f)));
+ }
+ }
+ return splits.toArray(new FileSplit[] {});
+ }
+
private FileSplit[] splitsForInternalOrFeedDataset(MetadataTransactionContext mdTxnCtx, String dataverseName,
String datasetName, String targetIdxName) throws AlgebricksException {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/IndexOperationTracker.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/IndexOperationTracker.java
index b3a6533..2016d08 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/IndexOperationTracker.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/IndexOperationTracker.java
@@ -15,6 +15,8 @@
package edu.uci.ics.asterix.transaction.management.opcallbacks;
+import java.util.concurrent.atomic.AtomicInteger;
+
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
@@ -30,7 +32,7 @@
public class IndexOperationTracker implements ILSMOperationTracker {
// Number of active operations on a ILSMIndex instance.
- private int numActiveOperations = 0;
+ private AtomicInteger numActiveOperations;
private long lastLSN;
private long firstLSN;
private final ILSMIndex index;
@@ -38,6 +40,7 @@
private ILSMIndexAccessor accessor;
public IndexOperationTracker(ILSMIndex index, ILSMIOOperationCallbackFactory ioOpCallbackFactory) {
+ this.numActiveOperations = new AtomicInteger(0);
this.index = index;
//TODO
//This code is added to avoid NullPointException when the index's comparatorFactory is null.
@@ -54,7 +57,7 @@
public void beforeOperation(LSMOperationType opType, ISearchOperationCallback searchCallback,
IModificationOperationCallback modificationCallback) throws HyracksDataException {
if (opType != LSMOperationType.FORCE_MODIFICATION) {
- numActiveOperations++;
+ numActiveOperations.incrementAndGet();
// Increment transactor-local active operations count.
AbstractOperationCallback opCallback = getOperationCallback(searchCallback, modificationCallback);
@@ -76,7 +79,6 @@
@Override
public void completeOperation(LSMOperationType opType, ISearchOperationCallback searchCallback,
IModificationOperationCallback modificationCallback) throws HyracksDataException {
- numActiveOperations--;
// Decrement transactor-local active operations count.
AbstractOperationCallback opCallback = getOperationCallback(searchCallback, modificationCallback);
@@ -85,7 +87,7 @@
}
// If we need a flush, and this is the last completing operation, then schedule the flush.
// Once the flush has completed notify all waiting operations.
- if (index.getFlushStatus() && numActiveOperations == 0 && opType != LSMOperationType.FLUSH) {
+ if (index.getFlushStatus() && numActiveOperations.decrementAndGet() == 0 && opType != LSMOperationType.FLUSH) {
if (accessor == null) {
accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);