[NO ISSUE][IPC] += high priority NC messaging, fail on sync send failure

Change-Id: Id1d259917fbc98693d795eca41934e2d7d55f304
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/11043
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
Reviewed-by: Till Westmann <tillw@apache.org>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index cc4b25f..2891710 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -324,7 +324,8 @@
             requests.add(new ActiveStatsRequestMessage(new ActiveRuntimeId(entityId, runtimeName, i), reqId));
         }
         try {
-            List<String> responses = (List<String>) messageBroker.sendSyncRequestToNCs(reqId, ncs, requests, timeout);
+            List<String> responses =
+                    (List<String>) messageBroker.sendSyncRequestToNCs(reqId, ncs, requests, timeout, false);
             stats = formatStats(responses);
             statsTimestamp = System.currentTimeMillis();
             notifySubscribers(statsUpdatedEvent);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
index ae50880..5870d3a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
@@ -150,7 +150,7 @@
             requests.add(new StorageCleanupRequestMessage(reqId, validDatasetIds));
         }
         messageBroker.sendSyncRequestToNCs(reqId, ncs, requests,
-                TimeUnit.SECONDS.toMillis(storageGlobalCleanupTimeoutSecs));
+                TimeUnit.SECONDS.toMillis(storageGlobalCleanupTimeoutSecs), false);
     }
 
     protected MetadataTransactionContext doRecovery(ICcApplicationContext appCtx, MetadataTransactionContext mdTxnCtx)
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
index 1f9ec32..0683207 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
@@ -65,18 +65,34 @@
     }
 
     @Override
