[NO ISSUE] Fix UDF delete message
The delete method for UDFs is using the async message format,
but this isn't correct. It should use the synchronous request
form and require a response, just as the udf load does.
Change-Id: I4c18e62bdca2fe6b9239d740b9040171b799a3a7
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3487
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ian Maxon <imaxon@uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/UdfApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/UdfApiServlet.java
index d293ee8..79b78a8 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/UdfApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/UdfApiServlet.java
@@ -130,10 +130,11 @@
}
private void deleteUdf(String dataverse, String resourceName) throws Exception {
- DeleteUdfMessage msg = new DeleteUdfMessage(dataverse, resourceName);
- for (String nc : appCtx.getClusterStateManager().getParticipantNodes()) {
- broker.sendApplicationMessageToNC(msg, nc);
- }
+ long reqId = broker.newRequestId();
+ List<INcAddressedMessage> requests = new ArrayList<>();
+ List<String> ncs = new ArrayList<>(appCtx.getClusterStateManager().getParticipantNodes());
+ ncs.forEach(s -> requests.add(new DeleteUdfMessage(dataverse, resourceName, reqId)));
+ broker.sendSyncRequestToNCs(reqId, ncs, requests, UDF_RESPONSE_TIMEOUT);
appCtx.getLibraryManager().deregisterLibraryClassLoader(dataverse, resourceName);
appCtx.getHcc().unDeployBinary(new DeploymentId(resourceName));
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AbstractUdfMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AbstractUdfMessage.java
new file mode 100644
index 0000000..90bd2a6
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AbstractUdfMessage.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.library.ILibraryManager;
+import org.apache.asterix.common.messaging.CcIdentifiedMessage;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public abstract class AbstractUdfMessage extends CcIdentifiedMessage implements INcAddressedMessage {
+
+ protected final String dataverseName;
+ protected final String libraryName;
+ protected static final Logger LOGGER = LogManager.getLogger();
+
+ private static final long serialVersionUID = 2L;
+
+ private final long reqId;
+
+ public AbstractUdfMessage(String dataverseName, String libraryName, long reqId) {
+ this.dataverseName = dataverseName;
+ this.libraryName = libraryName;
+ this.reqId = reqId;
+ }
+
+ @Override
+ public void handle(INcApplicationContext appCtx) {
+ ILibraryManager mgr = appCtx.getLibraryManager();
+ String mdNodeName = appCtx.getMetadataProperties().getMetadataNodeName();
+ String nodeName = appCtx.getServiceContext().getNodeId();
+ INCMessageBroker broker = (INCMessageBroker) appCtx.getServiceContext().getMessageBroker();
+ boolean isMdNode = mdNodeName.equals(nodeName);
+ try {
+ handleAction(mgr, isMdNode, appCtx);
+ broker.sendMessageToCC(getCcId(), new UdfResponseMessage(reqId, null));
+ } catch (Exception e) {
+ try {
+ LOGGER.error("Error in UDF distribution", e);
+ broker.sendMessageToPrimaryCC(new UdfResponseMessage(reqId, e));
+ } catch (Exception f) {
+ LOGGER.error("Unable to send failure response to CC", f);
+ }
+ }
+
+ }
+
+ protected abstract void handleAction(ILibraryManager mgr, boolean isMdNode, INcApplicationContext appCtx)
+ throws Exception;
+
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/DeleteUdfMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/DeleteUdfMessage.java
index ca7b9db..efbc9c1 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/DeleteUdfMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/DeleteUdfMessage.java
@@ -21,35 +21,20 @@
import org.apache.asterix.app.external.ExternalLibraryUtils;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.library.ILibraryManager;
-import org.apache.asterix.common.messaging.api.INcAddressedMessage;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-public class DeleteUdfMessage implements INcAddressedMessage {
+public class DeleteUdfMessage extends AbstractUdfMessage {
- private static final long serialVersionUID = -3129473321451281271L;
- private final String dataverseName;
- private final String libraryName;
- private static final Logger LOGGER = LogManager.getLogger();
+ private static final long serialVersionUID = 2L;
- public DeleteUdfMessage(String dataverseName, String libraryName) {
- this.dataverseName = dataverseName;
- this.libraryName = libraryName;
+ public DeleteUdfMessage(String dataverseName, String libraryName, long reqId) {
+ super(dataverseName, libraryName, reqId);
}
@Override
- public void handle(INcApplicationContext appCtx) {
- ILibraryManager mgr = appCtx.getLibraryManager();
- String mdNodeName = appCtx.getMetadataProperties().getMetadataNodeName();
- String nodeName = appCtx.getServiceContext().getNodeId();
- boolean isMdNode = mdNodeName.equals(nodeName);
- try {
- if (isMdNode) {
- ExternalLibraryUtils.uninstallLibrary(dataverseName, libraryName);
- }
- mgr.deregisterLibraryClassLoader(dataverseName, libraryName);
- } catch (Exception e) {
- LOGGER.error("Unable to un-deploy UDF", e);
+ protected void handleAction(ILibraryManager mgr, boolean isMdNode, INcApplicationContext appCtx) throws Exception {
+ if (isMdNode) {
+ ExternalLibraryUtils.uninstallLibrary(dataverseName, libraryName);
}
+ mgr.deregisterLibraryClassLoader(dataverseName, libraryName);
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/LoadUdfMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/LoadUdfMessage.java
index 66714bf..600603b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/LoadUdfMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/LoadUdfMessage.java
@@ -21,47 +21,20 @@
import org.apache.asterix.app.external.ExternalLibraryUtils;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.library.ILibraryManager;
-import org.apache.asterix.common.messaging.CcIdentifiedMessage;
-import org.apache.asterix.common.messaging.api.INCMessageBroker;
-import org.apache.asterix.common.messaging.api.INcAddressedMessage;
import org.apache.hyracks.util.file.FileUtil;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-public class LoadUdfMessage extends CcIdentifiedMessage implements INcAddressedMessage {
+public class LoadUdfMessage extends AbstractUdfMessage {
- private final String dataverseName;
- private final String libraryName;
- private static final Logger LOGGER = LogManager.getLogger();
-
- private static final long serialVersionUID = -4529473341458281271L;
- private final long reqId;
+ private static final long serialVersionUID = 2L;
public LoadUdfMessage(String dataverseName, String libraryName, long reqId) {
- this.dataverseName = dataverseName;
- this.libraryName = libraryName;
- this.reqId = reqId;
+ super(dataverseName, libraryName, reqId);
}
@Override
- public void handle(INcApplicationContext appCtx) {
- ILibraryManager mgr = appCtx.getLibraryManager();
- String mdNodeName = appCtx.getMetadataProperties().getMetadataNodeName();
- String nodeName = appCtx.getServiceContext().getNodeId();
- INCMessageBroker broker = (INCMessageBroker) appCtx.getServiceContext().getMessageBroker();
- boolean isMdNode = mdNodeName.equals(nodeName);
- try {
- ExternalLibraryUtils.setUpExternaLibrary(mgr, isMdNode,
- FileUtil.joinPath(appCtx.getServiceContext().getServerCtx().getBaseDir().getAbsolutePath(),
- "applications", dataverseName + "." + libraryName));
- broker.sendMessageToPrimaryCC(new UdfResponseMessage(reqId, null));
- } catch (Exception e) {
- try {
- broker.sendMessageToPrimaryCC(new UdfResponseMessage(reqId, e));
- } catch (Exception f) {
- LOGGER.error("Unable to send failure response to CC", f);
- }
- }
-
+ protected void handleAction(ILibraryManager mgr, boolean isMdNode, INcApplicationContext appCtx) throws Exception {
+ ExternalLibraryUtils.setUpExternaLibrary(mgr, isMdNode,
+ FileUtil.joinPath(appCtx.getServiceContext().getServerCtx().getBaseDir().getAbsolutePath(),
+ "applications", dataverseName + "." + libraryName));
}
}