Merge "Merge branch 'gerrit/mad-hatter'" into cheshire-cat
diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml
index e309132..1f22924 100644
--- a/asterixdb/asterix-app/pom.xml
+++ b/asterixdb/asterix-app/pom.xml
@@ -170,7 +170,7 @@
<executions>
<execution>
<id>venv</id>
- <phase>${pyro-shim.stage}</phase>
+ <phase>${prepare-env.stage}</phase>
<goals>
<goal>exec</goal>
</goals>
@@ -187,7 +187,7 @@
</execution>
<execution>
<id>shiv-install</id>
- <phase>${pyro-shim.stage}</phase>
+ <phase>${prepare-env.stage}</phase>
<goals>
<goal>exec</goal>
</goals>
@@ -209,7 +209,7 @@
</execution>
<execution>
<id>shiv-msgpack-shim</id>
- <phase>${pyro-shim.stage}</phase>
+ <phase>${shim.stage}</phase>
<goals>
<goal>exec</goal>
</goals>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractNCUdfServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractNCUdfServlet.java
new file mode 100644
index 0000000..ced9b40
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractNCUdfServlet.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.api.http.server;
+
+import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR;
+import static org.apache.asterix.common.functions.ExternalFunctionLanguage.JAVA;
+import static org.apache.asterix.common.functions.ExternalFunctionLanguage.PYTHON;
+
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.OpenOption;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.asterix.common.api.IApplicationContext;
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.common.functions.ExternalFunctionLanguage;
+import org.apache.asterix.common.library.LibraryDescriptor;
+import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.exceptions.IFormattedException;
+import org.apache.hyracks.control.common.work.SynchronizableWork;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.http.api.IServletRequest;
+import org.apache.hyracks.http.api.IServletResponse;
+import org.apache.hyracks.http.server.AbstractServlet;
+import org.apache.hyracks.http.server.utils.HttpUtil;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpScheme;
+
+public abstract class AbstractNCUdfServlet extends AbstractServlet {
+
+ INcApplicationContext appCtx;
+ INCServiceContext srvCtx;
+
+ protected final IApplicationContext plainAppCtx;
+ private final HttpScheme httpServerProtocol;
+ private final int httpServerPort;
+
+ public AbstractNCUdfServlet(ConcurrentMap<String, Object> ctx, String[] paths, IApplicationContext appCtx,
+ HttpScheme httpServerProtocol, int httpServerPort) {
+
+ super(ctx, paths);
+ this.plainAppCtx = appCtx;
+ this.httpServerProtocol = httpServerProtocol;
+ this.httpServerPort = httpServerPort;
+ }
+
+ void readFromFile(Path filePath, IServletResponse response, String contentType, OpenOption opt) throws Exception {
+ class InputStreamGetter extends SynchronizableWork {
+ private InputStream is;
+
+ @Override
+ protected void doRun() throws Exception {
+ if (opt != null) {
+ is = Files.newInputStream(filePath, opt);
+ } else {
+ is = Files.newInputStream(filePath);
+ }
+ }
+ }
+
+ InputStreamGetter r = new InputStreamGetter();
+ ((NodeControllerService) srvCtx.getControllerService()).getWorkQueue().scheduleAndSync(r);
+
+ if (r.is == null) {
+ response.setStatus(HttpResponseStatus.NOT_FOUND);
+ return;
+ }
+ try {
+ response.setStatus(HttpResponseStatus.OK);
+ HttpUtil.setContentType(response, contentType);
+ IOUtils.copyLarge(r.is, response.outputStream());
+ } finally {
+ r.is.close();
+ }
+ }
+
+ URI createDownloadURI(Path file) throws Exception {
+ String path = paths[0].substring(0, trims[0]) + '/' + file.getFileName();
+ String host = getHyracksClientConnection().getHost();
+ return new URI(httpServerProtocol.toString(), null, host, httpServerPort, path, null, null);
+ }
+
+ IHyracksClientConnection getHyracksClientConnection() throws Exception { // NOSONAR
+ IHyracksClientConnection hcc = (IHyracksClientConnection) ctx.get(HYRACKS_CONNECTION_ATTR);
+ if (hcc == null) {
+ throw new RuntimeDataException(ErrorCode.PROPERTY_NOT_SET, HYRACKS_CONNECTION_ATTR);
+ }
+ return hcc;
+ }
+
+ Pair<DataverseName, String> parseLibraryName(IServletRequest request) throws IllegalArgumentException {
+ String[] path = StringUtils.split(localPath(request), '/');
+ int ln = path.length;
+ if (ln < 2) {
+ return null;
+ }
+ String libraryName = path[ln - 1];
+ DataverseName dataverseName = DataverseName.create(Arrays.asList(path), 0, ln - 1);
+ return new Pair<>(dataverseName, libraryName);
+ }
+
+ static ExternalFunctionLanguage getLanguageByFileExtension(String fileExtension) {
+ switch (fileExtension) {
+ case LibraryDescriptor.FILE_EXT_ZIP:
+ return JAVA;
+ case LibraryDescriptor.FILE_EXT_PYZ:
+ return PYTHON;
+ default:
+ return null;
+ }
+ }
+
+ HttpResponseStatus toHttpErrorStatus(Exception e) {
+ if (e instanceof IFormattedException) {
+ IFormattedException fe = (IFormattedException) e;
+ if (ErrorCode.ASTERIX.equals(fe.getComponent())) {
+ switch (fe.getErrorCode()) {
+ case ErrorCode.UNKNOWN_DATAVERSE:
+ case ErrorCode.UNKNOWN_LIBRARY:
+ return HttpResponseStatus.NOT_FOUND;
+ }
+ }
+ }
+ return HttpResponseStatus.INTERNAL_SERVER_ERROR;
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/BasicAuthServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/BasicAuthServlet.java
index f25d223..e062cdc 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/BasicAuthServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/BasicAuthServlet.java
@@ -28,6 +28,7 @@
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.http.api.IServlet;
import org.apache.hyracks.http.api.IServletRequest;
import org.apache.hyracks.http.api.IServletResponse;
@@ -47,24 +48,31 @@
private Base64.Decoder b64Decoder;
Map<String, String> storedCredentials;
Map<String, String> ephemeralCredentials;
- private String sysAuthHeader;
private final IServlet delegate;
private ConcurrentMap<String, Object> ctx;
- public BasicAuthServlet(ConcurrentMap<String, Object> ctx, IServlet delegate) {
+ public BasicAuthServlet(ConcurrentMap<String, Object> ctx, IServlet delegate, Map<String, String> storedCredentials,
+ Map<String, String> ephemeralCredentials) {
+ this.ctx = ctx;
this.delegate = delegate;
b64Decoder = Base64.getDecoder();
- storedCredentials = (Map<String, String>) ctx.get(CREDENTIAL_MAP);
- this.ctx = ctx;
+ this.storedCredentials = storedCredentials;
+ this.ephemeralCredentials = ephemeralCredentials;
+ }
+
+ public static Pair<Map<String, String>, Map<String, String>> generateSysAuthHeader(
+ ConcurrentMap<String, Object> ctx) {
+ Map<String, String> storedCredentials = (Map<String, String>) ctx.get(CREDENTIAL_MAP);
// generate internal user
String sysUser;
do {
sysUser = generateRandomString(32);
} while (storedCredentials.containsKey(sysUser));
String sysPassword = generateRandomString(128);
- ephemeralCredentials = Collections.singletonMap(sysUser, hashPassword(sysPassword));
- sysAuthHeader = createAuthHeader(sysUser, sysPassword);
+ Map<String, String> ephemeralCredentials = Collections.singletonMap(sysUser, hashPassword(sysPassword));
+ String sysAuthHeader = createAuthHeader(sysUser, sysPassword);
ctx.put(SYS_AUTH_HEADER, sysAuthHeader);
+ return new Pair<>(storedCredentials, ephemeralCredentials);
}
@Override
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfApiServlet.java
index 7af7fd4..fc08f49 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfApiServlet.java
@@ -18,19 +18,19 @@
*/
package org.apache.asterix.api.http.server;
-import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR;
import static org.apache.asterix.api.http.server.ServletConstants.SYS_AUTH_HEADER;
-import static org.apache.asterix.common.functions.ExternalFunctionLanguage.JAVA;
-import static org.apache.asterix.common.functions.ExternalFunctionLanguage.PYTHON;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
-import java.util.Arrays;
+import java.security.DigestOutputStream;
+import java.security.MessageDigest;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
@@ -43,33 +43,25 @@
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.api.IReceptionist;
import org.apache.asterix.common.api.IRequestReference;
-import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.common.functions.ExternalFunctionLanguage;
-import org.apache.asterix.common.library.LibraryDescriptor;
import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
import org.apache.asterix.common.messaging.api.INCMessageBroker;
import org.apache.asterix.common.messaging.api.MessageFuture;
import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.compiler.provider.ILangCompilationProvider;
+import org.apache.asterix.external.util.ExternalLibraryUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.StringUtils;
import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.api.application.INCServiceContext;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.exceptions.IFormattedException;
-import org.apache.hyracks.control.common.work.SynchronizableWork;
-import org.apache.hyracks.control.nc.NodeControllerService;
import org.apache.hyracks.http.api.IServletRequest;
import org.apache.hyracks.http.api.IServletResponse;
-import org.apache.hyracks.http.server.AbstractServlet;
import org.apache.hyracks.http.server.utils.HttpUtil;
import org.apache.hyracks.util.file.FileUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import io.netty.buffer.ByteBufInputStream;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpScheme;
@@ -77,14 +69,7 @@
import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder;
import io.netty.handler.codec.http.multipart.InterfaceHttpData;
-public class NCUdfApiServlet extends AbstractServlet {
-
- INcApplicationContext appCtx;
- INCServiceContext srvCtx;
-
- protected final IApplicationContext plainAppCtx;
- private final HttpScheme httpServerProtocol;
- private final int httpServerPort;
+public class NCUdfApiServlet extends AbstractNCUdfServlet {
protected final ILangCompilationProvider compilationProvider;
protected final IReceptionist receptionist;
@@ -96,13 +81,9 @@
public NCUdfApiServlet(ConcurrentMap<String, Object> ctx, String[] paths, IApplicationContext appCtx,
ILangCompilationProvider compilationProvider, HttpScheme httpServerProtocol, int httpServerPort) {
-
- super(ctx, paths);
- this.plainAppCtx = appCtx;
+ super(ctx, paths, appCtx, httpServerProtocol, httpServerPort);
this.compilationProvider = compilationProvider;
this.receptionist = appCtx.getReceptionist();
- this.httpServerProtocol = httpServerProtocol;
- this.httpServerPort = httpServerPort;
}
@Override
@@ -137,12 +118,12 @@
}
private void doCreate(DataverseName dataverseName, String libraryName, ExternalFunctionLanguage language,
- URI downloadURI, boolean replaceIfExists, String sysAuthHeader, IRequestReference requestReference,
- IServletRequest request, IServletResponse response) throws Exception {
+ String hash, URI downloadURI, boolean replaceIfExists, String sysAuthHeader,
+ IRequestReference requestReference, IServletRequest request, IServletResponse response) throws Exception {
INCMessageBroker ncMb = (INCMessageBroker) srvCtx.getMessageBroker();
MessageFuture responseFuture = ncMb.registerMessageFuture();
CreateLibraryRequestMessage req = new CreateLibraryRequestMessage(srvCtx.getNodeId(),
- responseFuture.getFutureId(), dataverseName, libraryName, language, downloadURI, replaceIfExists,
+ responseFuture.getFutureId(), dataverseName, libraryName, language, hash, downloadURI, replaceIfExists,
sysAuthHeader, requestReference, additionalHttpHeadersFromRequest(request));
sendMessage(req, responseFuture, requestReference, request, response);
}
@@ -197,7 +178,7 @@
response.setStatus(HttpResponseStatus.BAD_REQUEST);
return;
}
- readFromFile(filePath, response);
+ readFromFile(filePath, response, HttpUtil.ContentType.APPLICATION_OCTET_STREAM, null);
}
@Override
@@ -209,6 +190,7 @@
return;
}
Path libraryTempFile = null;
+ FileOutputStream libTmpOut = null;
HttpPostRequestDecoder requestDecoder = new HttpPostRequestDecoder(httpRequest);
try {
if (!requestDecoder.hasNext() || requestDecoder.getBodyHttpDatas().size() != 1) {
@@ -234,9 +216,15 @@
LOGGER.debug("Created temporary file " + libraryTempFile + " for library " + libraryName.first + "."
+ libraryName.second);
}
- fileUpload.renameTo(libraryTempFile.toFile());
+ MessageDigest digest = MessageDigest.getInstance("MD5");
+ libTmpOut = new FileOutputStream(libraryTempFile.toFile());
+ OutputStream outStream = new DigestOutputStream(libTmpOut, digest);
+ InputStream uploadInput = new ByteBufInputStream(((FileUpload) httpData).getByteBuf());
+ IOUtils.copyLarge(uploadInput, outStream);
+ outStream.close();
URI downloadURI = createDownloadURI(libraryTempFile);
- doCreate(libraryName.first, libraryName.second, language, downloadURI, true, sysAuthHeader,
+ doCreate(libraryName.first, libraryName.second, language,
+ ExternalLibraryUtils.digestToHexString(digest), downloadURI, true, sysAuthHeader,
requestReference, request, response);
response.setStatus(HttpResponseStatus.OK);
} catch (Exception e) {
@@ -250,6 +238,9 @@
requestDecoder.destroy();
if (libraryTempFile != null) {
try {
+ if (libTmpOut != null) {
+ libTmpOut.close();
+ }
Files.deleteIfExists(libraryTempFile);
} catch (IOException e) {
LOGGER.warn("Could not delete temporary file " + libraryTempFile, e);
@@ -258,20 +249,6 @@
}
}
- private URI createDownloadURI(Path file) throws Exception {
- String path = paths[0].substring(0, trims[0]) + '/' + file.getFileName();
- String host = getHyracksClientConnection().getHost();
- return new URI(httpServerProtocol.toString(), null, host, httpServerPort, path, null, null);
- }
-
- private IHyracksClientConnection getHyracksClientConnection() throws Exception { // NOSONAR
- IHyracksClientConnection hcc = (IHyracksClientConnection) ctx.get(HYRACKS_CONNECTION_ATTR);
- if (hcc == null) {
- throw new RuntimeDataException(ErrorCode.PROPERTY_NOT_SET, HYRACKS_CONNECTION_ATTR);
- }
- return hcc;
- }
-
@Override
protected void delete(IServletRequest request, IServletResponse response) {
Pair<DataverseName, String> libraryName = parseLibraryName(request);
@@ -292,65 +269,4 @@
}
}
- private void readFromFile(Path filePath, IServletResponse response) throws Exception {
- class InputStreamGetter extends SynchronizableWork {
- private InputStream is;
-
- @Override
- protected void doRun() throws Exception {
- is = Files.newInputStream(filePath);
- }
- }
-
- InputStreamGetter r = new InputStreamGetter();
- ((NodeControllerService) srvCtx.getControllerService()).getWorkQueue().scheduleAndSync(r);
-
- if (r.is == null) {
- response.setStatus(HttpResponseStatus.NOT_FOUND);
- return;
- }
- try {
- response.setStatus(HttpResponseStatus.OK);
- HttpUtil.setContentType(response, "application/octet-stream");
- IOUtils.copyLarge(r.is, response.outputStream());
- } finally {
- r.is.close();
- }
- }
-
- private Pair<DataverseName, String> parseLibraryName(IServletRequest request) throws IllegalArgumentException {
- String[] path = StringUtils.split(localPath(request), '/');
- int ln = path.length;
- if (ln < 2) {
- return null;
- }
- String libraryName = path[ln - 1];
- DataverseName dataverseName = DataverseName.create(Arrays.asList(path), 0, ln - 1);
- return new Pair<>(dataverseName, libraryName);
- }
-
- private static ExternalFunctionLanguage getLanguageByFileExtension(String fileExtension) {
- switch (fileExtension) {
- case LibraryDescriptor.FILE_EXT_ZIP:
- return JAVA;
- case LibraryDescriptor.FILE_EXT_PYZ:
- return PYTHON;
- default:
- return null;
- }
- }
-
- private HttpResponseStatus toHttpErrorStatus(Exception e) {
- if (e instanceof IFormattedException) {
- IFormattedException fe = (IFormattedException) e;
- if (ErrorCode.ASTERIX.equals(fe.getComponent())) {
- switch (fe.getErrorCode()) {
- case ErrorCode.UNKNOWN_DATAVERSE:
- case ErrorCode.UNKNOWN_LIBRARY:
- return HttpResponseStatus.NOT_FOUND;
- }
- }
- }
- return HttpResponseStatus.INTERNAL_SERVER_ERROR;
- }
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfRecoveryServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfRecoveryServlet.java
new file mode 100644
index 0000000..2c29d14
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCUdfRecoveryServlet.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.api.http.server;
+
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.asterix.common.api.IApplicationContext;
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.external.library.ExternalLibraryManager;
+import org.apache.hyracks.http.api.IServletRequest;
+import org.apache.hyracks.http.api.IServletResponse;
+import org.apache.hyracks.http.server.utils.HttpUtil;
+
+import io.netty.handler.codec.http.HttpScheme;
+
+public class NCUdfRecoveryServlet extends AbstractNCUdfServlet {
+
+ ExternalLibraryManager libraryManager;
+
+ public static final String GET_ALL_UDF_ENDPOINT = "/all";
+
+ public NCUdfRecoveryServlet(ConcurrentMap<String, Object> ctx, String[] paths, IApplicationContext appCtx,
+ HttpScheme httpServerProtocol, int httpServerPort) {
+ super(ctx, paths, appCtx, httpServerProtocol, httpServerPort);
+ }
+
+ @Override
+ public void init() {
+ appCtx = (INcApplicationContext) plainAppCtx;
+ srvCtx = this.appCtx.getServiceContext();
+ this.libraryManager = (ExternalLibraryManager) appCtx.getLibraryManager();
+ }
+
+ @Override
+ protected void get(IServletRequest request, IServletResponse response) throws Exception {
+ String localPath = localPath(request);
+ if (localPath.equals(GET_ALL_UDF_ENDPOINT)) {
+ Path zippedLibs = libraryManager.zipAllLibs();
+ readFromFile(zippedLibs, response, HttpUtil.ContentType.APPLICATION_ZIP,
+ StandardOpenOption.DELETE_ON_CLOSE);
+ }
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryJobUtils.java
similarity index 98%
rename from asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtil.java
rename to asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryJobUtils.java
index dd1b736..6c6691b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryJobUtils.java
@@ -45,9 +45,9 @@
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
-public class ExternalLibraryUtil {
+public class ExternalLibraryJobUtils {
- private ExternalLibraryUtil() {
+ private ExternalLibraryJobUtils() {
}
public static Triple<JobSpecification, JobSpecification, JobSpecification> buildCreateLibraryJobSpec(
@@ -137,4 +137,5 @@
}
return splits.toArray(new FileSplit[0]);
}
-}
\ No newline at end of file
+
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CreateLibraryRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CreateLibraryRequestMessage.java
index 818a098..f23b6e4 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CreateLibraryRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CreateLibraryRequestMessage.java
@@ -32,24 +32,26 @@
final DataverseName dataverseName;
final String libraryName;
final ExternalFunctionLanguage lang;
+ final String hash;
final URI location;
final boolean replaceIfExists;
final String authToken;
private static final long serialVersionUID = 1L;
public CreateLibraryRequestMessage(String nodeRequestId, long requestMessageId, DataverseName dataverseName,
- String libraryName, ExternalFunctionLanguage lang, URI location, boolean replaceIfExists, String authToken,
- IRequestReference requestReference, Map<String, String> additionalParams) {
+ String libraryName, ExternalFunctionLanguage lang, String hash, URI location, boolean replaceIfExists,
+ String authToken, IRequestReference requestReference, Map<String, String> additionalParams) {
super(nodeRequestId, requestMessageId, requestReference, additionalParams);
this.dataverseName = dataverseName;
this.libraryName = libraryName;
this.lang = lang;
+ this.hash = hash;
this.location = location;
this.replaceIfExists = replaceIfExists;
this.authToken = authToken;
}
protected Statement produceStatement() {
- return new CreateLibraryStatement(dataverseName, libraryName, lang, location, replaceIfExists, authToken);
+ return new CreateLibraryStatement(dataverseName, libraryName, lang, hash, location, replaceIfExists, authToken);
}
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 8ac13be..e058d39 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -259,7 +259,7 @@
NodeControllerService ncs = (NodeControllerService) getServiceContext().getControllerService();
FileReference appDir =
ioManager.resolveAbsolutePath(getServiceContext().getServerCtx().getAppDir().getAbsolutePath());
- libraryManager = new ExternalLibraryManager(ncs, persistedResourceRegistry, appDir);
+ libraryManager = new ExternalLibraryManager(ncs, persistedResourceRegistry, appDir, ioManager);
libraryManager.initialize(resetStorageData);
/*
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/RetrieveLibrariesTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/RetrieveLibrariesTask.java
new file mode 100644
index 0000000..18d303c
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/RetrieveLibrariesTask.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.nc.task;
+
+import static org.apache.asterix.api.http.server.NCUdfRecoveryServlet.GET_ALL_UDF_ENDPOINT;
+import static org.apache.asterix.common.utils.Servlets.UDF_RECOVERY;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+
+import org.apache.asterix.common.api.INCLifecycleTask;
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.library.ILibraryManager;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.control.CcId;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.util.file.FileUtil;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class RetrieveLibrariesTask implements INCLifecycleTask {
+
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOGGER = LogManager.getLogger();
+ private final List<Pair<URI, String>> nodes;
+
+ public RetrieveLibrariesTask(List<Pair<URI, String>> nodes) {
+ this.nodes = nodes;
+ if (nodes.size() <= 0) {
+ throw new IllegalArgumentException("No nodes specified to retrieve from");
+ }
+ }
+
+ @Override
+ public void perform(CcId ccId, IControllerService cs) throws HyracksDataException {
+ INcApplicationContext appContext = (INcApplicationContext) cs.getApplicationContext();
+ boolean success = false;
+ for (Pair<URI, String> referenceNode : nodes) {
+ try {
+ LOGGER.info("Retrieving UDFs from " + referenceNode.getFirst().getHost());
+ retrieveLibrary(referenceNode.getFirst(), referenceNode.getSecond(), appContext);
+ success = true;
+ break;
+ } catch (HyracksDataException e) {
+ LOGGER.error("Unable to retrieve UDFs from: " + referenceNode.getFirst() + ", trying another node.", e);
+ }
+ }
+ if (!success) {
+ LOGGER.error("Unable to retrieve UDFs from any participant node");
+ throw HyracksDataException.create(ErrorCode.TIMEOUT);
+ }
+ }
+
+ private void retrieveLibrary(URI baseURI, String authToken, INcApplicationContext appContext)
+ throws HyracksDataException {
+ ILibraryManager libraryManager = appContext.getLibraryManager();
+ FileReference distributionDir = appContext.getLibraryManager().getDistributionDir();
+ URI libraryURI = getNCUdfRetrievalURL(baseURI);
+ try {
+ FileUtil.forceMkdirs(distributionDir.getFile());
+ Path targetFile = Files.createTempFile(Paths.get(distributionDir.getAbsolutePath()), "all_", ".zip");
+ FileReference targetFileRef = distributionDir.getChild(targetFile.getFileName().toString());
+ libraryManager.download(targetFileRef, authToken, libraryURI);
+ Path outputDirPath = libraryManager.getStorageDir().getFile().toPath().toAbsolutePath().normalize();
+ FileReference outPath = appContext.getIoManager().resolveAbsolutePath(outputDirPath.toString());
+ libraryManager.unzip(targetFileRef, outPath);
+ } catch (IOException e) {
+ LOGGER.error("Unable to retrieve UDFs from " + libraryURI.toString() + " before timeout");
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ public URI getNCUdfRetrievalURL(URI baseURL) {
+ String endpoint = UDF_RECOVERY.substring(0, UDF_RECOVERY.length() - 1) + GET_ALL_UDF_ENDPOINT;
+ URIBuilder builder = new URIBuilder(baseURL).setPath(endpoint);
+ try {
+ return builder.build();
+ } catch (URISyntaxException e) {
+ LOGGER.error("Could not find URL for NC recovery", e);
+ }
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ return "{ \"class\" : \"" + getClass().getSimpleName() + "\" }";
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
index d22e9fc..c4e4f82 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
@@ -18,14 +18,20 @@
*/
package org.apache.asterix.app.replication;
+import static org.apache.asterix.api.http.server.ServletConstants.SYS_AUTH_HEADER;
+import static org.apache.asterix.common.config.ExternalProperties.Option.NC_API_PORT;
import static org.apache.hyracks.api.exceptions.ErrorCode.NODE_FAILED;
import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
@@ -34,6 +40,7 @@
import org.apache.asterix.app.nc.task.ExportMetadataNodeTask;
import org.apache.asterix.app.nc.task.LocalRecoveryTask;
import org.apache.asterix.app.nc.task.MetadataBootstrapTask;
+import org.apache.asterix.app.nc.task.RetrieveLibrariesTask;
import org.apache.asterix.app.nc.task.StartLifecycleComponentsTask;
import org.apache.asterix.app.nc.task.StartReplicationServiceTask;
import org.apache.asterix.app.nc.task.UpdateNodeStatusTask;
@@ -42,6 +49,7 @@
import org.apache.asterix.app.replication.message.NCLifecycleTaskReportMessage;
import org.apache.asterix.app.replication.message.RegistrationTasksRequestMessage;
import org.apache.asterix.app.replication.message.RegistrationTasksResponseMessage;
+import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
import org.apache.asterix.common.api.INCLifecycleTask;
import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.cluster.IClusterStateManager;
@@ -53,14 +61,20 @@
import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.replication.messaging.ReplicaFailedMessage;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.api.application.ICCServiceContext;
import org.apache.hyracks.api.client.NodeStatus;
+import org.apache.hyracks.api.config.IOption;
import org.apache.hyracks.api.control.IGatekeeper;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.common.controllers.NCConfig;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import io.netty.handler.codec.http.HttpScheme;
+
public class NcLifecycleCoordinator implements INcLifecycleCoordinator {
private static final Logger LOGGER = LogManager.getLogger();
@@ -70,12 +84,14 @@
protected final ICCMessageBroker messageBroker;
private final boolean replicationEnabled;
private final IGatekeeper gatekeeper;
+ Map<String, Map<String, Object>> nodeSecretsMap;
public NcLifecycleCoordinator(ICCServiceContext serviceCtx, boolean replicationEnabled) {
this.messageBroker = (ICCMessageBroker) serviceCtx.getMessageBroker();
this.replicationEnabled = replicationEnabled;
this.gatekeeper =
((ClusterControllerService) serviceCtx.getControllerService()).getApplication().getGatekeeper();
+ this.nodeSecretsMap = new HashMap<>();
}
@Override
@@ -121,6 +137,7 @@
private void process(RegistrationTasksRequestMessage msg) throws HyracksDataException {
final String nodeId = msg.getNodeId();
+ nodeSecretsMap.put(nodeId, msg.getSecrets());
List<INCLifecycleTask> tasks = buildNCRegTasks(msg.getNodeId(), msg.getNodeStatus(), msg.getState());
RegistrationTasksResponseMessage response = new RegistrationTasksResponseMessage(nodeId, tasks);
try {
@@ -193,12 +210,12 @@
}
}
- protected List<INCLifecycleTask> buildIdleNcRegTasks(String nodeId, boolean metadataNode, SystemState state) {
+ protected List<INCLifecycleTask> buildIdleNcRegTasks(String newNodeId, boolean metadataNode, SystemState state) {
final List<INCLifecycleTask> tasks = new ArrayList<>();
tasks.add(new UpdateNodeStatusTask(NodeStatus.BOOTING));
if (state == SystemState.CORRUPTED) {
// need to perform local recovery for node partitions
- LocalRecoveryTask rt = new LocalRecoveryTask(Arrays.stream(clusterManager.getNodePartitions(nodeId))
+ LocalRecoveryTask rt = new LocalRecoveryTask(Arrays.stream(clusterManager.getNodePartitions(newNodeId))
.map(ClusterPartition::getPartitionId).collect(Collectors.toSet()));
tasks.add(rt);
}
@@ -210,6 +227,16 @@
}
tasks.add(new CheckpointTask());
tasks.add(new StartLifecycleComponentsTask());
+ if (isLibraryFetchEnabled() && clusterManager.getState() == ClusterState.ACTIVE) {
+ Set<String> nodes = clusterManager.getParticipantNodes(true);
+ if (nodes.size() > 0) {
+ try {
+ tasks.add(nodesToLibraryTask(newNodeId, nodes));
+ } catch (HyracksDataException e) {
+ LOGGER.error("Could not construct library recovery task", e);
+ }
+ }
+ }
if (metadataNode) {
tasks.add(new ExportMetadataNodeTask(true));
tasks.add(new BindMetadataNodeTask());
@@ -227,6 +254,23 @@
}
}
+ protected String getNCAuthToken(String node) {
+ return (String) nodeSecretsMap.get(node).get(SYS_AUTH_HEADER);
+ }
+
+ protected URI constructNCRecoveryUri(String nodeId) throws HyracksDataException {
+ Map<IOption, Object> nodeConfig = clusterManager.getNcConfiguration().get(nodeId);
+ String host = (String) nodeConfig.get(NCConfig.Option.PUBLIC_ADDRESS);
+ int port = (Integer) nodeConfig.get(NC_API_PORT);
+ URIBuilder builder = new URIBuilder().setScheme(HttpScheme.HTTP.toString()).setHost(host).setPort(port);
+ try {
+ return builder.build();
+ } catch (URISyntaxException e) {
+ LOGGER.error("Could not find URL for NC recovery", e);
+ throw HyracksDataException.create(e);
+ }
+ }
+
private void requestMetadataNodeTakeover(String node) throws HyracksDataException {
MetadataNodeRequestMessage msg =
new MetadataNodeRequestMessage(true, clusterManager.getMetadataPartition().getPartitionId());
@@ -237,6 +281,23 @@
}
}
+ protected RetrieveLibrariesTask nodesToLibraryTask(String newNodeId, Set<String> referenceNodes)
+ throws HyracksDataException {
+ List<Pair<URI, String>> referenceNodeLocAndAuth = new ArrayList<>();
+ for (String node : referenceNodes) {
+ referenceNodeLocAndAuth.add(new Pair<>(constructNCRecoveryUri(node), getNCAuthToken(node)));
+ }
+ return getRetrieveLibrariesTask(referenceNodeLocAndAuth);
+ }
+
+ protected RetrieveLibrariesTask getRetrieveLibrariesTask(List<Pair<URI, String>> referenceNodeLocAndAuth) {
+ return new RetrieveLibrariesTask(referenceNodeLocAndAuth);
+ }
+
+ protected boolean isLibraryFetchEnabled() {
+ return true;
+ }
+
private void notifyFailedReplica(IClusterStateManager clusterManager, String nodeID,
InetSocketAddress replicaAddress) {
LOGGER.info("notify replica failure of nodeId {} at {}", nodeID, replicaAddress);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
index 3c7182d..c2cc63c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
@@ -18,6 +18,9 @@
*/
package org.apache.asterix.app.replication.message;
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
import org.apache.asterix.common.messaging.api.INCMessageBroker;
@@ -34,22 +37,25 @@
public class RegistrationTasksRequestMessage implements INCLifecycleMessage, ICcAddressedMessage {
private static final Logger LOGGER = LogManager.getLogger();
- private static final long serialVersionUID = 1L;
- private final SystemState state;
- private final String nodeId;
- private final NodeStatus nodeStatus;
+ private static final long serialVersionUID = 2L;
+ protected final SystemState state;
+ protected final String nodeId;
+ protected final NodeStatus nodeStatus;
+ protected final Map<String, Object> secrets;
- public RegistrationTasksRequestMessage(String nodeId, NodeStatus nodeStatus, SystemState state) {
+ public RegistrationTasksRequestMessage(String nodeId, NodeStatus nodeStatus, SystemState state,
+ Map<String, Object> secretsEphemeral) {
this.state = state;
this.nodeId = nodeId;
this.nodeStatus = nodeStatus;
+ this.secrets = new HashMap<>(secretsEphemeral);
}
- public static void send(CcId ccId, NodeControllerService cs, NodeStatus nodeStatus, SystemState systemState)
- throws HyracksDataException {
+ public static void send(CcId ccId, NodeControllerService cs, NodeStatus nodeStatus, SystemState systemState,
+ Map<String, Object> secretsEphemeral) throws HyracksDataException {
try {
RegistrationTasksRequestMessage msg =
- new RegistrationTasksRequestMessage(cs.getId(), nodeStatus, systemState);
+ new RegistrationTasksRequestMessage(cs.getId(), nodeStatus, systemState, secretsEphemeral);
((INCMessageBroker) cs.getContext().getMessageBroker()).sendMessageToCC(ccId, msg);
} catch (Exception e) {
LOGGER.log(Level.ERROR, "Unable to send RegistrationTasksRequestMessage to CC", e);
@@ -79,4 +85,7 @@
return MessageType.REGISTRATION_TASKS_REQUEST;
}
-}
\ No newline at end of file
+ public Map<String, Object> getSecrets() {
+ return secrets;
+ }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 40a3c10..d62c151 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -49,7 +49,7 @@
import org.apache.asterix.app.active.ActiveEntityEventsListener;
import org.apache.asterix.app.active.ActiveNotificationHandler;
import org.apache.asterix.app.active.FeedEventsListener;
-import org.apache.asterix.app.external.ExternalLibraryUtil;
+import org.apache.asterix.app.external.ExternalLibraryJobUtils;
import org.apache.asterix.app.result.ExecutionError;
import org.apache.asterix.app.result.ResultHandle;
import org.apache.asterix.app.result.ResultReader;
@@ -1539,7 +1539,7 @@
// #. prepare jobs which will drop corresponding libraries.
List<Library> libraries = MetadataManager.INSTANCE.getDataverseLibraries(mdTxnCtx, dataverseName);
for (Library library : libraries) {
- jobsToExecute.add(ExternalLibraryUtil.buildDropLibraryJobSpec(dataverseName, library.getName(),
+ jobsToExecute.add(ExternalLibraryJobUtils.buildDropLibraryJobSpec(dataverseName, library.getName(),
metadataProvider));
}
@@ -2430,16 +2430,17 @@
CreateLibraryStatement cls = (CreateLibraryStatement) stmt;
DataverseName dataverseName = getActiveDataverseName(cls.getDataverseName());
String libraryName = cls.getLibraryName();
+ String libraryHash = cls.getHash();
lockUtil.createLibraryBegin(lockManager, metadataProvider.getLocks(), dataverseName, libraryName);
try {
- doCreateLibrary(metadataProvider, dataverseName, libraryName, cls, hcc);
+ doCreateLibrary(metadataProvider, dataverseName, libraryName, libraryHash, cls, hcc);
} finally {
metadataProvider.getLocks().unlock();
}
}
private void doCreateLibrary(MetadataProvider metadataProvider, DataverseName dataverseName, String libraryName,
- CreateLibraryStatement cls, IHyracksClientConnection hcc) throws Exception {
+ String libraryHash, CreateLibraryStatement cls, IHyracksClientConnection hcc) throws Exception {
JobUtils.ProgressState progress = ProgressState.NO_PROGRESS;
boolean prepareJobSuccessful = false;
JobSpecification abortJobSpec = null;
@@ -2461,7 +2462,7 @@
// #. add/update library with PendingAddOp
Library libraryPendingAdd =
- new Library(dataverseName, libraryName, language.name(), MetadataUtil.PENDING_ADD_OP);
+ new Library(dataverseName, libraryName, language.name(), libraryHash, MetadataUtil.PENDING_ADD_OP);
if (existingLibrary == null) {
MetadataManager.INSTANCE.addLibrary(mdTxnCtx, libraryPendingAdd);
} else {
@@ -2470,7 +2471,7 @@
// #. prepare to create library artifacts in NC.
Triple<JobSpecification, JobSpecification, JobSpecification> jobSpecs =
- ExternalLibraryUtil.buildCreateLibraryJobSpec(dataverseName, libraryName, language,
+ ExternalLibraryJobUtils.buildCreateLibraryJobSpec(dataverseName, libraryName, language,
cls.getLocation(), cls.getAuthToken(), metadataProvider);
JobSpecification prepareJobSpec = jobSpecs.first;
JobSpecification commitJobSpec = jobSpecs.second;
@@ -2490,7 +2491,8 @@
bActiveTxn = true;
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- Library newLibrary = new Library(dataverseName, libraryName, language.name(), MetadataUtil.PENDING_NO_OP);
+ Library newLibrary =
+ new Library(dataverseName, libraryName, language.name(), libraryHash, MetadataUtil.PENDING_NO_OP);
MetadataManager.INSTANCE.updateLibrary(mdTxnCtx, newLibrary);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -2511,8 +2513,8 @@
} else if (existingLibrary == null) {
// 'commit' job failed for a new library -> try removing the library
try {
- JobSpecification dropLibraryJobSpec = ExternalLibraryUtil.buildDropLibraryJobSpec(dataverseName,
- libraryName, metadataProvider);
+ JobSpecification dropLibraryJobSpec = ExternalLibraryJobUtils
+ .buildDropLibraryJobSpec(dataverseName, libraryName, metadataProvider);
runJob(hcc, dropLibraryJobSpec, jobFlags);
} catch (Exception e2) {
e.addSuppressed(e2);
@@ -2592,12 +2594,12 @@
// #. mark the existing library as PendingDropOp
// do drop instead of update because drop will fail if the library is used by functions/adapters
MetadataManager.INSTANCE.dropLibrary(mdTxnCtx, dataverseName, libraryName);
- MetadataManager.INSTANCE.addLibrary(mdTxnCtx,
- new Library(dataverseName, libraryName, library.getLanguage(), MetadataUtil.PENDING_DROP_OP));
+ MetadataManager.INSTANCE.addLibrary(mdTxnCtx, new Library(dataverseName, libraryName, library.getLanguage(),
+ library.getHash(), MetadataUtil.PENDING_DROP_OP));
// #. drop library artifacts in NCs.
JobSpecification jobSpec =
- ExternalLibraryUtil.buildDropLibraryJobSpec(dataverseName, libraryName, metadataProvider);
+ ExternalLibraryJobUtils.buildDropLibraryJobSpec(dataverseName, libraryName, metadataProvider);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
bActiveTxn = false;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index fd5ecb5..b0cb4de 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -19,16 +19,19 @@
package org.apache.asterix.hyracks.bootstrap;
import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR;
+import static org.apache.asterix.api.http.server.ServletConstants.SYS_AUTH_HEADER;
import static org.apache.asterix.common.utils.Servlets.QUERY_RESULT;
import static org.apache.asterix.common.utils.Servlets.QUERY_SERVICE;
import static org.apache.asterix.common.utils.Servlets.QUERY_STATUS;
import static org.apache.asterix.common.utils.Servlets.UDF;
+import static org.apache.asterix.common.utils.Servlets.UDF_RECOVERY;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -38,6 +41,7 @@
import org.apache.asterix.api.http.server.BasicAuthServlet;
import org.apache.asterix.api.http.server.NCQueryServiceServlet;
import org.apache.asterix.api.http.server.NCUdfApiServlet;
+import org.apache.asterix.api.http.server.NCUdfRecoveryServlet;
import org.apache.asterix.api.http.server.NetDiagnosticsApiServlet;
import org.apache.asterix.api.http.server.QueryResultApiServlet;
import org.apache.asterix.api.http.server.QueryStatusApiServlet;
@@ -83,6 +87,7 @@
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
+import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.api.application.IServiceContext;
import org.apache.hyracks.api.client.NodeStatus;
import org.apache.hyracks.api.config.IConfigManager;
@@ -106,11 +111,12 @@
public class NCApplication extends BaseNCApplication {
private static final Logger LOGGER = LogManager.getLogger();
protected NCExtensionManager ncExtensionManager;
- private INcApplicationContext runtimeContext;
+ protected INcApplicationContext runtimeContext;
private String nodeId;
private boolean stopInitiated;
- private boolean startupCompleted;
+ protected boolean startupCompleted;
protected WebManager webManager;
+ private HttpServer apiServer;
@Override
public void registerConfig(IConfigManager configManager) {
@@ -203,8 +209,8 @@
final ExternalProperties externalProperties = getApplicationContext().getExternalProperties();
final HttpServerConfig config =
HttpServerConfigBuilder.custom().setMaxRequestSize(externalProperties.getMaxWebRequestSize()).build();
- HttpServer apiServer = new HttpServer(webManager.getBosses(), webManager.getWorkers(),
- externalProperties.getNcApiPort(), config);
+ apiServer = new HttpServer(webManager.getBosses(), webManager.getWorkers(), externalProperties.getNcApiPort(),
+ config);
apiServer.setAttribute(ServletConstants.SERVICE_CONTEXT_ATTR, ncServiceCtx);
apiServer.setAttribute(HYRACKS_CONNECTION_ATTR, getApplicationContext().getHcc());
apiServer.addServlet(new StorageApiServlet(apiServer.ctx(), getApplicationContext(), Servlets.STORAGE));
@@ -217,9 +223,15 @@
apiServer.setAttribute(ServletConstants.CREDENTIAL_MAP,
parseCredentialMap(((NodeControllerService) ncServiceCtx.getControllerService()).getConfiguration()
.getCredentialFilePath()));
+ Pair<Map<String, String>, Map<String, String>> auth = BasicAuthServlet.generateSysAuthHeader(apiServer.ctx());
apiServer.addServlet(new BasicAuthServlet(apiServer.ctx(),
new NCUdfApiServlet(apiServer.ctx(), new String[] { UDF }, getApplicationContext(),
- sqlppCompilationProvider, apiServer.getScheme(), apiServer.getAddress().getPort())));
+ sqlppCompilationProvider, apiServer.getScheme(), apiServer.getAddress().getPort()),
+ auth.getFirst(), auth.getSecond()));
+ apiServer.addServlet(new BasicAuthServlet(
+ apiServer.ctx(), new NCUdfRecoveryServlet(apiServer.ctx(), new String[] { UDF_RECOVERY },
+ getApplicationContext(), apiServer.getScheme(), apiServer.getAddress().getPort()),
+ auth.getFirst(), auth.getSecond()));
apiServer.addServlet(new QueryStatusApiServlet(apiServer.ctx(), getApplicationContext(), QUERY_STATUS));
apiServer.addServlet(new QueryResultApiServlet(apiServer.ctx(), getApplicationContext(), QUERY_RESULT));
webManager.add(apiServer);
@@ -283,8 +295,11 @@
final NodeStatus currentStatus = ncs.getNodeStatus();
final SystemState systemState = isPendingStartupTasks(currentStatus, ncs.getPrimaryCcId(), ccId)
? getCurrentSystemState() : SystemState.HEALTHY;
+ final Map httpSecrets =
+ apiServer != null ? Collections.singletonMap(SYS_AUTH_HEADER, apiServer.ctx().get(SYS_AUTH_HEADER))
+ : Collections.emptyMap();
RegistrationTasksRequestMessage.send(ccId, (NodeControllerService) ncServiceCtx.getControllerService(),
- currentStatus, systemState);
+ currentStatus, systemState, httpSecrets);
}
@Override
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExternalPythonFunctionIT.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExternalPythonFunctionIT.java
index ac2bc6b..d72d494 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExternalPythonFunctionIT.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExternalPythonFunctionIT.java
@@ -20,10 +20,16 @@
package org.apache.asterix.test.runtime;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.test.common.TestExecutor;
import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.hyracks.control.nc.NodeControllerService;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -40,7 +46,9 @@
@BeforeClass
public static void setUp() throws Exception {
- LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, new TestExecutor());
+ final TestExecutor testExecutor = new TestExecutor();
+ LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, testExecutor);
+ setNcEndpoints(testExecutor);
}
@AfterClass
@@ -63,4 +71,17 @@
public void test() throws Exception {
LangExecutionUtil.test(tcCtx);
}
+
+ private static void setNcEndpoints(TestExecutor testExecutor) {
+ final NodeControllerService[] ncs = ExecutionTestUtil.integrationUtil.ncs;
+ final Map<String, InetSocketAddress> ncEndPoints = new HashMap<>();
+ final String ip = InetAddress.getLoopbackAddress().getHostAddress();
+ for (NodeControllerService nc : ncs) {
+ final String nodeId = nc.getId();
+ final INcApplicationContext appCtx = (INcApplicationContext) nc.getApplicationContext();
+ int apiPort = appCtx.getExternalProperties().getNcApiPort();
+ ncEndPoints.put(nodeId, InetSocketAddress.createUnresolved(ip, apiPort));
+ }
+ testExecutor.setNcEndPoints(ncEndPoints);
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/ILibraryManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/ILibraryManager.java
index 6d1c059..ef390f4 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/ILibraryManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/ILibraryManager.java
@@ -19,9 +19,15 @@
package org.apache.asterix.common.library;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.security.MessageDigest;
+
import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.external.ipc.ExternalFunctionResultRouter;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.ipc.impl.IPCSystem;
@@ -33,6 +39,8 @@
// deployment helpers
+ FileReference getStorageDir();
+
FileReference getLibraryDir(DataverseName dataverseName, String libraryName) throws HyracksDataException;
FileReference getDistributionDir();
@@ -44,4 +52,10 @@
ExternalFunctionResultRouter getRouter();
IPCSystem getIPCI();
+
+ MessageDigest download(FileReference targetFile, String authToken, URI libLocation) throws HyracksException;
+
+ void unzip(FileReference sourceFile, FileReference outputDir) throws IOException;
+
+ void writeAndForce(FileReference outputFile, InputStream dataStream, byte[] copyBuf) throws IOException;
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/LibraryDescriptor.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/LibraryDescriptor.java
index 72bde09..6f128dc 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/LibraryDescriptor.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/LibraryDescriptor.java
@@ -31,9 +31,10 @@
*/
public class LibraryDescriptor implements IJsonSerializable {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
private static final String FIELD_LANGUAGE = "lang";
+ private static final String FIELD_HASH = "hash";
public static final String FILE_EXT_ZIP = "zip";
@@ -44,23 +45,32 @@
*/
private final ExternalFunctionLanguage lang;
- public LibraryDescriptor(ExternalFunctionLanguage language) {
+ private final String hash;
+
+ public LibraryDescriptor(ExternalFunctionLanguage language, String hash) {
this.lang = language;
+ this.hash = hash;
}
public ExternalFunctionLanguage getLanguage() {
return lang;
}
+ public String getHash() {
+ return hash;
+ }
+
public JsonNode toJson(IPersistedResourceRegistry registry) {
ObjectNode jsonNode = registry.getClassIdentifier(LibraryDescriptor.class, serialVersionUID);
jsonNode.put(FIELD_LANGUAGE, lang.name());
+ jsonNode.put(FIELD_HASH, hash);
return jsonNode;
}
public static IJsonSerializable fromJson(IPersistedResourceRegistry registry, JsonNode json) {
String langText = json.get(FIELD_LANGUAGE).asText();
ExternalFunctionLanguage lang = ExternalFunctionLanguage.valueOf(langText);
- return new LibraryDescriptor(lang);
+ String hash = json.get(FIELD_HASH).asText();
+ return new LibraryDescriptor(lang, hash);
}
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java
index 69e9267..5edc186 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java
@@ -36,6 +36,7 @@
public static final String STORAGE = "/admin/storage/*";
public static final String NET_DIAGNOSTICS = "/admin/net/*";
public static final String UDF = "/admin/udf/*";
+ public static final String UDF_RECOVERY = "/admin/libraryrecovery/*";
private Servlets() {
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java
index 2ae8612..c5b9b53 100755
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java
@@ -21,15 +21,35 @@
import static com.fasterxml.jackson.databind.MapperFeature.SORT_PROPERTIES_ALPHABETICALLY;
import static com.fasterxml.jackson.databind.SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.net.URI;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.SimpleFileVisitor;
import java.nio.file.StandardCopyOption;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.security.DigestOutputStream;
+import java.security.KeyStore;
+import java.security.MessageDigest;
+import java.util.Enumeration;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipFile;
+
+import javax.net.ssl.SSLContext;
import org.apache.asterix.common.functions.ExternalFunctionLanguage;
import org.apache.asterix.common.library.ILibrary;
@@ -37,12 +57,30 @@
import org.apache.asterix.common.library.LibraryDescriptor;
import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.external.ipc.ExternalFunctionResultRouter;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.compress.archivers.zip.ZipArchiveEntry;
+import org.apache.commons.compress.archivers.zip.ZipArchiveOutputStream;
import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpHeaders;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.ssl.SSLContexts;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IFileHandle;
+import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.api.io.IPersistedResourceRegistry;
import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
+import org.apache.hyracks.api.network.INetworkSecurityConfig;
+import org.apache.hyracks.api.network.INetworkSecurityManager;
import org.apache.hyracks.api.util.IoUtil;
import org.apache.hyracks.control.common.work.AbstractWork;
import org.apache.hyracks.control.nc.NodeControllerService;
@@ -77,6 +115,8 @@
public static final String DISTRIBUTION_DIR = "dist";
+ private static final int DOWNLOAD_RETRY_COUNT = 10;
+
private static final Logger LOGGER = LogManager.getLogger(ExternalLibraryManager.class);
private final NodeControllerService ncs;
@@ -91,8 +131,11 @@
private final Map<Pair<DataverseName, String>, ILibrary> libraries = new HashMap<>();
private IPCSystem pythonIPC;
private final ExternalFunctionResultRouter router;
+ private final IIOManager ioManager;
+ private boolean sslEnabled;
- public ExternalLibraryManager(NodeControllerService ncs, IPersistedResourceRegistry reg, FileReference appDir) {
+ public ExternalLibraryManager(NodeControllerService ncs, IPersistedResourceRegistry reg, FileReference appDir,
+ IIOManager ioManager) {
this.ncs = ncs;
this.reg = reg;
baseDir = appDir.getChild(LIBRARY_MANAGER_BASE_DIR_NAME);
@@ -103,6 +146,8 @@
trashDirPath = trashDir.getFile().toPath().normalize();
objectMapper = createObjectMapper();
router = new ExternalFunctionResultRouter();
+ this.sslEnabled = ncs.getConfiguration().isSslEnabled();
+ this.ioManager = ioManager;
}
public void initialize(boolean resetStorageData) throws HyracksDataException {
@@ -170,6 +215,11 @@
}
}
+ @Override
+ public FileReference getStorageDir() {
+ return storageDir;
+ }
+
private FileReference getDataverseDir(DataverseName dataverseName) throws HyracksDataException {
return getChildFileRef(storageDir, dataverseName.getCanonicalForm());
}
@@ -208,10 +258,7 @@
throw new HyracksDataException("Cannot find library: " + dataverseName + '.' + libraryName);
}
try {
- FileReference descFile = libRevDir.getChild(DESCRIPTOR_FILE_NAME);
- byte[] descData = Files.readAllBytes(descFile.getFile().toPath());
- LibraryDescriptor desc = deserializeLibraryDescriptor(descData);
- ExternalFunctionLanguage libLang = desc.getLanguage();
+ ExternalFunctionLanguage libLang = getLibraryDescriptor(libRevDir).getLanguage();
switch (libLang) {
case JAVA:
return new JavaLibrary(libContentsDir.getFile());
@@ -240,6 +287,13 @@
return (LibraryDescriptor) reg.deserialize(jsonNode);
}
+ private LibraryDescriptor getLibraryDescriptor(FileReference revDir) throws IOException {
+ FileReference descFile = revDir.getChild(DESCRIPTOR_FILE_NAME);
+ byte[] descData = Files.readAllBytes(descFile.getFile().toPath());
+ return deserializeLibraryDescriptor(descData);
+
+ }
+
private FileReference findLibraryRevDir(DataverseName dataverseName, String libraryName)
throws HyracksDataException {
FileReference libraryBaseDir = getLibraryDir(dataverseName, libraryName);
@@ -277,6 +331,42 @@
return new Pair<>(dataverseName, libraryName);
}
+ public Path zipAllLibs() throws IOException {
+ byte[] copyBuf = new byte[4096];
+ Path outDir = Paths.get(baseDir.getAbsolutePath(), DISTRIBUTION_DIR);
+ FileUtil.forceMkdirs(outDir.toFile());
+ Path outZip = Files.createTempFile(outDir, "all_", ".zip");
+ try (FileOutputStream out = new FileOutputStream(outZip.toFile());
+ ZipArchiveOutputStream zipOut = new ZipArchiveOutputStream(out)) {
+ Files.walkFileTree(storageDirPath, new SimpleFileVisitor<Path>() {
+ @Override
+ public FileVisitResult visitFile(Path currPath, BasicFileAttributes attrs) throws IOException {
+ ZipArchiveEntry e =
+ new ZipArchiveEntry(currPath.toFile(), storageDirPath.relativize(currPath).toString());
+ zipOut.putArchiveEntry(e);
+ try (FileInputStream fileRead = new FileInputStream(currPath.toFile())) {
+ IOUtils.copyLarge(fileRead, zipOut, copyBuf);
+ zipOut.closeArchiveEntry();
+ }
+ return FileVisitResult.CONTINUE;
+ }
+
+ @Override
+ public FileVisitResult preVisitDirectory(Path currPath, BasicFileAttributes attrs) throws IOException {
+ if (currPath.equals(storageDirPath)) {
+ return FileVisitResult.CONTINUE;
+ }
+ ZipArchiveEntry e =
+ new ZipArchiveEntry(currPath.toFile(), storageDirPath.relativize(currPath).toString());
+ zipOut.putArchiveEntry(e);
+ return FileVisitResult.CONTINUE;
+ }
+ });
+ zipOut.finish();
+ }
+ return outZip;
+ }
+
@Override
public void dropLibraryPath(FileReference fileRef) throws HyracksDataException {
// does not flush any directories
@@ -338,4 +428,156 @@
}
}
}
+
+ @Override
+ public MessageDigest download(FileReference targetFile, String authToken, URI libLocation) throws HyracksException {
+ try {
+ targetFile.getFile().createNewFile();
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ IFileHandle fHandle = ioManager.open(targetFile, IIOManager.FileReadWriteMode.READ_WRITE,
+ IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+
+ MessageDigest digest = DigestUtils.getDigest("MD5");
+ try {
+ CloseableHttpClient httpClient = newClient();
+ try {
+ // retry 10 times at maximum for downloading binaries
+ HttpGet request = new HttpGet(libLocation);
+ request.setHeader(HttpHeaders.AUTHORIZATION, authToken);
+ int tried = 0;
+ Exception trace = null;
+ while (tried < DOWNLOAD_RETRY_COUNT) {
+ tried++;
+ CloseableHttpResponse response = null;
+ try {
+ response = httpClient.execute(request);
+ if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
+ throw new IOException("Http Error: " + response.getStatusLine().getStatusCode());
+ }
+ HttpEntity e = response.getEntity();
+ if (e == null) {
+ throw new IOException("No response");
+ }
+ WritableByteChannel outChannel = ioManager.newWritableChannel(fHandle);
+ OutputStream outStream = new DigestOutputStream(Channels.newOutputStream(outChannel), digest);
+ e.writeTo(outStream);
+ outStream.flush();
+ ioManager.sync(fHandle, true);
+ return digest;
+ } catch (IOException e) {
+ LOGGER.error("Unable to download library", e);
+ trace = e;
+ try {
+ ioManager.truncate(fHandle, 0);
+ digest.reset();
+ } catch (IOException e2) {
+ throw HyracksDataException.create(e2);
+ }
+ } finally {
+ if (response != null) {
+ try {
+ response.close();
+ } catch (IOException e) {
+ LOGGER.warn("Failed to close", e);
+ }
+ }
+ }
+ }
+
+ throw HyracksDataException.create(trace);
+ } finally {
+ try {
+ httpClient.close();
+ } catch (IOException e) {
+ LOGGER.warn("Failed to close", e);
+ }
+ }
+ } finally {
+ try {
+ ioManager.close(fHandle);
+ } catch (HyracksDataException e) {
+ LOGGER.warn("Failed to close", e);
+ }
+ }
+ }
+
+ @Override
+ public void unzip(FileReference sourceFile, FileReference outputDir) throws IOException {
+ boolean logTraceEnabled = LOGGER.isTraceEnabled();
+ Set<Path> newDirs = new HashSet<>();
+ Path outputDirPath = outputDir.getFile().toPath().toAbsolutePath().normalize();
+ try (ZipFile zipFile = new ZipFile(sourceFile.getFile())) {
+ Enumeration<? extends ZipEntry> entries = zipFile.entries();
+ byte[] writeBuf = new byte[4096];
+ while (entries.hasMoreElements()) {
+ ZipEntry entry = entries.nextElement();
+ if (entry.isDirectory()) {
+ continue;
+ }
+ Path entryOutputPath = outputDirPath.resolve(entry.getName()).toAbsolutePath().normalize();
+ if (!entryOutputPath.startsWith(outputDirPath)) {
+ throw new IOException("Malformed ZIP archive: " + entry.getName());
+ }
+ Path entryOutputDir = entryOutputPath.getParent();
+ Files.createDirectories(entryOutputDir);
+ // remember new directories so we can flush them later
+ for (Path p = entryOutputDir; !p.equals(outputDirPath); p = p.getParent()) {
+ newDirs.add(p);
+ }
+ try (InputStream in = zipFile.getInputStream(entry)) {
+ FileReference entryOutputFileRef = ioManager.resolveAbsolutePath(entryOutputPath.toString());
+ if (logTraceEnabled) {
+ LOGGER.trace("Extracting file {}", entryOutputFileRef);
+ }
+ writeAndForce(entryOutputFileRef, in, writeBuf);
+ }
+ }
+ }
+ for (Path newDir : newDirs) {
+ IoUtil.flushDirectory(newDir);
+ }
+ }
+
+ @Override
+ public void writeAndForce(FileReference outputFile, InputStream dataStream, byte[] copyBuffer) throws IOException {
+ outputFile.getFile().createNewFile();
+ IFileHandle fHandle = ioManager.open(outputFile, IIOManager.FileReadWriteMode.READ_WRITE,
+ IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+ try {
+ WritableByteChannel outChannel = ioManager.newWritableChannel(fHandle);
+ OutputStream outputStream = Channels.newOutputStream(outChannel);
+ IOUtils.copyLarge(dataStream, outputStream, copyBuffer);
+ outputStream.flush();
+ ioManager.sync(fHandle, true);
+ } finally {
+ ioManager.close(fHandle);
+ }
+ }
+
+ //TODO: this should probably be static so it could be reused somewhere else, or made such that the trust store is not
+ // reloaded from disk on every client intialization?
+ private CloseableHttpClient newClient() {
+ if (sslEnabled) {
+ try {
+ final INetworkSecurityManager networkSecurityManager = ncs.getNetworkSecurityManager();
+ final INetworkSecurityConfig configuration = networkSecurityManager.getConfiguration();
+ KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
+ try (FileInputStream trustStoreFile = new FileInputStream(configuration.getTrustStoreFile())) {
+ trustStore.load(trustStoreFile, configuration.getKeyStorePassword().toCharArray());
+ }
+ SSLContext sslcontext = SSLContexts.custom().loadTrustMaterial(trustStore, null).build();
+ SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(sslcontext,
+ new String[] { "TLSv1.2" }, null, SSLConnectionSocketFactory.getDefaultHostnameVerifier());
+ return HttpClients.custom().setSSLSocketFactory(sslsf).build();
+
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ } else {
+ return HttpClients.createDefault();
+ }
+ }
+
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/LibraryDeployPrepareOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/LibraryDeployPrepareOperatorDescriptor.java
index ddf3d66..4f91b1a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/LibraryDeployPrepareOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/LibraryDeployPrepareOperatorDescriptor.java
@@ -26,39 +26,21 @@
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
-import java.io.OutputStream;
import java.net.URI;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.util.Enumeration;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.zip.ZipEntry;
-import java.util.zip.ZipFile;
+import java.security.MessageDigest;
import org.apache.asterix.common.functions.ExternalFunctionLanguage;
import org.apache.asterix.common.library.LibraryDescriptor;
import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.external.library.ExternalLibraryManager;
+import org.apache.asterix.external.util.ExternalLibraryUtils;
import org.apache.commons.io.FilenameUtils;
-import org.apache.commons.io.IOUtils;
-import org.apache.http.HttpEntity;
-import org.apache.http.HttpHeaders;
-import org.apache.http.HttpStatus;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.api.io.IFileHandle;
-import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
import org.apache.hyracks.util.file.FileUtil;
import org.apache.logging.log4j.LogManager;
@@ -68,8 +50,6 @@
private static final long serialVersionUID = 1L;
- private static final int DOWNLOAD_RETRY_COUNT = 10;
-
private static final Logger LOGGER = LogManager.getLogger(LibraryDeployPrepareOperatorDescriptor.class);
private final ExternalFunctionLanguage language;
@@ -89,7 +69,7 @@
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
return new AbstractLibraryNodePushable(ctx) {
- private byte[] copyBuffer;
+ private final byte[] copyBuf = new byte[4096];
@Override
protected void execute() throws IOException {
@@ -139,8 +119,7 @@
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Downloading library from {} into {}", libLocation, targetFile);
}
- download(targetFile);
-
+ MessageDigest digest = libraryManager.download(targetFile, authToken, libLocation);
// extract from the archive
FileReference contentsDir = stageDir.getChild(ExternalLibraryManager.CONTENTS_DIR_NAME);
mkdir(contentsDir);
@@ -155,7 +134,7 @@
// shouldn't happen
throw new IOException("Unexpected file type: " + fileExt);
}
- unzip(targetFile, contentsDir);
+ libraryManager.unzip(targetFile, contentsDir);
break;
case PYTHON:
if (!LibraryDescriptor.FILE_EXT_PYZ.equals(fileExt)) {
@@ -176,172 +155,45 @@
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Writing library descriptor into {}", targetDescFile);
}
- writeDescriptor(targetDescFile, new LibraryDescriptor(language));
+ writeDescriptor(targetDescFile,
+ new LibraryDescriptor(language, ExternalLibraryUtils.digestToHexString(digest)));
flushDirectory(contentsDir);
flushDirectory(stageDir);
}
- private void download(FileReference targetFile) throws HyracksException {
- try {
- targetFile.getFile().createNewFile();
- } catch (IOException e) {
- throw HyracksDataException.create(e);
- }
- IFileHandle fHandle = ioManager.open(targetFile, IIOManager.FileReadWriteMode.READ_WRITE,
- IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
- try {
- CloseableHttpClient httpClient = HttpClientBuilder.create().build();
- try {
- // retry 10 times at maximum for downloading binaries
- HttpGet request = new HttpGet(libLocation);
- request.setHeader(HttpHeaders.AUTHORIZATION, authToken);
- int tried = 0;
- Exception trace = null;
- while (tried < DOWNLOAD_RETRY_COUNT) {
- tried++;
- CloseableHttpResponse response = null;
- try {
- response = httpClient.execute(request);
- if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
- throw new IOException("Http Error: " + response.getStatusLine().getStatusCode());
- }
- HttpEntity e = response.getEntity();
- if (e == null) {
- throw new IOException("No response");
- }
- WritableByteChannel outChannel = ioManager.newWritableChannel(fHandle);
- OutputStream outStream = Channels.newOutputStream(outChannel);
- e.writeTo(outStream);
- outStream.flush();
- ioManager.sync(fHandle, true);
- return;
- } catch (IOException e) {
- LOGGER.error("Unable to download library", e);
- trace = e;
- try {
- ioManager.truncate(fHandle, 0);
- } catch (IOException e2) {
- throw HyracksDataException.create(e2);
- }
- } finally {
- if (response != null) {
- try {
- response.close();
- } catch (IOException e) {
- LOGGER.warn("Failed to close", e);
- }
- }
- }
- }
-
- throw HyracksDataException.create(trace);
- } finally {
- try {
- httpClient.close();
- } catch (IOException e) {
- LOGGER.warn("Failed to close", e);
- }
- }
- } finally {
- try {
- ioManager.close(fHandle);
- } catch (HyracksDataException e) {
- LOGGER.warn("Failed to close", e);
- }
- }
- }
-
- private void unzip(FileReference sourceFile, FileReference outputDir) throws IOException {
- boolean logTraceEnabled = LOGGER.isTraceEnabled();
- Set<Path> newDirs = new HashSet<>();
- Path outputDirPath = outputDir.getFile().toPath().toAbsolutePath().normalize();
- try (ZipFile zipFile = new ZipFile(sourceFile.getFile())) {
- Enumeration<? extends ZipEntry> entries = zipFile.entries();
- while (entries.hasMoreElements()) {
- ZipEntry entry = entries.nextElement();
- if (entry.isDirectory()) {
- continue;
- }
- Path entryOutputPath = outputDirPath.resolve(entry.getName()).toAbsolutePath().normalize();
- if (!entryOutputPath.startsWith(outputDirPath)) {
- throw new IOException("Malformed ZIP archive: " + entry.getName());
- }
- Path entryOutputDir = entryOutputPath.getParent();
- Files.createDirectories(entryOutputDir);
- // remember new directories so we can flush them later
- for (Path p = entryOutputDir; !p.equals(outputDirPath); p = p.getParent()) {
- newDirs.add(p);
- }
- try (InputStream in = zipFile.getInputStream(entry)) {
- FileReference entryOutputFileRef =
- ioManager.resolveAbsolutePath(entryOutputPath.toString());
- if (logTraceEnabled) {
- LOGGER.trace("Extracting file {}", entryOutputFileRef);
- }
- writeAndForce(entryOutputFileRef, in);
- }
- }
- }
- for (Path newDir : newDirs) {
- flushDirectory(newDir);
- }
- }
-
private void shiv(FileReference sourceFile, FileReference stageDir, FileReference contentsDir,
boolean writeMsgpack) throws IOException {
FileReference msgpack = stageDir.getChild("msgpack.pyz");
if (writeMsgpack) {
- writeShim(msgpack, writeMsgpack);
+ writeShim(msgpack);
File msgPackFolder = new File(contentsDir.getRelativePath(), "ipc");
FileReference msgPackFolderRef =
new FileReference(contentsDir.getDeviceHandle(), msgPackFolder.getPath());
- unzip(msgpack, msgPackFolderRef);
+ libraryManager.unzip(msgpack, msgPackFolderRef);
Files.delete(msgpack.getFile().toPath());
}
- unzip(sourceFile, contentsDir);
- writeShim(contentsDir.getChild("entrypoint.py"), false);
+ libraryManager.unzip(sourceFile, contentsDir);
+ writeShim(contentsDir.getChild("entrypoint.py"));
}
- private boolean writeShim(FileReference outputFile, boolean optional) throws IOException {
+ private void writeShim(FileReference outputFile) throws IOException {
InputStream is = getClass().getClassLoader().getResourceAsStream(outputFile.getFile().getName());
if (is == null) {
throw new IOException("Classpath does not contain necessary Python resources!");
}
try {
- writeAndForce(outputFile, is);
+ libraryManager.writeAndForce(outputFile, is, copyBuf);
} finally {
is.close();
}
- return true;
}
private void writeDescriptor(FileReference descFile, LibraryDescriptor desc) throws IOException {
byte[] bytes = libraryManager.serializeLibraryDescriptor(desc);
- writeAndForce(descFile, new ByteArrayInputStream(bytes));
+ libraryManager.writeAndForce(descFile, new ByteArrayInputStream(bytes), copyBuf);
}
- private void writeAndForce(FileReference outputFile, InputStream dataStream) throws IOException {
- outputFile.getFile().createNewFile();
- IFileHandle fHandle = ioManager.open(outputFile, IIOManager.FileReadWriteMode.READ_WRITE,
- IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
- try {
- WritableByteChannel outChannel = ioManager.newWritableChannel(fHandle);
- OutputStream outputStream = Channels.newOutputStream(outChannel);
- IOUtils.copyLarge(dataStream, outputStream, getCopyBuffer());
- outputStream.flush();
- ioManager.sync(fHandle, true);
- } finally {
- ioManager.close(fHandle);
- }
- }
-
- private byte[] getCopyBuffer() {
- if (copyBuffer == null) {
- copyBuffer = new byte[4096];
- }
- return copyBuffer;
- }
};
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalLibraryUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalLibraryUtils.java
new file mode 100644
index 0000000..6e3be21
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalLibraryUtils.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.security.MessageDigest;
+
+import org.apache.hyracks.util.bytes.HexPrinter;
+
+public class ExternalLibraryUtils {
+
+ private ExternalLibraryUtils() {
+
+ }
+
+ public static String digestToHexString(MessageDigest digest) throws IOException {
+ byte[] hashBytes = digest.digest();
+ StringWriter hashBuilder = new StringWriter();
+ HexPrinter.printHexString(hashBytes, 0, hashBytes.length, hashBuilder);
+ return hashBuilder.toString();
+ }
+}
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateLibraryStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateLibraryStatement.java
index ebdeeef0..d832175 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateLibraryStatement.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateLibraryStatement.java
@@ -32,18 +32,20 @@
private final DataverseName dataverseName;
private final String libraryName;
private final ExternalFunctionLanguage lang;
+ private final String hash;
private final URI location;
private final boolean replaceIfExists;
private final String authToken;
public CreateLibraryStatement(DataverseName dataverseName, String libraryName, ExternalFunctionLanguage lang,
- URI location, boolean replaceIfExists, String authToken) {
+ String hash, URI location, boolean replaceIfExists, String authToken) {
this.dataverseName = dataverseName;
this.libraryName = libraryName;
this.lang = lang;
this.location = location;
this.replaceIfExists = replaceIfExists;
this.authToken = authToken;
+ this.hash = hash;
}
public DataverseName getDataverseName() {
@@ -58,6 +60,10 @@
return lang;
}
+ public String getHash() {
+ return hash;
+ }
+
public URI getLocation() {
return location;
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java
index 0bf8c3d..8da01aa 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java
@@ -172,7 +172,7 @@
}
public void dropLibrary(DataverseName dataverseName, String libraryName) {
- Library library = new Library(dataverseName, libraryName, null, MetadataUtil.PENDING_NO_OP);
+ Library library = new Library(dataverseName, libraryName, null, null, MetadataUtil.PENDING_NO_OP);
droppedCache.addLibraryIfNotExists(library);
logAndApply(new MetadataLogicalOperation(library, false));
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java
index 8430f44..10f5047 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataRecordTypes.java
@@ -75,6 +75,7 @@
public static final String FIELD_NAME_IS_PRIMARY = "IsPrimary";
public static final String FIELD_NAME_KIND = "Kind";
public static final String FIELD_NAME_LANGUAGE = "Language";
+ public static final String FIELD_NAME_HASH = "MD5Hash";
public static final String FIELD_NAME_LIBRARY_DATAVERSE_NAME = "LibraryDataverseName";
public static final String FIELD_NAME_LIBRARY_NAME = "LibraryName";
public static final String FIELD_NAME_LAST_REFRESH_TIME = "LastRefreshTime";
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Library.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Library.java
index 494b5f1..4a6a512 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Library.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Library.java
@@ -29,12 +29,14 @@
private final DataverseName dataverse;
private final String name;
private final String language;
+ private final String hash;
private final int pendingOp;
- public Library(DataverseName dataverseName, String libraryName, String language, int pendingOp) {
+ public Library(DataverseName dataverseName, String libraryName, String language, String hash, int pendingOp) {
this.dataverse = dataverseName;
this.name = libraryName;
this.language = language;
+ this.hash = hash;
this.pendingOp = pendingOp;
}
@@ -50,6 +52,10 @@
return language;
}
+ public String getHash() {
+ return hash;
+ }
+
public int getPendingOp() {
return pendingOp;
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/LibraryTupleTranslator.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/LibraryTupleTranslator.java
index 4792398..03abec8 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/LibraryTupleTranslator.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/LibraryTupleTranslator.java
@@ -19,6 +19,7 @@
package org.apache.asterix.metadata.entitytupletranslators;
+import static org.apache.asterix.metadata.bootstrap.MetadataRecordTypes.FIELD_NAME_HASH;
import static org.apache.asterix.metadata.bootstrap.MetadataRecordTypes.FIELD_NAME_LANGUAGE;
import static org.apache.asterix.metadata.bootstrap.MetadataRecordTypes.FIELD_NAME_PENDING_OP;
@@ -68,7 +69,10 @@
String language = languageIdx >= 0 ? ((AString) libraryRecord.getValueByPos(languageIdx)).getStringValue()
: ExternalFunctionLanguage.JAVA.name();
- return new Library(dataverseName, libraryName, language, pendingOp);
+ int hashIdx = libraryRecordType.getFieldIndex(FIELD_NAME_HASH);
+ String hash = hashIdx >= 0 ? ((AString) libraryRecord.getValueByPos(hashIdx)).getStringValue() : null;
+
+ return new Library(dataverseName, libraryName, language, hash, pendingOp);
}
@Override
@@ -119,6 +123,7 @@
protected void writeOpenFields(Library library) throws HyracksDataException {
writeLanguage(library);
writePendingOp(library);
+ writeHash(library);
}
private void writeLanguage(Library library) throws HyracksDataException {
@@ -133,6 +138,18 @@
recordBuilder.addField(fieldName, fieldValue);
}
+ private void writeHash(Library library) throws HyracksDataException {
+ String hash = library.getHash();
+
+ fieldName.reset();
+ aString.setValue(FIELD_NAME_HASH);
+ stringSerde.serialize(aString, fieldName.getDataOutput());
+ fieldValue.reset();
+ aString.setValue(hash);
+ stringSerde.serialize(aString, fieldValue.getDataOutput());
+ recordBuilder.addField(fieldName, fieldValue);
+ }
+
private void writePendingOp(Library library) throws HyracksDataException {
int pendingOp = library.getPendingOp();
diff --git a/asterixdb/asterix-server/pom.xml b/asterixdb/asterix-server/pom.xml
index 7a3206c..459f370 100644
--- a/asterixdb/asterix-server/pom.xml
+++ b/asterixdb/asterix-server/pom.xml
@@ -38,6 +38,8 @@
<properties>
<root.dir>${basedir}/..</root.dir>
+ <pip.path>${project.build.directory}/bin/pip3</pip.path>
+ <shiv.path>${project.build.directory}/bin/shiv</shiv.path>
</properties>
<build>
@@ -469,6 +471,94 @@
</usedDependencies>
</configuration>
</plugin>
+ <!-- TODO: this is just grody. workaround for not being able to use the proper dir in our integration tests on jenkins -->
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>exec-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>venv</id>
+ <phase>${prepare-env.stage}</phase>
+ <goals>
+ <goal>exec</goal>
+ </goals>
+ <configuration>
+ <!--suppress UnresolvedMavenProperty -->
+ <executable>${python.path}</executable>
+ <workingDirectory>${project.build.directory}</workingDirectory>
+ <arguments>
+ <argument>-m</argument>
+ <argument>venv</argument>
+ <argument>${project.build.directory}</argument>
+ </arguments>
+ </configuration>
+ </execution>
+ <execution>
+ <id>shiv-install</id>
+ <phase>${prepare-env.stage}</phase>
+ <goals>
+ <goal>exec</goal>
+ </goals>
+ <configuration>
+ <executable>${pip.path}</executable>
+ <workingDirectory>${project.build.directory}</workingDirectory>
+ <arguments>
+ <argument>install</argument>
+ <argument>--exists-action</argument>
+ <argument>w</argument>
+ <argument>--upgrade</argument>
+ <argument>shiv</argument>
+ </arguments>
+ <environmentVariables>
+ <VIRTUALENV>${project.build.directory}</VIRTUALENV>
+ <PATH>${project.build.directory}${path.separator}${env.PATH}${file.separator}bin</PATH>
+ </environmentVariables>
+ </configuration>
+ </execution>
+ <execution>
+ <id>shiv-msgpack-shim</id>
+ <phase>${shim.stage}</phase>
+ <goals>
+ <goal>exec</goal>
+ </goals>
+ <configuration>
+ <executable>${shiv.path}</executable>
+ <workingDirectory>${project.build.directory}</workingDirectory>
+ <arguments>
+ <argument>-o </argument>
+ <argument>${project.build.directory}${file.separator}classes${file.separator}msgpack.pyz</argument>
+ <argument>msgpack</argument>
+ </arguments>
+ <environmentVariables>
+ <VIRTUALENV>${project.build.directory}</VIRTUALENV>
+ <PATH>${project.build.directory}${path.separator}${env.PATH}${file.separator}bin</PATH>
+ </environmentVariables>
+ </configuration>
+ </execution>
+ <execution>
+ <id>shiv-test-lib</id>
+ <phase>${pytestlib.stage}</phase>
+ <goals>
+ <goal>exec</goal>
+ </goals>
+ <configuration>
+ <executable>${shiv.path}</executable>
+ <workingDirectory>${project.build.directory}</workingDirectory>
+ <arguments>
+ <argument>-o </argument>
+ <argument>${project.build.directory}${file.separator}TweetSent.pyz</argument>
+ <argument>--site-packages</argument>
+ <argument>${project.build.directory}${file.separator}..${file.separator}..${file.separator}asterix-app${file.separator}src${file.separator}test${file.separator}resources${file.separator}TweetSent</argument>
+ <argument>scikit-learn</argument>
+ </arguments>
+ <environmentVariables>
+ <VIRTUALENV>${project.build.directory}</VIRTUALENV>
+ <PATH>${project.build.directory}${path.separator}${env.PATH}${file.separator}bin</PATH>
+ </environmentVariables>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
<pluginManagement>
<plugins>
@@ -501,6 +591,18 @@
</build>
<profiles>
<profile>
+ <id>windows.python.envs</id>
+ <activation>
+ <os>
+ <family>Windows</family>
+ </os>
+ </activation>
+ <properties>
+ <pip.path>${project.build.directory}\Scripts\pip3.exe</pip.path>
+ <shiv.path>${project.build.directory}\Scripts\shiv.exe</shiv.path>
+ </properties>
+ </profile>
+ <profile>
<id>opt-modules</id>
<activation>
<file>
@@ -744,5 +846,17 @@
<artifactId>kite-data-core</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpcore</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpmime</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git a/asterixdb/pom.xml b/asterixdb/pom.xml
index 3cd433c..5cc17af 100644
--- a/asterixdb/pom.xml
+++ b/asterixdb/pom.xml
@@ -75,7 +75,8 @@
<itest.excludes>${global.itest.excludes}</itest.excludes>
<license.stage>compile</license.stage>
<resource.stage>process-classes</resource.stage>
- <pyro-shim.stage>none</pyro-shim.stage>
+ <prepare-env.stage>none</prepare-env.stage>
+ <shim.stage>none</shim.stage>
<pytestlib.stage>none</pytestlib.stage>
<!-- Versions under dependencymanagement or used in many projects via properties -->
@@ -664,7 +665,25 @@
</file>
</activation>
<properties>
- <pyro-shim.stage>process-classes</pyro-shim.stage>
+ <prepare-env.stage>process-classes</prepare-env.stage>
+ <shim.stage>process-classes</shim.stage>
+ <pytestlib.stage>generate-test-resources</pytestlib.stage>
+ <global.itest.excludes/>
+ </properties>
+ </profile>
+ <profile>
+ <id>python-udf-test-only</id>
+ <activation>
+ <property>
+ <name>no.shim</name>
+ </property>
+ <file>
+ <exists>${python.path}</exists>
+ </file>
+ </activation>
+ <properties>
+ <prepare-env.stage>process-classes</prepare-env.stage>
+ <shim.stage>none</shim.stage>
<pytestlib.stage>generate-test-resources</pytestlib.stage>
<global.itest.excludes/>
</properties>
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
index 787e823..01baa69 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/utils/HttpUtil.java
@@ -239,6 +239,8 @@
public static final String IMG_PNG = "image/png";
public static final String TEXT_HTML = "text/html";
public static final String TEXT_PLAIN = "text/plain";
+ public static final String APPLICATION_ZIP = "application/zip";
+ public static final String APPLICATION_OCTET_STREAM = "application/octet-stream";
private ContentType() {
}