[ASTERIXDB-2019][CLUS] Update cluster state on partitions changes

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Set the cluster to UNUSABLE when no partitions are registered
- Update cluster state after partitions register/de-register
- Reject unregistered nodes queries on CC
- Avoid NPE when trying to send to a node that was de-registered

Change-Id: I7d11733a1dcd86136e157d80517bff4abcfc776b
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1918
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
index fc0c1ff..9faa9e9 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
@@ -92,6 +92,11 @@
         ClusterControllerService ccSrv = (ClusterControllerService) ccSrvContext.getControllerService();
         CCApplication ccApp = (CCApplication) ccSrv.getApplication();
         CCMessageBroker messageBroker = (CCMessageBroker) ccSrvContext.getMessageBroker();
+        final String rejectionReason = getRejectionReason(ccSrv);
+        if (rejectionReason != null) {
+            sendRejection(rejectionReason, messageBroker);
+            return;
+        }
         CCExtensionManager ccExtMgr = (CCExtensionManager) ccAppCtx.getExtensionManager();
         ILangCompilationProvider compilationProvider = ccExtMgr.getCompilationProvider(lang);
         IStorageComponentProvider storageComponentProvider = ccAppCtx.getStorageComponentProvider();
@@ -100,16 +105,9 @@
 
         ccSrv.getExecutor().submit(() -> {
             ExecuteStatementResponseMessage responseMsg = new ExecuteStatementResponseMessage(requestMessageId);
-
             try {
-                final IClusterManagementWork.ClusterState clusterState = ClusterStateManager.INSTANCE.getState();
-                if (clusterState != IClusterManagementWork.ClusterState.ACTIVE) {
-                    throw new IllegalStateException("Cannot execute request, cluster is " + clusterState);
-                }
-
                 IParser parser = compilationProvider.getParserFactory().createParser(statementsText);
                 List<Statement> statements = parser.parse();
-
                 StringWriter outWriter = new StringWriter(256);
                 PrintWriter outPrinter = new PrintWriter(outWriter);
                 SessionOutput.ResultDecorator resultPrefix = ResultUtil.createPreResultDecorator();
@@ -148,6 +146,27 @@
         });
     }
 
+    private String getRejectionReason(ClusterControllerService ccSrv) {
+        if (ccSrv.getNodeManager().getNodeControllerState(requestNodeId) == null) {
+            return "Node is not registerted with the CC";
+        }
+        final IClusterManagementWork.ClusterState clusterState = ClusterStateManager.INSTANCE.getState();
+        if (clusterState != IClusterManagementWork.ClusterState.ACTIVE) {
+            return "Cannot execute request, cluster is " + clusterState;
+        }
+        return null;
+    }
+
+    private void sendRejection(String reason, CCMessageBroker messageBroker) {
+        ExecuteStatementResponseMessage responseMsg = new ExecuteStatementResponseMessage(requestMessageId);
+        responseMsg.setError(new Exception(reason));
+        try {
+            messageBroker.sendApplicationMessageToNC(responseMsg, requestNodeId);
+        } catch (Exception e) {
+            LOGGER.log(Level.WARNING, e.toString(), e);
+        }
+    }
+
     @Override
     public String toString() {
         return String.format("%s(id=%s, from=%s): %s", getClass().getSimpleName(), requestMessageId, requestNodeId,
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
index de2ca11..0eade41 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
@@ -69,7 +69,13 @@
     public void sendApplicationMessageToNC(INcAddressedMessage msg, String nodeId) throws Exception {
         INodeManager nodeManager = ccs.getNodeManager();
         NodeControllerState state = nodeManager.getNodeControllerState(nodeId);
-        state.getNodeController().sendApplicationMessageToNC(JavaSerializationUtils.serialize(msg), null, nodeId);
+        if (state != null) {
+            state.getNodeController().sendApplicationMessageToNC(JavaSerializationUtils.serialize(msg), null, nodeId);
+        } else {
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.warning("Couldn't send message to unregistered node (" + nodeId + ")");
+            }
+        }
     }
 
     @Override
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
index a5686fd..30675cd 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
@@ -100,11 +100,20 @@
 
     /**
      * Register the specified node partitions with the specified nodeId with this cluster state manager
+     * then calls {@link IClusterStateManager#refreshState()}
+     *
+     * @param nodeId
+     * @param nodePartitions
+     * @throws AsterixException
      */
     void registerNodePartitions(String nodeId, ClusterPartition[] nodePartitions) throws AsterixException;
 
     /**
      * De-register the specified node's partitions from this cluster state manager
+     * then calls {@link IClusterStateManager#refreshState()}
+     *
+     * @param nodeId
+     * @throws HyracksDataException
      */
-    void deregisterNodePartitions(String nodeId);
+    void deregisterNodePartitions(String nodeId) throws HyracksDataException;
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index 4717a7b..8156a23 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -154,6 +154,12 @@
     @Override
     public synchronized void refreshState() throws HyracksDataException {
         resetClusterPartitionConstraint();
+        if (clusterPartitions.isEmpty()) {
+            LOGGER.info("Cluster does not have any registered partitions");
+            setState(ClusterState.UNUSABLE);
+            return;
+        }
+
         for (ClusterPartition p : clusterPartitions.values()) {
             if (!p.isActive()) {
                 setState(ClusterState.UNUSABLE);
@@ -368,10 +374,16 @@
             clusterPartitions.put(nodePartition.getPartitionId(), nodePartition);
         }
         node2PartitionsMap.put(nodeId, nodePartitions);
+        //TODO fix exception propagation from refreshState
+        try {
+            refreshState();
+        } catch (HyracksDataException e) {
+            throw new AsterixException(e);
+        }
     }
 
     @Override
-    public synchronized void deregisterNodePartitions(String nodeId) {
+    public synchronized void deregisterNodePartitions(String nodeId) throws HyracksDataException {
         ClusterPartition[] nodePartitions = node2PartitionsMap.remove(nodeId);
         if (nodePartitions == null) {
             LOGGER.info("deregisterNodePartitions unknown node " + nodeId + " (already removed?)");
@@ -382,6 +394,7 @@
             for (ClusterPartition nodePartition : nodePartitions) {
                 clusterPartitions.remove(nodePartition.getPartitionId());
             }
+            refreshState();
         }
     }