checkpoint post merge from master
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
index e56ed92..9fcbb29 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
@@ -62,6 +62,9 @@
import edu.uci.ics.hyracks.storage.common.file.ResourceIdFactoryProvider;
public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAsterixPropertiesProvider {
+
+ public static final AsterixPropertiesAccessor ASTERIX_PROPERTIES_ACCESSOR = createAsterixPropertiesAccessor();
+
private static final int METADATA_IO_DEVICE_ID = 0;
private final INCApplicationContext ncApplicationContext;
@@ -84,18 +87,26 @@
private IIOManager ioManager;
private boolean isShuttingdown;
- public AsterixAppRuntimeContext(INCApplicationContext ncApplicationContext) {
+ public AsterixAppRuntimeContext(INCApplicationContext ncApplicationContext) throws AsterixException {
this.ncApplicationContext = ncApplicationContext;
+ compilerProperties = new AsterixCompilerProperties(ASTERIX_PROPERTIES_ACCESSOR);
+ externalProperties = new AsterixExternalProperties(ASTERIX_PROPERTIES_ACCESSOR);
+ metadataProperties = new AsterixMetadataProperties(ASTERIX_PROPERTIES_ACCESSOR);
+ storageProperties = new AsterixStorageProperties(ASTERIX_PROPERTIES_ACCESSOR);
+ txnProperties = new AsterixTransactionProperties(ASTERIX_PROPERTIES_ACCESSOR);
+ }
+
+ private static AsterixPropertiesAccessor createAsterixPropertiesAccessor() {
+ AsterixPropertiesAccessor propertiesAccessor = null;
+ try {
+ propertiesAccessor = new AsterixPropertiesAccessor();
+ } catch (AsterixException e) {
+ throw new IllegalStateException("Unable to create properties accessor");
+ }
+ return propertiesAccessor;
}
public void initialize() throws IOException, ACIDException, AsterixException {
- AsterixPropertiesAccessor propertiesAccessor = new AsterixPropertiesAccessor();
- compilerProperties = new AsterixCompilerProperties(propertiesAccessor);
- externalProperties = new AsterixExternalProperties(propertiesAccessor);
- metadataProperties = new AsterixMetadataProperties(propertiesAccessor);
- storageProperties = new AsterixStorageProperties(propertiesAccessor);
- txnProperties = new AsterixTransactionProperties(propertiesAccessor);
-
Logger.getLogger("edu.uci.ics").setLevel(externalProperties.getLogLevel());
fileMapManager = new AsterixFileMapManager();
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
index 97a536d..ed0cab1 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
@@ -33,7 +33,6 @@
import edu.uci.ics.asterix.metadata.MetadataManager;
import edu.uci.ics.asterix.metadata.api.IAsterixStateProxy;
import edu.uci.ics.asterix.metadata.bootstrap.AsterixStateProxy;
-import edu.uci.ics.asterix.metadata.feeds.FeedJobLifecycleListener;
import edu.uci.ics.asterix.metadata.feeds.FeedLifecycleListener;
import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
@@ -71,9 +70,6 @@
AsterixAppContextInfo.getInstance().getCCApplicationContext()
.addJobLifecycleListener(FeedLifecycleListener.INSTANCE);
- AsterixAppContextInfo.getInstance().getCCApplicationContext()
- .addClusterLifecycleListener(FeedLifecycleListener.INSTANCE);
-
AsterixExternalProperties externalProperties = AsterixAppContextInfo.getInstance().getExternalProperties();
setupWebServer(externalProperties);
webServer.start();
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterLifecycleListener.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
index a864e61..941733e 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
@@ -14,11 +14,22 @@
*/
package edu.uci.ics.asterix.hyracks.bootstrap;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.event.schema.cluster.Node;
+import edu.uci.ics.asterix.metadata.api.IClusterEventsSubscriber;
+import edu.uci.ics.asterix.metadata.api.IClusterManagementWork;
+import edu.uci.ics.asterix.metadata.cluster.AddNodeWork;
+import edu.uci.ics.asterix.metadata.cluster.AddNodeWorkResponse;
+import edu.uci.ics.asterix.metadata.cluster.ClusterManager;
+import edu.uci.ics.asterix.metadata.cluster.IClusterManagementWorkResponse.Status;
+import edu.uci.ics.asterix.metadata.cluster.RemoveNodeWork;
import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
import edu.uci.ics.hyracks.api.application.IClusterLifecycleListener;
@@ -37,6 +48,7 @@
LOGGER.info("NC: " + nodeId + " joined");
}
AsterixClusterProperties.INSTANCE.addNCConfiguration(nodeId, ncConfiguration);
+
}
public void notifyNodeFailure(Set<String> deadNodeIds) {
@@ -46,7 +58,58 @@
}
AsterixClusterProperties.INSTANCE.removeNCConfiguration(deadNode);
}
+ Set<IClusterEventsSubscriber> subscribers = ClusterManager.INSTANCE.getRegisteredClusterEventSubscribers();
+ Set<IClusterManagementWork> work = new HashSet<IClusterManagementWork>();
+ for (IClusterEventsSubscriber sub : subscribers) {
+ work.addAll(sub.notifyNodeFailure(deadNodeIds));
+ }
+
+ int nodesToAdd = 0;
+ Set<String> nodesToRemove = new HashSet<String>();
+ Set<IClusterManagementWork> nodeAdditionRequests = new HashSet<IClusterManagementWork>();
+ Set<IClusterManagementWork> nodeRemovalRequests = new HashSet<IClusterManagementWork>();
+ for (IClusterManagementWork w : work) {
+ switch (w.getClusterManagementWorkType()) {
+ case ADD_NODE:
+ if (nodesToAdd < ((AddNodeWork) w).getNumberOfNodes()) {
+ nodesToAdd = ((AddNodeWork) w).getNumberOfNodes();
+ }
+ nodeAdditionRequests.add(w);
+ break;
+ case REMOVE_NODE:
+ nodesToRemove.addAll(((RemoveNodeWork) w).getNodesToBeRemoved());
+ nodeRemovalRequests.add(w);
+ break;
+ }
+ }
+
+ Set<Node> addedNodes = new HashSet<Node>();
+ for (int i = 0; i < nodesToAdd; i++) {
+ Node node = AsterixClusterProperties.INSTANCE.getAvailableSubstitutionNode();
+ if (node != null) {
+ try {
+ ClusterManager.INSTANCE.addNode(node);
+ addedNodes.add(node);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Added NC at:" + node.getId());
+ }
+ } catch (AsterixException e) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Unable to add NC at:" + node.getId());
+ }
+ e.printStackTrace();
+ }
+ } else {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Unable to add NC: no more available nodes");
+ }
+ }
+ }
+
+ for (IClusterManagementWork w : nodeAdditionRequests) {
+ w.getSourceSubscriber().notifyRequestCompletion(
+ new AddNodeWorkResponse((AddNodeWork) w, Status.SUCCESS, addedNodes));
+ }
}
-
}
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index 069cdb6..e4ef300 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -25,6 +25,7 @@
import edu.uci.ics.asterix.common.api.AsterixThreadFactory;
import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
import edu.uci.ics.asterix.common.config.AsterixMetadataProperties;
+import edu.uci.ics.asterix.common.config.AsterixTransactionProperties;
import edu.uci.ics.asterix.common.config.IAsterixPropertiesProvider;
import edu.uci.ics.asterix.common.transactions.IRecoveryManager;
import edu.uci.ics.asterix.common.transactions.IRecoveryManager.SystemState;
@@ -63,6 +64,14 @@
Runtime.getRuntime().addShutdownHook(sHook);
runtimeContext = new AsterixAppRuntimeContext(ncApplicationContext);
+ AsterixMetadataProperties metadataProperties = ((IAsterixPropertiesProvider) runtimeContext)
+ .getMetadataProperties();
+ if (!metadataProperties.getNodeNames().contains(ncApplicationContext.getNodeId())) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Substitute node joining : " + ncApplicationContext.getNodeId());
+ }
+ updateOnNodeJoin();
+ }
runtimeContext.initialize();
ncApplicationContext.setApplicationObject(runtimeContext);
@@ -115,25 +124,6 @@
IAsterixStateProxy proxy = (IAsterixStateProxy) ncApplicationContext.getDistributedState();
AsterixMetadataProperties metadataProperties = ((IAsterixPropertiesProvider) runtimeContext)
.getMetadataProperties();
- if (!metadataProperties.getNodeNames().contains(nodeId)) {
- metadataProperties.getNodeNames().add(nodeId);
- Cluster cluster = AsterixClusterProperties.INSTANCE.getCluster();
- Node self = null;
- for (Node node : cluster.getSubstituteNodes().getNode()) {
- if (node.getId().equalsIgnoreCase(nodeId)) {
- String storeDir = node.getStore() == null ? cluster.getStore() : node.getStore();
- metadataProperties.getStores().put(nodeId, storeDir.split(","));
- self = node;
- break;
- }
- }
- if (self != null) {
- cluster.getSubstituteNodes().getNode().remove(self);
- cluster.getNode().add(self);
- } else {
- throw new IllegalStateException("Unknown node joining the cluster");
- }
- }
if (systemState == SystemState.NEW_UNIVERSE) {
if (LOGGER.isLoggable(Level.INFO)) {
@@ -201,6 +191,47 @@
}
}
+ private void updateOnNodeJoin() {
+ AsterixMetadataProperties metadataProperties = ((IAsterixPropertiesProvider) runtimeContext)
+ .getMetadataProperties();
+ AsterixTransactionProperties txnProperties = ((IAsterixPropertiesProvider) runtimeContext)
+ .getTransactionProperties();
+ if (!metadataProperties.getNodeNames().contains(nodeId)) {
+ metadataProperties.getNodeNames().add(nodeId);
+ Cluster cluster = AsterixClusterProperties.INSTANCE.getCluster();
+ String asterixInstanceName = cluster.getInstanceName();
+ Node self = null;
+ for (Node node : cluster.getSubstituteNodes().getNode()) {
+ String ncId = asterixInstanceName + "_" + node.getId();
+ if (ncId.equalsIgnoreCase(nodeId)) {
+ String storeDir = node.getStore() == null ? cluster.getStore() : node.getStore();
+ metadataProperties.getStores().put(nodeId, storeDir.split(","));
+
+ String coredumpPath = node.getLogDir() == null ? cluster.getLogDir() : node.getLogDir();
+ metadataProperties.getCoredumpPaths().put(nodeId, coredumpPath);
+
+ String txnLogDir = node.getTxnLogDir() == null ? cluster.getTxnLogDir() : node.getTxnLogDir();
+ txnProperties.getLogDirectories().put(nodeId, txnLogDir);
+
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Store set to : " + storeDir);
+ LOGGER.info("Coredump dir set to : " + coredumpPath);
+ LOGGER.info("Transaction log dir set to :" + txnLogDir);
+ }
+ self = node;
+ break;
+ }
+ }
+ if (self != null) {
+ cluster.getSubstituteNodes().getNode().remove(self);
+ cluster.getNode().add(self);
+ } else {
+ throw new IllegalStateException("Unknown node joining the cluster");
+ }
+ }
+
+ }
+
/**
* Shutdown hook that invokes {@link NCApplicationEntryPoint#stop() stop} method.
*/
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixExternalProperties.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixExternalProperties.java
index 7bab09d..98c7c91 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixExternalProperties.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixExternalProperties.java
@@ -27,6 +27,9 @@
private static final String EXTERNAL_APISERVER_KEY = "api.port";
private static int EXTERNAL_APISERVER_DEFAULT = 19002;
+ private static final String EXTERNAL_CC_JAVA_OPTS_KEY = "cc.java.opts";
+ private static String EXTERNAL_CC_JAVA_OPTS_DEFAULT = "-Xmx1024m";
+
private static final String EXTERNAL_NC_JAVA_OPTS_KEY = "nc.java.opts";
private static String EXTERNAL_NC_JAVA_OPTS_DEFAULT = "-Xmx1024m";
@@ -53,4 +56,9 @@
return accessor.getProperty(EXTERNAL_NC_JAVA_OPTS_KEY, EXTERNAL_NC_JAVA_OPTS_DEFAULT,
PropertyInterpreters.getStringPropertyInterpreter());
}
+
+ public String getCCJavaParams() {
+ return accessor.getProperty(EXTERNAL_CC_JAVA_OPTS_KEY, EXTERNAL_CC_JAVA_OPTS_DEFAULT,
+ PropertyInterpreters.getStringPropertyInterpreter());
+ }
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixMetadataProperties.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixMetadataProperties.java
index 73a1f46..1d68dfc 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixMetadataProperties.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixMetadataProperties.java
@@ -42,9 +42,12 @@
public Set<String> getNodeNames() {
return accessor.getNodeNames();
}
-
- public String getCoredumpPath(String nodeId){
+
+ public String getCoredumpPath(String nodeId) {
return accessor.getCoredumpPath(nodeId);
}
+ public Map<String, String> getCoredumpPaths() {
+ return accessor.getCoredumpConfig();
+ }
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixPropertiesAccessor.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixPropertiesAccessor.java
index 881bae2..7654aa3 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixPropertiesAccessor.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixPropertiesAccessor.java
@@ -115,8 +115,26 @@
return coredumpConfig.get(nodeId);
}
- public String getTransactionLogDir(String nodeId) {
- return transactionLogDirs.get(nodeId);
+ public Map<String, String> getTransactionLogDirs() {
+ return transactionLogDirs;
+ }
+
+ public Map<String, String> getCoredumpConfig() {
+ return coredumpConfig;
+ }
+
+ public void putCoredumpPaths(String nodeId, String coredumpPath) {
+ if (coredumpConfig.containsKey(nodeId)) {
+ throw new IllegalStateException("Cannot override value for coredump path");
+ }
+ coredumpConfig.put(nodeId, coredumpPath);
+ }
+
+ public void putTransactionLogDir(String nodeId, String txnLogDir) {
+ if (transactionLogDirs.containsKey(nodeId)) {
+ throw new IllegalStateException("Cannot override value for txnLogDir");
+ }
+ transactionLogDirs.put(nodeId, txnLogDir);
}
public <T> T getProperty(String property, T defaultValue, IPropertyInterpreter<T> interpreter) {
@@ -143,4 +161,5 @@
public String getInstanceName() {
return instanceName;
}
+
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixTransactionProperties.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixTransactionProperties.java
index 6fa5beb..6094a27 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixTransactionProperties.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/config/AsterixTransactionProperties.java
@@ -14,6 +14,8 @@
*/
package edu.uci.ics.asterix.common.config;
+import java.util.Map;
+
public class AsterixTransactionProperties extends AbstractAsterixProperties {
private static final String TXN_LOG_BUFFER_NUMPAGES_KEY = "txn.log.buffer.numpages";
@@ -57,7 +59,11 @@
}
public String getLogDirectory(String nodeId) {
- return accessor.getTransactionLogDir(nodeId);
+ return accessor.getTransactionLogDirs().get(nodeId);
+ }
+
+ public Map<String, String> getLogDirectories() {
+ return accessor.getTransactionLogDirs();
}
public int getLogBufferNumPages() {
diff --git a/asterix-common/src/main/resources/schema/cluster.xsd b/asterix-common/src/main/resources/schema/cluster.xsd
index 09b71d5..5192e06 100644
--- a/asterix-common/src/main/resources/schema/cluster.xsd
+++ b/asterix-common/src/main/resources/schema/cluster.xsd
@@ -3,7 +3,8 @@
xmlns:cl="cluster" targetNamespace="cluster" elementFormDefault="qualified">
<!-- definition of simple types -->
- <xs:element name="name" type="xs:string" />
+ <xs:element name="instance_name" type="xs:string" />
+ <xs:element name="cluster_name" type="xs:string" />
<xs:element name="log_dir" type="xs:string" />
<xs:element name="txn_log_dir" type="xs:string" />
<xs:element name="id" type="xs:string" />
@@ -40,9 +41,9 @@
<xs:element ref="cl:cluster_ip" />
<xs:element ref="cl:java_home" minOccurs="0" />
<xs:element ref="cl:log_dir" minOccurs="0" />
- <xs:element ref="cl:client_port" />
- <xs:element ref="cl:cluster_port" />
- <xs:element ref="cl:http_port" />
+ <xs:element ref="cl:client_port" />
+ <xs:element ref="cl:cluster_port" />
+ <xs:element ref="cl:http_port" />
</xs:sequence>
</xs:complexType>
</xs:element>
@@ -89,7 +90,8 @@
<xs:element name="cluster">
<xs:complexType>
<xs:sequence>
- <xs:element ref="cl:name" />
+ <xs:element ref="cl:instance_name" />
+ <xs:element ref="cl:cluster_name" />
<xs:element ref="cl:username" />
<xs:element ref="cl:env" minOccurs="0" />
<xs:element ref="cl:java_home" minOccurs="0" />
@@ -100,7 +102,7 @@
<xs:element ref="cl:working_dir" />
<xs:element ref="cl:master_node" />
<xs:element ref="cl:node" maxOccurs="unbounded" />
- <xs:element ref="cl:substitute_nodes"/>
+ <xs:element ref="cl:substitute_nodes" />
</xs:sequence>
</xs:complexType>
</xs:element>
diff --git a/asterix-events/src/main/java/edu/uci/ics/asterix/event/error/VerificationUtil.java b/asterix-events/src/main/java/edu/uci/ics/asterix/event/error/VerificationUtil.java
index 5fcc5cf..c643c42 100644
--- a/asterix-events/src/main/java/edu/uci/ics/asterix/event/error/VerificationUtil.java
+++ b/asterix-events/src/main/java/edu/uci/ics/asterix/event/error/VerificationUtil.java
@@ -29,97 +29,107 @@
public class VerificationUtil {
- private static final String VERIFY_SCRIPT_PATH = AsterixEventService.getEventHome() + File.separator + "scripts" + File.separator + "verify.sh";
+ private static final String VERIFY_SCRIPT_PATH = AsterixEventService
+ .getEventHome()
+ + File.separator
+ + "scripts"
+ + File.separator
+ + "verify.sh";
- public static AsterixRuntimeState getAsterixRuntimeState(AsterixInstance instance) throws Exception {
+ public static AsterixRuntimeState getAsterixRuntimeState(
+ AsterixInstance instance) throws Exception {
- Cluster cluster = instance.getCluster();
- List<String> args = new ArrayList<String>();
- args.add(instance.getName());
- args.add(instance.getCluster().getMasterNode().getClusterIp());
- for (Node node : cluster.getNode()) {
- args.add(node.getClusterIp());
- args.add(instance.getName() + "_" + node.getId());
- }
- Thread.sleep(2000);
- String output = AsterixEventServiceUtil.executeLocalScript(VERIFY_SCRIPT_PATH, args);
- boolean ccRunning = true;
- List<String> failedNCs = new ArrayList<String>();
- String[] infoFields;
- ProcessInfo pInfo;
- List<ProcessInfo> processes = new ArrayList<ProcessInfo>();
+ Cluster cluster = instance.getCluster();
+ List<String> args = new ArrayList<String>();
+ args.add(instance.getName());
+ args.add(instance.getCluster().getMasterNode().getClusterIp());
+ for (Node node : cluster.getNode()) {
+ args.add(node.getClusterIp());
+ args.add(instance.getName() + "_" + node.getId());
+ }
+ Thread.sleep(2000);
+ String output = AsterixEventServiceUtil.executeLocalScript(
+ VERIFY_SCRIPT_PATH, args);
+ boolean ccRunning = true;
+ List<String> failedNCs = new ArrayList<String>();
+ String[] infoFields;
+ ProcessInfo pInfo;
+ List<ProcessInfo> processes = new ArrayList<ProcessInfo>();
- for (String line : output.split("\n")) {
- String nodeid = null;
- infoFields = line.split(":");
- try {
- int pid = Integer.parseInt(infoFields[3]);
- if (infoFields[0].equals("NC")) {
- nodeid = infoFields[2].split("_")[1];
- } else {
- nodeid = instance.getCluster().getMasterNode().getId();
- }
- pInfo = new ProcessInfo(infoFields[0], infoFields[1], nodeid, pid);
- processes.add(pInfo);
- } catch (Exception e) {
- if (infoFields[0].equalsIgnoreCase("CC")) {
- ccRunning = false;
- } else {
- failedNCs.add(infoFields[1]);
- }
- }
- }
- return new AsterixRuntimeState(processes, failedNCs, ccRunning);
- }
+ for (String line : output.split("\n")) {
+ String nodeid = null;
+ infoFields = line.split(":");
+ try {
+ int pid = Integer.parseInt(infoFields[3]);
+ if (infoFields[0].equals("NC")) {
+ nodeid = infoFields[2].split("_")[1];
+ } else {
+ nodeid = instance.getCluster().getMasterNode().getId();
+ }
+ pInfo = new ProcessInfo(infoFields[0], infoFields[1], nodeid,
+ pid);
+ processes.add(pInfo);
+ } catch (Exception e) {
+ if (infoFields[0].equalsIgnoreCase("CC")) {
+ ccRunning = false;
+ } else {
+ failedNCs.add(infoFields[1]);
+ }
+ }
+ }
+ return new AsterixRuntimeState(processes, failedNCs, ccRunning);
+ }
- public static void updateInstanceWithRuntimeDescription(AsterixInstance instance, AsterixRuntimeState state,
- boolean expectedRunning) {
- StringBuffer summary = new StringBuffer();
- if (expectedRunning) {
- if (!state.isCcRunning()) {
- summary.append("Cluster Controller not running at " + instance.getCluster().getMasterNode().getId()
- + "\n");
- instance.setState(State.UNUSABLE);
- }
- if (state.getFailedNCs() != null && !state.getFailedNCs().isEmpty()) {
- summary.append("Node Controller not running at the following nodes" + "\n");
- for (String failedNC : state.getFailedNCs()) {
- summary.append(failedNC + "\n");
- }
- instance.setState(State.UNUSABLE);
- }
- if (!(instance.getState().equals(State.UNUSABLE))) {
- instance.setState(State.ACTIVE);
- }
- } else {
- if (state.getProcesses() != null && state.getProcesses().size() > 0) {
- summary.append("Following process still running " + "\n");
- for (ProcessInfo pInfo : state.getProcesses()) {
- summary.append(pInfo + "\n");
- }
- instance.setState(State.UNUSABLE);
- } else {
- instance.setState(State.INACTIVE);
- }
- }
- state.setSummary(summary.toString());
- instance.setAsterixRuntimeStates(state);
- }
+ public static void updateInstanceWithRuntimeDescription(
+ AsterixInstance instance, AsterixRuntimeState state,
+ boolean expectedRunning) {
+ StringBuffer summary = new StringBuffer();
+ if (expectedRunning) {
+ if (!state.isCcRunning()) {
+ summary.append("Cluster Controller not running at "
+ + instance.getCluster().getMasterNode().getId() + "\n");
+ instance.setState(State.UNUSABLE);
+ }
+ if (state.getFailedNCs() != null && !state.getFailedNCs().isEmpty()) {
+ summary.append("Node Controller not running at the following nodes"
+ + "\n");
+ for (String failedNC : state.getFailedNCs()) {
+ summary.append(failedNC + "\n");
+ }
+ // instance.setState(State.UNUSABLE);
+ }
+ if (!(instance.getState().equals(State.UNUSABLE))) {
+ // instance.setState(State.ACTIVE);
+ }
+ } else {
+ if (state.getProcesses() != null && state.getProcesses().size() > 0) {
+ summary.append("Following process still running " + "\n");
+ for (ProcessInfo pInfo : state.getProcesses()) {
+ summary.append(pInfo + "\n");
+ }
+ // instance.setState(State.UNUSABLE);
+ } else {
+ // instance.setState(State.INACTIVE);
+ }
+ }
+ state.setSummary(summary.toString());
+ instance.setAsterixRuntimeStates(state);
+ }
- public static void verifyBackupRestoreConfiguration(String hdfsUrl, String hadoopVersion, String hdfsBackupDir)
- throws Exception {
- StringBuffer errorCheck = new StringBuffer();
- if (hdfsUrl == null || hdfsUrl.length() == 0) {
- errorCheck.append("\n HDFS Url not configured");
- }
- if (hadoopVersion == null || hadoopVersion.length() == 0) {
- errorCheck.append("\n HDFS version not configured");
- }
- if (hdfsBackupDir == null || hdfsBackupDir.length() == 0) {
- errorCheck.append("\n HDFS backup directory not configured");
- }
- if (errorCheck.length() > 0) {
- throw new Exception("Incomplete hdfs configuration" + errorCheck);
- }
- }
+ public static void verifyBackupRestoreConfiguration(String hdfsUrl,
+ String hadoopVersion, String hdfsBackupDir) throws Exception {
+ StringBuffer errorCheck = new StringBuffer();
+ if (hdfsUrl == null || hdfsUrl.length() == 0) {
+ errorCheck.append("\n HDFS Url not configured");
+ }
+ if (hadoopVersion == null || hadoopVersion.length() == 0) {
+ errorCheck.append("\n HDFS version not configured");
+ }
+ if (hdfsBackupDir == null || hdfsBackupDir.length() == 0) {
+ errorCheck.append("\n HDFS backup directory not configured");
+ }
+ if (errorCheck.length() > 0) {
+ throw new Exception("Incomplete hdfs configuration" + errorCheck);
+ }
+ }
}
diff --git a/asterix-events/src/main/java/edu/uci/ics/asterix/event/model/AsterixInstance.java b/asterix-events/src/main/java/edu/uci/ics/asterix/event/model/AsterixInstance.java
index b5e2460..87f14b0 100644
--- a/asterix-events/src/main/java/edu/uci/ics/asterix/event/model/AsterixInstance.java
+++ b/asterix-events/src/main/java/edu/uci/ics/asterix/event/model/AsterixInstance.java
@@ -30,7 +30,7 @@
private static final long serialVersionUID = 1L;
private static final int WEB_INTERFACE_PORT_DEFAULT = 19001;
-
+
public enum State {
ACTIVE,
INACTIVE,
@@ -61,7 +61,6 @@
this.asterixVersion = asterixVersion;
this.createdTimestamp = new Date();
this.backupInfo = new ArrayList<BackupInfo>();
-
}
public Date getModifiedTimestamp() {
@@ -212,4 +211,5 @@
public void setAsterixConfiguration(AsterixConfiguration asterixConfiguration) {
this.asterixConfiguration = asterixConfiguration;
}
+
}
diff --git a/asterix-events/src/main/java/edu/uci/ics/asterix/event/service/AsterixEventServiceUtil.java b/asterix-events/src/main/java/edu/uci/ics/asterix/event/service/AsterixEventServiceUtil.java
index 1bca9e9..b9ed170 100644
--- a/asterix-events/src/main/java/edu/uci/ics/asterix/event/service/AsterixEventServiceUtil.java
+++ b/asterix-events/src/main/java/edu/uci/ics/asterix/event/service/AsterixEventServiceUtil.java
@@ -95,11 +95,25 @@
JAXBException, EventException {
String modifiedZipPath = injectAsterixPropertyFile(AsterixEventService.getAsterixZip(), asterixInstance);
- modifiedZipPath = injectAsterixLogPropertyFile(modifiedZipPath, asterixInstance);
- modifiedZipPath = injectAsterixClusterConfigurationFile(modifiedZipPath, asterixInstance);
+ injectAsterixClusterConfigurationFile(modifiedZipPath, asterixInstance);
}
public static void createClusterProperties(Cluster cluster, AsterixConfiguration asterixConfiguration) {
+
+ String ccJavaOpts = null;
+ String ncJavaOpts = null;
+ for (edu.uci.ics.asterix.common.configuration.Property property : asterixConfiguration.getProperty()) {
+ if (property.getName().equalsIgnoreCase(EventUtil.CC_JAVA_OPTS)) {
+ ccJavaOpts = property.getValue();
+ } else if (property.getName().equalsIgnoreCase(EventUtil.NC_JAVA_OPTS)) {
+ ncJavaOpts = property.getValue();
+ }
+ }
+
+ poulateClusterEnvironmentProperties(cluster, ccJavaOpts, ncJavaOpts);
+ }
+
+ public static void poulateClusterEnvironmentProperties(Cluster cluster, String ccJavaOpts, String ncJavaOpts) {
List<Property> clusterProperties = null;
if (cluster.getEnv() != null && cluster.getEnv().getProperty() != null) {
clusterProperties = cluster.getEnv().getProperty();
@@ -107,13 +121,9 @@
} else {
clusterProperties = new ArrayList<Property>();
}
- for (edu.uci.ics.asterix.common.configuration.Property property : asterixConfiguration.getProperty()) {
- if (property.getName().equalsIgnoreCase(EventUtil.CC_JAVA_OPTS)) {
- clusterProperties.add(new Property(EventUtil.CC_JAVA_OPTS, property.getValue()));
- } else if (property.getName().equalsIgnoreCase(EventUtil.NC_JAVA_OPTS)) {
- clusterProperties.add(new Property(EventUtil.NC_JAVA_OPTS, property.getValue()));
- }
- }
+
+ clusterProperties.add(new Property(EventUtil.CC_JAVA_OPTS, ccJavaOpts));
+ clusterProperties.add(new Property(EventUtil.NC_JAVA_OPTS, ncJavaOpts));
clusterProperties.add(new Property("ASTERIX_HOME", cluster.getWorkingDir().getDir() + File.separator
+ "asterix"));
clusterProperties.add(new Property("LOG_DIR", cluster.getLogDir()));
@@ -136,10 +146,6 @@
cluster.setEnv(new Env(clusterProperties));
}
- public static void poulateClusterEnvironmentProperties(Cluster cluster) {
-
- }
-
private static String injectAsterixPropertyFile(String origZipFile, AsterixInstance asterixInstance)
throws IOException, JAXBException {
writeAsterixConfigurationFile(asterixInstance);
diff --git a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java
index a789a20..0223700 100644
--- a/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java
+++ b/asterix-external-data/src/main/java/edu/uci/ics/asterix/external/dataset/adapter/PullBasedAdapter.java
@@ -50,7 +50,6 @@
protected boolean alterRequested = false;
private Map<String, Object> modifiedConfiguration = null;
private long tupleCount = 0;
- private FeedPolicyEnforcer policyEnforcer;
public abstract IPullBasedFeedClient getFeedClient(int partition) throws Exception;
diff --git a/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/CreateCommand.java b/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/CreateCommand.java
index 7784d79..5b024ec 100644
--- a/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/CreateCommand.java
+++ b/asterix-installer/src/main/java/edu/uci/ics/asterix/installer/command/CreateCommand.java
@@ -51,13 +51,15 @@
AsterixEventServiceUtil.validateAsterixInstanceNotExists(asterixInstanceName);
CreateConfig createConfig = (CreateConfig) config;
cluster = EventUtil.getCluster(createConfig.clusterPath);
+ cluster.setInstanceName(asterixInstanceName);
asterixConfiguration = InstallerUtil.getAsterixConfiguration(createConfig.asterixConfPath);
AsterixInstance asterixInstance = AsterixEventServiceUtil.createAsterixInstance(asterixInstanceName, cluster,
asterixConfiguration);
AsterixEventServiceUtil.evaluateConflictWithOtherInstances(asterixInstance);
AsterixEventServiceUtil.createAsterixZip(asterixInstance);
AsterixEventServiceUtil.createClusterProperties(cluster, asterixConfiguration);
- AsterixEventServiceClient eventrixClient = AsterixEventService.getAsterixEventServiceClient(cluster, true, false);
+ AsterixEventServiceClient eventrixClient = AsterixEventService.getAsterixEventServiceClient(cluster, true,
+ false);
Patterns asterixBinarytrasnferPattern = PatternCreator.INSTANCE.getAsterixBinaryTransferPattern(
asterixInstanceName, cluster);
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IClusterEventsSubscriber.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IClusterEventsSubscriber.java
new file mode 100644
index 0000000..39f9927
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IClusterEventsSubscriber.java
@@ -0,0 +1,37 @@
+package edu.uci.ics.asterix.metadata.api;
+
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.util.Set;
+
+import edu.uci.ics.asterix.metadata.cluster.IClusterManagementWorkResponse;
+
+public interface IClusterEventsSubscriber {
+
+ /**
+ * @param deadNodeIds
+ * @return
+ */
+ public Set<IClusterManagementWork> notifyNodeFailure(Set<String> deadNodeIds);
+
+ /**
+ * @param joinedNodeId
+ * @return
+ */
+ public Set<IClusterManagementWork> notifyNodeJoin(String joinedNodeId);
+
+ public void notifyRequestCompletion(IClusterManagementWorkResponse response);
+
+}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IClusterManagementWork.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IClusterManagementWork.java
new file mode 100644
index 0000000..65ac354
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IClusterManagementWork.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.metadata.api;
+
+public interface IClusterManagementWork {
+
+ public enum WorkType {
+ ADD_NODE,
+ REMOVE_NODE
+ }
+
+ public WorkType getClusterManagementWorkType();
+
+ public int getWorkId();
+
+ public IClusterEventsSubscriber getSourceSubscriber();
+}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IClusterManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IClusterManager.java
index 386d89e..ea07a62 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IClusterManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IClusterManager.java
@@ -1,5 +1,7 @@
package edu.uci.ics.asterix.metadata.api;
+import java.util.Set;
+
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.event.schema.cluster.Node;
@@ -17,4 +19,20 @@
*/
public void removeNode(Node node) throws AsterixException;
+ /**
+ * @param subscriber
+ */
+ public void registerSubscriber(IClusterEventsSubscriber subscriber);
+
+ /**
+ * @param sunscriber
+ * @return
+ */
+ public boolean deregisterSubscriber(IClusterEventsSubscriber sunscriber);
+
+ /**
+ * @return
+ */
+ public Set<IClusterEventsSubscriber> getRegisteredClusterEventSubscribers();
+
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AbstractClusterManagementWork.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AbstractClusterManagementWork.java
new file mode 100644
index 0000000..bde860f
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AbstractClusterManagementWork.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.metadata.cluster;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import edu.uci.ics.asterix.metadata.api.IClusterEventsSubscriber;
+import edu.uci.ics.asterix.metadata.api.IClusterManagementWork;
+
+public abstract class AbstractClusterManagementWork implements IClusterManagementWork {
+
+ protected final IClusterEventsSubscriber subscriber;
+
+ @Override
+ public int getWorkId() {
+ return WorkIdGenerator.getNextWorkId();
+ }
+
+ public AbstractClusterManagementWork(IClusterEventsSubscriber subscriber) {
+ this.subscriber = subscriber;
+ }
+
+ private static class WorkIdGenerator {
+ private static AtomicInteger workId = new AtomicInteger(0);
+
+ public static int getNextWorkId() {
+ return workId.incrementAndGet();
+ }
+
+ }
+
+ @Override
+ public IClusterEventsSubscriber getSourceSubscriber() {
+ return subscriber;
+ }
+
+}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AddNodeWork.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AddNodeWork.java
new file mode 100644
index 0000000..4344ac8
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AddNodeWork.java
@@ -0,0 +1,23 @@
+package edu.uci.ics.asterix.metadata.cluster;
+
+import edu.uci.ics.asterix.metadata.api.IClusterEventsSubscriber;
+
+public class AddNodeWork extends AbstractClusterManagementWork {
+
+ private final int numberOfNodes;
+
+ @Override
+ public WorkType getClusterManagementWorkType() {
+ return WorkType.ADD_NODE;
+ }
+
+ public AddNodeWork(int numberOfNodes, IClusterEventsSubscriber subscriber) {
+ super(subscriber);
+ this.numberOfNodes = numberOfNodes;
+ }
+
+ public int getNumberOfNodes() {
+ return numberOfNodes;
+ }
+
+}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AddNodeWorkResponse.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AddNodeWorkResponse.java
new file mode 100644
index 0000000..fca9990
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/AddNodeWorkResponse.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.metadata.cluster;
+
+import java.util.Set;
+
+import edu.uci.ics.asterix.event.schema.cluster.Node;
+
+public class AddNodeWorkResponse extends ClusterManagementWorkResponse {
+
+ private final Set<Node> nodesAdded;
+
+ public AddNodeWorkResponse(AddNodeWork w, Status status, Set<Node> nodesAdded) {
+ super(w, status);
+ this.nodesAdded = nodesAdded;
+ }
+
+ public Set<Node> getNodesAdded() {
+ return nodesAdded;
+ }
+
+}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManagementWorkResponse.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManagementWorkResponse.java
new file mode 100644
index 0000000..e720c9f
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManagementWorkResponse.java
@@ -0,0 +1,24 @@
+package edu.uci.ics.asterix.metadata.cluster;
+
+import edu.uci.ics.asterix.metadata.api.IClusterManagementWork;
+
+public class ClusterManagementWorkResponse implements IClusterManagementWorkResponse {
+
+ protected final IClusterManagementWork work;
+
+ protected final Status status;
+
+ public ClusterManagementWorkResponse(IClusterManagementWork w, Status status) {
+ this.work = w;
+ this.status = status;
+ }
+
+ public IClusterManagementWork getWork() {
+ return work;
+ }
+
+ public Status getStatus() {
+ return status;
+ }
+
+}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManager.java
index b73b861..dc072d4 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/ClusterManager.java
@@ -2,32 +2,48 @@
import java.io.File;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.Unmarshaller;
+import edu.uci.ics.asterix.common.config.AsterixExternalProperties;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.event.management.AsterixEventServiceClient;
+import edu.uci.ics.asterix.event.model.AsterixInstance;
import edu.uci.ics.asterix.event.schema.cluster.Cluster;
import edu.uci.ics.asterix.event.schema.cluster.Node;
import edu.uci.ics.asterix.event.schema.pattern.Pattern;
import edu.uci.ics.asterix.event.schema.pattern.Patterns;
import edu.uci.ics.asterix.event.service.AsterixEventService;
+import edu.uci.ics.asterix.event.service.AsterixEventServiceUtil;
+import edu.uci.ics.asterix.event.service.ILookupService;
+import edu.uci.ics.asterix.event.service.ServiceProvider;
import edu.uci.ics.asterix.event.util.PatternCreator;
import edu.uci.ics.asterix.installer.schema.conf.Configuration;
+import edu.uci.ics.asterix.metadata.api.IClusterEventsSubscriber;
import edu.uci.ics.asterix.metadata.api.IClusterManager;
import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
import edu.uci.ics.asterix.om.util.AsterixClusterProperties;
public class ClusterManager implements IClusterManager {
+ private static final Logger LOGGER = Logger.getLogger(AsterixEventServiceClient.class.getName());
+
public static ClusterManager INSTANCE = new ClusterManager();
private static String eventsDir = System.getenv("user.dir") + File.separator + "eventrix";
private static AsterixEventServiceClient client;
+ private static ILookupService lookupService;
+
+ private static final Set<IClusterEventsSubscriber> eventSubscribers = new HashSet<IClusterEventsSubscriber>();
+
private ClusterManager() {
Cluster asterixCluster = AsterixClusterProperties.INSTANCE.getCluster();
String asterixDir = System.getProperty("user.dir") + File.separator + "asterix";
@@ -41,6 +57,19 @@
configuration = (Configuration) unmarshaller.unmarshal(configFile);
AsterixEventService.initialize(configuration, asterixDir, eventHome);
client = AsterixEventService.getAsterixEventServiceClient(AsterixClusterProperties.INSTANCE.getCluster());
+
+ lookupService = ServiceProvider.INSTANCE.getLookupService();
+ if (!lookupService.isRunning(configuration)) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Lookup service not running. Starting lookup service ...");
+ }
+ lookupService.startService(configuration);
+ } else {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Lookup service running");
+ }
+ }
+
} catch (Exception e) {
throw new IllegalStateException("Unable to initialize cluster manager" + e);
}
@@ -57,6 +86,10 @@
cluster.getNode().add(node);
client.submit(prepareNode);
+ AsterixExternalProperties externalProps = AsterixAppContextInfo.getInstance().getExternalProperties();
+ AsterixEventServiceUtil.poulateClusterEnvironmentProperties(cluster, externalProps.getCCJavaParams(),
+ externalProps.getNCJavaParams());
+
pattern.clear();
String ccHost = cluster.getMasterNode().getClusterIp();
String hostId = node.getId();
@@ -66,6 +99,12 @@
pattern.add(startNC);
Patterns startNCPattern = new Patterns(pattern);
client.submit(startNCPattern);
+
+ AsterixInstance instance = lookupService.getAsterixInstance(cluster.getInstanceName());
+ instance.getCluster().getNode().add(node);
+ instance.getCluster().getSubstituteNodes().getNode().remove(node);
+ lookupService.updateAsterixInstance(instance);
+
} catch (Exception e) {
throw new AsterixException(e);
}
@@ -79,7 +118,21 @@
private List<Pattern> getRemoveNodePattern(Node node) {
List<Pattern> pattern = new ArrayList<Pattern>();
-
return pattern;
}
+
+ @Override
+ public void registerSubscriber(IClusterEventsSubscriber subscriber) {
+ eventSubscribers.add(subscriber);
+ }
+
+ @Override
+ public boolean deregisterSubscriber(IClusterEventsSubscriber subscriber) {
+ return eventSubscribers.remove(subscriber);
+ }
+
+ @Override
+ public Set<IClusterEventsSubscriber> getRegisteredClusterEventSubscribers() {
+ return eventSubscribers;
+ }
}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/IClusterManagementWorkResponse.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/IClusterManagementWorkResponse.java
new file mode 100644
index 0000000..ae3ce7e
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/IClusterManagementWorkResponse.java
@@ -0,0 +1,11 @@
+package edu.uci.ics.asterix.metadata.cluster;
+
+public interface IClusterManagementWorkResponse {
+
+ public enum Status {
+ SUCCESS,
+ FAILURE
+ }
+
+
+}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/RemoveNodeWork.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/RemoveNodeWork.java
new file mode 100644
index 0000000..0b15df7
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/RemoveNodeWork.java
@@ -0,0 +1,25 @@
+package edu.uci.ics.asterix.metadata.cluster;
+
+import java.util.Set;
+
+import edu.uci.ics.asterix.metadata.api.IClusterEventsSubscriber;
+
+public class RemoveNodeWork extends AbstractClusterManagementWork {
+
+ private final Set<String> nodesToBeRemoved;
+
+ @Override
+ public WorkType getClusterManagementWorkType() {
+ return WorkType.REMOVE_NODE;
+ }
+
+ public RemoveNodeWork(Set<String> nodesToBeRemoved, IClusterEventsSubscriber subscriber) {
+ super(subscriber);
+ this.nodesToBeRemoved = nodesToBeRemoved;
+ }
+
+ public Set<String> getNodesToBeRemoved() {
+ return nodesToBeRemoved;
+ }
+
+}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/RemoveNodeWorkResponse.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/RemoveNodeWorkResponse.java
new file mode 100644
index 0000000..48445ef
--- /dev/null
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/cluster/RemoveNodeWorkResponse.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.metadata.cluster;
+
+import java.util.Set;
+
+public class RemoveNodeWorkResponse extends ClusterManagementWorkResponse {
+
+ private final Set<String> nodesRemoved;
+
+ public RemoveNodeWorkResponse(AddNodeWork w, Status status, Set<String> nodesRemoved) {
+ super(w, status);
+ this.nodesRemoved = nodesRemoved;
+ }
+
+ public Set<String> getNodesAdded() {
+ return nodesRemoved;
+ }
+
+}
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/BuiltinFeedPolicies.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/BuiltinFeedPolicies.java
index 9556ae0..3aa6b05 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/BuiltinFeedPolicies.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/BuiltinFeedPolicies.java
@@ -18,7 +18,7 @@
public static final FeedPolicy[] policies = new FeedPolicy[] { MISSTION_CRITICAL, ADVANCED, BASIC_MONITORED, BASIC };
- public static final FeedPolicy DEFAULT_POLICY = BASIC_MONITORED;
+ public static final FeedPolicy DEFAULT_POLICY = ADVANCED;
public static final String CONFIG_FEED_POLICY_KEY = "policy";
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedLifecycleListener.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedLifecycleListener.java
index ffa6c7e..bd154e6 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedLifecycleListener.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/feeds/FeedLifecycleListener.java
@@ -20,6 +20,7 @@
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -30,14 +31,19 @@
import edu.uci.ics.asterix.metadata.MetadataException;
import edu.uci.ics.asterix.metadata.MetadataManager;
import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
+import edu.uci.ics.asterix.metadata.api.IClusterEventsSubscriber;
+import edu.uci.ics.asterix.metadata.api.IClusterManagementWork;
+import edu.uci.ics.asterix.metadata.cluster.AddNodeWork;
+import edu.uci.ics.asterix.metadata.cluster.ClusterManager;
+import edu.uci.ics.asterix.metadata.cluster.IClusterManagementWorkResponse;
import edu.uci.ics.asterix.metadata.entities.FeedActivity;
import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
+import edu.uci.ics.asterix.metadata.feeds.FeedLifecycleListener.FeedFailure.FailureType;
import edu.uci.ics.asterix.om.util.AsterixAppContextInfo;
import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
import edu.uci.ics.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory;
import edu.uci.ics.hyracks.algebricks.runtime.operators.std.EmptyTupleSourceRuntimeFactory;
-import edu.uci.ics.hyracks.api.application.IClusterLifecycleListener;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
@@ -51,9 +57,7 @@
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.api.job.JobStatus;
-//import edu.uci.ics.hyracks.api.job.JobInfo;
-
-public class FeedLifecycleListener implements IJobLifecycleListener, IClusterLifecycleListener, Serializable {
+public class FeedLifecycleListener implements IJobLifecycleListener, IClusterEventsSubscriber, Serializable {
private static final long serialVersionUID = 1L;
@@ -69,6 +73,7 @@
feedFailureNotificationHandler = new FeedFailureHandler(failureEventInbox);
new Thread(feedJobNotificationHandler).start();
new Thread(feedFailureNotificationHandler).start();
+ ClusterManager.INSTANCE.registerSubscriber(this);
}
private final FeedJobNotificationHandler feedJobNotificationHandler;
@@ -313,13 +318,7 @@
}
@Override
- public void notifyNodeJoin(String nodeId, Map<String, String> ncConfiguration) {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void notifyNodeFailure(Set<String> deadNodeIds) {
+ public Set<IClusterManagementWork> notifyNodeFailure(Set<String> deadNodeIds) {
Collection<FeedInfo> feedInfos = feedJobNotificationHandler.registeredFeeds.values();
FeedFailureReport failureReport = new FeedFailureReport();
for (FeedInfo feedInfo : feedInfos) {
@@ -342,7 +341,40 @@
}
}
}
- failureEventInbox.add(failureReport);
+ return handleFailure(failureReport);
+ }
+
+ private Set<IClusterManagementWork> handleFailure(FeedFailureReport failureReport) {
+ Set<IClusterManagementWork> work = new HashSet<IClusterManagementWork>();
+ Map<String, Map<FeedInfo, List<FailureType>>> failureMap = new HashMap<String, Map<FeedInfo, List<FailureType>>>();
+ for (Map.Entry<FeedInfo, List<FeedFailure>> entry : failureReport.failures.entrySet()) {
+ FeedInfo feedInfo = entry.getKey();
+ List<FeedFailure> feedFailures = entry.getValue();
+ for (FeedFailure feedFailure : feedFailures) {
+ switch (feedFailure.failureType) {
+ case COMPUTE_NODE:
+ case INGESTION_NODE:
+ Map<FeedInfo, List<FailureType>> failuresBecauseOfThisNode = failureMap.get(feedFailure.nodeId);
+ if (failuresBecauseOfThisNode == null) {
+ failuresBecauseOfThisNode = new HashMap<FeedInfo, List<FailureType>>();
+ failuresBecauseOfThisNode.put(feedInfo, new ArrayList<FailureType>());
+ failureMap.put(feedFailure.nodeId, failuresBecauseOfThisNode);
+ }
+ List<FailureType> feedF = failuresBecauseOfThisNode.get(feedInfo);
+ if (feedF == null) {
+ feedF = new ArrayList<FailureType>();
+ failuresBecauseOfThisNode.put(feedInfo, feedF);
+ }
+ feedF.add(feedFailure.failureType);
+ break;
+ case STORAGE_NODE:
+ }
+ }
+ }
+
+ AddNodeWork addNodesWork = new AddNodeWork(failureMap.keySet().size(), this);
+ work.add(addNodesWork);
+ return work;
}
public static class FeedFailure {
@@ -361,4 +393,16 @@
this.nodeId = nodeId;
}
}
+
+ @Override
+ public Set<IClusterManagementWork> notifyNodeJoin(String joinedNodeId) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void notifyRequestCompletion(IClusterManagementWorkResponse response) {
+ // TODO Auto-generated method stub
+
+ }
}
diff --git a/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixClusterProperties.java b/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixClusterProperties.java
index 7daf04c..6285385 100644
--- a/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixClusterProperties.java
+++ b/asterix-om/src/main/java/edu/uci/ics/asterix/om/util/AsterixClusterProperties.java
@@ -14,12 +14,17 @@
*/
package edu.uci.ics.asterix.om.util;
+import java.io.InputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Unmarshaller;
+
import edu.uci.ics.asterix.event.schema.cluster.Cluster;
import edu.uci.ics.asterix.event.schema.cluster.Node;
@@ -29,78 +34,84 @@
public class AsterixClusterProperties {
- private static final Logger LOGGER = Logger
- .getLogger(AsterixClusterProperties.class.getName());
+ private static final Logger LOGGER = Logger.getLogger(AsterixClusterProperties.class.getName());
- private static final String IO_DEVICES = "iodevices";
+ private static final String IO_DEVICES = "iodevices";
- public static final AsterixClusterProperties INSTANCE = new AsterixClusterProperties();
+ public static final AsterixClusterProperties INSTANCE = new AsterixClusterProperties();
- private Map<String, Map<String, String>> ncConfiguration = new HashMap<String, Map<String, String>>();
+ private Map<String, Map<String, String>> ncConfiguration = new HashMap<String, Map<String, String>>();
- private final Cluster cluster;
+ public static final String CLUSTER_CONFIGURATION_FILE = "cluster.xml";
+ private final Cluster cluster;
+
+ private AsterixClusterProperties() {
+ InputStream is = this.getClass().getClassLoader().getResourceAsStream(CLUSTER_CONFIGURATION_FILE);
+ try {
+ JAXBContext ctx = JAXBContext.newInstance(Cluster.class);
+ Unmarshaller unmarshaller = ctx.createUnmarshaller();
+ cluster = (Cluster) unmarshaller.unmarshal(is);
+
+ } catch (JAXBException e) {
+ throw new IllegalStateException("Failed to read configuration file " + CLUSTER_CONFIGURATION_FILE);
+ }
+ }
- private AsterixClusterProperties() {
- cluster = null;
- }
+ public enum State {
+ ACTIVE,
+ UNUSABLE
+ }
- public enum State {
- ACTIVE, UNUSABLE
- }
+ private State state = State.UNUSABLE;
- private State state = State.UNUSABLE;
+ public void removeNCConfiguration(String nodeId) {
+ state = State.UNUSABLE;
+ ncConfiguration.remove(nodeId);
+ }
- public void removeNCConfiguration(String nodeId) {
- state = State.UNUSABLE;
- ncConfiguration.remove(nodeId);
- }
+ public void addNCConfiguration(String nodeId, Map<String, String> configuration) {
+ ncConfiguration.put(nodeId, configuration);
+ if (ncConfiguration.keySet().size() == AsterixAppContextInfo.getInstance().getMetadataProperties()
+ .getNodeNames().size()) {
+ state = State.ACTIVE;
+ }
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info(" Registering configuration parameters for node id" + nodeId);
+ }
+ }
- public void addNCConfiguration(String nodeId,
- Map<String, String> configuration) {
- ncConfiguration.put(nodeId, configuration);
- if (ncConfiguration.keySet().size() == AsterixAppContextInfo
- .getInstance().getMetadataProperties().getNodeNames().size()) {
- state = State.ACTIVE;
- }
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info(" Registering configuration parameters for node id"
- + nodeId);
- }
- }
+ /**
+ * Returns the number of IO devices configured for a Node Controller
+ *
+ * @param nodeId
+ * unique identifier of the Node Controller
+ * @return number of IO devices. -1 if the node id is not valid. A node id
+ * is not valid if it does not correspond to the set of registered
+ * Node Controllers.
+ */
+ public int getNumberOfIODevices(String nodeId) {
+ Map<String, String> ncConfig = ncConfiguration.get(nodeId);
+ if (ncConfig == null) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Configuration parameters for nodeId" + nodeId
+ + " not found. The node has not joined yet or has left.");
+ }
+ return -1;
+ }
+ return ncConfig.get(IO_DEVICES).split(",").length;
+ }
- /**
- * Returns the number of IO devices configured for a Node Controller
- *
- * @param nodeId
- * unique identifier of the Node Controller
- * @return number of IO devices. -1 if the node id is not valid. A node id
- * is not valid if it does not correspond to the set of registered
- * Node Controllers.
- */
- public int getNumberOfIODevices(String nodeId) {
- Map<String, String> ncConfig = ncConfiguration.get(nodeId);
- if (ncConfig == null) {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Configuration parameters for nodeId"
- + nodeId
- + " not found. The node has not joined yet or has left.");
- }
- return -1;
- }
- return ncConfig.get(IO_DEVICES).split(",").length;
- }
+ public State getState() {
+ return state;
+ }
- public State getState() {
- return state;
- }
+ public Cluster getCluster() {
+ return cluster;
+ }
- public Cluster getCluster() {
- return cluster;
- }
-
- public Node getAvailableSubstitutionNode() {
- List<Node> subNodes = cluster.getSubstituteNodes().getNode();
- return subNodes.isEmpty() ? null : subNodes.remove(0);
- }
+ public Node getAvailableSubstitutionNode() {
+ List<Node> subNodes = cluster.getSubstituteNodes().getNode();
+ return subNodes.isEmpty() ? null : subNodes.remove(0);
+ }
}
diff --git a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapter.java b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapter.java
index 33fa168..63ae8f2 100644
--- a/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapter.java
+++ b/asterix-tools/src/main/java/edu/uci/ics/asterix/tools/external/data/SyntheticTwitterFeedAdapter.java
@@ -29,9 +29,9 @@
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
/**
- * TPS can be configured between 1 and 20,000
+ * TPS can be configured between 1 and 20,000
+ *
* @author ramang
- *
*/
public class SyntheticTwitterFeedAdapter extends PullBasedAdapter {
@@ -101,13 +101,20 @@
private ARecordType outputRecordType;
private int partition;
private int tweetCount = 0;
+ private int tweetCountBeforeException = 0;
+ private int exceptionPeriod = -1;
- public SyntheticTwitterFeedClient(Map<String, Object> configuration, ARecordType outputRecordType,
- int partition) throws AsterixException {
+ public SyntheticTwitterFeedClient(Map<String, Object> configuration, ARecordType outputRecordType, int partition)
+ throws AsterixException {
this.outputRecordType = outputRecordType;
- String value = (String)configuration.get(KEY_DURATION);
+ String value = (String) configuration.get(KEY_DURATION);
duration = value != null ? Integer.parseInt(value) : 60;
- initializeTweetRate((String)configuration.get(KEY_TPS));
+ initializeTweetRate((String) configuration.get(KEY_TPS));
+ value = (String) configuration.get(KEY_EXCEPTION_PERIOD);
+ if (value != null) {
+ exceptionPeriod = Integer.parseInt(value);
+ }
+
InitializationInfo info = new InitializationInfo();
info.timeDurationInSecs = duration;
DataGenerator.initialize(info);
@@ -206,6 +213,12 @@
tweetCount = 0;
}
}
+ tweetCountBeforeException++;
+
+ if (tweetCountBeforeException == exceptionPeriod) {
+ tweetCountBeforeException = 0;
+ throw new AsterixException("Delibrate exception");
+ }
return InflowState.DATA_AVAILABLE;
}