[ASTERIXDB-2014][HYR][CLUS] Respect disabled NCService

- Don't contact NCService on failed nodes, if NCService is disabled
- Also, don't block work queue for TriggerNCWork

Change-Id: Ib307f06d8bbcf4039480291aef566f240cadba20
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1913
Reviewed-by: Till Westmann <tillw@apache.org>
Integration-Tests: Till Westmann <tillw@apache.org>
Tested-by: Till Westmann <tillw@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index e1c218f..d32e577 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -257,7 +257,7 @@
         getNCServices().entrySet().forEach(ncService -> {
             final TriggerNCWork triggerWork = new TriggerNCWork(ClusterControllerService.this,
                     ncService.getValue().getLeft(), ncService.getValue().getRight(), ncService.getKey());
-            workQueue.schedule(triggerWork);
+            executor.submit(triggerWork);
         });
         serviceCtx.addClusterLifecycleListener(new IClusterLifecycleListener() {
             @Override
@@ -271,9 +271,11 @@
                 LOGGER.log(Level.WARNING, "Getting notified that nodes: " + deadNodeIds + " has failed");
                 for (String nodeId : deadNodeIds) {
                     Pair<String, Integer> ncService = getNCService(nodeId);
-                    final TriggerNCWork triggerWork = new TriggerNCWork(ClusterControllerService.this,
-                            ncService.getLeft(), ncService.getRight(), nodeId);
-                    workQueue.schedule(triggerWork);
+                    if (ncService.getRight() != NCConfig.NCSERVICE_PORT_DISABLED) {
+                        final TriggerNCWork triggerWork = new TriggerNCWork(ClusterControllerService.this,
+                                ncService.getLeft(), ncService.getRight(), nodeId);
+                        executor.submit(triggerWork);
+                    }
                 }
             }
         });
@@ -282,10 +284,12 @@
     private void terminateNCServices() throws Exception {
         List<ShutdownNCServiceWork> shutdownNCServiceWorks = new ArrayList<>();
         getNCServices().entrySet().forEach(ncService -> {
-            ShutdownNCServiceWork shutdownWork = new ShutdownNCServiceWork(ncService.getValue().getLeft(),
-                    ncService.getValue().getRight(), ncService.getKey());
-            workQueue.schedule(shutdownWork);
-            shutdownNCServiceWorks.add(shutdownWork);
+            if (ncService.getValue().getRight() != NCConfig.NCSERVICE_PORT_DISABLED) {
+                ShutdownNCServiceWork shutdownWork = new ShutdownNCServiceWork(ncService.getValue().getLeft(),
+                        ncService.getValue().getRight(), ncService.getKey());
+                workQueue.schedule(shutdownWork);
+                shutdownNCServiceWorks.add(shutdownWork);
+            }
         });
         for (ShutdownNCServiceWork shutdownWork : shutdownNCServiceWorks) {
             shutdownWork.sync();