[NO ISSUE][IPC] += high priority NC messaging, fail on sync send failure
Change-Id: Id1d259917fbc98693d795eca41934e2d7d55f304
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/11043
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
Reviewed-by: Till Westmann <tillw@apache.org>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index cc4b25f..2891710 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -324,7 +324,8 @@
requests.add(new ActiveStatsRequestMessage(new ActiveRuntimeId(entityId, runtimeName, i), reqId));
}
try {
- List<String> responses = (List<String>) messageBroker.sendSyncRequestToNCs(reqId, ncs, requests, timeout);
+ List<String> responses =
+ (List<String>) messageBroker.sendSyncRequestToNCs(reqId, ncs, requests, timeout, false);
stats = formatStats(responses);
statsTimestamp = System.currentTimeMillis();
notifySubscribers(statsUpdatedEvent);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
index ae50880..5870d3a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
@@ -150,7 +150,7 @@
requests.add(new StorageCleanupRequestMessage(reqId, validDatasetIds));
}
messageBroker.sendSyncRequestToNCs(reqId, ncs, requests,
- TimeUnit.SECONDS.toMillis(storageGlobalCleanupTimeoutSecs));
+ TimeUnit.SECONDS.toMillis(storageGlobalCleanupTimeoutSecs), false);
}
protected MetadataTransactionContext doRecovery(ICcApplicationContext appCtx, MetadataTransactionContext mdTxnCtx)
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
index 1f9ec32..0683207 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
@@ -65,18 +65,34 @@
}
@Override
- public void sendApplicationMessageToNC(INcAddressedMessage msg, String nodeId) throws Exception {
+ public boolean sendApplicationMessageToNC(INcAddressedMessage msg, String nodeId) throws Exception {
+ return sendMessage(msg, nodeId, false);
+ }
+
+ @Override
+ public boolean sendRealTimeApplicationMessageToNC(INcAddressedMessage msg, String nodeId) throws Exception {
+ return sendMessage(msg, nodeId, true);
+ }
+
+ private boolean sendMessage(INcAddressedMessage msg, String nodeId, boolean realTime) throws Exception {
INodeManager nodeManager = ccs.getNodeManager();
NodeControllerState state = nodeManager.getNodeControllerState(nodeId);
if (msg instanceof ICcIdentifiedMessage) {
((ICcIdentifiedMessage) msg).setCcId(ccs.getCcId());
}
if (state != null) {
- state.getNodeController().sendApplicationMessageToNC(JavaSerializationUtils.serialize(msg), null, nodeId);
+ byte[] payload = JavaSerializationUtils.serialize(msg);
+ if (realTime) {
+ state.getNodeController().sendRealTimeApplicationMessageToNC(payload, null, nodeId);
+ } else {
+ state.getNodeController().sendApplicationMessageToNC(payload, null, nodeId);
+ }
+ return true;
} else {
if (LOGGER.isWarnEnabled()) {
LOGGER.warn("Couldn't send message to unregistered node (" + nodeId + ")");
}
+ return false;
}
}
@@ -87,7 +103,7 @@
@Override
public Object sendSyncRequestToNCs(long reqId, List<String> ncs, List<? extends INcAddressedMessage> requests,
- long timeout) throws Exception {
+ long timeout, boolean realTime) throws Exception {
MutableInt numRequired = new MutableInt(0);
MutablePair<MutableInt, MutablePair<ResponseState, Object>> pair =
MutablePair.of(numRequired, MutablePair.of(ResponseState.UNINITIALIZED, UNINITIALIZED));
@@ -101,7 +117,10 @@
if (!(message instanceof ICcIdentifiedMessage)) {
throw new IllegalStateException("sync request message not cc identified: " + message);
}
- sendApplicationMessageToNC(message, nc);
+ if (!(realTime ? sendRealTimeApplicationMessageToNC(message, nc)
+ : sendApplicationMessageToNC(message, nc))) {
+ throw new RuntimeDataException(ErrorCode.ILLEGAL_STATE, "unable to send sync message to " + nc);
+ }
}
long time = System.currentTimeMillis();
while (pair.getLeft().getValue() > 0) {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
index 208686c..e628bc7 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
@@ -36,7 +36,16 @@
* @param nodeId
* @throws Exception
*/
- void sendApplicationMessageToNC(INcAddressedMessage msg, String nodeId) throws Exception;
+ boolean sendApplicationMessageToNC(INcAddressedMessage msg, String nodeId) throws Exception;
+
+ /**
+ * Sends the passed message to the specified {@code nodeId}
+ *
+ * @param msg
+ * @param nodeId
+ * @throws Exception
+ */
+ boolean sendRealTimeApplicationMessageToNC(INcAddressedMessage msg, String nodeId) throws Exception;
/**
* Sends the passed requests to all NCs and wait for the response
@@ -44,10 +53,11 @@
* @param ncs
* @param requests
* @param timeout
+ * @param realTime
* @throws Exception
*/
Object sendSyncRequestToNCs(long reqId, List<String> ncs, List<? extends INcAddressedMessage> requests,
- long timeout) throws Exception;
+ long timeout, boolean realTime) throws Exception;
/**
* respond to a sync request
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
index 6198acc..b64f779 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
@@ -51,7 +51,7 @@
response.setException(new Exception("One or more nodes has not reported max resource id."));
}
}
- broker.sendApplicationMessageToNC(response, src);
+ broker.sendRealTimeApplicationMessageToNC(response, src);
} catch (Exception e) {
throw HyracksDataException.create(e);
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockRequest.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockRequest.java
index 4e1c3b1..2d9acf4 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockRequest.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/TxnIdBlockRequest.java
@@ -51,7 +51,7 @@
ICCMessageBroker broker = (ICCMessageBroker) appCtx.getServiceContext().getMessageBroker();
long startingId = appCtx.getTxnIdFactory().getIdBlock(blockSizeRequested);
TxnIdBlockResponse response = new TxnIdBlockResponse(startingId, blockSizeRequested);
- broker.sendApplicationMessageToNC(response, nodeId);
+ broker.sendRealTimeApplicationMessageToNC(response, nodeId);
} catch (Exception e) {
throw HyracksDataException.create(e);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
index 42a0d66..a754baf 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
@@ -66,6 +66,8 @@
void sendApplicationMessageToNC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception;
+ void sendRealTimeApplicationMessageToNC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception;
+
void takeThreadDump(String requestId) throws Exception;
/**
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index 0b85c4e..ff5dd33 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -134,6 +134,13 @@
}
@Override
+ public void sendRealTimeApplicationMessageToNC(byte[] data, DeploymentId deploymentId, String nodeId)
+ throws Exception {
+ SendApplicationMessageFunction fn = new SendApplicationMessageFunction(data, deploymentId, true, nodeId);
+ ipcHandle.send(-1, fn, null);
+ }
+
+ @Override
public void takeThreadDump(String requestId) throws Exception {
ThreadDumpRequestFunction fn = new ThreadDumpRequestFunction(requestId, ccId);
ipcHandle.send(-1, fn, null);
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
index e4c1d30..08a18f9 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
@@ -61,8 +61,13 @@
switch (fn.getFunctionId()) {
case SEND_APPLICATION_MESSAGE:
CCNCFunctions.SendApplicationMessageFunction amf = (CCNCFunctions.SendApplicationMessageFunction) fn;
- ncs.getWorkQueue().schedule(
- new ApplicationMessageWork(ncs, amf.getMessage(), amf.getDeploymentId(), amf.getNodeId()));
+ ApplicationMessageWork amfw =
+ new ApplicationMessageWork(ncs, amf.getMessage(), amf.getDeploymentId(), amf.getNodeId());
+ if (amf.isRealTime()) {
+ ncs.getExecutor().submit(amfw);
+ } else {
+ ncs.getWorkQueue().schedule(amfw);
+ }
return;
case START_TASKS:
CCNCFunctions.StartTasksFunction stf = (CCNCFunctions.StartTasksFunction) fn;