[ASTERIXDB-2019][CLUS] Update cluster state on partitions changes
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Set the cluster to UNUSABLE when no partitions are registered
- Update cluster state after partitions register/de-register
- Reject unregistered nodes queries on CC
- Avoid NPE when trying to send to a node that was de-registered
Change-Id: I7d11733a1dcd86136e157d80517bff4abcfc776b
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1918
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
index fc0c1ff..9faa9e9 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
@@ -92,6 +92,11 @@
ClusterControllerService ccSrv = (ClusterControllerService) ccSrvContext.getControllerService();
CCApplication ccApp = (CCApplication) ccSrv.getApplication();
CCMessageBroker messageBroker = (CCMessageBroker) ccSrvContext.getMessageBroker();
+ final String rejectionReason = getRejectionReason(ccSrv);
+ if (rejectionReason != null) {
+ sendRejection(rejectionReason, messageBroker);
+ return;
+ }
CCExtensionManager ccExtMgr = (CCExtensionManager) ccAppCtx.getExtensionManager();
ILangCompilationProvider compilationProvider = ccExtMgr.getCompilationProvider(lang);
IStorageComponentProvider storageComponentProvider = ccAppCtx.getStorageComponentProvider();
@@ -100,16 +105,9 @@
ccSrv.getExecutor().submit(() -> {
ExecuteStatementResponseMessage responseMsg = new ExecuteStatementResponseMessage(requestMessageId);
-
try {
- final IClusterManagementWork.ClusterState clusterState = ClusterStateManager.INSTANCE.getState();
- if (clusterState != IClusterManagementWork.ClusterState.ACTIVE) {
- throw new IllegalStateException("Cannot execute request, cluster is " + clusterState);
- }
-
IParser parser = compilationProvider.getParserFactory().createParser(statementsText);
List<Statement> statements = parser.parse();
-
StringWriter outWriter = new StringWriter(256);
PrintWriter outPrinter = new PrintWriter(outWriter);
SessionOutput.ResultDecorator resultPrefix = ResultUtil.createPreResultDecorator();
@@ -148,6 +146,27 @@
});
}
+ private String getRejectionReason(ClusterControllerService ccSrv) {
+ if (ccSrv.getNodeManager().getNodeControllerState(requestNodeId) == null) {
+ return "Node is not registerted with the CC";
+ }
+ final IClusterManagementWork.ClusterState clusterState = ClusterStateManager.INSTANCE.getState();
+ if (clusterState != IClusterManagementWork.ClusterState.ACTIVE) {
+ return "Cannot execute request, cluster is " + clusterState;
+ }
+ return null;
+ }
+
+ private void sendRejection(String reason, CCMessageBroker messageBroker) {
+ ExecuteStatementResponseMessage responseMsg = new ExecuteStatementResponseMessage(requestMessageId);
+ responseMsg.setError(new Exception(reason));
+ try {
+ messageBroker.sendApplicationMessageToNC(responseMsg, requestNodeId);
+ } catch (Exception e) {
+ LOGGER.log(Level.WARNING, e.toString(), e);
+ }
+ }
+
@Override
public String toString() {
return String.format("%s(id=%s, from=%s): %s", getClass().getSimpleName(), requestMessageId, requestNodeId,
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
index de2ca11..0eade41 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
@@ -69,7 +69,13 @@
public void sendApplicationMessageToNC(INcAddressedMessage msg, String nodeId) throws Exception {
INodeManager nodeManager = ccs.getNodeManager();
NodeControllerState state = nodeManager.getNodeControllerState(nodeId);
- state.getNodeController().sendApplicationMessageToNC(JavaSerializationUtils.serialize(msg), null, nodeId);
+ if (state != null) {
+ state.getNodeController().sendApplicationMessageToNC(JavaSerializationUtils.serialize(msg), null, nodeId);
+ } else {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.warning("Couldn't send message to unregistered node (" + nodeId + ")");
+ }
+ }
}
@Override
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
index a5686fd..30675cd 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
@@ -100,11 +100,20 @@
/**
* Register the specified node partitions with the specified nodeId with this cluster state manager
+ * then calls {@link IClusterStateManager#refreshState()}
+ *
+ * @param nodeId
+ * @param nodePartitions
+ * @throws AsterixException
*/
void registerNodePartitions(String nodeId, ClusterPartition[] nodePartitions) throws AsterixException;
/**
* De-register the specified node's partitions from this cluster state manager
+ * then calls {@link IClusterStateManager#refreshState()}
+ *
+ * @param nodeId
+ * @throws HyracksDataException
*/
- void deregisterNodePartitions(String nodeId);
+ void deregisterNodePartitions(String nodeId) throws HyracksDataException;
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index 4717a7b..8156a23 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -154,6 +154,12 @@
@Override
public synchronized void refreshState() throws HyracksDataException {
resetClusterPartitionConstraint();
+ if (clusterPartitions.isEmpty()) {
+ LOGGER.info("Cluster does not have any registered partitions");
+ setState(ClusterState.UNUSABLE);
+ return;
+ }
+
for (ClusterPartition p : clusterPartitions.values()) {
if (!p.isActive()) {
setState(ClusterState.UNUSABLE);
@@ -368,10 +374,16 @@
clusterPartitions.put(nodePartition.getPartitionId(), nodePartition);
}
node2PartitionsMap.put(nodeId, nodePartitions);
+ //TODO fix exception propagation from refreshState
+ try {
+ refreshState();
+ } catch (HyracksDataException e) {
+ throw new AsterixException(e);
+ }
}
@Override
- public synchronized void deregisterNodePartitions(String nodeId) {
+ public synchronized void deregisterNodePartitions(String nodeId) throws HyracksDataException {
ClusterPartition[] nodePartitions = node2PartitionsMap.remove(nodeId);
if (nodePartitions == null) {
LOGGER.info("deregisterNodePartitions unknown node " + nodeId + " (already removed?)");
@@ -382,6 +394,7 @@
for (ClusterPartition nodePartition : nodePartitions) {
clusterPartitions.remove(nodePartition.getPartitionId());
}
+ refreshState();
}
}