[NO ISSUE][NET] Split delivery of messages and exceptions
- user model changes: no
- storage format changes: no
- interface changes: yes
Change-Id: I5a97e1eb1e2a3ec207591b3d5b8b7f1949a80fbc
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/4025
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
index a78c269..25474769 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
@@ -64,7 +64,7 @@
}
@Override
- public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload, Exception exception) {
+ public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload) {
HyracksClientInterfaceFunctions.Function fn = (HyracksClientInterfaceFunctions.Function) payload;
switch (fn.getFunctionId()) {
case GET_CLUSTER_CONTROLLER_INFO:
@@ -200,4 +200,14 @@
}
}
}
+
+ @Override
+ public void onError(IIPCHandle handle, long mid, long rmid, Exception exception) {
+ LOGGER.info("exception in/or processing message", exception);
+ try {
+ handle.send(mid, null, exception);
+ } catch (IPCException e) {
+ LOGGER.warn("error sending exception response", e);
+ }
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
index 0e4ad41..d263cc0 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
@@ -60,8 +60,7 @@
}
@Override
- public void deliverIncomingMessage(final IIPCHandle handle, long mid, long rmid, Object payload,
- Exception exception) {
+ public void deliverIncomingMessage(final IIPCHandle handle, long mid, long rmid, Object payload) {
CCNCFunctions.Function fn = (Function) payload;
switch (fn.getFunctionId()) {
case REGISTER_NODE:
@@ -170,6 +169,11 @@
}
}
+ @Override
+ public void onError(IIPCHandle handle, long mid, long rmid, Exception exception) {
+ LOGGER.info("exception in/or processing message", exception);
+ }
+
private static void processNodeHeartbeat(ClusterControllerService ccs, CCNCFunctions.Function fn) {
final ExecutorService executor = ccs.getExecutor();
if (executor != null) {
@@ -177,4 +181,4 @@
executor.execute(new NodeHeartbeatWork(ccs, nhf.getNodeId(), nhf.getHeartbeatData(), nhf.getNcAddress()));
}
}
-}
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
index 836c624..df08c04 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
@@ -37,12 +37,15 @@
import org.apache.hyracks.control.nc.work.UndeployJobSpecWork;
import org.apache.hyracks.ipc.api.IIPCHandle;
import org.apache.hyracks.ipc.api.IIPCI;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
/**
* Interprocess communication in a node controller
* This class must be refactored with each function carrying its own implementation
*/
final class NodeControllerIPCI implements IIPCI {
+ private static final Logger LOGGER = LogManager.getLogger();
private final NodeControllerService ncs;
/**
@@ -53,8 +56,7 @@
}
@Override
- public void deliverIncomingMessage(final IIPCHandle handle, long mid, long rmid, Object payload,
- Exception exception) {
+ public void deliverIncomingMessage(final IIPCHandle handle, long mid, long rmid, Object payload) {
CCNCFunctions.Function fn = (CCNCFunctions.Function) payload;
switch (fn.getFunctionId()) {
case SEND_APPLICATION_MESSAGE:
@@ -150,4 +152,9 @@
}
}
+
+ @Override
+ public void onError(IIPCHandle handle, long mid, long rmid, Exception exception) {
+ LOGGER.info("exception in/or processing message", exception);
+ }
}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCI.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCI.java
index 02698fa..bf1bc33 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCI.java
@@ -21,7 +21,6 @@
/**
* The interprocess communication interface that handles communication between different processes across the cluster
*/
-@FunctionalInterface
public interface IIPCI {
/**
@@ -34,8 +33,19 @@
* the request message id (if the message is a response to a request)
* @param payload
* the message payload
- * @param exception
- * an exception if the message was an error message
*/
- void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload, Exception exception);
+ void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload);
+
+ /**
+ * handles an error message, or failure to unmarshall the message
+ * @param handle
+ * the message IPC handle
+ * @param mid
+ * the message id
+ * @param rmid
+ * the request message id (if the message is a response to a request)
+ * @param exception
+ * an exception
+ */
+ void onError(IIPCHandle handle, long mid, long rmid, Exception exception);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/RPCInterface.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/RPCInterface.java
index 7dae541..fd98b5c 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/RPCInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/RPCInterface.java
@@ -31,7 +31,7 @@
public Object call(IIPCHandle handle, Object request) throws Exception {
Request req;
long mid;
- synchronized (this) {
+ synchronized (reqMap) {
req = new Request(handle, this);
mid = handle.send(-1, request, null);
reqMap.put(mid, req);
@@ -40,21 +40,23 @@
}
@Override
- public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload, Exception exception) {
+ public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload) {
Request req;
- synchronized (this) {
+ synchronized (reqMap) {
req = reqMap.remove(rmid);
}
assert req != null;
- if (exception != null) {
- req.setException(exception);
- } else {
- req.setResult(payload);
- }
+ req.setResult(payload);
}
- protected synchronized void removeRequest(Request r) {
- reqMap.remove(r);
+ @Override
+ public void onError(IIPCHandle handle, long mid, long rmid, Exception exception) {
+ Request req;
+ synchronized (reqMap) {
+ req = reqMap.remove(rmid);
+ }
+ assert req != null;
+ req.setException(exception);
}
private static class Request {
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
index 8d90ba3..4a19a33 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
@@ -29,13 +29,8 @@
import org.apache.hyracks.ipc.api.IPCPerformanceCounters;
import org.apache.hyracks.ipc.api.IPayloadSerializerDeserializer;
import org.apache.hyracks.ipc.exceptions.IPCException;
-import org.apache.logging.log4j.Level;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
public class IPCSystem {
- private static final Logger LOGGER = LogManager.getLogger();
-
private final IPCConnectionManager cMgr;
private final IIPCI ipci;
@@ -101,15 +96,11 @@
void deliverIncomingMessage(final Message message) {
long mid = message.getMessageId();
long rmid = message.getRequestMessageId();
- Object payload = null;
- Exception exception = null;
if (message.getFlag() == Message.ERROR) {
- exception = (Exception) message.getPayload();
- LOGGER.log(Level.INFO, "Exception in message", exception);
+ ipci.onError(message.getIPCHandle(), mid, rmid, (Exception) message.getPayload());
} else {
- payload = message.getPayload();
+ ipci.deliverIncomingMessage(message.getIPCHandle(), mid, rmid, message.getPayload());
}
- ipci.deliverIncomingMessage(message.getIPCHandle(), mid, rmid, payload, exception);
}
IPCConnectionManager getConnectionManager() {
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/test/java/org/apache/hyracks/ipc/tests/IPCTest.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/test/java/org/apache/hyracks/ipc/tests/IPCTest.java
index 00bd761..0d8d69b 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/test/java/org/apache/hyracks/ipc/tests/IPCTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/test/java/org/apache/hyracks/ipc/tests/IPCTest.java
@@ -47,7 +47,7 @@
IIPCHandle handle = client.getHandle(serverAddr, 0);
for (int i = 0; i < 100; ++i) {
- Assert.assertEquals(rpci.call(handle, Integer.valueOf(i)), Integer.valueOf(2 * i));
+ Assert.assertEquals(rpci.call(handle, i), 2 * i);
}
try {
@@ -62,27 +62,24 @@
final Executor executor = Executors.newCachedThreadPool();
IIPCI ipci = new IIPCI() {
@Override
- public void deliverIncomingMessage(final IIPCHandle handle, final long mid, long rmid, final Object payload,
- Exception exception) {
- executor.execute(new Runnable() {
- @Override
- public void run() {
- Object result = null;
- Exception exception = null;
+ public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload) {
+ executor.execute(() -> {
+ try {
+ handle.send(mid, (int) payload * 2, null);
+ } catch (Exception e) {
try {
- Integer i = (Integer) payload;
- result = i.intValue() * 2;
- } catch (Exception e) {
- exception = e;
- }
- try {
- handle.send(mid, result, exception);
- } catch (IPCException e) {
- e.printStackTrace();
+ handle.send(mid, null, e);
+ } catch (IPCException e1) {
+ e1.printStackTrace();
}
}
});
}
+
+ @Override
+ public void onError(IIPCHandle handle, long mid, long rmid, Exception exception) {
+ exception.printStackTrace();
+ }
};
return new IPCSystem(new InetSocketAddress("127.0.0.1", 0), PlainSocketChannelFactory.INSTANCE, ipci,
new JavaSerializationBasedPayloadSerializerDeserializer());