[ASTERIXDB-2597] Load UDFs via HTTP
- POST existing UDF format to /admin/udf/$DATAVERSE/$LIBNAME
- DELETE against that URL to remove UDFs
Change-Id: I6be9fef54c010bdb32f5c78af9b973f9843f442f
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3386
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ian Maxon <imaxon@uci.edu>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/UdfApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/UdfApiServlet.java
new file mode 100644
index 0000000..d293ee8
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/UdfApiServlet.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.api.http.server;
+
+import java.io.File;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.asterix.app.external.ExternalLibraryUtils;
+import org.apache.asterix.app.message.DeleteUdfMessage;
+import org.apache.asterix.app.message.LoadUdfMessage;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.messaging.api.ICCMessageBroker;
+import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.deployment.DeploymentId;
+import org.apache.hyracks.http.api.IServletRequest;
+import org.apache.hyracks.http.api.IServletResponse;
+import org.apache.hyracks.http.server.AbstractServlet;
+import org.apache.hyracks.util.file.FileUtil;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.QueryStringDecoder;
+
+public class UdfApiServlet extends AbstractServlet {
+
+ private static final Logger LOGGER = LogManager.getLogger();
+ private final ICcApplicationContext appCtx;
+ private final ICCMessageBroker broker;
+ public static final String UDF_TMP_DIR_PREFIX = "udf_temp";
+ public static final int UDF_RESPONSE_TIMEOUT = 5000;
+
+ public UdfApiServlet(ICcApplicationContext appCtx, ConcurrentMap<String, Object> ctx, String... paths) {
+ super(ctx, paths);
+ this.appCtx = appCtx;
+ this.broker = (ICCMessageBroker) appCtx.getServiceContext().getMessageBroker();
+ }
+
+ private String[] getResource(FullHttpRequest req) throws IllegalArgumentException {
+ String[] path = new QueryStringDecoder(req.uri()).path().split("/");
+ if (path.length != 5) {
+ throw new IllegalArgumentException("Invalid resource.");
+ }
+ String resourceName = path[path.length - 1];
+ String dataverseName = path[path.length - 2];
+ return new String[] { resourceName, dataverseName };
+ }
+
+ @Override
+ protected void post(IServletRequest request, IServletResponse response) {
+ FullHttpRequest req = request.getHttpRequest();
+ String[] resourceNames;
+ try {
+ resourceNames = getResource(req);
+ } catch (IllegalArgumentException e) {
+ response.setStatus(HttpResponseStatus.BAD_REQUEST);
+ return;
+ }
+ String resourceName = resourceNames[0];
+ String dataverse = resourceNames[1];
+ File udf = null;
+ try {
+ File workingDir = new File(appCtx.getServiceContext().getServerCtx().getBaseDir().getAbsolutePath(),
+ UDF_TMP_DIR_PREFIX);
+ if (!workingDir.exists()) {
+ FileUtil.forceMkdirs(workingDir);
+ }
+ udf = File.createTempFile(resourceName, ".zip", workingDir);
+ try (RandomAccessFile raf = new RandomAccessFile(udf, "rw")) {
+ ByteBuf reqContent = req.content();
+ raf.setLength(reqContent.readableBytes());
+ FileChannel fc = raf.getChannel();
+ ByteBuffer content = reqContent.nioBuffer();
+ while (content.hasRemaining()) {
+ fc.write(content);
+ }
+ }
+ IHyracksClientConnection hcc = appCtx.getHcc();
+ DeploymentId udfName = new DeploymentId(dataverse + "." + resourceName);
+ ClassLoader cl = appCtx.getLibraryManager().getLibraryClassLoader(dataverse, resourceName);
+ if (cl != null) {
+ deleteUdf(dataverse, resourceName);
+ }
+ hcc.deployBinary(udfName, Arrays.asList(udf.toString()), true);
+ ExternalLibraryUtils.setUpExternaLibrary(appCtx.getLibraryManager(), false,
+ FileUtil.joinPath(appCtx.getServiceContext().getServerCtx().getBaseDir().getAbsolutePath(),
+ "applications", udfName.toString()));
+
+ long reqId = broker.newRequestId();
+ List<INcAddressedMessage> requests = new ArrayList<>();
+ List<String> ncs = new ArrayList<>(appCtx.getClusterStateManager().getParticipantNodes());
+ ncs.forEach(s -> requests.add(new LoadUdfMessage(dataverse, resourceName, reqId)));
+ broker.sendSyncRequestToNCs(reqId, ncs, requests, UDF_RESPONSE_TIMEOUT);
+ } catch (Exception e) {
+ response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
+ LOGGER.error(e);
+ return;
+ } finally {
+ if (udf != null) {
+ udf.delete();
+ }
+ }
+ response.setStatus(HttpResponseStatus.OK);
+
+ }
+
+ private void deleteUdf(String dataverse, String resourceName) throws Exception {
+ DeleteUdfMessage msg = new DeleteUdfMessage(dataverse, resourceName);
+ for (String nc : appCtx.getClusterStateManager().getParticipantNodes()) {
+ broker.sendApplicationMessageToNC(msg, nc);
+ }
+ appCtx.getLibraryManager().deregisterLibraryClassLoader(dataverse, resourceName);
+ appCtx.getHcc().unDeployBinary(new DeploymentId(resourceName));
+ }
+
+ @Override
+ protected void delete(IServletRequest request, IServletResponse response) {
+ String[] resourceNames;
+ try {
+ resourceNames = getResource(request.getHttpRequest());
+ } catch (IllegalArgumentException e) {
+ response.setStatus(HttpResponseStatus.BAD_REQUEST);
+ return;
+ }
+ String resourceName = resourceNames[0];
+ String dataverse = resourceNames[1];
+ try {
+ deleteUdf(dataverse, resourceName);
+ } catch (Exception e) {
+ response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
+ LOGGER.error(e);
+ return;
+ }
+ response.setStatus(HttpResponseStatus.OK);
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java
index 0eac212..a989941 100755
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java
@@ -22,6 +22,7 @@
import java.io.FilenameFilter;
import java.io.IOException;
import java.net.URL;
+import java.net.URLClassLoader;
import java.rmi.RemoteException;
import java.util.ArrayList;
import java.util.HashMap;
@@ -60,8 +61,8 @@
private ExternalLibraryUtils() {
}
- public static void setUpExternaLibraries(ILibraryManager externalLibraryManager, boolean isMetadataNode)
- throws Exception {
+ public static void setUpExternaLibrary(ILibraryManager externalLibraryManager, boolean isMetadataNode,
+ String libraryPath) throws Exception {
// start by un-installing removed libraries (Metadata Node only)
Map<String, List<String>> uninstalledLibs = null;
if (isMetadataNode) {
@@ -69,18 +70,30 @@
}
// get the directory of the to be installed libraries
- File installLibDir = getLibraryInstallDir();
+ String[] pathSplit = libraryPath.split("\\.");
+ String[] dvSplit = pathSplit[pathSplit.length - 2].split("/");
+ String dataverse = dvSplit[dvSplit.length - 1];
+ String name = pathSplit[pathSplit.length - 1].trim();
+ File installLibDir = new File(libraryPath);
+
// directory exists?
if (installLibDir.exists()) {
- // get the list of files in the directory
- for (File dataverseDir : installLibDir.listFiles(File::isDirectory)) {
- for (File libraryDir : dataverseDir.listFiles(File::isDirectory)) {
- // For each file (library), register classloader and configure its parameter.
- // If current node is Metadata Node, add the library to metadata.
- registerClassLoader(externalLibraryManager, dataverseDir.getName(), libraryDir.getName());
- configureLibrary(externalLibraryManager, dataverseDir.getName(), libraryDir, uninstalledLibs,
- isMetadataNode);
- }
+ registerClassLoader(externalLibraryManager, dataverse, name, libraryPath);
+ configureLibrary(externalLibraryManager, dataverse, name, installLibDir, uninstalledLibs, isMetadataNode);
+ }
+ }
+
+ public static void setUpInstalledLibraries(ILibraryManager externalLibraryManager, boolean isMetadataNode,
+ File appDir) throws Exception {
+ File[] libs = appDir.listFiles(new FilenameFilter() {
+ @Override
+ public boolean accept(File dir, String name) {
+ return dir.isDirectory();
+ }
+ });
+ if (libs != null) {
+ for (File lib : libs) {
+ setUpExternaLibrary(externalLibraryManager, isMetadataNode, lib.getAbsolutePath());
}
}
}
@@ -134,7 +147,7 @@
* @throws RemoteException
* @throws ACIDException
*/
- protected static boolean uninstallLibrary(String dataverse, String libraryName)
+ public static boolean uninstallLibrary(String dataverse, String libraryName)
throws AsterixException, RemoteException, ACIDException {
MetadataTransactionContext mdTxnCtx = null;
try {
@@ -270,10 +283,9 @@
* failure in installing an element does not effect installation of other
* libraries.
*/
- protected static void configureLibrary(ILibraryManager libraryManager, String dataverse, final File libraryDir,
- Map<String, List<String>> uninstalledLibs, boolean isMetadataNode) throws Exception {
+ protected static void configureLibrary(ILibraryManager libraryManager, String dataverse, String libraryName,
+ final File libraryDir, Map<String, List<String>> uninstalledLibs, boolean isMetadataNode) throws Exception {
- String libraryName = libraryDir.getName().trim();
String[] libraryDescriptors = libraryDir.list((dir, name) -> name.endsWith(".xml"));
if (libraryDescriptors == null) {
@@ -303,15 +315,15 @@
* register the library class loader with the external library manager
*
* @param dataverse
- * @param libraryName
+ * @param libraryPath
* @throws Exception
*/
- protected static void registerClassLoader(ILibraryManager externalLibraryManager, String dataverse,
- String libraryName) throws Exception {
+ protected static void registerClassLoader(ILibraryManager externalLibraryManager, String dataverse, String name,
+ String libraryPath) throws Exception {
// get the class loader
- ClassLoader classLoader = getLibraryClassLoader(dataverse, libraryName);
+ URLClassLoader classLoader = getLibraryClassLoader(dataverse, name, libraryPath);
// register it with the external library manager
- externalLibraryManager.registerLibraryClassLoader(dataverse, libraryName, classLoader);
+ externalLibraryManager.registerLibraryClassLoader(dataverse, name, classLoader);
}
/**
@@ -331,22 +343,23 @@
/**
* Get the class loader for the library
*
+ * @param libraryPath
* @param dataverse
- * @param libraryName
* @return
* @throws Exception
*/
- private static ClassLoader getLibraryClassLoader(String dataverse, String libraryName) throws Exception {
+ private static URLClassLoader getLibraryClassLoader(String dataverse, String name, String libraryPath)
+ throws Exception {
// Get a reference to the library directory
- File installDir = getLibraryInstallDir();
+ File installDir = new File(libraryPath);
if (LOGGER.isInfoEnabled()) {
- LOGGER.info("Installing lirbary " + libraryName + " in dataverse " + dataverse + "."
- + " Install Directory: " + installDir.getAbsolutePath());
+ LOGGER.info("Installing lirbary " + name + " in dataverse " + dataverse + "." + " Install Directory: "
+ + installDir.getAbsolutePath());
}
// get a reference to the specific library dir
- File libDir =
- new File(installDir.getAbsolutePath() + File.separator + dataverse + File.separator + libraryName);
+ File libDir = installDir;
+
FilenameFilter jarFileFilter = new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
@@ -388,9 +401,9 @@
}
if (LOGGER.isInfoEnabled()) {
- StringBuilder logMesg = new StringBuilder("Classpath for library " + libraryName + "\n");
+ StringBuilder logMesg = new StringBuilder("Classpath for library " + dataverse + ": ");
for (URL url : urls) {
- logMesg.append(url.getFile() + "\n");
+ logMesg.append(url.getFile() + File.pathSeparatorChar);
}
LOGGER.info(logMesg.toString());
}
@@ -400,14 +413,6 @@
}
/**
- * @return the directory "System.getProperty("app.home", System.getProperty("user.home")/lib/udfs"
- */
- protected static File getLibraryInstallDir() {
- return new File(System.getProperty("app.home", System.getProperty("user.home")) + File.separator + "lib"
- + File.separator + "udfs");
- }
-
- /**
* @return the directory "System.getProperty("app.home", System.getProperty("user.home")/lib/udfs/uninstall"
*/
protected static File getLibraryUninstallDir() {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalUDFLibrarian.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalUDFLibrarian.java
index 7246925..86588ea 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalUDFLibrarian.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalUDFLibrarian.java
@@ -19,112 +19,58 @@
package org.apache.asterix.app.external;
import java.io.File;
-import java.io.FileOutputStream;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.rmi.RemoteException;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.List;
-import java.util.zip.ZipEntry;
-import java.util.zip.ZipFile;
+import java.net.URL;
-import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.library.ILibraryManager;
-import org.apache.commons.compress.utils.IOUtils;
-import org.apache.commons.io.FileUtils;
-import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.ClientProtocolException;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.FileEntity;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.hyracks.api.exceptions.HyracksException;
@SuppressWarnings("squid:S134")
public class ExternalUDFLibrarian implements IExternalUDFLibrarian {
- // The following list includes a library manager for the CC
- // and library managers for NCs (one-per-NC).
- private final List<ILibraryManager> libraryManagers;
+ private HttpClient hc;
+ private String host;
+ private int port;
- public ExternalUDFLibrarian(List<ILibraryManager> libraryManagers) {
- this.libraryManagers = libraryManagers;
+ public ExternalUDFLibrarian(String host, int port) {
+ hc = new DefaultHttpClient();
+ this.host = host;
+ this.port = port;
}
- public static void removeLibraryDir() throws IOException {
- File installLibDir = ExternalLibraryUtils.getLibraryInstallDir();
- FileUtils.deleteQuietly(installLibDir);
+ public ExternalUDFLibrarian() {
+ this("localhost", 19002);
}
- public static void unzip(String sourceFile, String outputDir) throws IOException {
- if (System.getProperty("os.name").toLowerCase().startsWith("win")) {
- try (ZipFile zipFile = new ZipFile(sourceFile)) {
- Enumeration<? extends ZipEntry> entries = zipFile.entries();
- while (entries.hasMoreElements()) {
- ZipEntry entry = entries.nextElement();
- File entryDestination = new File(outputDir, entry.getName());
- if (!entry.isDirectory()) {
- entryDestination.getParentFile().mkdirs();
- try (InputStream in = zipFile.getInputStream(entry);
- OutputStream out = new FileOutputStream(entryDestination)) {
- IOUtils.copy(in, out);
- }
- }
- }
- }
- } else {
- Process process = new ProcessBuilder("unzip", "-d", outputDir, sourceFile).start();
- try {
- process.waitFor();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IOException(e);
- }
+ @Override
+ public void install(String dataverse, String libName, String libPath) throws Exception {
+ URL url = new URL("http", host, port, "/admin/udf/" + dataverse + "/" + libName);
+ HttpPost post = new HttpPost(url.toString());
+ post.setEntity(new FileEntity(new File(libPath), "application/octet-stream"));
+ HttpResponse response = hc.execute(post);
+ response.getEntity().consumeContent();
+ if (response.getStatusLine().getStatusCode() != 200) {
+ throw new HyracksException(response.getStatusLine().toString());
}
}
@Override
- public void install(String dvName, String libName, String libPath) throws Exception {
- // get the directory of the to be installed libraries
- File installLibDir = ExternalLibraryUtils.getLibraryInstallDir();
- // directory exists?
- if (!installLibDir.exists()) {
- installLibDir.mkdir();
- }
- // copy the library file into the directory
- File destinationDir =
- new File(installLibDir.getAbsolutePath() + File.separator + dvName + File.separator + libName);
- FileUtils.deleteQuietly(destinationDir);
- destinationDir.mkdirs();
- try {
- unzip(libPath, destinationDir.getAbsolutePath());
- } catch (Exception e) {
-
- throw new Exception("Couldn't unzip the file: " + libPath, e);
- }
-
- for (ILibraryManager libraryManager : libraryManagers) {
- ExternalLibraryUtils.registerClassLoader(libraryManager, dvName, libName);
- ExternalLibraryUtils.configureLibrary(libraryManager, dvName, destinationDir, new HashMap<>(),
- libraryManagers.indexOf(libraryManager) != 0);
+ public void uninstall(String dataverse, String libName)
+ throws IOException, ClientProtocolException, AsterixException {
+ URL url = new URL("http", host, port, "/admin/udf/" + dataverse + "/" + libName);
+ HttpDelete del = new HttpDelete(url.toString());
+ HttpResponse response = hc.execute(del);
+ response.getEntity().consumeContent();
+ if (response.getStatusLine().getStatusCode() != 200) {
+ throw new AsterixException(response.getStatusLine().toString());
}
}
- @Override
- public void uninstall(String dvName, String libName) throws RemoteException, AsterixException, ACIDException {
- ExternalLibraryUtils.uninstallLibrary(dvName, libName);
- for (ILibraryManager libraryManager : libraryManagers) {
- libraryManager.deregisterLibraryClassLoader(dvName, libName);
- }
- }
-
- public void cleanup() throws AsterixException, RemoteException, ACIDException {
- for (ILibraryManager libraryManager : libraryManagers) {
- List<Pair<String, String>> libs = libraryManager.getAllLibraries();
- for (Pair<String, String> dvAndLib : libs) {
- ExternalLibraryUtils.uninstallLibrary(dvAndLib.first, dvAndLib.second);
- libraryManager.deregisterLibraryClassLoader(dvAndLib.first, dvAndLib.second);
- }
- }
- // get the directory of the to be installed libraries
- File installLibDir = ExternalLibraryUtils.getLibraryInstallDir();
- FileUtils.deleteQuietly(installLibDir);
- }
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/IExternalUDFLibrarian.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/IExternalUDFLibrarian.java
index 9a17444..a7a668a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/IExternalUDFLibrarian.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/IExternalUDFLibrarian.java
@@ -19,13 +19,12 @@
package org.apache.asterix.app.external;
import java.io.IOException;
-import java.rmi.RemoteException;
-import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.http.client.ClientProtocolException;
public interface IExternalUDFLibrarian {
- public void install(String dvName, String libName, String libPath) throws IOException, Exception;
+ void install(String dataverse, String libName, String libPath) throws Exception;
- public void uninstall(String dvName, String libName) throws RemoteException, AsterixException, ACIDException;
+ void uninstall(String dataverse, String libName) throws IOException, ClientProtocolException, AsterixException;
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/DeleteUdfMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/DeleteUdfMessage.java
new file mode 100644
index 0000000..ca7b9db
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/DeleteUdfMessage.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import org.apache.asterix.app.external.ExternalLibraryUtils;
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.library.ILibraryManager;
+import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class DeleteUdfMessage implements INcAddressedMessage {
+
+ private static final long serialVersionUID = -3129473321451281271L;
+ private final String dataverseName;
+ private final String libraryName;
+ private static final Logger LOGGER = LogManager.getLogger();
+
+ public DeleteUdfMessage(String dataverseName, String libraryName) {
+ this.dataverseName = dataverseName;
+ this.libraryName = libraryName;
+ }
+
+ @Override
+ public void handle(INcApplicationContext appCtx) {
+ ILibraryManager mgr = appCtx.getLibraryManager();
+ String mdNodeName = appCtx.getMetadataProperties().getMetadataNodeName();
+ String nodeName = appCtx.getServiceContext().getNodeId();
+ boolean isMdNode = mdNodeName.equals(nodeName);
+ try {
+ if (isMdNode) {
+ ExternalLibraryUtils.uninstallLibrary(dataverseName, libraryName);
+ }
+ mgr.deregisterLibraryClassLoader(dataverseName, libraryName);
+ } catch (Exception e) {
+ LOGGER.error("Unable to un-deploy UDF", e);
+ }
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/LoadUdfMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/LoadUdfMessage.java
new file mode 100644
index 0000000..66714bf
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/LoadUdfMessage.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import org.apache.asterix.app.external.ExternalLibraryUtils;
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.library.ILibraryManager;
+import org.apache.asterix.common.messaging.CcIdentifiedMessage;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+import org.apache.hyracks.util.file.FileUtil;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class LoadUdfMessage extends CcIdentifiedMessage implements INcAddressedMessage {
+
+ private final String dataverseName;
+ private final String libraryName;
+ private static final Logger LOGGER = LogManager.getLogger();
+
+ private static final long serialVersionUID = -4529473341458281271L;
+ private final long reqId;
+
+ public LoadUdfMessage(String dataverseName, String libraryName, long reqId) {
+ this.dataverseName = dataverseName;
+ this.libraryName = libraryName;
+ this.reqId = reqId;
+ }
+
+ @Override
+ public void handle(INcApplicationContext appCtx) {
+ ILibraryManager mgr = appCtx.getLibraryManager();
+ String mdNodeName = appCtx.getMetadataProperties().getMetadataNodeName();
+ String nodeName = appCtx.getServiceContext().getNodeId();
+ INCMessageBroker broker = (INCMessageBroker) appCtx.getServiceContext().getMessageBroker();
+ boolean isMdNode = mdNodeName.equals(nodeName);
+ try {
+ ExternalLibraryUtils.setUpExternaLibrary(mgr, isMdNode,
+ FileUtil.joinPath(appCtx.getServiceContext().getServerCtx().getBaseDir().getAbsolutePath(),
+ "applications", dataverseName + "." + libraryName));
+ broker.sendMessageToPrimaryCC(new UdfResponseMessage(reqId, null));
+ } catch (Exception e) {
+ try {
+ broker.sendMessageToPrimaryCC(new UdfResponseMessage(reqId, e));
+ } catch (Exception f) {
+ LOGGER.error("Unable to send failure response to CC", f);
+ }
+ }
+
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UdfResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UdfResponseMessage.java
new file mode 100644
index 0000000..29a91c5
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UdfResponseMessage.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import java.util.ArrayList;
+
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.messaging.api.ICCMessageBroker;
+import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
+import org.apache.asterix.common.messaging.api.INcResponse;
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class UdfResponseMessage implements ICcAddressedMessage, INcResponse {
+
+ private static final long serialVersionUID = -4520773141458281271L;
+ private final long reqId;
+ private final Exception failure;
+
+ public UdfResponseMessage(long reqId, Exception failure) {
+ this.reqId = reqId;
+ this.failure = failure;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void setResult(MutablePair<ICCMessageBroker.ResponseState, Object> result) {
+ ICCMessageBroker.ResponseState responseState = result.getLeft();
+ if (failure != null) {
+ result.setLeft(ICCMessageBroker.ResponseState.FAILURE);
+ result.setRight(failure);
+ return;
+ }
+ switch (responseState) {
+ case UNINITIALIZED:
+ // First to arrive
+ result.setRight(new ArrayList<String>());
+ // No failure, change state to success
+ result.setLeft(ICCMessageBroker.ResponseState.SUCCESS);
+ // Fallthrough
+ case SUCCESS:
+ break;
+ default:
+ break;
+
+ }
+ }
+
+ @Override
+ public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
+ ICCMessageBroker broker = (ICCMessageBroker) appCtx.getServiceContext().getMessageBroker();
+ broker.respond(reqId, this);
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java
index 8cfeb12..1ca2b78 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java
@@ -38,7 +38,8 @@
public void perform(CcId ccId, IControllerService cs) throws HyracksDataException {
INcApplicationContext appContext = (INcApplicationContext) cs.getApplicationContext();
try {
- ExternalLibraryUtils.setUpExternaLibraries(appContext.getLibraryManager(), metadataNode);
+ ExternalLibraryUtils.setUpInstalledLibraries(appContext.getLibraryManager(), metadataNode,
+ cs.getContext().getServerCtx().getAppDir());
} catch (Exception e) {
throw HyracksDataException.create(e);
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 5998599..e2fbe35 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -47,6 +47,7 @@
import org.apache.asterix.api.http.server.RebalanceApiServlet;
import org.apache.asterix.api.http.server.ServletConstants;
import org.apache.asterix.api.http.server.ShutdownApiServlet;
+import org.apache.asterix.api.http.server.UdfApiServlet;
import org.apache.asterix.api.http.server.VersionApiServlet;
import org.apache.asterix.app.active.ActiveNotificationHandler;
import org.apache.asterix.app.cc.CCExtensionManager;
@@ -150,7 +151,7 @@
ReplicationProperties repProp =
new ReplicationProperties(PropertiesAccessor.getInstance(ccServiceCtx.getAppConfig()));
INcLifecycleCoordinator lifecycleCoordinator = createNcLifeCycleCoordinator(repProp.isReplicationEnabled());
- ExternalLibraryUtils.setUpExternaLibraries(libraryManager, false);
+ ExternalLibraryUtils.setUpInstalledLibraries(libraryManager, false, ccServiceCtx.getServerCtx().getAppDir());
componentProvider = new StorageComponentProvider();
ccExtensionManager = new CCExtensionManager(new ArrayList<>(getExtensions()));
@@ -261,6 +262,7 @@
addServlet(jsonAPIServer, Servlets.CLUSTER_STATE_CC_DETAIL); // must not precede add of CLUSTER_STATE
addServlet(jsonAPIServer, Servlets.DIAGNOSTICS);
addServlet(jsonAPIServer, Servlets.ACTIVE_STATS);
+ addServlet(jsonAPIServer, Servlets.UDF);
return jsonAPIServer;
}
@@ -313,6 +315,8 @@
return new DiagnosticsApiServlet(appCtx, ctx, paths);
case Servlets.ACTIVE_STATS:
return new ActiveStatsApiServlet(appCtx, ctx, paths);
+ case Servlets.UDF:
+ return new UdfApiServlet(appCtx, ctx, paths);
default:
throw new IllegalStateException(String.valueOf(key));
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index 2c25b36..ccbab3c 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -198,8 +198,7 @@
public void init(boolean deleteOldInstanceData, String externalLibPath, String confDir) throws Exception {
List<ILibraryManager> libraryManagers = new ArrayList<>();
- ExternalUDFLibrarian librarian = new ExternalUDFLibrarian(libraryManagers);
- librarian.cleanup();
+ ExternalUDFLibrarian librarian = new ExternalUDFLibrarian();
init(deleteOldInstanceData, confDir);
if (externalLibPath != null && externalLibPath.length() != 0) {
libraryManagers.add(((ICcApplicationContext) cc.getApplicationContext()).getLibraryManager());
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index ffbddb6..ccbf5ec 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -26,7 +26,6 @@
import java.util.List;
import java.util.Map;
-import org.apache.asterix.app.external.ExternalUDFLibrarian;
import org.apache.asterix.app.nc.NCAppRuntimeContext;
import org.apache.asterix.app.nc.TransactionSubsystem;
import org.apache.asterix.common.config.DatasetConfig.IndexType;
@@ -151,8 +150,6 @@
try {
File outdir = new File(PATH_ACTUAL);
outdir.mkdirs();
- // remove library directory
- ExternalUDFLibrarian.removeLibraryDir();
ExecutionTestUtil.setUp(cleanupOnStart,
testConfigFileName == null ? TEST_CONFIG_FILE_NAME : testConfigFileName,
ExecutionTestUtil.integrationUtil, runHDFS, options);
@@ -167,7 +164,6 @@
}
public void deInit(boolean cleanupOnStop) throws Exception {
- ExternalUDFLibrarian.removeLibraryDir();
ExecutionTestUtil.tearDown(cleanupOnStop, runHDFS);
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index 5da5199..d05ae3e 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -1105,7 +1105,7 @@
lines = statement.split("\n");
String lastLine = lines[lines.length - 1];
String[] command = lastLine.trim().split(" ");
- if (command.length < 3) {
+ if (command.length < 2) {
throw new Exception("invalid library format");
}
String dataverse = command[1];
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
index 75eccfd..20420ab 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
@@ -33,7 +33,6 @@
import java.util.List;
import org.apache.asterix.app.external.ExternalUDFLibrarian;
-import org.apache.asterix.common.library.ILibraryManager;
import org.apache.asterix.common.utils.StorageConstants;
import org.apache.asterix.test.common.TestExecutor;
import org.apache.asterix.testframework.context.TestCaseContext;
@@ -69,10 +68,8 @@
testExecutor = executor;
File outdir = new File(PATH_ACTUAL);
outdir.mkdirs();
- List<ILibraryManager> libraryManagers =
- ExecutionTestUtil.setUp(cleanupOnStart, configFile, integrationUtil, startHdfs, null);
- ExternalUDFLibrarian.removeLibraryDir();
- librarian = new ExternalUDFLibrarian(libraryManagers);
+ ExecutionTestUtil.setUp(cleanupOnStart, configFile, integrationUtil, startHdfs, null);
+ librarian = new ExternalUDFLibrarian();
testExecutor.setLibrarian(librarian);
if (repeat != 1) {
System.out.println("FYI: each test will be run " + repeat + " times.");
@@ -86,7 +83,6 @@
// Check whether there are leaked threads.
checkThreadLeaks();
} finally {
- ExternalUDFLibrarian.removeLibraryDir();
ExecutionTestUtil.tearDown(cleanupOnStop);
integrationUtil.removeTestStorageFiles();
if (!badTestCases.isEmpty()) {
@@ -126,9 +122,6 @@
if (repeat > 1) {
System.err.print("[" + i + "/" + repeat + "] ");
}
- if (librarian != null) {
- librarian.cleanup();
- }
testExecutor.executeTest(PATH_ACTUAL, tcCtx, null, false, ExecutionTestUtil.FailedGroup);
try {
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/type_validation/type_validation.0.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/type_validation/type_validation.0.ddl.sqlpp
new file mode 100644
index 0000000..76cc70d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/type_validation/type_validation.0.ddl.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+DROP DATAVERSE externallibtest if exists;
+CREATE DATAVERSE externallibtest;
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/ILibraryManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/ILibraryManager.java
index ec02692..c1598d9 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/ILibraryManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/ILibraryManager.java
@@ -19,6 +19,7 @@
package org.apache.asterix.common.library;
+import java.net.URLClassLoader;
import java.util.List;
import org.apache.hyracks.algebricks.common.utils.Pair;
@@ -29,12 +30,10 @@
/**
* Registers the library class loader with the external library manager.
* <code>dataverseName</code> and <code>libraryName</code> uniquely identifies a class loader.
- *
- * @param dataverseName
* @param libraryName
* @param classLoader
*/
- void registerLibraryClassLoader(String dataverseName, String libraryName, ClassLoader classLoader)
+ void registerLibraryClassLoader(String dataverseName, String libraryName, URLClassLoader classLoader)
throws HyracksDataException;
/**
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java
index d5aa5d1..93a959f 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java
@@ -36,6 +36,7 @@
public static final String ACTIVE_STATS = "/admin/active/*";
public static final String STORAGE = "/admin/storage/*";
public static final String NET_DIAGNOSTICS = "/admin/net/*";
+ public static final String UDF = "/admin/udf/*";
private Servlets() {
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java
index b9ce2bd..60c8bfd 100755
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.external.library;
+import java.io.IOException;
+import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -28,14 +30,17 @@
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.common.library.ILibraryManager;
import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
public class ExternalLibraryManager implements ILibraryManager {
- private final Map<String, ClassLoader> libraryClassLoaders = new HashMap<>();
+ private final Map<String, URLClassLoader> libraryClassLoaders = new HashMap<>();
private final Map<String, List<String>> externalFunctionParameters = new HashMap<>();
+ private static final Logger LOGGER = LogManager.getLogger();
@Override
- public void registerLibraryClassLoader(String dataverseName, String libraryName, ClassLoader classLoader)
+ public void registerLibraryClassLoader(String dataverseName, String libraryName, URLClassLoader classLoader)
throws RuntimeDataException {
String key = getKey(dataverseName, libraryName);
synchronized (libraryClassLoaders) {
@@ -59,7 +64,13 @@
public void deregisterLibraryClassLoader(String dataverseName, String libraryName) {
String key = getKey(dataverseName, libraryName);
synchronized (libraryClassLoaders) {
- if (libraryClassLoaders.get(key) != null) {
+ URLClassLoader cl = libraryClassLoaders.get(key);
+ if (cl != null) {
+ try {
+ cl.close();
+ } catch (IOException e) {
+ LOGGER.error("Unable to close UDF classloader!", e);
+ }
libraryClassLoaders.remove(key);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IServerContext.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IServerContext.java
new file mode 100644
index 0000000..b997a6e
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IServerContext.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.application;
+
+import java.io.File;
+
+public interface IServerContext {
+ enum ServerType {
+ CLUSTER_CONTROLLER,
+ NODE_CONTROLLER,
+ }
+
+ ServerType getServerType();
+
+ File getBaseDir();
+
+ File getAppDir();
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IServiceContext.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IServiceContext.java
index 6effee3..0a99c45 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IServiceContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IServiceContext.java
@@ -69,4 +69,6 @@
default IPersistedResourceRegistry getPersistedResourceRegistry() {
throw new UnsupportedOperationException();
}
+
+ IServerContext getServerCtx();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
index 0ee3658..e813141 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksClientInterfaceFunctions.java
@@ -386,10 +386,12 @@
private static final long serialVersionUID = 1L;
private final List<URL> binaryURLs;
private final DeploymentId deploymentId;
+ private final boolean extractFromArchive;
- public CliDeployBinaryFunction(List<URL> binaryURLs, DeploymentId deploymentId) {
+ public CliDeployBinaryFunction(List<URL> binaryURLs, DeploymentId deploymentId, boolean isExtractFromArchive) {
this.binaryURLs = binaryURLs;
this.deploymentId = deploymentId;
+ this.extractFromArchive = isExtractFromArchive;
}
@Override
@@ -404,6 +406,10 @@
public DeploymentId getDeploymentId() {
return deploymentId;
}
+
+ public boolean isExtractFromArchive() {
+ return extractFromArchive;
+ }
}
public static class CliUnDeployBinaryFunction extends Function {
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
index 89f2ad4..f295119 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientConnection.java
@@ -159,12 +159,22 @@
void waitForCompletion(JobId jobId) throws Exception;
/**
- * Deploy the user-defined jars to the cluster
+ * Deploy files to the cluster
*
- * @param jars
- * a list of user-defined jars
+ * @param files
+ * a list of file paths
*/
- DeploymentId deployBinary(List<String> jars) throws Exception;
+ DeploymentId deployBinary(List<String> files) throws Exception;
+
+ /**
+ * Deploy files to the cluster
+ *
+ * @param files
+ * a list of file paths
+ * @param deploymentId
+ * the id used to uniquely identify this set of files for management
+ */
+ void deployBinary(DeploymentId deploymentId, List<String> files, boolean extractFromArchive) throws Exception;
/**
* undeploy a certain deployment
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
index 4cc47d2..053276f 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/IHyracksClientInterface.java
@@ -57,7 +57,8 @@
public ClusterTopology getClusterTopology() throws Exception;
- public void deployBinary(List<URL> binaryURLs, DeploymentId deploymentId) throws Exception;
+ public void deployBinary(List<URL> binaryURLs, DeploymentId deploymentId, boolean extractFromArchive)
+ throws Exception;
public void unDeployBinary(DeploymentId deploymentId) throws Exception;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
index 67e9599..a78c269 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClientInterfaceIPCI.java
@@ -165,7 +165,7 @@
HyracksClientInterfaceFunctions.CliDeployBinaryFunction dbf =
(HyracksClientInterfaceFunctions.CliDeployBinaryFunction) fn;
ccs.getWorkQueue().schedule(new CliDeployBinaryWork(ccs, dbf.getBinaryURLs(), dbf.getDeploymentId(),
- new IPCResponder<>(handle, mid)));
+ dbf.isExtractFromArchive(), new IPCResponder<>(handle, mid)));
break;
case CLI_UNDEPLOY_BINARY:
HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction udbf =
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CliDeployBinaryWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CliDeployBinaryWork.java
index 4962607..53f9360 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CliDeployBinaryWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CliDeployBinaryWork.java
@@ -47,13 +47,15 @@
private List<URL> binaryURLs;
private DeploymentId deploymentId;
private IPCResponder<DeploymentId> callback;
+ private boolean extractFromArchive;
public CliDeployBinaryWork(ClusterControllerService ncs, List<URL> binaryURLs, DeploymentId deploymentId,
- IPCResponder<DeploymentId> callback) {
+ boolean extractFromArchive, IPCResponder<DeploymentId> callback) {
this.ccs = ncs;
this.binaryURLs = binaryURLs;
this.deploymentId = deploymentId;
this.callback = callback;
+ this.extractFromArchive = extractFromArchive;
}
@Override
@@ -66,7 +68,7 @@
* Deploy for the cluster controller
*/
DeploymentUtils.deploy(deploymentId, binaryURLs, ccs.getContext().getJobSerializerDeserializerContainer(),
- ccs.getServerContext(), false);
+ ccs.getServerContext(), false, extractFromArchive);
/**
* Deploy for the node controllers
@@ -82,7 +84,7 @@
* deploy binaries to each node controller
*/
for (NodeControllerState ncs : nodeManager.getAllNodeControllerStates()) {
- ncs.getNodeController().deployBinary(deploymentId, binaryURLs);
+ ncs.getNodeController().deployBinary(deploymentId, binaryURLs, extractFromArchive);
}
ccs.getExecutor().execute(new Runnable() {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ServiceContext.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ServiceContext.java
index 99fc76b..b6d32a9 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ServiceContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ServiceContext.java
@@ -21,16 +21,16 @@
import java.io.Serializable;
import java.util.concurrent.ThreadFactory;
+import org.apache.hyracks.api.application.IServerContext;
import org.apache.hyracks.api.application.IServiceContext;
import org.apache.hyracks.api.config.IApplicationConfig;
import org.apache.hyracks.api.io.IPersistedResourceRegistry;
import org.apache.hyracks.api.job.IJobSerializerDeserializerContainer;
import org.apache.hyracks.api.job.JobSerializerDeserializerContainer;
import org.apache.hyracks.api.messages.IMessageBroker;
-import org.apache.hyracks.control.common.context.ServerContext;
public abstract class ServiceContext implements IServiceContext {
- protected final ServerContext serverCtx;
+ protected final IServerContext serverCtx;
protected final IApplicationConfig appConfig;
protected ThreadFactory threadFactory;
protected Serializable distributedState;
@@ -38,7 +38,7 @@
protected IJobSerializerDeserializerContainer jobSerDeContainer = new JobSerializerDeserializerContainer();
protected IPersistedResourceRegistry persistedResourceRegistry;
- public ServiceContext(ServerContext serverCtx, IApplicationConfig appConfig, ThreadFactory threadFactory) {
+ public ServiceContext(IServerContext serverCtx, IApplicationConfig appConfig, ThreadFactory threadFactory) {
this.serverCtx = serverCtx;
this.appConfig = appConfig;
this.threadFactory = threadFactory;
@@ -88,4 +88,9 @@
public IPersistedResourceRegistry getPersistedResourceRegistry() {
return persistedResourceRegistry;
}
+
+ @Override
+ public IServerContext getServerCtx() {
+ return serverCtx;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
index 42a0d66..7f172b6 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java
@@ -51,7 +51,7 @@
void reportPartitionAvailability(PartitionId pid, NetworkAddress networkAddress) throws Exception;
- void deployBinary(DeploymentId deploymentId, List<URL> url) throws Exception;
+ void deployBinary(DeploymentId deploymentId, List<URL> url, boolean extractFromArchive) throws Exception;
void undeployBinary(DeploymentId deploymentId) throws Exception;
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/context/ServerContext.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/context/ServerContext.java
index ff037f0..ef2777d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/context/ServerContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/context/ServerContext.java
@@ -20,18 +20,18 @@
import java.io.File;
-public class ServerContext {
- public enum ServerType {
- CLUSTER_CONTROLLER,
- NODE_CONTROLLER,
- }
+import org.apache.hyracks.api.application.IServerContext;
+
+public class ServerContext implements IServerContext {
private final ServerType type;
private final File baseDir;
+ private final File appDir;
public ServerContext(ServerType type, File baseDir) {
this.type = type;
this.baseDir = baseDir;
+ this.appDir = new File(baseDir, "applications");
}
public ServerType getServerType() {
@@ -41,4 +41,8 @@
public File getBaseDir() {
return baseDir;
}
+
+ public File getAppDir() {
+ return appDir;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/deployment/ClassLoaderJobSerializerDeserializer.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/deployment/ClassLoaderJobSerializerDeserializer.java
index 31e6e39..f0d7828 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/deployment/ClassLoaderJobSerializerDeserializer.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/deployment/ClassLoaderJobSerializerDeserializer.java
@@ -73,7 +73,7 @@
});
try {
if (classLoader == null) {
- /** crate a new classloader */
+ /** create a new classloader */
URL[] urls = binaryURLs.toArray(new URL[binaryURLs.size()]);
classLoader = new MutableURLClassLoader(urls, this.getClass().getClassLoader());
} else {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/deployment/DeploymentUtils.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/deployment/DeploymentUtils.java
index a079d97..b900591 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/deployment/DeploymentUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/deployment/DeploymentUtils.java
@@ -26,7 +26,10 @@
import java.io.OutputStream;
import java.net.URL;
import java.util.ArrayList;
+import java.util.Enumeration;
import java.util.List;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipFile;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
@@ -41,6 +44,8 @@
import org.apache.hyracks.api.job.IJobSerializerDeserializerContainer;
import org.apache.hyracks.api.util.JavaSerializationUtils;
import org.apache.hyracks.control.common.context.ServerContext;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
/**
* A utility class which is in charge of the actual work of deployments.
@@ -50,6 +55,7 @@
public class DeploymentUtils {
private static final String DEPLOYMENT = "applications";
+ private static final Logger LOGGER = LogManager.getLogger();
/**
* undeploy an existing deployment
@@ -86,13 +92,12 @@
* @param container
* the container of serailizer/deserializer
* @param ctx
- * the ServerContext
- * @param isNC
+ * the ServerContext * @param isNC
* true is NC/false is CC
* @throws HyracksException
*/
public static void deploy(DeploymentId deploymentId, List<URL> urls, IJobSerializerDeserializerContainer container,
- ServerContext ctx, boolean isNC) throws HyracksException {
+ ServerContext ctx, boolean isNC, boolean extractFromArchive) throws HyracksException {
IJobSerializerDeserializer jobSerDe = container.getJobSerializerDeserializer(deploymentId);
if (jobSerDe == null) {
jobSerDe = new ClassLoaderJobSerializerDeserializer();
@@ -101,7 +106,11 @@
String rootDir = ctx.getBaseDir().toString();
String deploymentDir = rootDir.endsWith(File.separator) ? rootDir + DEPLOYMENT + File.separator + deploymentId
: rootDir + File.separator + DEPLOYMENT + File.separator + deploymentId;
- jobSerDe.addClassPathURLs(downloadURLs(urls, deploymentDir, isNC));
+ if (extractFromArchive) {
+ downloadURLs(urls, deploymentDir, isNC, true);
+ } else {
+ jobSerDe.addClassPathURLs(downloadURLs(urls, deploymentDir, isNC, false));
+ }
}
/**
@@ -176,7 +185,8 @@
* @return a list of local file URLs
* @throws HyracksException
*/
- private static List<URL> downloadURLs(List<URL> urls, String deploymentDir, boolean isNC) throws HyracksException {
+ private static List<URL> downloadURLs(List<URL> urls, String deploymentDir, boolean isNC,
+ boolean extractFromArchive) throws HyracksException {
//retry 10 times at maximum for downloading binaries
int retryCount = 10;
int tried = 0;
@@ -208,14 +218,44 @@
is.close();
}
}
+ if (extractFromArchive) {
+ unzip(targetFile.getAbsolutePath(), deploymentDir);
+ }
downloadedFileURLs.add(targetFile.toURI().toURL());
}
return downloadedFileURLs;
} catch (Exception e) {
- e.printStackTrace();
+ LOGGER.error("Unable to fetch binaries from URL", e);
trace = e;
}
}
throw HyracksException.create(trace);
}
+
+ public static void unzip(String sourceFile, String outputDir) throws IOException {
+ try (ZipFile zipFile = new ZipFile(sourceFile)) {
+ Enumeration<? extends ZipEntry> entries = zipFile.entries();
+ List<File> createdFiles = new ArrayList<>();
+ while (entries.hasMoreElements()) {
+ ZipEntry entry = entries.nextElement();
+ File entryDestination = new File(outputDir, entry.getName());
+ if (!entry.isDirectory()) {
+ entryDestination.getParentFile().mkdirs();
+ try (InputStream in = zipFile.getInputStream(entry);
+ OutputStream out = new FileOutputStream(entryDestination)) {
+ createdFiles.add(entryDestination);
+ IOUtils.copy(in, out);
+ } catch (IOException e) {
+ for (File f : createdFiles) {
+ if (!f.delete()) {
+ LOGGER.error("Couldn't clean up file after failed archive extraction: "
+ + f.getAbsolutePath());
+ }
+ }
+ throw e;
+ }
+ }
+ }
+ }
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
index 2ba4768..43b9003 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
@@ -1176,11 +1176,14 @@
private final List<URL> binaryURLs;
private final DeploymentId deploymentId;
+ private final boolean extractFromArchive;
- public DeployBinaryFunction(DeploymentId deploymentId, List<URL> binaryURLs, CcId ccId) {
+ public DeployBinaryFunction(DeploymentId deploymentId, List<URL> binaryURLs, CcId ccId,
+ boolean isExtractFromArchive) {
super(ccId);
this.binaryURLs = binaryURLs;
this.deploymentId = deploymentId;
+ this.extractFromArchive = isExtractFromArchive;
}
@Override
@@ -1195,6 +1198,10 @@
public DeploymentId getDeploymentId() {
return deploymentId;
}
+
+ public boolean isExtractFromArchive() {
+ return extractFromArchive;
+ }
}
public static class UnDeployBinaryFunction extends CCIdentifiedFunction {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
index d32ee32..ea71bc9 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java
@@ -92,8 +92,9 @@
}
@Override
- public void deployBinary(DeploymentId deploymentId, List<URL> binaryURLs) throws Exception {
- DeployBinaryFunction rpaf = new DeployBinaryFunction(deploymentId, binaryURLs, ccId);
+ public void deployBinary(DeploymentId deploymentId, List<URL> binaryURLs, boolean extractFromArchive)
+ throws Exception {
+ DeployBinaryFunction rpaf = new DeployBinaryFunction(deploymentId, binaryURLs, ccId, extractFromArchive);
ipcHandle.send(-1, rpaf, null);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
index 3bc9710..836c624 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java
@@ -100,8 +100,8 @@
case DEPLOY_BINARY:
CCNCFunctions.DeployBinaryFunction dbf = (CCNCFunctions.DeployBinaryFunction) fn;
- ncs.getWorkQueue()
- .schedule(new DeployBinaryWork(ncs, dbf.getDeploymentId(), dbf.getBinaryURLs(), dbf.getCcId()));
+ ncs.getWorkQueue().schedule(new DeployBinaryWork(ncs, dbf.getDeploymentId(), dbf.getBinaryURLs(),
+ dbf.getCcId(), dbf.isExtractFromArchive()));
return;
case UNDEPLOY_BINARY:
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 517169b..bf07c20 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -222,7 +222,7 @@
deployedJobSpecActivityClusterGraphMap = new Hashtable<>();
timer = new Timer(true);
serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER,
- new File(new File(NodeControllerService.class.getName()), id));
+ new File(ioManager.getWorkspacePath(0), id));
getNodeControllerInfosAcceptor = new MutableObject<>();
memoryManager =
new MemoryManager((long) (memoryMXBean.getHeapMemoryUsage().getMax() * MEMORY_FUDGE_FACTOR));
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
index 14404d2..c335312 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
@@ -346,6 +346,10 @@
return dev.createFileRef(waPath + File.separator + waf.getName());
}
+ public String getWorkspacePath(int index) {
+ return workspaces.get(index) != null ? workspaces.get(index).getWorkspace() : null;
+ }
+
@Override
public void sync(IFileHandle fileHandle, boolean metadata) throws HyracksDataException {
try {
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DeployBinaryWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DeployBinaryWork.java
index dfda463..586b539 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DeployBinaryWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/DeployBinaryWork.java
@@ -42,12 +42,15 @@
private NodeControllerService ncs;
private List<URL> binaryURLs;
private final CcId ccId;
+ private final boolean extractFromArchive;
- public DeployBinaryWork(NodeControllerService ncs, DeploymentId deploymentId, List<URL> binaryURLs, CcId ccId) {
+ public DeployBinaryWork(NodeControllerService ncs, DeploymentId deploymentId, List<URL> binaryURLs, CcId ccId,
+ boolean extractFromArchive) {
this.deploymentId = deploymentId;
this.ncs = ncs;
this.binaryURLs = binaryURLs;
this.ccId = ccId;
+ this.extractFromArchive = extractFromArchive;
}
@Override
@@ -55,7 +58,7 @@
DeploymentStatus status;
try {
DeploymentUtils.deploy(deploymentId, binaryURLs, ncs.getContext().getJobSerializerDeserializerContainer(),
- ncs.getServerContext(), true);
+ ncs.getServerContext(), true, extractFromArchive);
status = DeploymentStatus.SUCCEED;
} catch (Exception e) {
status = DeploymentStatus.FAIL;
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksClientInterfaceRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksClientInterfaceRemoteProxy.java
index 82d7a3b..840fa24 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksClientInterfaceRemoteProxy.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksClientInterfaceRemoteProxy.java
@@ -144,9 +144,11 @@
}
@Override
- public void deployBinary(List<URL> binaryURLs, DeploymentId deploymentId) throws Exception {
+ public void deployBinary(List<URL> binaryURLs, DeploymentId deploymentId, boolean extractFromArchive)
+ throws Exception {
HyracksClientInterfaceFunctions.CliDeployBinaryFunction dbf =
- new HyracksClientInterfaceFunctions.CliDeployBinaryFunction(binaryURLs, deploymentId);
+ new HyracksClientInterfaceFunctions.CliDeployBinaryFunction(binaryURLs, deploymentId,
+ extractFromArchive);
rpci.call(ipcHandle, dbf);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java
index 29d12ce..fa6f2f0 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/HyracksConnection.java
@@ -214,21 +214,28 @@
}
@Override
- public DeploymentId deployBinary(List<String> jars) throws Exception {
+ public DeploymentId deployBinary(List<String> files) throws Exception {
/** generate a deployment id */
DeploymentId deploymentId = new DeploymentId(UUID.randomUUID().toString());
+ deployBinary(deploymentId, files, false);
+ return deploymentId;
+ }
+
+ @Override
+ public void deployBinary(DeploymentId deploymentId, List<String> files, boolean extractFromArchive)
+ throws Exception {
List<URL> binaryURLs = new ArrayList<>();
- if (jars != null && !jars.isEmpty()) {
+ if (files != null && !files.isEmpty()) {
CloseableHttpClient hc = new DefaultHttpClient();
try {
- /** upload jars through a http client one-by-one to the CC server */
- for (String jar : jars) {
- int slashIndex = jar.lastIndexOf('/');
- String fileName = jar.substring(slashIndex + 1);
+ /** upload files through a http client one-by-one to the CC server */
+ for (String file : files) {
+ int slashIndex = file.lastIndexOf('/');
+ String fileName = file.substring(slashIndex + 1);
String url = "http://" + ccHost + ":" + ccInfo.getWebPort() + "/applications/"
+ deploymentId.toString() + "&" + fileName;
HttpPut put = new HttpPut(url);
- put.setEntity(new FileEntity(new File(jar), "application/octet-stream"));
+ put.setEntity(new FileEntity(new File(file), "application/octet-stream"));
HttpResponse response = hc.execute(put);
response.getEntity().consumeContent();
if (response.getStatusLine().getStatusCode() != 200) {
@@ -243,8 +250,7 @@
}
}
/** deploy the URLs to the CC and NCs */
- hci.deployBinary(binaryURLs, deploymentId);
- return deploymentId;
+ hci.deployBinary(binaryURLs, deploymentId, extractFromArchive);
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestNCServiceContext.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestNCServiceContext.java
index 4417795..63126ab 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestNCServiceContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestNCServiceContext.java
@@ -22,6 +22,7 @@
import java.util.concurrent.ThreadFactory;
import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.application.IServerContext;
import org.apache.hyracks.api.application.IStateDumpHandler;
import org.apache.hyracks.api.comm.IChannelInterfaceFactory;
import org.apache.hyracks.api.config.IApplicationConfig;
@@ -152,4 +153,9 @@
public Object getApplicationContext() {
return appCtx;
}
+
+ @Override
+ public IServerContext getServerCtx() {
+ return null;
+ }
}