-    public void sendApplicationMessageToNC(INcAddressedMessage msg, String nodeId) throws Exception {
+    public boolean sendApplicationMessageToNC(INcAddressedMessage msg, String nodeId) throws Exception {
+        return sendMessage(msg, nodeId, false);
+    }
+
+    @Override
+    public boolean sendRealTimeApplicationMessageToNC(INcAddressedMessage msg, String nodeId) throws Exception {
+        return sendMessage(msg, nodeId, true);
+    }
+
+    private boolean sendMessage(INcAddressedMessage msg, String nodeId, boolean realTime) throws Exception {
         INodeManager nodeManager = ccs.getNodeManager();
         NodeControllerState state = nodeManager.getNodeControllerState(nodeId);
         if (msg instanceof ICcIdentifiedMessage) {
             ((ICcIdentifiedMessage) msg).setCcId(ccs.getCcId());
         }
         if (state != null) {
-            state.getNodeController().sendApplicationMessageToNC(JavaSerializationUtils.serialize(msg), null, nodeId);
+            byte[] payload = JavaSerializationUtils.serialize(msg);
+            if (realTime) {
+                state.getNodeController().sendRealTimeApplicationMessageToNC(payload, null, nodeId);
+            } else {
+                state.getNodeController().sendApplicationMessageToNC(payload, null, nodeId);
+            }
+            return true;
         } else {
             if (LOGGER.isWarnEnabled()) {
                 LOGGER.warn("Couldn't send message to unregistered node (" + nodeId + ")");
             }
+            return false;
         }
     }
 
@@ -87,7 +103,7 @@
 
     @Override
     public Object sendSyncRequestToNCs(long reqId, List<String> ncs, List<? extends INcAddressedMessage> requests,
-            long timeout) throws Exception {
+            long timeout, boolean realTime) throws Exception {
         MutableInt numRequired = new MutableInt(0);
         MutablePair<MutableInt, MutablePair<ResponseState, Object>> pair =
                 MutablePair.of(numRequired, MutablePair.of(ResponseState.UNINITIALIZED, UNINITIALIZED));
@@ -101,7 +117,10 @@
                     if (!(message instanceof ICcIdentifiedMessage)) {
                         throw new IllegalStateException("sync request message not cc identified: " + message);
                     }
-                    sendApplicationMessageToNC(message, nc);
+                    if (!(realTime ? sendRealTimeApplicationMessageToNC(message, nc)
+                            : sendApplicationMessageToNC(message, nc))) {
+                        throw new RuntimeDataException(ErrorCode.ILLEGAL_STATE, "unable to send sync message to " + nc);
+                    }
                 }
                 long time = System.currentTimeMillis();
                 while (pair.getLeft().getValue() > 0) {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
index 208686c..e628bc7 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
@@ -36,7 +36,16 @@
      * @param nodeId
      * @throws Exception
      */
-    void sendApplicationMessageToNC(INcAddressedMessage msg, String nodeId) throws Exception;
+    boolean sendApplicationMessageToNC(INcAddressedMessage msg, String nodeId) throws Exception;
+
+    /**
+     * Sends the passed message to the specified {@code nodeId}
+     *
+     * @param msg
+     * @param nodeId
+     * @throws Exception
+     */
+    boolean sendRealTimeApplicationMessageToNC(INcAddressedMessage msg, String nodeId) throws Exception;
 
     /**
      * Sends the passed requests to all NCs and wait for the response
@@ -44,10 +53,11 @@
      * @param ncs
      * @param requests
      * @param timeout
+     * @param realTime
      * @throws Exception
      */
     Object sendSyncRequestToNCs(long reqId, List<String> ncs, List<? extends INcAddressedMessage> requests,
-            long timeout) throws Exception;
+            long timeout, boolean realTime) throws Exception;
 
     /**
      * respond to a sync request
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
index 6198acc..b64f779 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
@@ -51,7 +51,7 @@
                     response.setException(new Exception("One or more nodes has not reported max resource id."));
                 }
             }
-            broker.sendApplicationMessageToNC(response, src);
+            broker.sendRealTimeApplicationMessageToNC(response, src);
         } catch (Exception e) {
             throw HyracksDataException.create(e);
         }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockRequest.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockRequest.java
index 4e1c3b1..2d9acf4 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockRequest.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockRequest.java
@@ -51,7 +51,7 @@
             ICCMessageBroker broker = (ICCMessageBroker) appCtx.getServiceContext().getMessageBroker();
             long startingId = appCtx.getTxnIdFactory().getIdBlock(blockSizeRequested);
             TxnIdBlockResponse response = new TxnIdBlockResponse(startingId, blockSizeRequested);
-            broker.sendApplicationMessageToNC(response, nodeId);
+            broker.sendRealTimeApplicationMessageToNC(response, nodeId);
         } catch (Exception e) {
             throw HyracksDataException.create(e);
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
index 42a0d66..a754baf 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
@@ -66,6 +66,8 @@
 
     void sendApplicationMessageToNC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception;
 
+    void sendRealTimeApplicationMessageToNC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception;
+
     void takeThreadDump(String requestId) throws Exception;
 
     /**
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index 0b85c4e..ff5dd33 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -134,6 +134,13 @@
     }
 
     @Override
+    public void sendRealTimeApplicationMessageToNC(byte[] data, DeploymentId deploymentId, String nodeId)
+            throws Exception {
+        SendApplicationMessageFunction fn = new SendApplicationMessageFunction(data, deploymentId, true, nodeId);
+        ipcHandle.send(-1, fn, null);
+    }
+
+    @Override
     public void takeThreadDump(String requestId) throws Exception {
         ThreadDumpRequestFunction fn = new ThreadDumpRequestFunction(requestId, ccId);
         ipcHandle.send(-1, fn, null);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
index e4c1d30..08a18f9 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
@@ -61,8 +61,13 @@
         switch (fn.getFunctionId()) {
             case SEND_APPLICATION_MESSAGE:
                 CCNCFunctions.SendApplicationMessageFunction amf = (CCNCFunctions.SendApplicationMessageFunction) fn;
-                ncs.getWorkQueue().schedule(
-                        new ApplicationMessageWork(ncs, amf.getMessage(), amf.getDeploymentId(), amf.getNodeId()));
+                ApplicationMessageWork amfw =
+                        new ApplicationMessageWork(ncs, amf.getMessage(), amf.getDeploymentId(), amf.getNodeId());
+                if (amf.isRealTime()) {
+                    ncs.getExecutor().submit(amfw);
+                } else {
+                    ncs.getWorkQueue().schedule(amfw);
+                }
                 return;
             case START_TASKS:
                 CCNCFunctions.StartTasksFunction stf = (CCNCFunctions.StartTasksFunction) fn;