[NO ISSUE][HYR] += ability to bypass work queue for high priority app messages

- use said ability for active stats responses

Change-Id: I5d4747e08a380a585d0c4a9312873ea39b80abbf
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/10964
Reviewed-by: Michael Blow <mblow@apache.org>
Reviewed-by: Till Westmann <tillw@apache.org>
Contrib: Michael Blow <mblow@apache.org>
Tested-by: Michael Blow <mblow@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
index 26b5068..b99e4f2 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
@@ -153,8 +153,8 @@
             String stats = runtime.getStats();
             LOGGER.debug("Sending stats response for {} ", runtimeId);
             ActiveStatsResponse response = new ActiveStatsResponse(reqId, stats, null);
-            ((NodeControllerService) serviceCtx.getControllerService()).sendApplicationMessageToCC(message.getCcId(),
-                    JavaSerializationUtils.serialize(response), null);
+            ((NodeControllerService) serviceCtx.getControllerService()).sendRealTimeApplicationMessageToCC(
+                    message.getCcId(), JavaSerializationUtils.serialize(response), null);
         } catch (Exception e) {
             throw HyracksDataException.create(e);
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
index 16e8ed8..d350f61 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
@@ -129,8 +129,16 @@
                 break;
             case SEND_APPLICATION_MESSAGE:
                 CCNCFunctions.SendApplicationMessageFunction rsf = (CCNCFunctions.SendApplicationMessageFunction) fn;
-                ccs.getWorkQueue().schedule(
-                        new ApplicationMessageWork(ccs, rsf.getMessage(), rsf.getDeploymentId(), rsf.getNodeId()));
+                ApplicationMessageWork work =
+                        new ApplicationMessageWork(ccs, rsf.getMessage(), rsf.getDeploymentId(), rsf.getNodeId());
+                if (rsf.isRealTime()) {
+                    final ExecutorService executor = ccs.getExecutor();
+                    if (executor != null) {
+                        executor.execute(work);
+                    }
+                } else {
+                    ccs.getWorkQueue().schedule(work);
+                }
                 break;
             case GET_NODE_CONTROLLERS_INFO:
                 ccs.getWorkQueue().schedule(new GetNodeControllersInfoWork(ccs.getNodeManager(),
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
index c8106e8..1c91183 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
@@ -66,6 +66,8 @@
 
     void sendApplicationMessageToCC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception;
 
+    void sendRealTimeApplicationMessageToCC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception;
+
     void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, IResultMetadata metadata, boolean emptyResult,
             int partition, int nPartitions, NetworkAddress networkAddress) throws Exception;
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
index 6b5b5db..27a8b02 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
@@ -127,9 +127,10 @@
     }
 
     public static class SendApplicationMessageFunction extends Function {
-        private static final long serialVersionUID = 1L;
-        private byte[] serializedMessage;
-        private DeploymentId deploymentId;
+        private static final long serialVersionUID = 2L;
+        private final byte[] serializedMessage;
+        private final DeploymentId deploymentId;
+        private final boolean realTime;
         private String nodeId;
 
         public DeploymentId getDeploymentId() {
@@ -148,9 +149,14 @@
             return serializedMessage;
         }
 
-        public SendApplicationMessageFunction(byte[] data, DeploymentId deploymentId, String nodeId) {
+        public boolean isRealTime() {
+            return realTime;
+        }
+
+        public SendApplicationMessageFunction(byte[] data, DeploymentId deploymentId, boolean realTime, String nodeId) {
             this.serializedMessage = data;
             this.deploymentId = deploymentId;
+            this.realTime = realTime;
             this.nodeId = nodeId;
         }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 344c3fb..09dc04d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -128,7 +128,14 @@
 
     @Override
     public void sendApplicationMessageToCC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception {
-        SendApplicationMessageFunction fn = new SendApplicationMessageFunction(data, deploymentId, nodeId);
+        SendApplicationMessageFunction fn = new SendApplicationMessageFunction(data, deploymentId, false, nodeId);
+        ipcHandle.send(-1, fn, null);
+    }
+
+    @Override
+    public void sendRealTimeApplicationMessageToCC(byte[] data, DeploymentId deploymentId, String nodeId)
+            throws Exception {
+        SendApplicationMessageFunction fn = new SendApplicationMessageFunction(data, deploymentId, true, nodeId);
         ipcHandle.send(-1, fn, null);
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index d32ee32..0b85c4e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -129,7 +129,7 @@
 
     @Override
     public void sendApplicationMessageToNC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception {
-        SendApplicationMessageFunction fn = new SendApplicationMessageFunction(data, deploymentId, nodeId);
+        SendApplicationMessageFunction fn = new SendApplicationMessageFunction(data, deploymentId, false, nodeId);
         ipcHandle.send(-1, fn, null);
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 1356f4c..c774317 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -633,6 +633,10 @@
         getClusterController(ccId).sendApplicationMessageToCC(data, deploymentId, id);
     }
 
+    public void sendRealTimeApplicationMessageToCC(CcId ccId, byte[] data, DeploymentId deploymentId) throws Exception {
+        getClusterController(ccId).sendRealTimeApplicationMessageToCC(data, deploymentId, id);
+    }
+
     public IResultPartitionManager getResultPartitionManager() {
         return resultPartitionManager;
     }