[ASTERIXDB-2176][RT] Flexible python pathing
- Allow python path and args to be specified in config
Change-Id: I6f75b29be59090250b2ff9992279187babd97fff
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/7423
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml
index 27c0215..ef443ea 100644
--- a/asterixdb/asterix-app/pom.xml
+++ b/asterixdb/asterix-app/pom.xml
@@ -37,8 +37,8 @@
<root.dir>${basedir}/..</root.dir>
<appendedResourcesDirectory>${basedir}/src/main/appended-resources</appendedResourcesDirectory>
<sonar.sources>pom.xml,src/main/java,src/main/resources</sonar.sources>
- <pip.path>${project.build.directory}${file.separator}bin${file.separator}pip3</pip.path>
- <shiv.path>${project.build.directory}${file.separator}bin${file.separator}shiv</shiv.path>
+ <pip.path>${project.build.directory}/bin/pip3</pip.path>
+ <shiv.path>${project.build.directory}/bin/shiv</shiv.path>
</properties>
<build>
<plugins>
@@ -382,8 +382,8 @@
</os>
</activation>
<properties>
- <pip.path>${project.build.directory}${file.separator}Scripts${file.separator}pip3.exe</pip.path>
- <shiv.path>${project.build.directory}${file.separator}Scripts${file.separator}shiv.exe</shiv.path>
+ <pip.path>${project.build.directory}\Scripts\pip3.exe</pip.path>
+ <shiv.path>${project.build.directory}\Scripts\shiv.exe</shiv.path>
</properties>
</profile>
<profile>
@@ -827,9 +827,9 @@
<artifactId>commons-csv</artifactId>
</dependency>
<dependency>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpmime</artifactId>
- <scope>test</scope>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpmime</artifactId>
+ <scope>test</scope>
</dependency>
<!-- AWS -->
<dependency>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/BasicAuthServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/BasicAuthServlet.java
index d8e167f..f25d223 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/BasicAuthServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/BasicAuthServlet.java
@@ -18,35 +18,68 @@
*/
package org.apache.asterix.api.http.server;
-import static org.apache.asterix.api.http.server.ServletConstants.CREDENTIAL_MAP;
+import static org.apache.asterix.api.http.server.ServletConstants.*;
+import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
+import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hyracks.http.api.IServlet;
import org.apache.hyracks.http.api.IServletRequest;
import org.apache.hyracks.http.api.IServletResponse;
-import org.apache.hyracks.http.server.AbstractServlet;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.mindrot.jbcrypt.BCrypt;
import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponseStatus;
-public abstract class BasicAuthServlet extends AbstractServlet {
+public class BasicAuthServlet implements IServlet {
private static final Logger LOGGER = LogManager.getLogger();
public static String BASIC_AUTH_METHOD_NAME = "Basic";
private Base64.Decoder b64Decoder;
Map<String, String> storedCredentials;
+ Map<String, String> ephemeralCredentials;
+ private String sysAuthHeader;
+ private final IServlet delegate;
+ private ConcurrentMap<String, Object> ctx;
- protected BasicAuthServlet(ConcurrentMap<String, Object> ctx, String... paths) {
- super(ctx, paths);
+ public BasicAuthServlet(ConcurrentMap<String, Object> ctx, IServlet delegate) {
+ this.delegate = delegate;
b64Decoder = Base64.getDecoder();
storedCredentials = (Map<String, String>) ctx.get(CREDENTIAL_MAP);
+ this.ctx = ctx;
+ // generate internal user
+ String sysUser;
+ do {
+ sysUser = generateRandomString(32);
+ } while (storedCredentials.containsKey(sysUser));
+ String sysPassword = generateRandomString(128);
+ ephemeralCredentials = Collections.singletonMap(sysUser, hashPassword(sysPassword));
+ sysAuthHeader = createAuthHeader(sysUser, sysPassword);
+ ctx.put(SYS_AUTH_HEADER, sysAuthHeader);
+ }
+
+ @Override
+ public String[] getPaths() {
+ return delegate.getPaths();
+ }
+
+ @Override
+ public void init() throws IOException {
+ delegate.init();
+ }
+
+ @Override
+ public ConcurrentMap<String, Object> ctx() {
+ return ctx;
}
@Override
@@ -56,7 +89,7 @@
if (!authorized) {
response.setStatus(HttpResponseStatus.UNAUTHORIZED);
} else {
- super.handle(request, response);
+ delegate.handle(request, response);
}
} catch (Exception e) {
LOGGER.log(Level.WARN, "Unhandled exception", e);
@@ -104,7 +137,7 @@
}
protected Map<String, String> getStoredCredentials(IServletRequest request) {
- return storedCredentials;
+ return request.getHttpRequest().method().equals(HttpMethod.GET) ? ephemeralCredentials : storedCredentials;
}
public static String hashPassword(String password) {
@@ -116,4 +149,8 @@
byte[] encodedAuth = Base64.getEncoder().encode(auth.getBytes(StandardCharsets.ISO_8859_1));
return "Basic " + new String(encodedAuth);
}
+
+ private static String generateRandomString(int size) {
+ return RandomStringUtils.randomAlphanumeric(size);
+ }
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ServletConstants.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ServletConstants.java
index 857faf0..0eac474 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ServletConstants.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ServletConstants.java
@@ -26,6 +26,7 @@
public static final String RUNNING_QUERIES_ATTR = "org.apache.asterix.RUNINNG_QUERIES";
public static final String SERVICE_CONTEXT_ATTR = "org.apache.asterix.SERVICE_CONTEXT";
public static final String CREDENTIAL_MAP = "org.apache.asterix.CREDENTIAL_MAP";
+ public static final String SYS_AUTH_HEADER = "org.apache.asterix.SYS_AUTH_HEADER";
private ServletConstants() {
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/UdfApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/UdfApiServlet.java
index d5bec4f..18139f6 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/UdfApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/UdfApiServlet.java
@@ -19,6 +19,7 @@
package org.apache.asterix.api.http.server;
import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR;
+import static org.apache.asterix.api.http.server.ServletConstants.SYS_AUTH_HEADER;
import static org.apache.asterix.common.functions.ExternalFunctionLanguage.JAVA;
import static org.apache.asterix.common.functions.ExternalFunctionLanguage.PYTHON;
@@ -63,7 +64,6 @@
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.output.NullWriter;
-import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.api.application.ICCServiceContext;
@@ -74,12 +74,12 @@
import org.apache.hyracks.control.common.work.SynchronizableWork;
import org.apache.hyracks.http.api.IServletRequest;
import org.apache.hyracks.http.api.IServletResponse;
+import org.apache.hyracks.http.server.AbstractServlet;
import org.apache.hyracks.http.server.utils.HttpUtil;
import org.apache.hyracks.util.file.FileUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpScheme;
@@ -87,22 +87,21 @@
import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder;
import io.netty.handler.codec.http.multipart.InterfaceHttpData;
-public class UdfApiServlet extends BasicAuthServlet {
+public class UdfApiServlet extends AbstractServlet {
private static final Logger LOGGER = LogManager.getLogger();
- private final ICcApplicationContext appCtx;
+ protected final ICcApplicationContext appCtx;
private final ClusterControllerService ccs;
private final HttpScheme httpServerProtocol;
private final int httpServerPort;
- private final ILangCompilationProvider compilationProvider;
- private final IStatementExecutorFactory statementExecutorFactory;
- private final IStorageComponentProvider componentProvider;
- private final IReceptionist receptionist;
- private final Path workingDir;
- private Map<String, String> sysCredentials;
- private String sysAuthHeader;
+ protected final ILangCompilationProvider compilationProvider;
+ protected final IStatementExecutorFactory statementExecutorFactory;
+ protected final IStorageComponentProvider componentProvider;
+ protected final IReceptionist receptionist;
+ protected final Path workingDir;
+ protected String sysAuthHeader;
public UdfApiServlet(ConcurrentMap<String, Object> ctx, String[] paths, ICcApplicationContext appCtx,
ILangCompilationProvider compilationProvider, IStatementExecutorFactory statementExecutorFactory,
@@ -124,23 +123,15 @@
@Override
public void init() throws IOException {
- super.init();
initAuth();
initStorage();
}
- private void initAuth() {
- // generate internal user
- String sysUser;
- do {
- sysUser = generateRandomString(32);
- } while (storedCredentials.containsKey(sysUser));
- String sysPassword = generateRandomString(128);
- this.sysCredentials = Collections.singletonMap(sysUser, hashPassword(sysPassword));
- this.sysAuthHeader = createAuthHeader(sysUser, sysPassword);
+ protected void initAuth() {
+ sysAuthHeader = (String) ctx.get(SYS_AUTH_HEADER);
}
- private void initStorage() throws IOException {
+ protected void initStorage() throws IOException {
// prepare working directory
if (Files.isDirectory(workingDir)) {
try {
@@ -154,6 +145,10 @@
}
}
+ protected Map<String, String> additionalHttpHeadersFromRequest(IServletRequest request) {
+ return Collections.emptyMap();
+ }
+
@Override
protected void post(IServletRequest request, IServletResponse response) {
IClusterManagementWork.ClusterState clusterState = appCtx.getClusterStateManager().getState();
@@ -197,7 +192,7 @@
URI downloadURI = createDownloadURI(libraryTempFile);
CreateLibraryStatement stmt = new CreateLibraryStatement(libraryName.first, libraryName.second,
language, downloadURI, true, sysAuthHeader);
- executeStatement(stmt, requestReference);
+ executeStatement(stmt, requestReference, request);
response.setStatus(HttpResponseStatus.OK);
} catch (Exception e) {
response.setStatus(toHttpErrorStatus(e));
@@ -218,13 +213,12 @@
}
}
- private URI createDownloadURI(Path file) throws Exception {
+ protected URI createDownloadURI(Path file) throws Exception {
String path = paths[0].substring(0, trims[0]) + '/' + file.getFileName();
String host = getHyracksClientConnection().getHost();
return new URI(httpServerProtocol.toString(), null, host, httpServerPort, path, null, null);
}
- @Override
protected void delete(IServletRequest request, IServletResponse response) {
IClusterManagementWork.ClusterState clusterState = appCtx.getClusterStateManager().getState();
if (clusterState != IClusterManagementWork.ClusterState.ACTIVE) {
@@ -239,7 +233,7 @@
try {
IRequestReference requestReference = receptionist.welcome(request);
LibraryDropStatement stmt = new LibraryDropStatement(libraryName.first, libraryName.second, false);
- executeStatement(stmt, requestReference);
+ executeStatement(stmt, requestReference, request);
response.setStatus(HttpResponseStatus.OK);
} catch (Exception e) {
response.setStatus(toHttpErrorStatus(e));
@@ -250,21 +244,21 @@
}
}
- private void executeStatement(Statement statement, IRequestReference requestReference) throws Exception {
+ protected void executeStatement(Statement statement, IRequestReference requestReference, IServletRequest request)
+ throws Exception {
SessionOutput sessionOutput = new SessionOutput(new SessionConfig(SessionConfig.OutputFormat.ADM),
new PrintWriter(NullWriter.NULL_WRITER));
ResponsePrinter printer = new ResponsePrinter(sessionOutput);
ResultProperties resultProperties = new ResultProperties(IStatementExecutor.ResultDelivery.IMMEDIATE, 1);
IRequestParameters requestParams = new RequestParameters(requestReference, "", null, resultProperties,
new IStatementExecutor.Stats(), new IStatementExecutor.StatementProperties(), null, null,
- Collections.emptyMap(), Collections.emptyMap(), false);
+ additionalHttpHeadersFromRequest(request), Collections.emptyMap(), false);
MetadataManager.INSTANCE.init();
IStatementExecutor translator = statementExecutorFactory.create(appCtx, Collections.singletonList(statement),
sessionOutput, compilationProvider, componentProvider, printer);
translator.compileAndExecute(getHyracksClientConnection(), requestParams);
}
- @Override
protected void get(IServletRequest request, IServletResponse response) throws Exception {
IClusterManagementWork.ClusterState clusterState = appCtx.getClusterStateManager().getState();
if (clusterState != IClusterManagementWork.ClusterState.ACTIVE) {
@@ -287,7 +281,7 @@
readFromFile(filePath, response);
}
- private IHyracksClientConnection getHyracksClientConnection() throws Exception { // NOSONAR
+ protected IHyracksClientConnection getHyracksClientConnection() throws Exception { // NOSONAR
IHyracksClientConnection hcc = (IHyracksClientConnection) ctx.get(HYRACKS_CONNECTION_ATTR);
if (hcc == null) {
throw new RuntimeDataException(ErrorCode.PROPERTY_NOT_SET, HYRACKS_CONNECTION_ATTR);
@@ -295,13 +289,7 @@
return hcc;
}
- @Override
- protected Map<String, String> getStoredCredentials(IServletRequest request) {
- return request.getHttpRequest().method().equals(HttpMethod.GET) ? sysCredentials
- : super.getStoredCredentials(request);
- }
-
- private Pair<DataverseName, String> parseLibraryName(IServletRequest request) throws IllegalArgumentException {
+ protected Pair<DataverseName, String> parseLibraryName(IServletRequest request) throws IllegalArgumentException {
String[] path = StringUtils.split(localPath(request), '/');
int ln = path.length;
if (ln < 2) {
@@ -312,7 +300,7 @@
return new Pair<>(dataverseName, libraryName);
}
- private static ExternalFunctionLanguage getLanguageByFileExtension(String fileExtension) {
+ protected static ExternalFunctionLanguage getLanguageByFileExtension(String fileExtension) {
switch (fileExtension) {
case LibraryDescriptor.FILE_EXT_ZIP:
return JAVA;
@@ -323,7 +311,7 @@
}
}
- private HttpResponseStatus toHttpErrorStatus(Exception e) {
+ protected HttpResponseStatus toHttpErrorStatus(Exception e) {
if (e instanceof IFormattedException) {
IFormattedException fe = (IFormattedException) e;
if (ErrorCode.ASTERIX.equals(fe.getComponent())) {
@@ -337,10 +325,6 @@
return HttpResponseStatus.INTERNAL_SERVER_ERROR;
}
- private static String generateRandomString(int size) {
- return RandomStringUtils.randomAlphanumeric(size);
- }
-
protected void readFromFile(Path filePath, IServletResponse response) throws Exception {
class InputStreamGetter extends SynchronizableWork {
private InputStream is;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index e80995c..17cf664 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -39,6 +39,7 @@
import org.apache.asterix.api.http.IQueryWebServerRegistrant;
import org.apache.asterix.api.http.server.ActiveStatsApiServlet;
import org.apache.asterix.api.http.server.ApiServlet;
+import org.apache.asterix.api.http.server.BasicAuthServlet;
import org.apache.asterix.api.http.server.CcQueryCancellationServlet;
import org.apache.asterix.api.http.server.ClusterApiServlet;
import org.apache.asterix.api.http.server.ClusterControllerDetailsApiServlet;
@@ -350,9 +351,10 @@
case Servlets.ACTIVE_STATS:
return new ActiveStatsApiServlet(appCtx, ctx, paths);
case Servlets.UDF:
- return new UdfApiServlet(ctx, paths, appCtx, ccExtensionManager.getCompilationProvider(SQLPP),
- getStatementExecutorFactory(), componentProvider, server.getScheme(),
- server.getAddress().getPort());
+ return new BasicAuthServlet(ctx,
+ new UdfApiServlet(ctx, paths, appCtx, ccExtensionManager.getCompilationProvider(SQLPP),
+ getStatementExecutorFactory(), componentProvider, server.getScheme(),
+ server.getAddress().getPort()));
default:
throw new IllegalStateException(key);
}
diff --git a/asterixdb/asterix-app/src/main/resources/entrypoint.py b/asterixdb/asterix-app/src/main/resources/entrypoint.py
index bdb68e2..0917f49 100755
--- a/asterixdb/asterix-app/src/main/resources/entrypoint.py
+++ b/asterixdb/asterix-app/src/main/resources/entrypoint.py
@@ -16,8 +16,12 @@
# under the License.
import sys
-sys.path.insert(0, './site-packages/')
-sys.path.insert(len(sys.path)-1, './ipc/site-packages')
+from os import pathsep
+addr = str(sys.argv[1])
+port = str(sys.argv[2])
+paths = sys.argv[3]
+for p in paths.split(pathsep):
+ sys.path.append(p)
from struct import *
import signal
import msgpack
@@ -239,8 +243,6 @@
self.disconnect_sock()
-addr = str(sys.argv[1])
-port = str(sys.argv[2])
wrap = Wrapper()
wrap.connect_sock(addr, port)
signal.signal(signal.SIGTERM, wrap.disconnect_sock)
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarPythonFunctionEvaluator.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarPythonFunctionEvaluator.java
index c47145b..a5eccce 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarPythonFunctionEvaluator.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarPythonFunctionEvaluator.java
@@ -24,6 +24,8 @@
import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
@@ -31,7 +33,6 @@
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.WarningUtil;
-import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.common.library.ILibraryManager;
import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.external.ipc.ExternalFunctionResultRouter;
@@ -45,6 +46,7 @@
import org.apache.asterix.om.types.TypeTagUtil;
import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.config.IApplicationConfig;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.TaskAttemptId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -71,18 +73,47 @@
private final ByteBuffer outputWrapper;
private final IEvaluatorContext evaluatorContext;
private static final String ENTRYPOINT = "entrypoint.py";
+ private static final String SITE_PACKAGES = "site-packages";
private final IPointable[] argValues;
ExternalScalarPythonFunctionEvaluator(IExternalFunctionInfo finfo, IScalarEvaluatorFactory[] args,
IAType[] argTypes, IEvaluatorContext ctx, SourceLocation sourceLoc) throws HyracksDataException {
super(finfo, args, argTypes, ctx);
+ IApplicationConfig cfg = ctx.getServiceContext().getAppConfig();
+ String pythonPathCmd = cfg.getString(NCConfig.Option.PYTHON_CMD);
+ List<String> pythonArgs = new ArrayList<>();
+ if (pythonPathCmd == null) {
+ //if absolute path to interpreter is not specified, use environmental python
+ pythonPathCmd = "/usr/bin/env";
+ pythonArgs.add("python3");
+ }
+ File pythonPath = new File(pythonPathCmd);
+ List<String> sitePkgs = new ArrayList<>();
+ sitePkgs.add(SITE_PACKAGES);
+ String addlSitePackagesRaw =
+ ctx.getServiceContext().getAppConfig().getString((NCConfig.Option.PYTHON_ADDITIONAL_PACKAGES));
+ if (addlSitePackagesRaw != null) {
+ sitePkgs.addAll(Arrays.asList(addlSitePackagesRaw.split(File.pathSeparator)));
+ }
+ if (cfg.getBoolean(NCConfig.Option.PYTHON_USE_BUNDLED_MSGPACK)) {
+ sitePkgs.add("ipc" + File.separator + SITE_PACKAGES + File.separator);
+ }
+ String[] pythonArgsRaw = ctx.getServiceContext().getAppConfig().getStringArray(NCConfig.Option.PYTHON_ARGS);
+ if (pythonArgsRaw != null) {
+ pythonArgs.addAll(Arrays.asList(pythonArgsRaw));
+ }
+ StringBuilder sitePackagesPathBuilder = new StringBuilder();
+ for (int i = 0; i < sitePkgs.size() - 1; i++) {
+ sitePackagesPathBuilder.append(sitePkgs.get(i));
+ sitePackagesPathBuilder.append(File.pathSeparator);
+ }
+ sitePackagesPathBuilder.append(sitePkgs.get(sitePkgs.size() - 1));
- File pythonPath = new File(ctx.getServiceContext().getAppConfig().getString(NCConfig.Option.PYTHON_HOME));
- DataverseName dataverseName = FunctionSignature.getDataverseName(finfo.getFunctionIdentifier());
try {
libraryEvaluator = PythonLibraryEvaluator.getInstance(finfo, libraryManager, router, ipcSys, pythonPath,
- ctx.getTaskContext(), ctx.getWarningCollector(), sourceLoc);
+ ctx.getTaskContext(), sitePackagesPathBuilder.toString(), pythonArgs, ctx.getWarningCollector(),
+ sourceLoc);
} catch (IOException | AsterixException e) {
throw new HyracksDataException("Failed to initialize Python", e);
}
@@ -128,17 +159,22 @@
String module;
String clazz;
String fn;
+ String sitePkgs;
+ List<String> pythonArgs;
TaskAttemptId task;
IWarningCollector warningCollector;
SourceLocation sourceLoc;
private PythonLibraryEvaluator(JobId jobId, PythonLibraryEvaluatorId evaluatorId, IExternalFunctionInfo finfo,
- ILibraryManager libMgr, File pythonHome, ExternalFunctionResultRouter router, IPCSystem ipcSys,
- TaskAttemptId task, IWarningCollector warningCollector, SourceLocation sourceLoc) {
+ ILibraryManager libMgr, File pythonHome, String sitePkgs, List<String> pythonArgs,
+ ExternalFunctionResultRouter router, IPCSystem ipcSys, TaskAttemptId task,
+ IWarningCollector warningCollector, SourceLocation sourceLoc) {
super(jobId, evaluatorId);
this.finfo = finfo;
this.libMgr = libMgr;
this.pythonHome = pythonHome;
+ this.sitePkgs = sitePkgs;
+ this.pythonArgs = pythonArgs;
this.router = router;
this.task = task;
this.ipcSys = ipcSys;
@@ -168,8 +204,14 @@
this.clazz = clazz;
this.module = packageModule;
int port = ipcSys.getSocketAddress().getPort();
- ProcessBuilder pb = new ProcessBuilder(pythonHome.getAbsolutePath(), ENTRYPOINT,
- InetAddress.getLoopbackAddress().getHostAddress(), Integer.toString(port));
+ List<String> args = new ArrayList<>();
+ args.add(pythonHome.getAbsolutePath());
+ args.addAll(pythonArgs);
+ args.add(ENTRYPOINT);
+ args.add(InetAddress.getLoopbackAddress().getHostAddress());
+ args.add(Integer.toString(port));
+ args.add(sitePkgs);
+ ProcessBuilder pb = new ProcessBuilder(args.toArray(new String[0]));
pb.directory(new File(wd));
p = pb.start();
proto = new PythonIPCProto(p.getOutputStream(), router, ipcSys);
@@ -207,13 +249,15 @@
private static PythonLibraryEvaluator getInstance(IExternalFunctionInfo finfo, ILibraryManager libMgr,
ExternalFunctionResultRouter router, IPCSystem ipcSys, File pythonHome, IHyracksTaskContext ctx,
- IWarningCollector warningCollector, SourceLocation sourceLoc) throws IOException, AsterixException {
+ String sitePkgs, List<String> pythonArgs, IWarningCollector warningCollector, SourceLocation sourceLoc)
+ throws IOException, AsterixException {
PythonLibraryEvaluatorId evaluatorId =
new PythonLibraryEvaluatorId(finfo.getLibraryDataverseName(), finfo.getLibraryName());
PythonLibraryEvaluator evaluator = (PythonLibraryEvaluator) ctx.getStateObject(evaluatorId);
if (evaluator == null) {
evaluator = new PythonLibraryEvaluator(ctx.getJobletContext().getJobId(), evaluatorId, finfo, libMgr,
- pythonHome, router, ipcSys, ctx.getTaskAttemptId(), warningCollector, sourceLoc);
+ pythonHome, sitePkgs, pythonArgs, router, ipcSys, ctx.getTaskAttemptId(), warningCollector,
+ sourceLoc);
ctx.registerDeallocatable(evaluator);
evaluator.initialize();
ctx.setStateObject(evaluator);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/LibraryDeployPrepareOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/LibraryDeployPrepareOperatorDescriptor.java
index 8929a60..ddf3d66 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/LibraryDeployPrepareOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/LibraryDeployPrepareOperatorDescriptor.java
@@ -20,6 +20,7 @@
package org.apache.asterix.external.operators;
import static org.apache.asterix.external.library.ExternalLibraryManager.DESCRIPTOR_FILE_NAME;
+import static org.apache.hyracks.control.common.controllers.NCConfig.Option.PYTHON_USE_BUNDLED_MSGPACK;
import java.io.ByteArrayInputStream;
import java.io.File;
@@ -161,7 +162,9 @@
// shouldn't happen
throw new IOException("Unexpected file type: " + fileExt);
}
- shiv(targetFile, stageDir, contentsDir);
+ boolean extractMsgPack = ctx.getJobletContext().getServiceContext().getAppConfig()
+ .getBoolean(PYTHON_USE_BUNDLED_MSGPACK);
+ shiv(targetFile, stageDir, contentsDir, extractMsgPack);
break;
default:
// shouldn't happen
@@ -285,20 +288,22 @@
}
}
- private void shiv(FileReference sourceFile, FileReference stageDir, FileReference contentsDir)
- throws IOException {
+ private void shiv(FileReference sourceFile, FileReference stageDir, FileReference contentsDir,
+ boolean writeMsgpack) throws IOException {
FileReference msgpack = stageDir.getChild("msgpack.pyz");
- writeShim(msgpack);
+ if (writeMsgpack) {
+ writeShim(msgpack, writeMsgpack);
+ File msgPackFolder = new File(contentsDir.getRelativePath(), "ipc");
+ FileReference msgPackFolderRef =
+ new FileReference(contentsDir.getDeviceHandle(), msgPackFolder.getPath());
+ unzip(msgpack, msgPackFolderRef);
+ Files.delete(msgpack.getFile().toPath());
+ }
unzip(sourceFile, contentsDir);
- File msgPackFolder = new File(contentsDir.getRelativePath(), "ipc");
- FileReference msgPackFolderRef =
- new FileReference(contentsDir.getDeviceHandle(), msgPackFolder.getPath());
- unzip(msgpack, msgPackFolderRef);
- writeShim(contentsDir.getChild("entrypoint.py"));
- Files.delete(msgpack.getFile().toPath());
+ writeShim(contentsDir.getChild("entrypoint.py"), false);
}
- private void writeShim(FileReference outputFile) throws IOException {
+ private boolean writeShim(FileReference outputFile, boolean optional) throws IOException {
InputStream is = getClass().getClassLoader().getResourceAsStream(outputFile.getFile().getName());
if (is == null) {
throw new IOException("Classpath does not contain necessary Python resources!");
@@ -308,6 +313,7 @@
} finally {
is.close();
}
+ return true;
}
private void writeDescriptor(FileReference descFile, LibraryDescriptor desc) throws IOException {
@@ -338,4 +344,4 @@
}
};
}
-}
\ No newline at end of file
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
index 510db51..ce299b0 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
@@ -18,6 +18,7 @@
*/
package org.apache.hyracks.control.common.controllers;
+import static org.apache.hyracks.control.common.config.OptionTypes.BOOLEAN;
import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER;
import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER_BYTE_UNIT;
import static org.apache.hyracks.control.common.config.OptionTypes.LONG;
@@ -91,7 +92,10 @@
KEY_STORE_PASSWORD(STRING, (String) null),
IO_WORKERS_PER_PARTITION(POSITIVE_INTEGER, 2),
IO_QUEUE_SIZE(POSITIVE_INTEGER, 10),
- PYTHON_HOME(STRING, "/usr/bin/python3");
+ PYTHON_CMD(STRING, (String) null),
+ PYTHON_ADDITIONAL_PACKAGES(STRING, (String) null),
+ PYTHON_USE_BUNDLED_MSGPACK(BOOLEAN, true),
+ PYTHON_ARGS(STRING_ARRAY, (String[]) null);
private final IOptionType parser;
private final String defaultValueDescription;
@@ -224,8 +228,14 @@
return "Number of threads per partition used to write and read from storage";
case IO_QUEUE_SIZE:
return "Length of the queue used for requests to write and read";
- case PYTHON_HOME:
- return "Path to python interpreter";
+ case PYTHON_CMD:
+ return "Absolute path to python interpreter. Defaults to environmental Python3";
+ case PYTHON_ADDITIONAL_PACKAGES:
+ return "List of additional paths, separated by a path separator character, to add to sys.path behind msgpack and library package paths";
+ case PYTHON_USE_BUNDLED_MSGPACK:
+ return "True to include bundled msgpack on Python sys.path, false to use system-provided msgpack";
+ case PYTHON_ARGS:
+ return "Python args to pass to Python interpreter";
default:
throw new IllegalStateException("Not yet implemented: " + this);
}