[NO ISSUE][IPC] += sendRealTimeMessageTo[Primary]CC to INCMessageBroker
Change-Id: I0d8a8e2018a1839f20a7c1b601c26c8a3502ef5d
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/11026
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
index 26f6524..1b216ea 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
@@ -79,6 +79,16 @@
}
@Override
+ public void sendRealTimeMessageToCC(CcId ccId, ICcAddressedMessage message) throws Exception {
+ ncs.sendRealTimeApplicationMessageToCC(ccId, JavaSerializationUtils.serialize(message), null);
+ }
+
+ @Override
+ public void sendRealTimeMessageToPrimaryCC(ICcAddressedMessage message) throws Exception {
+ sendRealTimeMessageToCC(ncs.getPrimaryCcId(), message);
+ }
+
+ @Override
public void sendMessageToNC(String nodeId, INcAddressedMessage message) throws Exception {
IChannelControlBlock messagingChannel = ncs.getMessagingNetworkManager().getMessagingChannel(nodeId);
sendMessageToChannel(messagingChannel, message);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
index 6bd58a9..88905fd 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
@@ -29,7 +29,7 @@
* @param message
* @throws Exception
*/
- public void sendMessageToPrimaryCC(ICcAddressedMessage message) throws Exception;
+ void sendMessageToPrimaryCC(ICcAddressedMessage message) throws Exception;
/**
* Sends application message from this NC to the CC.
@@ -37,7 +37,23 @@
* @param message
* @throws Exception
*/
- public void sendMessageToCC(CcId ccId, ICcAddressedMessage message) throws Exception;
+ void sendMessageToCC(CcId ccId, ICcAddressedMessage message) throws Exception;
+
+ /**
+ * Sends high-priority application message from this NC to the primary CC.
+ *
+ * @param message
+ * @throws Exception
+ */
+ void sendRealTimeMessageToPrimaryCC(ICcAddressedMessage message) throws Exception;
+
+ /**
+ * Sends high-priority application message from this NC to the CC.
+ *
+ * @param message
+ * @throws Exception
+ */
+ void sendRealTimeMessageToCC(CcId ccId, ICcAddressedMessage message) throws Exception;
/**
* Sends application message from this NC to another NC.
@@ -45,14 +61,14 @@
* @param message
* @throws Exception
*/
- public void sendMessageToNC(String nodeId, INcAddressedMessage message) throws Exception;
+ void sendMessageToNC(String nodeId, INcAddressedMessage message) throws Exception;
/**
* Queue a message to this {@link INCMessageBroker} for processing
*
* @param msg
*/
- public void queueReceivedMessage(INcAddressedMessage msg);
+ void queueReceivedMessage(INcAddressedMessage msg);
/**
* Creates and registers a Future for a message that will be send through this broker