[NO ISSUE][HYR] += ability to bypass work queue for high priority app messages
- use said ability for active stats responses
Change-Id: I5d4747e08a380a585d0c4a9312873ea39b80abbf
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/10964
Reviewed-by: Michael Blow <mblow@apache.org>
Reviewed-by: Till Westmann <tillw@apache.org>
Contrib: Michael Blow <mblow@apache.org>
Tested-by: Michael Blow <mblow@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
index 26b5068..b99e4f2 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
@@ -153,8 +153,8 @@
String stats = runtime.getStats();
LOGGER.debug("Sending stats response for {} ", runtimeId);
ActiveStatsResponse response = new ActiveStatsResponse(reqId, stats, null);
- ((NodeControllerService) serviceCtx.getControllerService()).sendApplicationMessageToCC(message.getCcId(),
- JavaSerializationUtils.serialize(response), null);
+ ((NodeControllerService) serviceCtx.getControllerService()).sendRealTimeApplicationMessageToCC(
+ message.getCcId(), JavaSerializationUtils.serialize(response), null);
} catch (Exception e) {
throw HyracksDataException.create(e);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
index 16e8ed8..d350f61 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
@@ -129,8 +129,16 @@
break;
case SEND_APPLICATION_MESSAGE:
CCNCFunctions.SendApplicationMessageFunction rsf = (CCNCFunctions.SendApplicationMessageFunction) fn;
- ccs.getWorkQueue().schedule(
- new ApplicationMessageWork(ccs, rsf.getMessage(), rsf.getDeploymentId(), rsf.getNodeId()));
+ ApplicationMessageWork work =
+ new ApplicationMessageWork(ccs, rsf.getMessage(), rsf.getDeploymentId(), rsf.getNodeId());
+ if (rsf.isRealTime()) {
+ final ExecutorService executor = ccs.getExecutor();
+ if (executor != null) {
+ executor.execute(work);
+ }
+ } else {
+ ccs.getWorkQueue().schedule(work);
+ }
break;
case GET_NODE_CONTROLLERS_INFO:
ccs.getWorkQueue().schedule(new GetNodeControllersInfoWork(ccs.getNodeManager(),
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
index c8106e8..1c91183 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java
@@ -66,6 +66,8 @@
void sendApplicationMessageToCC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception;
+ void sendRealTimeApplicationMessageToCC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception;
+
void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, IResultMetadata metadata, boolean emptyResult,
int partition, int nPartitions, NetworkAddress networkAddress) throws Exception;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
index 6b5b5db..27a8b02 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
@@ -127,9 +127,10 @@
}
public static class SendApplicationMessageFunction extends Function {
- private static final long serialVersionUID = 1L;
- private byte[] serializedMessage;
- private DeploymentId deploymentId;
+ private static final long serialVersionUID = 2L;
+ private final byte[] serializedMessage;
+ private final DeploymentId deploymentId;
+ private final boolean realTime;
private String nodeId;
public DeploymentId getDeploymentId() {
@@ -148,9 +149,14 @@
return serializedMessage;
}
- public SendApplicationMessageFunction(byte[] data, DeploymentId deploymentId, String nodeId) {
+ public boolean isRealTime() {
+ return realTime;
+ }
+
+ public SendApplicationMessageFunction(byte[] data, DeploymentId deploymentId, boolean realTime, String nodeId) {
this.serializedMessage = data;
this.deploymentId = deploymentId;
+ this.realTime = realTime;
this.nodeId = nodeId;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 344c3fb..09dc04d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -128,7 +128,14 @@
@Override
public void sendApplicationMessageToCC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception {
- SendApplicationMessageFunction fn = new SendApplicationMessageFunction(data, deploymentId, nodeId);
+ SendApplicationMessageFunction fn = new SendApplicationMessageFunction(data, deploymentId, false, nodeId);
+ ipcHandle.send(-1, fn, null);
+ }
+
+ @Override
+ public void sendRealTimeApplicationMessageToCC(byte[] data, DeploymentId deploymentId, String nodeId)
+ throws Exception {
+ SendApplicationMessageFunction fn = new SendApplicationMessageFunction(data, deploymentId, true, nodeId);
ipcHandle.send(-1, fn, null);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index d32ee32..0b85c4e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -129,7 +129,7 @@
@Override
public void sendApplicationMessageToNC(byte[] data, DeploymentId deploymentId, String nodeId) throws Exception {
- SendApplicationMessageFunction fn = new SendApplicationMessageFunction(data, deploymentId, nodeId);
+ SendApplicationMessageFunction fn = new SendApplicationMessageFunction(data, deploymentId, false, nodeId);
ipcHandle.send(-1, fn, null);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 1356f4c..c774317 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -633,6 +633,10 @@
getClusterController(ccId).sendApplicationMessageToCC(data, deploymentId, id);
}
+ public void sendRealTimeApplicationMessageToCC(CcId ccId, byte[] data, DeploymentId deploymentId) throws Exception {
+ getClusterController(ccId).sendRealTimeApplicationMessageToCC(data, deploymentId, id);
+ }
+
public IResultPartitionManager getResultPartitionManager() {
return resultPartitionManager;
}