[ASTERIXDB-3034][RT] Fenced UDFs
- user model changes: yes
- storage format changes: no
- interface changes: yes
Details:
- Allow UDFs to be run via domain socket activated
systemd services . This makes it so the UDF is run
as a different user than the NC process itself
Change-Id: Ibeb6228f2dc8edbf642e61cd5633c71913e18972
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/16364
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml
index 51ede69..8262ce6 100644
--- a/asterixdb/asterix-app/pom.xml
+++ b/asterixdb/asterix-app/pom.xml
@@ -388,7 +388,7 @@
<profile>
<id>asterix-gerrit-asterix-app</id>
<properties>
- <test.excludes>**/SqlppExecutionWithCancellationTest.java,**/DmlTest.java,**/RepeatedTest.java,**/SqlppExecutionTest.java,**/AqlExecutionTest.java,**/*Compression*Test.java,**/*Ssl*Test.java</test.excludes>
+ <test.excludes>**/SqlppExecutionWithCancellationTest.java,**/DmlTest.java,**/RepeatedTest.java,**/SqlppExecutionTest.java,**/AqlExecutionTest.java,**/*Compression*Test.java,**/*Ssl*Test.java,**/Podman*.java</test.excludes>
<itest.excludes>**/*.java</itest.excludes>
</properties>
<build>
diff --git a/asterixdb/asterix-app/src/main/resources/entrypoint.py b/asterixdb/asterix-app/src/main/resources/entrypoint.py
index 7bad7ef..918596c 100755
--- a/asterixdb/asterix-app/src/main/resources/entrypoint.py
+++ b/asterixdb/asterix-app/src/main/resources/entrypoint.py
@@ -168,6 +168,7 @@
def quit(self):
self.alive = False
+ self.disconnect_sock()
return True
def handle_call(self):
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index 86d8ed4..e8c2c1d 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -43,6 +43,7 @@
import org.apache.asterix.common.library.ILibraryManager;
import org.apache.asterix.hyracks.bootstrap.CCApplication;
import org.apache.asterix.hyracks.bootstrap.NCApplication;
+import org.apache.asterix.lang.common.util.ExpressionUtils;
import org.apache.asterix.test.dataflow.TestLsmIoOpCallbackFactory;
import org.apache.asterix.test.dataflow.TestPrimaryIndexOperationTrackerFactory;
import org.apache.commons.io.FileUtils;
@@ -132,13 +133,13 @@
cc = new ClusterControllerService(ccConfig, ccApplication);
nodeNames = ccConfig.getConfigManager().getNodeNames();
- if (deleteOldInstanceData) {
+ if (deleteOldInstanceData && nodeNames != null) {
deleteTransactionLogs();
removeTestStorageFiles();
deleteCCFiles();
}
final List<NodeControllerService> nodeControllers = new ArrayList<>();
- for (String nodeId : nodeNames) {
+ for (String nodeId : ExpressionUtils.emptyIfNull(nodeNames)) {
// mark this NC as virtual, so that the CC doesn't try to start via NCService...
configManager.set(nodeId, NCConfig.Option.NCSERVICE_PORT, NCConfig.NCSERVICE_PORT_DISABLED);
final INCApplication ncApplication = createNCApplication();
@@ -303,7 +304,7 @@
stopCC(false);
- if (deleteOldInstanceData) {
+ if (deleteOldInstanceData && nodeNames != null) {
deleteTransactionLogs();
removeTestStorageFiles();
deleteCCFiles();
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
index 408882d..d704d8e 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
@@ -33,6 +33,7 @@
import java.util.List;
import org.apache.asterix.app.external.ExternalUDFLibrarian;
+import org.apache.asterix.app.external.IExternalUDFLibrarian;
import org.apache.asterix.common.utils.StorageConstants;
import org.apache.asterix.test.common.TestExecutor;
import org.apache.asterix.testframework.context.TestCaseContext;
@@ -65,11 +66,17 @@
}
public static void setUp(String configFile, TestExecutor executor, boolean startHdfs) throws Exception {
+ setUp(configFile, executor, startHdfs, false, new ExternalUDFLibrarian());
+ }
+
+ public static void setUp(String configFile, TestExecutor executor, boolean startHdfs, boolean disableLangExec,
+ IExternalUDFLibrarian librarian) throws Exception {
testExecutor = executor;
File outdir = new File(PATH_ACTUAL);
outdir.mkdirs();
- ExecutionTestUtil.setUp(cleanupOnStart, configFile, integrationUtil, startHdfs, null);
- librarian = new ExternalUDFLibrarian();
+ if (!disableLangExec) {
+ ExecutionTestUtil.setUp(cleanupOnStart, configFile, integrationUtil, startHdfs, null);
+ }
testExecutor.setLibrarian(librarian);
if (repeat != 1) {
System.out.println("FYI: each test will be run " + repeat + " times.");
@@ -151,7 +158,9 @@
NodeControllerService[] ncs = integrationUtil.ncs;
// Checks that dataset files are uniformly distributed across each io device.
for (NodeControllerService nc : ncs) {
- checkNcStore(nc);
+ if (nc != null) {
+ checkNcStore(nc);
+ }
}
}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_it_python.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_it_python.xml
index 686ede2..284c2fd 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_it_python.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_it_python.xml
@@ -52,15 +52,7 @@
<test-case FilePath="external-library" check-warnings="true">
<compilation-unit name="py_function_error">
<output-dir compare="Clean-JSON">py_function_error</output-dir>
- <expected-warn>ASX0201: External UDF returned exception. Returned exception was: Traceback (most recent call last):
- File "entrypoint.py", line 181, in handle_call
- result[0].append(self.next_tuple(*arg, key=self.mid))
- File "entrypoint.py", line 99, in next_tuple
- return self.wrapped_fns[key](*args)
- File "site-packages/roundtrip.py", line 32, in warning
- raise ArithmeticError("oof")
-ArithmeticError: oof
- (in line 28, at column 1)</expected-warn>
+ <expected-warn>ArithmeticError: oof</expected-warn>
</compilation-unit>
</test-case>
<test-case FilePath="external-library">
@@ -76,8 +68,8 @@
<test-case FilePath="external-library" check-warnings="true">
<compilation-unit name="crash">
<output-dir compare="Text">crash</output-dir>
- <expected-warn>ASX0201: External UDF returned exception. Returned exception was: Function externallibtest:crash#0 failed to execute (in line 23, at column 1)</expected-warn>
- <expected-warn>ASX0201: External UDF returned exception. Returned exception was: java.io.IOException: Python process exited with code: 1 (in line 23, at column 1)</expected-warn>
+ <expected-warn>ASX0201: External UDF returned exception.</expected-warn>
+ <expected-warn>ASX0201: External UDF returned exception.</expected-warn>
</compilation-unit>
</test-case>
<test-case FilePath="external-library">
diff --git a/asterixdb/asterix-docker/pom.xml b/asterixdb/asterix-docker/pom.xml
deleted file mode 100644
index 6c54337..0000000
--- a/asterixdb/asterix-docker/pom.xml
+++ /dev/null
@@ -1,68 +0,0 @@
-<!--
- ! Licensed to the Apache Software Foundation (ASF) under one
- ! or more contributor license agreements. See the NOTICE file
- ! distributed with this work for additional information
- ! regarding copyright ownership. The ASF licenses this file
- ! to you under the Apache License, Version 2.0 (the
- ! "License"); you may not use this file except in compliance
- ! with the License. You may obtain a copy of the License at
- !
- ! http://www.apache.org/licenses/LICENSE-2.0
- !
- ! Unless required by applicable law or agreed to in writing,
- ! software distributed under the License is distributed on an
- ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ! KIND, either express or implied. See the License for the
- ! specific language governing permissions and limitations
- ! under the License.
- !-->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <artifactId>apache-asterixdb</artifactId>
- <groupId>org.apache.asterix</groupId>
- <version>0.9.8-SNAPSHOT</version>
- </parent>
- <artifactId>asterix-docker</artifactId>
-
- <properties>
- <root.dir>${basedir}/..</root.dir>
- </properties>
-
- <licenses>
- <license>
- <name>Apache License, Version 2.0</name>
- <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
- <distribution>repo</distribution>
- <comments>A business-friendly OSS license</comments>
- </license>
- </licenses>
-
- <profiles>
- <profile>
- <id>docker</id>
- <build>
- <plugins>
- <plugin>
- <groupId>com.spotify</groupId>
- <artifactId>docker-maven-plugin</artifactId>
- <version>0.2.11</version>
- <configuration>
- <imageName>asterixdb/demo</imageName>
- <dockerDirectory>docker</dockerDirectory>
- <resources>
- <resource>
- <targetPath>/</targetPath>
- <directory>../asterix-server/target/</directory>
- <include>asterix-server-${project.version}-binary-assembly.zip</include>
- </resource>
- </resources>
- </configuration>
- </plugin>
- </plugins>
- </build>
- </profile>
- </profiles>
-
-</project>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalLangIPCProto.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalLangIPCProto.java
new file mode 100644
index 0000000..35e5961
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalLangIPCProto.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.api;
+
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.ipc.MessageType;
+import org.apache.asterix.external.library.msgpack.MsgPackPointableVisitor;
+import org.apache.asterix.om.pointables.AFlatValuePointable;
+import org.apache.asterix.om.pointables.AListVisitablePointable;
+import org.apache.asterix.om.pointables.ARecordVisitablePointable;
+import org.apache.asterix.om.pointables.PointableAllocator;
+import org.apache.asterix.om.pointables.base.IVisitablePointable;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.EnumDeserializer;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.types.TypeTagUtil;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+
+public interface IExternalLangIPCProto {
+ static void visitValueRef(IAType type, DataOutput out, IValueReference valueReference,
+ PointableAllocator pointableAllocator, MsgPackPointableVisitor pointableVisitor, boolean visitNull)
+ throws IOException {
+ IVisitablePointable pointable;
+ switch (type.getTypeTag()) {
+ case OBJECT:
+ pointable = pointableAllocator.allocateRecordValue(type);
+ pointable.set(valueReference);
+ pointableVisitor.visit((ARecordVisitablePointable) pointable, pointableVisitor.getTypeInfo(type, out));
+ break;
+ case ARRAY:
+ case MULTISET:
+ pointable = pointableAllocator.allocateListValue(type);
+ pointable.set(valueReference);
+ pointableVisitor.visit((AListVisitablePointable) pointable, pointableVisitor.getTypeInfo(type, out));
+ break;
+ case ANY:
+ ATypeTag rtTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER
+ .deserialize(valueReference.getByteArray()[valueReference.getStartOffset()]);
+ IAType rtType = TypeTagUtil.getBuiltinTypeByTag(rtTypeTag);
+ visitValueRef(rtType, out, valueReference, pointableAllocator, pointableVisitor, visitNull);
+ break;
+ case MISSING:
+ case NULL:
+ if (!visitNull) {
+ return;
+ }
+ default:
+ pointable = pointableAllocator.allocateFieldValue(type);
+ pointable.set(valueReference);
+ pointableVisitor.visit((AFlatValuePointable) pointable, pointableVisitor.getTypeInfo(type, out));
+ break;
+ }
+ }
+
+ void start();
+
+ void helo() throws IOException, AsterixException;
+
+ long init(String module, String clazz, String fn) throws IOException, AsterixException;
+
+ ByteBuffer call(long functionId, IAType[] argTypes, IValueReference[] argValues, boolean nullCall)
+ throws IOException, AsterixException;
+
+ ByteBuffer callMulti(long key, ArrayBackedValueStorage args, int numTuples) throws IOException, AsterixException;
+
+ //For future use with interpreter reuse between jobs.
+ void quit() throws HyracksDataException;
+
+ void receiveMsg() throws IOException, AsterixException;
+
+ void sendHeader(long key, int msgLen) throws IOException;
+
+ void sendMsg(ArrayBackedValueStorage content) throws IOException;
+
+ void sendMsg() throws IOException;
+
+ MessageType getResponseType();
+
+ long getRouteId();
+
+ DataOutputStream getSockOut();
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/ILibraryEvaluator.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/ILibraryEvaluator.java
new file mode 100644
index 0000000..8c6538b
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/ILibraryEvaluator.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.api;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.om.functions.IExternalFunctionInfo;
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.api.resources.IDeallocatable;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+
+public interface ILibraryEvaluator extends IDeallocatable {
+
+ void start() throws IOException, AsterixException;
+
+ long initialize(IExternalFunctionInfo finfo) throws IOException, AsterixException;
+
+ ByteBuffer call(long id, IAType[] argTypes, IValueReference[] valueReferences, boolean nullCall) throws IOException;
+
+ ByteBuffer callMulti(long id, ArrayBackedValueStorage arguments, int numTuples) throws IOException;
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/AbstractPythonIPCProto.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/AbstractPythonIPCProto.java
new file mode 100644
index 0000000..00d1dcc
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/AbstractPythonIPCProto.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.ipc;
+
+import static org.apache.hyracks.ipc.impl.Message.HEADER_SIZE;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.external.api.IExternalLangIPCProto;
+import org.apache.asterix.external.library.msgpack.MsgPackPointableVisitor;
+import org.apache.asterix.om.pointables.PointableAllocator;
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.ipc.impl.Message;
+import org.msgpack.core.MessagePack;
+import org.msgpack.core.MessageUnpacker;
+import org.msgpack.core.buffer.ArrayBufferInput;
+
+public abstract class AbstractPythonIPCProto {
+ public static final int HEADER_SIZE_LEN_INCLUSIVE = 21;
+ protected final PythonMessageBuilder messageBuilder;
+ protected final DataOutputStream sockOut;
+ protected final ArrayBufferInput unpackerInput;
+ protected final MessageUnpacker unpacker;
+ protected final ArrayBackedValueStorage argsStorage;
+ protected final PointableAllocator pointableAllocator;
+ protected final MsgPackPointableVisitor pointableVisitor;
+ private final ByteBuffer headerBuffer = ByteBuffer.allocate(HEADER_SIZE_LEN_INCLUSIVE);
+ protected ByteBuffer recvBuffer = ByteBuffer.allocate(32768);
+ protected long routeId;
+ protected Pair<ByteBuffer, Exception> bufferBox;
+ protected long maxFunctionId;
+
+ public AbstractPythonIPCProto(OutputStream sockOut) {
+ messageBuilder = new PythonMessageBuilder();
+ this.sockOut = new DataOutputStream(sockOut);
+ this.maxFunctionId = 0L;
+ unpackerInput = new ArrayBufferInput(new byte[0]);
+ unpacker = MessagePack.newDefaultUnpacker(unpackerInput);
+ this.argsStorage = new ArrayBackedValueStorage();
+ this.pointableAllocator = new PointableAllocator();
+ this.pointableVisitor = new MsgPackPointableVisitor();
+ }
+
+ public void helo() throws IOException, AsterixException {
+ recvBuffer.clear();
+ recvBuffer.position(0);
+ recvBuffer.limit(0);
+ messageBuilder.reset();
+ messageBuilder.hello();
+ sendHeader(routeId, messageBuilder.getLength());
+ sendMsg();
+ receiveMsg();
+ if (getResponseType() != MessageType.HELO) {
+ throw HyracksDataException.create(org.apache.hyracks.api.exceptions.ErrorCode.ILLEGAL_STATE,
+ "Expected HELO, recieved " + getResponseType().name());
+ }
+ }
+
+ public long init(String module, String clazz, String fn) throws IOException, AsterixException {
+ long functionId = maxFunctionId++;
+ recvBuffer.clear();
+ recvBuffer.position(0);
+ recvBuffer.limit(0);
+ messageBuilder.reset();
+ messageBuilder.init(module, clazz, fn);
+ sendHeader(functionId, messageBuilder.getLength());
+ sendMsg();
+ receiveMsg();
+ if (getResponseType() != MessageType.INIT_RSP) {
+ throw HyracksDataException.create(org.apache.hyracks.api.exceptions.ErrorCode.ILLEGAL_STATE,
+ "Expected INIT_RSP, recieved " + getResponseType().name());
+ }
+ return functionId;
+ }
+
+ public ByteBuffer call(long functionId, IAType[] argTypes, IValueReference[] argValues, boolean nullCall)
+ throws IOException, AsterixException {
+ recvBuffer.clear();
+ recvBuffer.position(0);
+ recvBuffer.limit(0);
+ messageBuilder.reset();
+ argsStorage.reset();
+ for (int i = 0; i < argTypes.length; i++) {
+ IExternalLangIPCProto.visitValueRef(argTypes[i], argsStorage.getDataOutput(), argValues[i],
+ pointableAllocator, pointableVisitor, nullCall);
+ }
+ int len = argsStorage.getLength() + 5;
+ sendHeader(functionId, len);
+ messageBuilder.call(argValues.length, len);
+ sendMsg(argsStorage);
+ receiveMsg();
+ if (getResponseType() != MessageType.CALL_RSP) {
+ throw HyracksDataException.create(org.apache.hyracks.api.exceptions.ErrorCode.ILLEGAL_STATE,
+ "Expected CALL_RSP, recieved " + getResponseType().name());
+ }
+ return recvBuffer;
+ }
+
+ public ByteBuffer callMulti(long key, ArrayBackedValueStorage args, int numTuples)
+ throws IOException, AsterixException {
+ recvBuffer.clear();
+ recvBuffer.position(0);
+ recvBuffer.limit(0);
+ messageBuilder.reset();
+ int len = args.getLength() + 4;
+ sendHeader(key, len);
+ messageBuilder.callMulti(0, numTuples);
+ sendMsg(args);
+ receiveMsg();
+ if (getResponseType() != MessageType.CALL_RSP) {
+ throw HyracksDataException.create(org.apache.hyracks.api.exceptions.ErrorCode.ILLEGAL_STATE,
+ "Expected CALL_RSP, recieved " + getResponseType().name());
+ }
+ return recvBuffer;
+ }
+
+ public void quit() throws HyracksDataException {
+ messageBuilder.quit();
+ }
+
+ public abstract void receiveMsg() throws IOException, AsterixException;
+
+ public void sendHeader(long key, int msgLen) throws IOException {
+ headerBuffer.clear();
+ headerBuffer.position(0);
+ headerBuffer.putInt(HEADER_SIZE + Integer.BYTES + msgLen);
+ headerBuffer.putLong(key);
+ headerBuffer.putLong(routeId);
+ headerBuffer.put(Message.NORMAL);
+ sockOut.write(headerBuffer.array(), 0, HEADER_SIZE + Integer.BYTES);
+ sockOut.flush();
+ }
+
+ public void sendMsg(ArrayBackedValueStorage content) throws IOException {
+ sockOut.write(messageBuilder.getBuf().array(), messageBuilder.getBuf().arrayOffset(),
+ messageBuilder.getBuf().position());
+ sockOut.write(content.getByteArray(), content.getStartOffset(), content.getLength());
+ sockOut.flush();
+ }
+
+ public void sendMsg() throws IOException {
+ sockOut.write(messageBuilder.getBuf().array(), messageBuilder.getBuf().arrayOffset(),
+ messageBuilder.getBuf().position());
+ sockOut.flush();
+ }
+
+ public MessageType getResponseType() {
+ return messageBuilder.type;
+ }
+
+ public long getRouteId() {
+ return routeId;
+ }
+
+ public DataOutputStream getSockOut() {
+ return sockOut;
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonDomainSocketProto.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonDomainSocketProto.java
new file mode 100644
index 0000000..89f240a
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonDomainSocketProto.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.ipc;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.external.api.IExternalLangIPCProto;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.msgpack.core.MessagePack;
+
+public class PythonDomainSocketProto extends AbstractPythonIPCProto implements IExternalLangIPCProto {
+ private final String wd;
+ SocketChannel chan;
+ private ByteBuffer headerBuffer;
+ private ProcessHandle pid;
+ public static final int HYR_HEADER_SIZE = 21; // 4 (sz) + 8 (mid) + 8 (rmid) + 1 (flags)
+ public static final int HYR_HEADER_SIZE_NOSZ = 17; // 8 + 8 + 1
+
+ public PythonDomainSocketProto(OutputStream sockOut, SocketChannel chan, String wd) {
+ super(sockOut);
+ this.chan = chan;
+ this.wd = wd;
+ headerBuffer = ByteBuffer.allocate(HYR_HEADER_SIZE);
+ }
+
+ @Override
+ public void start() {
+ }
+
+ @Override
+ public void helo() throws IOException, AsterixException {
+ recvBuffer.clear();
+ recvBuffer.position(0);
+ recvBuffer.limit(0);
+ messageBuilder.reset();
+ messageBuilder.helloDS(wd);
+ sendHeader(routeId, messageBuilder.getLength());
+ sendMsg(true);
+ receiveMsg(true);
+ byte pidType = recvBuffer.get();
+ if (pidType != MessagePack.Code.UINT32 && pidType != MessagePack.Code.UINT16) {
+ throw AsterixException.create(ErrorCode.EXTERNAL_UDF_EXCEPTION,
+ "Returned pid type is incorrect: " + pidType);
+ }
+ switch (pidType) {
+ case MessagePack.Code.UINT32:
+ pid = ProcessHandle.of(recvBuffer.getInt()).get();
+ break;
+ case MessagePack.Code.UINT16:
+ pid = ProcessHandle.of(recvBuffer.getShort()).get();
+ break;
+ case MessagePack.Code.UINT8:
+ pid = ProcessHandle.of(recvBuffer.get()).get();
+ break;
+ default:
+ throw AsterixException.create(ErrorCode.EXTERNAL_UDF_EXCEPTION,
+ "Returned pid type is incorrect: " + pidType);
+ }
+ if (getResponseType() != MessageType.HELO) {
+ throw HyracksDataException.create(org.apache.hyracks.api.exceptions.ErrorCode.ILLEGAL_STATE,
+ "Expected HELO, recieved " + getResponseType().name());
+ }
+ }
+
+ @Override
+ public void sendMsg() throws IOException {
+ sendMsg(false);
+ }
+
+ @Override
+ public void sendMsg(ArrayBackedValueStorage args) throws IOException {
+ sendMsg(false, args);
+ }
+
+ public void sendMsg(boolean sendIfDead) throws IOException {
+ if (!sendIfDead && (pid == null || !pid.isAlive())) {
+ return;
+ }
+ super.sendMsg();
+ }
+
+ public void sendMsg(boolean sendIfDead, ArrayBackedValueStorage args) throws IOException {
+ if (!sendIfDead && (pid == null || !pid.isAlive())) {
+ return;
+ }
+ super.sendMsg(args);
+ }
+
+ @Override
+ public void receiveMsg() throws IOException, AsterixException {
+ receiveMsg(false);
+ }
+
+ public void receiveMsg(boolean sendIfDead) throws IOException, AsterixException {
+ if (!sendIfDead && (pid == null || !pid.isAlive())) {
+ throw new AsterixException("Python process exited unexpectedly");
+ }
+ readFully(headerBuffer.capacity(), headerBuffer);
+ if (headerBuffer.remaining() < Integer.BYTES) {
+ recvBuffer.limit(0);
+ throw new AsterixException("Python process exited unexpectedly");
+ }
+ int msgSz = headerBuffer.getInt() - HYR_HEADER_SIZE_NOSZ;
+ if (recvBuffer.capacity() < msgSz) {
+ recvBuffer = ByteBuffer.allocate(((msgSz / 32768) + 1) * 32768);
+ }
+ readFully(msgSz, recvBuffer);
+ messageBuilder.readHead(recvBuffer);
+ if (messageBuilder.type == MessageType.ERROR) {
+ unpackerInput.reset(recvBuffer.array(), recvBuffer.position() + recvBuffer.arrayOffset(),
+ recvBuffer.remaining());
+ unpacker.reset(unpackerInput);
+ throw new AsterixException(unpacker.unpackString().replace('\0', ' '));
+ }
+ }
+
+ private void readFully(int msgSz, ByteBuffer buf) throws IOException, AsterixException {
+ buf.limit(msgSz);
+ buf.clear();
+ int read;
+ int size = msgSz;
+ while (size > 0) {
+ read = chan.read(buf);
+ if (read < 0) {
+ throw new AsterixException("Socket closed");
+ }
+ size -= read;
+ }
+ buf.flip();
+ }
+
+ @Override
+ public void quit() throws HyracksDataException {
+ messageBuilder.quit();
+ }
+
+ public ProcessHandle getPid() {
+ return pid;
+ }
+
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonIPCProto.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonIPCProto.java
deleted file mode 100644
index c803517..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonIPCProto.java
+++ /dev/null
@@ -1,290 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.ipc;
-
-import static org.apache.hyracks.ipc.impl.Message.HEADER_SIZE;
-
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.external.library.msgpack.MsgPackPointableVisitor;
-import org.apache.asterix.om.pointables.AFlatValuePointable;
-import org.apache.asterix.om.pointables.AListVisitablePointable;
-import org.apache.asterix.om.pointables.ARecordVisitablePointable;
-import org.apache.asterix.om.pointables.PointableAllocator;
-import org.apache.asterix.om.pointables.base.IVisitablePointable;
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.EnumDeserializer;
-import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.types.TypeTagUtil;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.data.std.api.IValueReference;
-import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
-import org.apache.hyracks.ipc.impl.Message;
-import org.msgpack.core.MessagePack;
-import org.msgpack.core.MessageUnpacker;
-import org.msgpack.core.buffer.ArrayBufferInput;
-
-public class PythonIPCProto {
-
- private final PythonMessageBuilder messageBuilder;
- private final DataOutputStream sockOut;
- private final ByteBuffer headerBuffer = ByteBuffer.allocate(21);
- private ByteBuffer recvBuffer = ByteBuffer.allocate(32768);
- private final ExternalFunctionResultRouter router;
- private long routeId;
- private Pair<ByteBuffer, Exception> bufferBox;
- private final Process pythonProc;
- private long maxFunctionId;
- private final ArrayBufferInput unpackerInput;
- private final MessageUnpacker unpacker;
- private final ArrayBackedValueStorage argsStorage;
- private final PointableAllocator pointableAllocator;
- private final MsgPackPointableVisitor pointableVisitor;
-
- public PythonIPCProto(OutputStream sockOut, ExternalFunctionResultRouter router, Process pythonProc) {
- this.sockOut = new DataOutputStream(sockOut);
- messageBuilder = new PythonMessageBuilder();
- this.router = router;
- this.pythonProc = pythonProc;
- this.maxFunctionId = 0L;
- unpackerInput = new ArrayBufferInput(new byte[0]);
- unpacker = MessagePack.newDefaultUnpacker(unpackerInput);
- this.argsStorage = new ArrayBackedValueStorage();
- this.pointableAllocator = new PointableAllocator();
- this.pointableVisitor = new MsgPackPointableVisitor();
- }
-
- public void start() {
- Pair<Long, Pair<ByteBuffer, Exception>> keyAndBufferBox = router.insertRoute(recvBuffer);
- this.routeId = keyAndBufferBox.getFirst();
- this.bufferBox = keyAndBufferBox.getSecond();
- }
-
- public void helo() throws IOException, AsterixException {
- recvBuffer.clear();
- recvBuffer.position(0);
- recvBuffer.limit(0);
- messageBuilder.reset();
- messageBuilder.hello();
- sendHeader(routeId, messageBuilder.getLength());
- sendMsg();
- receiveMsg();
- if (getResponseType() != MessageType.HELO) {
- throw HyracksDataException.create(org.apache.hyracks.api.exceptions.ErrorCode.ILLEGAL_STATE,
- "Expected HELO, recieved " + getResponseType().name());
- }
- }
-
- public long init(String module, String clazz, String fn) throws IOException, AsterixException {
- long functionId = maxFunctionId++;
- recvBuffer.clear();
- recvBuffer.position(0);
- recvBuffer.limit(0);
- messageBuilder.reset();
- messageBuilder.init(module, clazz, fn);
- sendHeader(functionId, messageBuilder.getLength());
- sendMsg();
- receiveMsg();
- if (getResponseType() != MessageType.INIT_RSP) {
- throw HyracksDataException.create(org.apache.hyracks.api.exceptions.ErrorCode.ILLEGAL_STATE,
- "Expected INIT_RSP, recieved " + getResponseType().name());
- }
- return functionId;
- }
-
- public ByteBuffer call(long functionId, IAType[] argTypes, IValueReference[] argValues, boolean nullCall)
- throws IOException, AsterixException {
- recvBuffer.clear();
- recvBuffer.position(0);
- recvBuffer.limit(0);
- messageBuilder.reset();
- argsStorage.reset();
- for (int i = 0; i < argTypes.length; i++) {
- visitValueRef(argTypes[i], argsStorage.getDataOutput(), argValues[i], pointableAllocator, pointableVisitor,
- nullCall);
- }
- int len = argsStorage.getLength() + 5;
- sendHeader(functionId, len);
- messageBuilder.call(argValues.length, len);
- sendMsg(argsStorage);
- receiveMsg();
- if (getResponseType() != MessageType.CALL_RSP) {
- throw HyracksDataException.create(org.apache.hyracks.api.exceptions.ErrorCode.ILLEGAL_STATE,
- "Expected CALL_RSP, recieved " + getResponseType().name());
- }
- return recvBuffer;
- }
-
- public ByteBuffer callMulti(long key, ArrayBackedValueStorage args, int numTuples)
- throws IOException, AsterixException {
- recvBuffer.clear();
- recvBuffer.position(0);
- recvBuffer.limit(0);
- messageBuilder.reset();
- int len = args.getLength() + 4;
- sendHeader(key, len);
- messageBuilder.callMulti(0, numTuples);
- sendMsg(args);
- receiveMsg();
- if (getResponseType() != MessageType.CALL_RSP) {
- throw HyracksDataException.create(org.apache.hyracks.api.exceptions.ErrorCode.ILLEGAL_STATE,
- "Expected CALL_RSP, recieved " + getResponseType().name());
- }
- return recvBuffer;
- }
-
- //For future use with interpreter reuse between jobs.
- public void quit() throws HyracksDataException {
- messageBuilder.quit();
- router.removeRoute(routeId);
- }
-
- public void receiveMsg() throws IOException, AsterixException {
- Exception except;
- try {
- synchronized (bufferBox) {
- while ((bufferBox.getFirst().limit() == 0 || bufferBox.getSecond() != null) && pythonProc.isAlive()) {
- bufferBox.wait(100);
- }
- }
- except = router.getAndRemoveException(routeId);
- if (!pythonProc.isAlive()) {
- except = new IOException("Python process exited with code: " + pythonProc.exitValue());
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new AsterixException(ErrorCode.EXTERNAL_UDF_EXCEPTION, e);
- }
- if (except != null) {
- throw new AsterixException(except);
- }
- if (bufferBox.getFirst() != recvBuffer) {
- recvBuffer = bufferBox.getFirst();
- }
- messageBuilder.readHead(recvBuffer);
- if (messageBuilder.type == MessageType.ERROR) {
- unpackerInput.reset(recvBuffer.array(), recvBuffer.position() + recvBuffer.arrayOffset(),
- recvBuffer.remaining());
- unpacker.reset(unpackerInput);
- throw new AsterixException(unpacker.unpackString());
- }
- }
-
- public void sendHeader(long key, int msgLen) throws IOException {
- headerBuffer.clear();
- headerBuffer.position(0);
- headerBuffer.putInt(HEADER_SIZE + Integer.BYTES + msgLen);
- headerBuffer.putLong(key);
- headerBuffer.putLong(routeId);
- headerBuffer.put(Message.NORMAL);
- sockOut.write(headerBuffer.array(), 0, HEADER_SIZE + Integer.BYTES);
- sockOut.flush();
- }
-
- public void sendMsg(ArrayBackedValueStorage content) throws IOException {
- sockOut.write(messageBuilder.getBuf().array(), messageBuilder.getBuf().arrayOffset(),
- messageBuilder.getBuf().position());
- sockOut.write(content.getByteArray(), content.getStartOffset(), content.getLength());
- sockOut.flush();
- }
-
- public void sendMsg() throws IOException {
- sockOut.write(messageBuilder.getBuf().array(), messageBuilder.getBuf().arrayOffset(),
- messageBuilder.getBuf().position());
- sockOut.flush();
- }
-
- public MessageType getResponseType() {
- return messageBuilder.type;
- }
-
- public long getRouteId() {
- return routeId;
- }
-
- public DataOutputStream getSockOut() {
- return sockOut;
- }
-
- public static void visitValueRef(IAType type, DataOutput out, IValueReference valueReference,
- PointableAllocator pointableAllocator, MsgPackPointableVisitor pointableVisitor, boolean visitNull)
- throws IOException {
- IVisitablePointable pointable;
- switch (type.getTypeTag()) {
- case OBJECT:
- pointable = pointableAllocator.allocateRecordValue(type);
- pointable.set(valueReference);
- pointableVisitor.visit((ARecordVisitablePointable) pointable, pointableVisitor.getTypeInfo(type, out));
- break;
- case ARRAY:
- case MULTISET:
- pointable = pointableAllocator.allocateListValue(type);
- pointable.set(valueReference);
- pointableVisitor.visit((AListVisitablePointable) pointable, pointableVisitor.getTypeInfo(type, out));
- break;
- case ANY:
- ATypeTag rtTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER
- .deserialize(valueReference.getByteArray()[valueReference.getStartOffset()]);
- IAType rtType = TypeTagUtil.getBuiltinTypeByTag(rtTypeTag);
- switch (rtTypeTag) {
- case OBJECT:
- pointable = pointableAllocator.allocateRecordValue(rtType);
- pointable.set(valueReference);
- pointableVisitor.visit((ARecordVisitablePointable) pointable,
- pointableVisitor.getTypeInfo(rtType, out));
- break;
- case ARRAY:
- case MULTISET:
- pointable = pointableAllocator.allocateListValue(rtType);
- pointable.set(valueReference);
- pointableVisitor.visit((AListVisitablePointable) pointable,
- pointableVisitor.getTypeInfo(rtType, out));
- break;
- case MISSING:
- case NULL:
- if (!visitNull) {
- return;
- }
- default:
- pointable = pointableAllocator.allocateFieldValue(rtType);
- pointable.set(valueReference);
- pointableVisitor.visit((AFlatValuePointable) pointable,
- pointableVisitor.getTypeInfo(rtType, out));
- break;
- }
- break;
- case MISSING:
- case NULL:
- if (!visitNull) {
- return;
- }
- default:
- pointable = pointableAllocator.allocateFieldValue(type);
- pointable.set(valueReference);
- pointableVisitor.visit((AFlatValuePointable) pointable, pointableVisitor.getTypeInfo(type, out));
- break;
- }
- }
-
-}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonMessageBuilder.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonMessageBuilder.java
index 5429657..20f8306 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonMessageBuilder.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonMessageBuilder.java
@@ -82,6 +82,16 @@
buf.put(serAddr);
}
+ public void helloDS(String modulePath) throws IOException {
+ this.type = MessageType.HELO;
+ // sum(string lengths) + 2 from fix array tag and message type
+ dataLength = PythonMessageBuilder.getStringLength(modulePath) + 2;
+ packHeader();
+ MessagePackUtils.packFixArrayHeader(buf, (byte) 2);
+ MessagePackUtils.packStr(buf, "HELLO");
+ MessagePackUtils.packStr(buf, modulePath);
+ }
+
public void quit() throws HyracksDataException {
this.type = MessageType.QUIT;
dataLength = getStringLength("QUIT");
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonTCPSocketProto.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonTCPSocketProto.java
new file mode 100644
index 0000000..7fd3de4
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonTCPSocketProto.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.ipc;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.external.api.IExternalLangIPCProto;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class PythonTCPSocketProto extends AbstractPythonIPCProto
+ implements org.apache.asterix.external.api.IExternalLangIPCProto {
+
+ private final ExternalFunctionResultRouter router;
+ private final Process proc;
+
+ public PythonTCPSocketProto(OutputStream sockOut, ExternalFunctionResultRouter router, Process pythonProc) {
+ super(sockOut);
+ this.router = router;
+ this.proc = pythonProc;
+ }
+
+ @Override
+ public void start() {
+ Pair<Long, Pair<ByteBuffer, Exception>> keyAndBufferBox = router.insertRoute(recvBuffer);
+ this.routeId = keyAndBufferBox.getFirst();
+ this.bufferBox = keyAndBufferBox.getSecond();
+ }
+
+ @Override
+ public void quit() throws HyracksDataException {
+ messageBuilder.quit();
+ router.removeRoute(routeId);
+ }
+
+ @Override
+ public void receiveMsg() throws IOException, AsterixException {
+ Exception except;
+ try {
+ synchronized (bufferBox) {
+ while ((bufferBox.getFirst().limit() == 0 || bufferBox.getSecond() != null) && proc.isAlive()) {
+ bufferBox.wait(100);
+ }
+ }
+ except = router.getAndRemoveException(routeId);
+ if (!proc.isAlive()) {
+ except = new IOException("Python process exited with code: " + proc.exitValue());
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new AsterixException(ErrorCode.EXTERNAL_UDF_EXCEPTION, e);
+ }
+ if (except != null) {
+ throw new AsterixException(except);
+ }
+ if (bufferBox.getFirst() != recvBuffer) {
+ recvBuffer = bufferBox.getFirst();
+ }
+ messageBuilder.readHead(recvBuffer);
+ if (messageBuilder.type == MessageType.ERROR) {
+ unpackerInput.reset(recvBuffer.array(), recvBuffer.position() + recvBuffer.arrayOffset(),
+ recvBuffer.remaining());
+ unpacker.reset(unpackerInput);
+ throw new AsterixException(unpacker.unpackString());
+ }
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/AbstractLibrarySocketEvaluator.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/AbstractLibrarySocketEvaluator.java
new file mode 100644
index 0000000..6fcfdcf
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/AbstractLibrarySocketEvaluator.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.library;
+
+import static org.apache.asterix.common.exceptions.ErrorCode.EXTERNAL_UDF_EXCEPTION;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.api.IExternalLangIPCProto;
+import org.apache.asterix.external.api.ILibraryEvaluator;
+import org.apache.asterix.om.functions.IExternalFunctionInfo;
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.api.dataflow.TaskAttemptId;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.api.exceptions.Warning;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.dataflow.std.base.AbstractStateObject;
+
+public abstract class AbstractLibrarySocketEvaluator extends AbstractStateObject implements ILibraryEvaluator {
+
+ protected IExternalLangIPCProto proto;
+ protected TaskAttemptId task;
+ protected IWarningCollector warningCollector;
+ protected SourceLocation sourceLoc;
+
+ public AbstractLibrarySocketEvaluator(JobId jobId, PythonLibraryEvaluatorId evaluatorId, TaskAttemptId task,
+ IWarningCollector warningCollector, SourceLocation sourceLoc) {
+ super(jobId, evaluatorId);
+ this.task = task;
+ this.warningCollector = warningCollector;
+ this.sourceLoc = sourceLoc;
+ }
+
+ @Override
+ public long initialize(IExternalFunctionInfo finfo) throws IOException, AsterixException {
+ List<String> externalIdents = finfo.getExternalIdentifier();
+ String packageModule = externalIdents.get(0);
+ String clazz;
+ String fn;
+ String externalIdent1 = externalIdents.get(1);
+ int idx = externalIdent1.lastIndexOf('.');
+ if (idx >= 0) {
+ clazz = externalIdent1.substring(0, idx);
+ fn = externalIdent1.substring(idx + 1);
+ } else {
+ clazz = null;
+ fn = externalIdent1;
+ }
+ return proto.init(packageModule, clazz, fn);
+ }
+
+ @Override
+ public ByteBuffer call(long id, IAType[] argTypes, IValueReference[] valueReferences, boolean nullCall)
+ throws IOException {
+ ByteBuffer ret = null;
+ try {
+ ret = proto.call(id, argTypes, valueReferences, nullCall);
+ } catch (AsterixException e) {
+ if (warningCollector.shouldWarn()) {
+ warningCollector.warn(Warning.of(sourceLoc, EXTERNAL_UDF_EXCEPTION, e.getMessage()));
+ }
+ }
+ return ret;
+ }
+
+ @Override
+ public ByteBuffer callMulti(long id, ArrayBackedValueStorage arguments, int numTuples) throws IOException {
+ ByteBuffer ret = null;
+ try {
+ ret = proto.callMulti(id, arguments, numTuples);
+ } catch (AsterixException e) {
+ if (warningCollector.shouldWarn()) {
+ warningCollector.warn(Warning.of(sourceLoc, EXTERNAL_UDF_EXCEPTION, e.getMessage()));
+ }
+ }
+ return ret;
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarPythonFunctionEvaluator.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarPythonFunctionEvaluator.java
index 94a4dd2..fb8d761 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarPythonFunctionEvaluator.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarPythonFunctionEvaluator.java
@@ -28,6 +28,7 @@
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.external.api.ILibraryEvaluator;
import org.apache.asterix.external.library.msgpack.MessageUnpackerToADM;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.om.functions.IExternalFunctionInfo;
@@ -49,7 +50,7 @@
class ExternalScalarPythonFunctionEvaluator extends ExternalScalarFunctionEvaluator {
- private final PythonLibraryEvaluator libraryEvaluator;
+ private final ILibraryEvaluator libraryEvaluator;
private final ArrayBackedValueStorage resultBuffer = new ArrayBackedValueStorage();
private final ByteBuffer argHolder;
@@ -115,7 +116,7 @@
return;
}
try {
- ByteBuffer res = libraryEvaluator.callPython(fnId, argTypes, argValues, nullCall);
+ ByteBuffer res = libraryEvaluator.call(fnId, argTypes, argValues, nullCall);
resultBuffer.reset();
wrap(res, resultBuffer.getDataOutput());
} catch (Exception e) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryDomainSocketEvaluator.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryDomainSocketEvaluator.java
new file mode 100644
index 0000000..056aa9a
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryDomainSocketEvaluator.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.library;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandle;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.MethodType;
+import java.lang.invoke.VarHandle;
+import java.net.ProtocolFamily;
+import java.net.SocketAddress;
+import java.net.StandardProtocolFamily;
+import java.nio.channels.Channels;
+import java.nio.channels.SocketChannel;
+import java.nio.file.Path;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.library.ILibraryManager;
+import org.apache.asterix.external.ipc.PythonDomainSocketProto;
+import org.apache.asterix.om.functions.IExternalFunctionInfo;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.TaskAttemptId;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.control.common.controllers.NCConfig;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class PythonLibraryDomainSocketEvaluator extends AbstractLibrarySocketEvaluator {
+
+ private final ILibraryManager libMgr;
+ private final Path sockPath;
+ SocketChannel chan;
+ ProcessHandle pid;
+ private static final Logger LOGGER = LogManager.getLogger(ExternalLibraryManager.class);
+
+ public PythonLibraryDomainSocketEvaluator(JobId jobId, PythonLibraryEvaluatorId evaluatorId, ILibraryManager libMgr,
+ TaskAttemptId task, IWarningCollector warningCollector, SourceLocation sourceLoc, Path sockPath) {
+ super(jobId, evaluatorId, task, warningCollector, sourceLoc);
+ this.libMgr = libMgr;
+ this.sockPath = sockPath;
+ }
+
+ public void start() throws IOException, AsterixException {
+ PythonLibraryEvaluatorId fnId = (PythonLibraryEvaluatorId) id;
+ PythonLibrary library =
+ (PythonLibrary) libMgr.getLibrary(fnId.getLibraryDataverseName(), fnId.getLibraryName());
+ String wd = library.getFile().getAbsolutePath();
+ MethodHandles.Lookup lookup = MethodHandles.lookup();
+ SocketAddress sockAddr;
+ try {
+ VarHandle sockEnum = lookup.in(StandardProtocolFamily.class)
+ .findStaticVarHandle(StandardProtocolFamily.class, "UNIX", StandardProtocolFamily.class);
+ Class domainSock = Class.forName("java.net.UnixDomainSocketAddress");
+ MethodType unixDomainSockAddrType = MethodType.methodType(domainSock, Path.class);
+ MethodHandle unixDomainSockAddr = lookup.findStatic(domainSock, "of", unixDomainSockAddrType);
+ MethodType sockChanMethodType = MethodType.methodType(SocketChannel.class, ProtocolFamily.class);
+ MethodHandle sockChanOpen = lookup.findStatic(SocketChannel.class, "open", sockChanMethodType);
+ sockAddr = ((SocketAddress) unixDomainSockAddr.invoke(sockPath));
+ chan = (SocketChannel) sockChanOpen.invoke(sockEnum.get());
+ } catch (Throwable e) {
+ throw HyracksDataException.create(ErrorCode.LOCAL_NETWORK_ERROR, e);
+ }
+ chan.connect(sockAddr);
+ proto = new PythonDomainSocketProto(Channels.newOutputStream(chan), chan, wd);
+ proto.start();
+ proto.helo();
+ this.pid = ((PythonDomainSocketProto) proto).getPid();
+ }
+
+ @Override
+ public void deallocate() {
+ try {
+ if (proto != null) {
+ proto.quit();
+ }
+ if (chan != null) {
+ chan.close();
+ }
+ } catch (IOException e) {
+ LOGGER.error("Caught exception exiting Python UDF:", e);
+ }
+ if (pid != null && pid.isAlive()) {
+ LOGGER.error("Python UDF " + pid.pid() + " did not exit as expected.");
+ }
+ }
+
+ static PythonLibraryDomainSocketEvaluator getInstance(IExternalFunctionInfo finfo, ILibraryManager libMgr,
+ IHyracksTaskContext ctx, IWarningCollector warningCollector, SourceLocation sourceLoc)
+ throws IOException, AsterixException {
+ PythonLibraryEvaluatorId evaluatorId = new PythonLibraryEvaluatorId(finfo.getLibraryDataverseName(),
+ finfo.getLibraryName(), Thread.currentThread());
+ PythonLibraryDomainSocketEvaluator evaluator =
+ (PythonLibraryDomainSocketEvaluator) ctx.getStateObject(evaluatorId);
+ if (evaluator == null) {
+ Path sockPath = Path.of(ctx.getJobletContext().getServiceContext().getAppConfig()
+ .getString(NCConfig.Option.PYTHON_DS_PATH));
+ evaluator = new PythonLibraryDomainSocketEvaluator(ctx.getJobletContext().getJobId(), evaluatorId, libMgr,
+ ctx.getTaskAttemptId(), warningCollector, sourceLoc, sockPath);
+ ctx.getJobletContext().registerDeallocatable(evaluator);
+ evaluator.start();
+ ctx.setStateObject(evaluator);
+ }
+ return evaluator;
+ }
+
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryEvaluator.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryEvaluator.java
deleted file mode 100644
index f82b30d..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryEvaluator.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.library;
-
-import static org.apache.asterix.common.exceptions.ErrorCode.EXTERNAL_UDF_EXCEPTION;
-import static org.msgpack.core.MessagePack.Code.ARRAY16;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.library.ILibraryManager;
-import org.apache.asterix.external.ipc.ExternalFunctionResultRouter;
-import org.apache.asterix.external.ipc.PythonIPCProto;
-import org.apache.asterix.external.library.msgpack.MessagePackUtils;
-import org.apache.asterix.om.functions.IExternalFunctionInfo;
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.EnumDeserializer;
-import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.types.TypeTagUtil;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.TaskAttemptId;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.exceptions.IWarningCollector;
-import org.apache.hyracks.api.exceptions.SourceLocation;
-import org.apache.hyracks.api.exceptions.Warning;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.resources.IDeallocatable;
-import org.apache.hyracks.data.std.api.IValueReference;
-import org.apache.hyracks.data.std.primitive.TaggedValuePointable;
-import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
-import org.apache.hyracks.dataflow.std.base.AbstractStateObject;
-import org.apache.hyracks.ipc.impl.IPCSystem;
-
-public class PythonLibraryEvaluator extends AbstractStateObject implements IDeallocatable {
-
- public static final String ENTRYPOINT = "entrypoint.py";
- public static final String SITE_PACKAGES = "site-packages";
-
- private Process p;
- private ILibraryManager libMgr;
- private File pythonHome;
- private PythonIPCProto proto;
- private ExternalFunctionResultRouter router;
- private IPCSystem ipcSys;
- private String sitePkgs;
- private List<String> pythonArgs;
- private Map<String, String> pythonEnv;
- private TaskAttemptId task;
- private IWarningCollector warningCollector;
- private SourceLocation sourceLoc;
-
- public PythonLibraryEvaluator(JobId jobId, PythonLibraryEvaluatorId evaluatorId, ILibraryManager libMgr,
- File pythonHome, String sitePkgs, List<String> pythonArgs, Map<String, String> pythonEnv,
- ExternalFunctionResultRouter router, IPCSystem ipcSys, TaskAttemptId task,
- IWarningCollector warningCollector, SourceLocation sourceLoc) {
- super(jobId, evaluatorId);
- this.libMgr = libMgr;
- this.pythonHome = pythonHome;
- this.sitePkgs = sitePkgs;
- this.pythonArgs = pythonArgs;
- this.pythonEnv = pythonEnv;
- this.router = router;
- this.task = task;
- this.ipcSys = ipcSys;
- this.warningCollector = warningCollector;
- this.sourceLoc = sourceLoc;
- }
-
- private void initialize() throws IOException, AsterixException {
- PythonLibraryEvaluatorId fnId = (PythonLibraryEvaluatorId) id;
- PythonLibrary library =
- (PythonLibrary) libMgr.getLibrary(fnId.getLibraryDataverseName(), fnId.getLibraryName());
- String wd = library.getFile().getAbsolutePath();
- int port = ipcSys.getSocketAddress().getPort();
- List<String> args = new ArrayList<>();
- args.add(pythonHome.getAbsolutePath());
- args.addAll(pythonArgs);
- args.add(ENTRYPOINT);
- args.add(InetAddress.getLoopbackAddress().getHostAddress());
- args.add(Integer.toString(port));
- args.add(sitePkgs);
- ProcessBuilder pb = new ProcessBuilder(args.toArray(new String[0]));
- pb.environment().putAll(pythonEnv);
- pb.directory(new File(wd));
- p = pb.start();
- proto = new PythonIPCProto(p.getOutputStream(), router, p);
- proto.start();
- proto.helo();
- }
-
- public long initialize(IExternalFunctionInfo finfo) throws IOException, AsterixException {
- List<String> externalIdents = finfo.getExternalIdentifier();
- String packageModule = externalIdents.get(0);
- String clazz;
- String fn;
- String externalIdent1 = externalIdents.get(1);
- int idx = externalIdent1.lastIndexOf('.');
- if (idx >= 0) {
- clazz = externalIdent1.substring(0, idx);
- fn = externalIdent1.substring(idx + 1);
- } else {
- clazz = null;
- fn = externalIdent1;
- }
- return proto.init(packageModule, clazz, fn);
- }
-
- public ByteBuffer callPython(long id, IAType[] argTypes, IValueReference[] valueReferences, boolean nullCall)
- throws IOException {
- ByteBuffer ret = null;
- try {
- ret = proto.call(id, argTypes, valueReferences, nullCall);
- } catch (AsterixException e) {
- if (warningCollector.shouldWarn()) {
- warningCollector.warn(Warning.of(sourceLoc, EXTERNAL_UDF_EXCEPTION, e.getMessage()));
- }
- }
- return ret;
- }
-
- public ByteBuffer callPythonMulti(long id, ArrayBackedValueStorage arguments, int numTuples) throws IOException {
- ByteBuffer ret = null;
- try {
- ret = proto.callMulti(id, arguments, numTuples);
- } catch (AsterixException e) {
- if (warningCollector.shouldWarn()) {
- warningCollector.warn(Warning.of(sourceLoc, EXTERNAL_UDF_EXCEPTION, e.getMessage()));
- }
- }
- return ret;
- }
-
- @Override
- public void deallocate() {
- if (p != null) {
- boolean dead = false;
- try {
- p.destroy();
- dead = p.waitFor(100, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- //gonna kill it anyway
- }
- if (!dead) {
- p.destroyForcibly();
- }
- }
- router.removeRoute(proto.getRouteId());
- }
-
- public static ATypeTag peekArgument(IAType type, IValueReference valueReference) throws HyracksDataException {
- ATypeTag tag = type.getTypeTag();
- if (tag == ATypeTag.ANY) {
- TaggedValuePointable pointy = TaggedValuePointable.FACTORY.createPointable();
- pointy.set(valueReference);
- ATypeTag rtTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(pointy.getTag());
- IAType rtType = TypeTagUtil.getBuiltinTypeByTag(rtTypeTag);
- return MessagePackUtils.peekUnknown(rtType);
- } else {
- return MessagePackUtils.peekUnknown(type);
- }
- }
-
- public static void setVoidArgument(ArrayBackedValueStorage argHolder) throws IOException {
- argHolder.getDataOutput().writeByte(ARRAY16);
- argHolder.getDataOutput().writeShort((short) 0);
- }
-
- public static PythonLibraryEvaluator getInstance(IExternalFunctionInfo finfo, ILibraryManager libMgr,
- ExternalFunctionResultRouter router, IPCSystem ipcSys, File pythonHome, IHyracksTaskContext ctx,
- String sitePkgs, List<String> pythonArgs, Map<String, String> pythonEnv, IWarningCollector warningCollector,
- SourceLocation sourceLoc) throws IOException, AsterixException {
- PythonLibraryEvaluatorId evaluatorId = new PythonLibraryEvaluatorId(finfo.getLibraryDataverseName(),
- finfo.getLibraryName(), Thread.currentThread());
- PythonLibraryEvaluator evaluator = (PythonLibraryEvaluator) ctx.getStateObject(evaluatorId);
- if (evaluator == null) {
- evaluator = new PythonLibraryEvaluator(ctx.getJobletContext().getJobId(), evaluatorId, libMgr, pythonHome,
- sitePkgs, pythonArgs, pythonEnv, router, ipcSys, ctx.getTaskAttemptId(), warningCollector,
- sourceLoc);
- ctx.getJobletContext().registerDeallocatable(evaluator);
- evaluator.initialize();
- ctx.setStateObject(evaluator);
- }
- return evaluator;
- }
-
-}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryEvaluatorFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryEvaluatorFactory.java
index 06c9bc9..63a6ec3 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryEvaluatorFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryEvaluatorFactory.java
@@ -18,10 +18,12 @@
*/
package org.apache.asterix.external.library;
-import static org.apache.asterix.external.library.PythonLibraryEvaluator.SITE_PACKAGES;
+import static org.apache.asterix.external.library.PythonLibraryTCPSocketEvaluator.SITE_PACKAGES;
import java.io.File;
import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -31,8 +33,10 @@
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.library.ILibraryManager;
+import org.apache.asterix.external.api.ILibraryEvaluator;
import org.apache.asterix.external.ipc.ExternalFunctionResultRouter;
import org.apache.asterix.om.functions.IExternalFunctionInfo;
+import org.apache.commons.lang3.SystemUtils;
import org.apache.hyracks.api.config.IApplicationConfig;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.SourceLocation;
@@ -40,83 +44,116 @@
import org.apache.hyracks.ipc.impl.IPCSystem;
public class PythonLibraryEvaluatorFactory {
- private final ILibraryManager libraryManager;
- private final IPCSystem ipcSys;
- private final File pythonPath;
- private final IHyracksTaskContext ctx;
- private final ExternalFunctionResultRouter router;
- private final String sitePackagesPath;
- private final List<String> pythonArgs;
- private final Map<String, String> pythonEnv;
+
+ private ILibraryManager libraryManager;
+ private IPCSystem ipcSys;
+ private File pythonPath;
+ private IHyracksTaskContext ctx;
+ private ExternalFunctionResultRouter router;
+ private String sitePackagesPath;
+ private List<String> pythonArgs;
+ private Map<String, String> pythonEnv;
+
+ private boolean domainSockEnable;
public PythonLibraryEvaluatorFactory(IHyracksTaskContext ctx) throws AsterixException {
this.ctx = ctx;
+ String dsPath =
+ ctx.getJobletContext().getServiceContext().getAppConfig().getString(NCConfig.Option.PYTHON_DS_PATH);
+ config(dsPath == null ? null : Path.of(dsPath));
libraryManager = ((INcApplicationContext) ctx.getJobletContext().getServiceContext().getApplicationContext())
.getLibraryManager();
- router = libraryManager.getRouter();
- ipcSys = libraryManager.getIPCI();
- IApplicationConfig appCfg = ctx.getJobletContext().getServiceContext().getAppConfig();
- String pythonPathCmd = appCfg.getString(NCConfig.Option.PYTHON_CMD);
- boolean findPython = appCfg.getBoolean(NCConfig.Option.PYTHON_CMD_AUTOLOCATE);
- pythonArgs = new ArrayList<>();
- if (pythonPathCmd == null) {
- if (findPython) {
- //if absolute path to interpreter is not specified, try to use environmental python
- pythonPathCmd = "/usr/bin/env";
- pythonArgs.add("python3");
- } else {
- throw AsterixException.create(ErrorCode.EXTERNAL_UDF_EXCEPTION, "Python interpreter not specified, and "
- + NCConfig.Option.PYTHON_CMD_AUTOLOCATE.ini() + " is false");
- }
- }
- pythonEnv = new HashMap<>();
- String[] envRaw = appCfg.getStringArray((NCConfig.Option.PYTHON_ENV));
- if (envRaw != null) {
- for (String rawEnvArg : envRaw) {
- //TODO: i think equals is shared among all unixes and windows. but it needs verification
- if (rawEnvArg.length() < 1) {
- continue;
- }
- String[] rawArgSplit = rawEnvArg.split("(?<!\\\\)=", 2);
- if (rawArgSplit.length < 2) {
+ if (!domainSockEnable) {
+ router = libraryManager.getRouter();
+ ipcSys = libraryManager.getIPCI();
+ IApplicationConfig appCfg = ctx.getJobletContext().getServiceContext().getAppConfig();
+ String pythonPathCmd = appCfg.getString(NCConfig.Option.PYTHON_CMD);
+ boolean findPython = appCfg.getBoolean(NCConfig.Option.PYTHON_CMD_AUTOLOCATE);
+ pythonArgs = new ArrayList<>();
+ if (pythonPathCmd == null) {
+ if (findPython) {
+ //if absolute path to interpreter is not specified, try to use environmental python
+ pythonPathCmd = "/usr/bin/env";
+ pythonArgs.add("python3");
+ } else {
throw AsterixException.create(ErrorCode.EXTERNAL_UDF_EXCEPTION,
- "Invalid environment variable format detected.");
- }
- pythonEnv.put(rawArgSplit[0], rawArgSplit[1]);
- }
- }
- pythonPath = new File(pythonPathCmd);
- List<String> sitePkgs = new ArrayList<>();
- sitePkgs.add(SITE_PACKAGES);
- String[] addlSitePackages = appCfg.getStringArray((NCConfig.Option.PYTHON_ADDITIONAL_PACKAGES));
- for (String sitePkg : addlSitePackages) {
- if (sitePkg.length() > 0) {
- sitePkgs.add(sitePkg);
- }
- }
- if (appCfg.getBoolean(NCConfig.Option.PYTHON_USE_BUNDLED_MSGPACK)) {
- sitePkgs.add("ipc" + File.separator + SITE_PACKAGES + File.separator);
- }
- String[] pythonArgsRaw = appCfg.getStringArray(NCConfig.Option.PYTHON_ARGS);
- if (pythonArgsRaw != null) {
- for (String arg : pythonArgsRaw) {
- if (arg.length() > 0) {
- pythonArgs.add(arg);
+ "Python interpreter not specified or domain socket not found, and "
+ + NCConfig.Option.PYTHON_CMD_AUTOLOCATE.ini() + " is false");
}
}
+ pythonEnv = new HashMap<>();
+ String[] envRaw = appCfg.getStringArray((NCConfig.Option.PYTHON_ENV));
+ if (envRaw != null) {
+ for (String rawEnvArg : envRaw) {
+ //TODO: i think equals is shared among all unixes and windows. but it needs verification
+ if (rawEnvArg.length() < 1) {
+ continue;
+ }
+ String[] rawArgSplit = rawEnvArg.split("(?<!\\\\)=", 2);
+ if (rawArgSplit.length < 2) {
+ throw AsterixException.create(ErrorCode.EXTERNAL_UDF_EXCEPTION,
+ "Invalid environment variable format detected.");
+ }
+ pythonEnv.put(rawArgSplit[0], rawArgSplit[1]);
+ }
+ }
+ pythonPath = new File(pythonPathCmd);
+ List<String> sitePkgs = new ArrayList<>();
+ sitePkgs.add(SITE_PACKAGES);
+ String[] addlSitePackages = appCfg.getStringArray((NCConfig.Option.PYTHON_ADDITIONAL_PACKAGES));
+ for (String sitePkg : addlSitePackages) {
+ if (sitePkg.length() > 0) {
+ sitePkgs.add(sitePkg);
+ }
+ }
+ if (appCfg.getBoolean(NCConfig.Option.PYTHON_USE_BUNDLED_MSGPACK)) {
+ sitePkgs.add("ipc" + File.separator + SITE_PACKAGES + File.separator);
+ }
+ String[] pythonArgsRaw = appCfg.getStringArray(NCConfig.Option.PYTHON_ARGS);
+ if (pythonArgsRaw != null) {
+ for (String arg : pythonArgsRaw) {
+ if (arg.length() > 0) {
+ pythonArgs.add(arg);
+ }
+ }
+ }
+ StringBuilder sitePackagesPathBuilder = new StringBuilder();
+ for (int i = 0; i < sitePkgs.size() - 1; i++) {
+ sitePackagesPathBuilder.append(sitePkgs.get(i));
+ sitePackagesPathBuilder.append(File.pathSeparator);
+ }
+ sitePackagesPathBuilder.append(sitePkgs.get(sitePkgs.size() - 1));
+ sitePackagesPath = sitePackagesPathBuilder.toString();
}
- StringBuilder sitePackagesPathBuilder = new StringBuilder();
- for (int i = 0; i < sitePkgs.size() - 1; i++) {
- sitePackagesPathBuilder.append(sitePkgs.get(i));
- sitePackagesPathBuilder.append(File.pathSeparator);
- }
- sitePackagesPathBuilder.append(sitePkgs.get(sitePkgs.size() - 1));
- sitePackagesPath = sitePackagesPathBuilder.toString();
}
- public PythonLibraryEvaluator getEvaluator(IExternalFunctionInfo fnInfo, SourceLocation sourceLoc)
+ public ILibraryEvaluator getEvaluator(IExternalFunctionInfo fnInfo, SourceLocation sourceLoc)
throws IOException, AsterixException {
- return PythonLibraryEvaluator.getInstance(fnInfo, libraryManager, router, ipcSys, pythonPath, ctx,
- sitePackagesPath, pythonArgs, pythonEnv, ctx.getWarningCollector(), sourceLoc);
+ if (domainSockEnable) {
+ return PythonLibraryDomainSocketEvaluator.getInstance(fnInfo, libraryManager, ctx,
+ ctx.getWarningCollector(), sourceLoc);
+ } else {
+ return PythonLibraryTCPSocketEvaluator.getInstance(fnInfo, libraryManager, router, ipcSys, pythonPath, ctx,
+ sitePackagesPath, pythonArgs, pythonEnv, ctx.getWarningCollector(), sourceLoc);
+ }
+ }
+
+ private void config(Path sockPath) throws AsterixException {
+ if (sockPath == null) {
+ domainSockEnable = false;
+ return;
+ }
+ Runtime rt = Runtime.getRuntime();
+ if (rt.version().feature() >= 17 && SystemUtils.IS_OS_LINUX) {
+ if (Files.exists(sockPath)) {
+ domainSockEnable = true;
+ } else {
+ throw AsterixException.create(ErrorCode.EXTERNAL_UDF_EXCEPTION,
+ "Domain socket was not found at specified path");
+ }
+ } else {
+ throw AsterixException.create(ErrorCode.EXTERNAL_UDF_EXCEPTION,
+ "Domain socket path specified, but Java version is below 17 or OS is not Linux");
+ }
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryTCPSocketEvaluator.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryTCPSocketEvaluator.java
new file mode 100644
index 0000000..385d738
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryTCPSocketEvaluator.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.library;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.library.ILibraryManager;
+import org.apache.asterix.external.ipc.ExternalFunctionResultRouter;
+import org.apache.asterix.external.ipc.PythonTCPSocketProto;
+import org.apache.asterix.om.functions.IExternalFunctionInfo;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.TaskAttemptId;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.ipc.impl.IPCSystem;
+
+public class PythonLibraryTCPSocketEvaluator extends AbstractLibrarySocketEvaluator {
+
+ public static final String ENTRYPOINT = "entrypoint.py";
+ public static final String SITE_PACKAGES = "site-packages";
+
+ private Process p;
+ private ILibraryManager libMgr;
+ private File pythonHome;
+ private ExternalFunctionResultRouter router;
+ private IPCSystem ipcSys;
+ private String sitePkgs;
+ private List<String> pythonArgs;
+ private Map<String, String> pythonEnv;
+
+ public PythonLibraryTCPSocketEvaluator(JobId jobId, PythonLibraryEvaluatorId evaluatorId, ILibraryManager libMgr,
+ File pythonHome, String sitePkgs, List<String> pythonArgs, Map<String, String> pythonEnv,
+ ExternalFunctionResultRouter router, IPCSystem ipcSys, TaskAttemptId task,
+ IWarningCollector warningCollector, SourceLocation sourceLoc) {
+ super(jobId, evaluatorId, task, warningCollector, sourceLoc);
+ this.libMgr = libMgr;
+ this.pythonHome = pythonHome;
+ this.sitePkgs = sitePkgs;
+ this.pythonArgs = pythonArgs;
+ this.pythonEnv = pythonEnv;
+ this.router = router;
+ this.ipcSys = ipcSys;
+ }
+
+ @Override
+ public void start() throws IOException, AsterixException {
+ PythonLibraryEvaluatorId fnId = (PythonLibraryEvaluatorId) id;
+ PythonLibrary library =
+ (PythonLibrary) libMgr.getLibrary(fnId.getLibraryDataverseName(), fnId.getLibraryName());
+ String wd = library.getFile().getAbsolutePath();
+ int port = ipcSys.getSocketAddress().getPort();
+ List<String> args = new ArrayList<>();
+ args.add(pythonHome.getAbsolutePath());
+ args.addAll(pythonArgs);
+ args.add(ENTRYPOINT);
+ args.add(InetAddress.getLoopbackAddress().getHostAddress());
+ args.add(Integer.toString(port));
+ args.add(sitePkgs);
+ ProcessBuilder pb = new ProcessBuilder(args.toArray(new String[0]));
+ pb.environment().putAll(pythonEnv);
+ pb.directory(new File(wd));
+ p = pb.start();
+ proto = new PythonTCPSocketProto(p.getOutputStream(), router, p);
+ proto.start();
+ proto.helo();
+ }
+
+ @Override
+ public void deallocate() {
+ if (p != null) {
+ boolean dead = false;
+ try {
+ p.destroy();
+ dead = p.waitFor(100, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ //gonna kill it anyway
+ }
+ if (!dead) {
+ p.destroyForcibly();
+ }
+ }
+ router.removeRoute(proto.getRouteId());
+ }
+
+ static PythonLibraryTCPSocketEvaluator getInstance(IExternalFunctionInfo finfo, ILibraryManager libMgr,
+ ExternalFunctionResultRouter router, IPCSystem ipcSys, File pythonHome, IHyracksTaskContext ctx,
+ String sitePkgs, List<String> pythonArgs, Map<String, String> pythonEnv, IWarningCollector warningCollector,
+ SourceLocation sourceLoc) throws IOException, AsterixException {
+ PythonLibraryEvaluatorId evaluatorId = new PythonLibraryEvaluatorId(finfo.getLibraryDataverseName(),
+ finfo.getLibraryName(), Thread.currentThread());
+ PythonLibraryTCPSocketEvaluator evaluator = (PythonLibraryTCPSocketEvaluator) ctx.getStateObject(evaluatorId);
+ if (evaluator == null) {
+ evaluator = new PythonLibraryTCPSocketEvaluator(ctx.getJobletContext().getJobId(), evaluatorId, libMgr,
+ pythonHome, sitePkgs, pythonArgs, pythonEnv, router, ipcSys, ctx.getTaskAttemptId(),
+ warningCollector, sourceLoc);
+ ctx.getJobletContext().registerDeallocatable(evaluator);
+ evaluator.start();
+ ctx.setStateObject(evaluator);
+ }
+ return evaluator;
+ }
+
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalAssignBatchRuntimeFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalAssignBatchRuntimeFactory.java
index 741dad2..5f8a3f0 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalAssignBatchRuntimeFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalAssignBatchRuntimeFactory.java
@@ -33,12 +33,13 @@
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
-import org.apache.asterix.external.ipc.PythonIPCProto;
-import org.apache.asterix.external.library.PythonLibraryEvaluator;
+import org.apache.asterix.external.api.IExternalLangIPCProto;
+import org.apache.asterix.external.api.ILibraryEvaluator;
import org.apache.asterix.external.library.PythonLibraryEvaluatorFactory;
import org.apache.asterix.external.library.msgpack.MessageUnpackerToADM;
import org.apache.asterix.external.library.msgpack.MsgPackPointableVisitor;
import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.om.functions.IExternalFunctionDescriptor;
import org.apache.asterix.om.pointables.PointableAllocator;
import org.apache.asterix.om.types.ATypeTag;
@@ -50,6 +51,7 @@
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.Warning;
+import org.apache.hyracks.data.std.primitive.TaggedValuePointable;
import org.apache.hyracks.data.std.primitive.VoidPointable;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
@@ -87,7 +89,7 @@
private ArrayBackedValueStorage outputWrapper;
private List<ArrayBackedValueStorage> argHolders;
ArrayTupleBuilder tupleBuilder;
- private List<Pair<Long, PythonLibraryEvaluator>> libraryEvaluators;
+ private List<Pair<Long, ILibraryEvaluator>> libraryEvaluators;
private ATypeTag[][] nullCalls;
private int[] numCalls;
private VoidPointable ref;
@@ -97,6 +99,7 @@
private MessageUnpackerToADM unpackerToADM;
private PointableAllocator pointableAllocator;
private MsgPackPointableVisitor pointableVisitor;
+ private TaggedValuePointable anyPointer;
@Override
public void open() throws HyracksDataException {
@@ -109,7 +112,7 @@
try {
PythonLibraryEvaluatorFactory evalFactory = new PythonLibraryEvaluatorFactory(ctx);
for (IExternalFunctionDescriptor fnDesc : fnDescs) {
- PythonLibraryEvaluator eval = evalFactory.getEvaluator(fnDesc.getFunctionInfo(), sourceLoc);
+ ILibraryEvaluator eval = evalFactory.getEvaluator(fnDesc.getFunctionInfo(), sourceLoc);
long id = eval.initialize(fnDesc.getFunctionInfo());
libraryEvaluators.add(new Pair<>(id, eval));
}
@@ -133,6 +136,7 @@
unpackerToADM = new MessageUnpackerToADM();
pointableAllocator = new PointableAllocator();
pointableVisitor = new MsgPackPointableVisitor();
+ anyPointer = TaggedValuePointable.FACTORY.createPointable();
}
private void resetBuffers(int numTuples, int[] numCalls) {
@@ -177,8 +181,12 @@
int numEntries = unpacker.unpackArrayHeader();
for (int j = 0; j < numEntries; j++) {
if (ctx.getWarningCollector().shouldWarn()) {
- ctx.getWarningCollector().warn(Warning.of(sourceLoc,
- ErrorCode.EXTERNAL_UDF_EXCEPTION, unpacker.unpackString()));
+ //TODO: in domain socket mode, a NUL can appear at the end of the stacktrace strings.
+ // this should probably not happen but warnings with control characters should
+ // also be properly escaped
+ ctx.getWarningCollector()
+ .warn(Warning.of(sourceLoc, ErrorCode.EXTERNAL_UDF_EXCEPTION,
+ unpacker.unpackString().replace('\0', ' ')));
}
}
} catch (MessagePackException e) {
@@ -211,8 +219,8 @@
for (int colIdx = 0; colIdx < cols.length; colIdx++) {
ref.set(buffer.array(), tRef.getFieldStart(cols[colIdx]),
tRef.getFieldLength(cols[colIdx]));
- ATypeTag argumentPresence = PythonLibraryEvaluator
- .peekArgument(fnDescs[func].getArgumentTypes()[colIdx], ref);
+ ATypeTag argumentPresence = ExternalDataUtils
+ .peekArgument(fnDescs[func].getArgumentTypes()[colIdx], ref, anyPointer);
argumentStatus = handleNullMatrix(func, t, argumentPresence, argumentStatus);
}
}
@@ -224,7 +232,7 @@
for (int colIdx = 0; colIdx < cols.length; colIdx++) {
ref.set(buffer.array(), tRef.getFieldStart(cols[colIdx]),
tRef.getFieldLength(cols[colIdx]));
- PythonIPCProto.visitValueRef(fnDescs[func].getArgumentTypes()[colIdx],
+ IExternalLangIPCProto.visitValueRef(fnDescs[func].getArgumentTypes()[colIdx],
argHolders.get(func).getDataOutput(), ref, pointableAllocator,
pointableVisitor, fnDescs[func].getFunctionInfo().getNullCall());
}
@@ -232,21 +240,25 @@
numCalls[func]--;
}
if (cols.length == 0) {
- PythonLibraryEvaluator.setVoidArgument(argHolders.get(func));
+ ExternalDataUtils.setVoidArgument(argHolders.get(func));
}
}
}
//TODO: maybe this could be done in parallel for each unique library evaluator?
for (int argHolderIdx = 0; argHolderIdx < argHolders.size(); argHolderIdx++) {
- Pair<Long, PythonLibraryEvaluator> fnEval = libraryEvaluators.get(argHolderIdx);
- ByteBuffer columnResult = fnEval.getSecond().callPythonMulti(fnEval.getFirst(),
+ Pair<Long, ILibraryEvaluator> fnEval = libraryEvaluators.get(argHolderIdx);
+ ByteBuffer columnResult = fnEval.getSecond().callMulti(fnEval.getFirst(),
argHolders.get(argHolderIdx), numCalls[argHolderIdx]);
if (columnResult != null) {
Pair<ByteBuffer, Counter> resultholder = batchResults.get(argHolderIdx);
- if (resultholder.getFirst().capacity() < columnResult.capacity()) {
- ByteBuffer realloc = ctx.reallocateFrame(resultholder.getFirst(),
- columnResult.capacity() * 2, false);
+ if (resultholder.getFirst().capacity() < columnResult.remaining()) {
+ ByteBuffer realloc =
+ ctx.reallocateFrame(resultholder.getFirst(),
+ ctx.getInitialFrameSize()
+ * ((columnResult.remaining() / ctx.getInitialFrameSize()) + 1),
+ false);
+ realloc.limit(columnResult.limit());
resultholder.setFirst(realloc);
}
ByteBuffer resultBuf = resultholder.getFirst();
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 62dc074..5bf5844 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -35,6 +35,7 @@
import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.validateAzureDataLakeProperties;
import static org.apache.asterix.external.util.google.gcs.GCSUtils.validateProperties;
import static org.apache.asterix.runtime.evaluators.functions.StringEvaluatorUtils.RESERVED_REGEX_CHARS;
+import static org.msgpack.core.MessagePack.Code.ARRAY16;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
@@ -66,11 +67,15 @@
import org.apache.asterix.external.api.IRecordReaderFactory;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher;
import org.apache.asterix.external.library.JavaLibrary;
+import org.apache.asterix.external.library.msgpack.MessagePackUtils;
import org.apache.asterix.external.util.ExternalDataConstants.ParquetOptions;
import org.apache.asterix.external.util.aws.s3.S3Utils;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.types.AUnionType;
+import org.apache.asterix.om.types.EnumDeserializer;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.types.TypeTagUtil;
import org.apache.asterix.runtime.evaluators.common.NumberUtils;
import org.apache.asterix.runtime.projection.DataProjectionInfo;
import org.apache.asterix.runtime.projection.FunctionCallInformation;
@@ -78,6 +83,9 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.TaggedValuePointable;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.dataflow.common.data.parsers.BooleanParserFactory;
import org.apache.hyracks.dataflow.common.data.parsers.DoubleParserFactory;
import org.apache.hyracks.dataflow.common.data.parsers.FloatParserFactory;
@@ -386,7 +394,8 @@
/**
* Fills the configuration of the external dataset and its adapter with default values if not provided by user.
*
- * @param configuration external data configuration
+ * @param configuration
+ * external data configuration
*/
public static void defaultConfiguration(Map<String, String> configuration) {
String format = configuration.get(ExternalDataConstants.KEY_FORMAT);
@@ -408,8 +417,10 @@
* Prepares the configuration of the external data and its adapter by filling the information required by
* adapters and parsers.
*
- * @param adapterName adapter name
- * @param configuration external data configuration
+ * @param adapterName
+ * adapter name
+ * @param configuration
+ * external data configuration
*/
public static void prepare(String adapterName, Map<String, String> configuration) {
if (!configuration.containsKey(ExternalDataConstants.KEY_READER)) {
@@ -431,7 +442,8 @@
* Normalizes the values of certain parameters of the adapter configuration. This should happen before persisting
* the metadata (e.g. when creating external datasets or feeds) and when creating an adapter factory.
*
- * @param configuration external data configuration
+ * @param configuration
+ * external data configuration
*/
public static void normalize(Map<String, String> configuration) {
// normalize the "format" parameter
@@ -451,8 +463,10 @@
/**
* Validates the parameter values of the adapter configuration. This should happen after normalizing the values.
*
- * @param configuration external data configuration
- * @throws HyracksDataException HyracksDataException
+ * @param configuration
+ * external data configuration
+ * @throws HyracksDataException
+ * HyracksDataException
*/
public static void validate(Map<String, String> configuration) throws HyracksDataException {
String format = configuration.get(ExternalDataConstants.KEY_FORMAT);
@@ -514,7 +528,8 @@
* Validates adapter specific external dataset properties. Specific properties for different adapters should be
* validated here
*
- * @param configuration properties
+ * @param configuration
+ * properties
*/
public static void validateAdapterSpecificProperties(Map<String, String> configuration, SourceLocation srcLoc,
IWarningCollector collector, IApplicationContext appCtx) throws CompilationException {
@@ -542,7 +557,8 @@
/**
* Regex matches all the provided patterns against the provided path
*
- * @param path path to check against
+ * @param path
+ * path to check against
* @return {@code true} if all patterns match, {@code false} otherwise
*/
public static boolean matchPatterns(List<Matcher> matchers, String path) {
@@ -557,7 +573,8 @@
/**
* Converts the wildcard to proper regex
*
- * @param pattern wildcard pattern to convert
+ * @param pattern
+ * wildcard pattern to convert
* @return regex expression
*/
public static String patternToRegex(String pattern) {
@@ -646,7 +663,8 @@
/**
* Adjusts the prefix (if needed) and returns it
*
- * @param configuration configuration
+ * @param configuration
+ * configuration
*/
public static String getPrefix(Map<String, String> configuration) {
return getPrefix(configuration, true);
@@ -661,8 +679,10 @@
}
/**
- * @param configuration configuration map
- * @throws CompilationException Compilation exception
+ * @param configuration
+ * configuration map
+ * @throws CompilationException
+ * Compilation exception
*/
public static void validateIncludeExclude(Map<String, String> configuration) throws CompilationException {
// Ensure that include and exclude are not provided at the same time + ensure valid format or property
@@ -746,8 +766,10 @@
/**
* Validate Parquet dataset's declared type and configuration
*
- * @param properties external dataset configuration
- * @param datasetRecordType dataset declared type
+ * @param properties
+ * external dataset configuration
+ * @param datasetRecordType
+ * dataset declared type
*/
public static void validateParquetTypeAndConfiguration(Map<String, String> properties,
ARecordType datasetRecordType) throws CompilationException {
@@ -780,7 +802,8 @@
/**
* Serialize {@link ARecordType} as Base64 string to pass it to {@link org.apache.hadoop.conf.Configuration}
*
- * @param expectedType expected type
+ * @param expectedType
+ * expected type
* @return the expected type as Base64 string
*/
private static String serializeExpectedTypeToString(ARecordType expectedType) throws IOException {
@@ -799,7 +822,8 @@
* Serialize {@link FunctionCallInformation} map as Base64 string to pass it to
* {@link org.apache.hadoop.conf.Configuration}
*
- * @param functionCallInfoMap function information map
+ * @param functionCallInfoMap
+ * function information map
* @return function information map as Base64 string
*/
static String serializeFunctionCallInfoToString(Map<String, FunctionCallInformation> functionCallInfoMap)
@@ -830,4 +854,22 @@
public static Optional<String> getFirstNotNull(Map<String, String> configuration, String... parameters) {
return Arrays.stream(parameters).filter(field -> configuration.get(field) != null).findFirst();
}
+
+ public static ATypeTag peekArgument(IAType type, IValueReference valueReference, TaggedValuePointable pointy)
+ throws HyracksDataException {
+ ATypeTag tag = type.getTypeTag();
+ if (tag == ATypeTag.ANY) {
+ pointy.set(valueReference);
+ ATypeTag rtTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(pointy.getTag());
+ IAType rtType = TypeTagUtil.getBuiltinTypeByTag(rtTypeTag);
+ return MessagePackUtils.peekUnknown(rtType);
+ } else {
+ return MessagePackUtils.peekUnknown(type);
+ }
+ }
+
+ public static void setVoidArgument(ArrayBackedValueStorage argHolder) throws IOException {
+ argHolder.getDataOutput().writeByte(ARRAY16);
+ argHolder.getDataOutput().writeShort((short) 0);
+ }
}
diff --git a/asterixdb/asterix-docker/docker/.gitattributes b/asterixdb/asterix-podman/docker/.gitattributes
similarity index 100%
rename from asterixdb/asterix-docker/docker/.gitattributes
rename to asterixdb/asterix-podman/docker/.gitattributes
diff --git a/asterixdb/asterix-docker/docker/Dockerfile b/asterixdb/asterix-podman/docker/Dockerfile
similarity index 100%
rename from asterixdb/asterix-docker/docker/Dockerfile
rename to asterixdb/asterix-podman/docker/Dockerfile
diff --git a/asterixdb/asterix-docker/docker/asterix-configuration.xml b/asterixdb/asterix-podman/docker/asterix-configuration.xml
similarity index 100%
rename from asterixdb/asterix-docker/docker/asterix-configuration.xml
rename to asterixdb/asterix-podman/docker/asterix-configuration.xml
diff --git a/asterixdb/asterix-docker/docker/fbm.adm b/asterixdb/asterix-podman/docker/fbm.adm
similarity index 100%
rename from asterixdb/asterix-docker/docker/fbm.adm
rename to asterixdb/asterix-podman/docker/fbm.adm
diff --git a/asterixdb/asterix-docker/docker/fbu.adm b/asterixdb/asterix-podman/docker/fbu.adm
similarity index 100%
rename from asterixdb/asterix-docker/docker/fbu.adm
rename to asterixdb/asterix-podman/docker/fbu.adm
diff --git a/asterixdb/asterix-docker/docker/supervisord.conf b/asterixdb/asterix-podman/docker/supervisord.conf
similarity index 100%
rename from asterixdb/asterix-docker/docker/supervisord.conf
rename to asterixdb/asterix-podman/docker/supervisord.conf
diff --git a/asterixdb/asterix-docker/docker/twm.adm b/asterixdb/asterix-podman/docker/twm.adm
similarity index 100%
rename from asterixdb/asterix-docker/docker/twm.adm
rename to asterixdb/asterix-podman/docker/twm.adm
diff --git a/asterixdb/asterix-docker/docker/twu.adm b/asterixdb/asterix-podman/docker/twu.adm
similarity index 100%
rename from asterixdb/asterix-docker/docker/twu.adm
rename to asterixdb/asterix-podman/docker/twu.adm
diff --git a/asterixdb/asterix-podman/pom.xml b/asterixdb/asterix-podman/pom.xml
new file mode 100644
index 0000000..3d32518
--- /dev/null
+++ b/asterixdb/asterix-podman/pom.xml
@@ -0,0 +1,156 @@
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements. See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership. The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License. You may obtain a copy of the License at
+ !
+ ! http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied. See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>apache-asterixdb</artifactId>
+ <groupId>org.apache.asterix</groupId>
+ <version>0.9.8-SNAPSHOT</version>
+ </parent>
+ <artifactId>asterix-podman</artifactId>
+ <dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>asterix-server</artifactId>
+ <version>${project.version}</version>
+ <type>deb</type>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.asterix</groupId>
+ <artifactId>asterix-app</artifactId>
+ <version>${project.version}</version>
+ <classifier>tests</classifier>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.asterix</groupId>
+ <artifactId>asterix-test-framework</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>testcontainers</artifactId>
+ <version>1.17.1</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <properties>
+ <root.dir>${basedir}/..</root.dir>
+ </properties>
+
+ <licenses>
+ <license>
+ <name>Apache License, Version 2.0</name>
+ <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+ <distribution>repo</distribution>
+ <comments>A business-friendly OSS license</comments>
+ </license>
+ </licenses>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ <configuration>
+ <excludes combine.children="append">
+ <exclude>src/test/resources/setup.sh</exclude>
+ <exclude>src/test/resources/passwd</exclude>
+ <exclude>src/test/resources/socktest/Containerfile</exclude>
+ <exclude>src/test/resources/testenv.conf</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ <profiles>
+ <profile>
+ <id>podman.tests</id>
+ <properties>
+ <test.excludes>**/*.java</test.excludes>
+ <itest.includes>**/PodmanPythonFunctionIT.java</itest.includes>
+ <failIfNoTests>false</failIfNoTests>
+ </properties>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>nl.lexemmens</groupId>
+ <artifactId>podman-maven-plugin</artifactId>
+ <version>1.8.0</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>build</goal>
+ </goals>
+ <phase>generate-test-resources</phase>
+ </execution>
+ </executions>
+ <configuration>
+ <skipAuth>true</skipAuth>
+ <images>
+ <image>
+ <name>asterixdb/socktest</name>
+ <build>
+ <pull>false</pull>
+ <createLatestTag>true</createLatestTag>
+ <containerFileDir>src/test/resources/socktest</containerFileDir>
+ </build>
+ </image>
+ </images>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-resources-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>copy-external-data-resources</id>
+ <phase>generate-resources</phase>
+ <goals>
+ <goal>copy-resources</goal>
+ </goals>
+ <configuration>
+ <outputDirectory>target/</outputDirectory>
+ <overwrite>true</overwrite>
+ <resources>
+ <resource>
+ <directory>../asterix-server/target</directory>
+ <includes>
+ <include>asterix-server*.deb</include>
+ </includes>
+ </resource>
+ </resources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+
+</project>
diff --git a/asterixdb/asterix-podman/src/test/java/org/apache/asterix/test/podman/PodmanPythonFunctionIT.java b/asterixdb/asterix-podman/src/test/java/org/apache/asterix/test/podman/PodmanPythonFunctionIT.java
new file mode 100644
index 0000000..f0f89cd
--- /dev/null
+++ b/asterixdb/asterix-podman/src/test/java/org/apache/asterix/test/podman/PodmanPythonFunctionIT.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.podman;
+
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.asterix.test.common.TestExecutor;
+import org.apache.asterix.test.runtime.ExecutionTestUtil;
+import org.apache.asterix.test.runtime.LangExecutionUtil;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.testcontainers.DockerClientFactory;
+import org.testcontainers.containers.BindMode;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import com.github.dockerjava.api.DockerClient;
+
+/**
+ * Runs the Python UDF tests within a container using domain sockets.
+ */
+@RunWith(Parameterized.class)
+public class PodmanPythonFunctionIT {
+ public static final DockerImageName ASTERIX_IMAGE = DockerImageName.parse("asterixdb/socktest");
+ @ClassRule
+ public static GenericContainer<?> asterix = new GenericContainer(ASTERIX_IMAGE).withExposedPorts(19004, 5006, 19002)
+ .withFileSystemBind("../asterix-app/", "/var/tmp/asterix-app/", BindMode.READ_WRITE);
+ protected static final String TEST_CONFIG_FILE_NAME = "../asterix-app/src/test/resources/cc.conf";
+ private static final boolean cleanupOnStop = true;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ final TestExecutor testExecutor = new TestExecutor(
+ List.of(InetSocketAddress.createUnresolved(asterix.getHost(), asterix.getMappedPort(19002))));
+ asterix.execInContainer("/opt/setup.sh");
+ LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, testExecutor, false, true, new PodmanUDFLibrarian(asterix));
+ setEndpoints(testExecutor);
+ testExecutor.waitForClusterActive(60, TimeUnit.SECONDS);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ try {
+ } finally {
+ ExecutionTestUtil.tearDown(cleanupOnStop);
+ DockerClient dc = DockerClientFactory.instance().client();
+ dc.removeImageCmd(ASTERIX_IMAGE.asCanonicalNameString()).withForce(true).exec();
+ }
+ }
+
+ @Parameters(name = "PodmanPythonFunctionIT {index}: {0}")
+ public static Collection<Object[]> tests() throws Exception {
+ return LangExecutionUtil.tests("only_sqlpp.xml", "testsuite_it_python.xml",
+ "../asterix-app/src/test/resources/runtimets");
+ }
+
+ protected TestCaseContext tcCtx;
+
+ public PodmanPythonFunctionIT(TestCaseContext tcCtx) {
+ this.tcCtx = tcCtx;
+ }
+
+ @Test
+ public void test() throws Exception {
+ LangExecutionUtil.test(tcCtx);
+ }
+
+ private static void setEndpoints(TestExecutor testExecutor) {
+ final Map<String, InetSocketAddress> ncEndPoints = new HashMap<>();
+ final String ip = asterix.getHost();
+ final String nodeId = "asterix_nc";
+ int apiPort = asterix.getMappedPort(19004);
+ ncEndPoints.put(nodeId, InetSocketAddress.createUnresolved(ip, apiPort));
+ testExecutor.setNcEndPoints(ncEndPoints);
+ }
+}
diff --git a/asterixdb/asterix-podman/src/test/java/org/apache/asterix/test/podman/PodmanUDFLibrarian.java b/asterixdb/asterix-podman/src/test/java/org/apache/asterix/test/podman/PodmanUDFLibrarian.java
new file mode 100644
index 0000000..025f607
--- /dev/null
+++ b/asterixdb/asterix-podman/src/test/java/org/apache/asterix/test/podman/PodmanUDFLibrarian.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.podman;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.asterix.app.external.IExternalUDFLibrarian;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.json.JsonReadFeature;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+public class PodmanUDFLibrarian implements IExternalUDFLibrarian {
+ final GenericContainer<?> asterix;
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ public PodmanUDFLibrarian(GenericContainer asterix) {
+ OBJECT_MAPPER.configure(JsonReadFeature.ALLOW_UNESCAPED_CONTROL_CHARS.mappedFeature(), true);
+ this.asterix = asterix;
+ }
+
+ @Override
+ public void install(URI path, String type, String libPath, Pair<String, String> credentials) throws Exception {
+ Container.ExecResult curlResult = null;
+ int retryCt = 0;
+ while (retryCt < 10) {
+ try {
+ curlResult = asterix.execInContainer("curl", "--no-progress-meter", "-X", "POST", "-u",
+ credentials.first + ":" + credentials.second, "-F",
+ "data=@" + "/var/tmp/asterix-app/" + libPath, "-F", "type=" + type,
+ "http://localhost:19004" + path.getRawPath());
+ handleResponse(curlResult);
+ return;
+ } catch (RuntimeException e) {
+ retryCt++;
+ if (retryCt > 9)
+ throw e;
+ }
+ }
+ }
+
+ @Override
+ public void uninstall(URI path, Pair<String, String> credentials) throws IOException, AsterixException {
+ try {
+ Container.ExecResult curlResult = asterix.execInContainer("curl", "-X", "DELETE", "-u",
+ credentials.first + ":" + credentials.second, "http://localhost:19004" + path.getPath());
+ handleResponse(curlResult);
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private void handleResponse(Container.ExecResult result) throws AsterixException, JsonProcessingException {
+ if (result.getExitCode() != 0) {
+ throw new AsterixException(result.getStderr());
+ }
+ JsonNode resp = OBJECT_MAPPER.readTree(result.getStdout().replace('\0', ' '));
+ if (resp.has("error")) {
+ throw new AsterixException(resp.get("error").toString());
+ }
+ return;
+ }
+}
diff --git a/asterixdb/asterix-podman/src/test/resources/cc.conf b/asterixdb/asterix-podman/src/test/resources/cc.conf
new file mode 100644
index 0000000..e4cbd73
--- /dev/null
+++ b/asterixdb/asterix-podman/src/test/resources/cc.conf
@@ -0,0 +1,36 @@
+; Licensed to the Apache Software Foundation (ASF) under one
+; or more contributor license agreements. See the NOTICE file
+; distributed with this work for additional information
+; regarding copyright ownership. The ASF licenses this file
+; to you under the Apache License, Version 2.0 (the
+; "License"); you may not use this file except in compliance
+; with the License. You may obtain a copy of the License at
+;
+; http://www.apache.org/licenses/LICENSE-2.0
+;
+; Unless required by applicable law or agreed to in writing,
+; software distributed under the License is distributed on an
+; "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+; KIND, either express or implied. See the License for the
+; specific language governing permissions and limitations
+; under the License.
+
+[nc/asterix_nc1]
+txn.log.dir=/opt/apache-asterixdb/data/txnlog
+core.dump.dir=/opt/apache-asterixdb/logs/coredump
+iodevices=/opt/apache-asterixdb/data/
+nc.api.port=19004
+
+[nc]
+address=127.0.0.1
+command=asterixnc
+credential.file=/opt/apache-asterixdb/etc/passwd
+jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5006
+python.ds.path = /tmp/pyudf.socket
+
+[cc]
+address = 127.0.0.1
+
+[common]
+log.level = INFO
+log.dir = /opt/apache-asterixdb/logs/
diff --git a/asterixdb/asterix-podman/src/test/resources/passwd b/asterixdb/asterix-podman/src/test/resources/passwd
new file mode 100644
index 0000000..a1ea5b0
--- /dev/null
+++ b/asterixdb/asterix-podman/src/test/resources/passwd
@@ -0,0 +1 @@
+admin:$2a$12$JxgDzf/uOn1NS2Y3exhrDOf7JY/eUHQH7HeH90s5Ye2gALoO0FsQy
diff --git a/asterixdb/asterix-podman/src/test/resources/setup.sh b/asterixdb/asterix-podman/src/test/resources/setup.sh
new file mode 100644
index 0000000..e3523aa
--- /dev/null
+++ b/asterixdb/asterix-podman/src/test/resources/setup.sh
@@ -0,0 +1,8 @@
+#!/bin/bash
+cd /var/tmp/asterix-app/
+shiv -o target/TweetSent.pyz --site-packages src/test/resources/TweetSent scikit-learn
+cp -a /var/tmp/asterix-app/data/classifications /opt/apache-asterixdb/data/
+cp -a /var/tmp/asterix-app/data/twitter /opt/apache-asterixdb/data/
+cp -a /var/tmp/asterix-app/data/big-object /opt/apache-asterixdb/data/
+mkdir -p /opt/apache-asterixdb/target/data/
+cp -a /var/tmp/asterix-app/target/data/big-object /opt/apache-asterixdb/target/data/
\ No newline at end of file
diff --git a/asterixdb/asterix-podman/src/test/resources/socktest/Containerfile b/asterixdb/asterix-podman/src/test/resources/socktest/Containerfile
new file mode 100644
index 0000000..a7546d5
--- /dev/null
+++ b/asterixdb/asterix-podman/src/test/resources/socktest/Containerfile
@@ -0,0 +1,17 @@
+FROM ubuntu:22.04
+RUN apt -y update
+RUN DEBIAN_FRONTEND=noninteractive TZ=Etc/UTC apt -y install systemd openjdk-17-jre-headless unzip wget curl python3-pip python3-venv python3-systemd
+RUN pip3 install shiv msgpack
+COPY target/asterix-server_*all.deb .
+RUN dpkg -i asterix-server*.deb
+COPY src/test/resources/cc.conf /opt/apache-asterixdb/cc.conf
+COPY src/test/resources/passwd /opt/apache-asterixdb/etc/passwd
+RUN mkdir -p /etc/systemd/system/pyudf@.service.d/
+COPY src/test/resources/testenv.conf /etc/systemd/system/pyudf@.service.d/
+COPY src/test/resources/setup.sh /opt
+RUN chmod +x /opt/setup.sh
+RUN systemctl enable asterix-nc asterix-cc pyudf.socket
+
+EXPOSE 19001 19002 19004
+
+CMD [ "/lib/systemd/systemd" ]
diff --git a/asterixdb/asterix-podman/src/test/resources/testenv.conf b/asterixdb/asterix-podman/src/test/resources/testenv.conf
new file mode 100644
index 0000000..0c2f182
--- /dev/null
+++ b/asterixdb/asterix-podman/src/test/resources/testenv.conf
@@ -0,0 +1,3 @@
+[Service]
+Environment="FOO=BAR=BAZ"
+Environment="BAR=BAZ"
diff --git a/asterixdb/asterix-server/pom.xml b/asterixdb/asterix-server/pom.xml
index 6c2a05a..e1d0964 100644
--- a/asterixdb/asterix-server/pom.xml
+++ b/asterixdb/asterix-server/pom.xml
@@ -978,7 +978,7 @@
<plugin>
<artifactId>jdeb</artifactId>
<groupId>org.vafer</groupId>
- <version>1.5</version>
+ <version>1.8</version>
<executions>
<execution>
<phase>package</phase>
@@ -988,26 +988,36 @@
<configuration>
<dataSet>
<data>
- <src>${project.build.directory}/${project.build.finalName}-binary-assembly/apache-asterixdb-${project.version}/</src>
- <excludes>bin/**</excludes>
+ <src>${project.build.directory}/${project.build.finalName}-binary-assembly/apache-asterixdb-${project.version}</src>
<type>directory</type>
<mapper>
<type>perm</type>
- <prefix>/opt/apache-asterixdb-${project.version}/</prefix>
- <user>asterixdb</user>
- <group>asterixdb</group>
+ <prefix>/opt/apache-asterixdb/</prefix>
+ <user>root</user>
+ <group>root</group>
+ <filemode>755</filemode>
+ </mapper>
+ </data>
+ <data>
+ <type>file</type>
+ <src>src/deb/systemd/cc.conf</src>
+ <mapper>
+ <prefix>/opt/apache-asterixdb/</prefix>
+ <type>perm</type>
+ <user>root</user>
+ <group>root</group>
<filemode>644</filemode>
</mapper>
</data>
<data>
- <src>${project.build.directory}/${project.build.finalName}-binary-assembly/apache-asterixdb-${project.version}/bin</src>
- <type>directory</type>
+ <type>file</type>
+ <src>src/deb/udf_listener.py</src>
<mapper>
+ <prefix>/opt/apache-asterixdb/bin</prefix>
<type>perm</type>
- <prefix>/opt/apache-asterixdb-${project.version}/bin</prefix>
- <user>asterixdb</user>
- <group>asterixdb</group>
- <filemode>754</filemode>
+ <user>root</user>
+ <group>root</group>
+ <filemode>555</filemode>
</mapper>
</data>
<data>
@@ -1030,6 +1040,39 @@
<group>root</group>
</mapper>
</data>
+ <data>
+ <type>file</type>
+ <src>src/deb/systemd/pyudf.socket</src>
+ <mapper>
+ <prefix>/lib/systemd/system</prefix>
+ <type>perm</type>
+ <user>root</user>
+ <group>root</group>
+ </mapper>
+ </data>
+ <data>
+ <type>file</type>
+ <src>src/deb/systemd/pyudf@.service</src>
+ <mapper>
+ <prefix>/lib/systemd/system</prefix>
+ <type>perm</type>
+ <user>root</user>
+ <group>root</group>
+ </mapper>
+ </data>
+ <data>
+ <type>template</type>
+ <paths>
+ <path>/opt/apache-asterixdb/logs</path>
+ <path>/opt/apache-asterixdb/data</path>
+ </paths>
+ <mapper>
+ <type>perm</type>
+ <user>asterixdb</user>
+ <group>asterixdb</group>
+ <filemode>750</filemode>
+ </mapper>
+ </data>
</dataSet>
</configuration>
</execution>
diff --git a/asterixdb/asterix-server/src/deb/control/control b/asterixdb/asterix-server/src/deb/control/control
index 1f6c213..77bbd1d 100644
--- a/asterixdb/asterix-server/src/deb/control/control
+++ b/asterixdb/asterix-server/src/deb/control/control
@@ -17,8 +17,7 @@
Section: databases
Priority: extra
Architecture: all
-Depends: jdk (>= 1.8)
+Depends: java17-runtime-headless
Maintainer: Ian Maxon <ian@maxons.email>
Description: Apache AsterixDB - a scalable, open source Big Data Management System (BDMS)
-Distribution: development
-Depends: default-jre | java8-runtime
+Distribution: development
\ No newline at end of file
diff --git a/asterixdb/asterix-server/src/deb/control/postinst b/asterixdb/asterix-server/src/deb/control/postinst
index 896ca28..fe5c912 100644
--- a/asterixdb/asterix-server/src/deb/control/postinst
+++ b/asterixdb/asterix-server/src/deb/control/postinst
@@ -13,5 +13,4 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-adduser --system --group --quiet --home /opt/apache-asterixdb/ \
---no-create-home --disabled-login --force-badname asterixdb
+chmod -R 755 /opt/apache-asterixdb/
\ No newline at end of file
diff --git a/asterixdb/asterix-server/src/deb/control/preinst b/asterixdb/asterix-server/src/deb/control/preinst
index 4509c90..8d14847 100644
--- a/asterixdb/asterix-server/src/deb/control/preinst
+++ b/asterixdb/asterix-server/src/deb/control/preinst
@@ -13,3 +13,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+adduser --system --group --quiet --home /opt/apache-asterixdb/ \
+--no-create-home --disabled-login --force-badname asterixdb
+adduser --system --group --quiet --home /opt/apache-asterixdb/ \
+--no-create-home --disabled-login --force-badname asterixdb-udf
\ No newline at end of file
diff --git a/asterixdb/asterix-server/src/deb/systemd/asterix-cc.service b/asterixdb/asterix-server/src/deb/systemd/asterix-cc.service
index 9711fba..2a52e2d 100644
--- a/asterixdb/asterix-server/src/deb/systemd/asterix-cc.service
+++ b/asterixdb/asterix-server/src/deb/systemd/asterix-cc.service
@@ -19,8 +19,9 @@
[Service]
Type=simple
User=asterixdb
-ExecStart=/opt/apache-asterixdb/bin/asterixcc --config-file /opt/apache-asterixdb/cc.conf
+ExecStart=/opt/apache-asterixdb/bin/asterixcc -config-file "/opt/apache-asterixdb/cc.conf"
Restart=on-abort
+WorkingDirectory=/opt/apache-asterixdb
[Install]
WantedBy=multi-user.target
diff --git a/asterixdb/asterix-server/src/deb/systemd/asterix-nc.service b/asterixdb/asterix-server/src/deb/systemd/asterix-nc.service
index bfe6296..e09d8e8 100644
--- a/asterixdb/asterix-server/src/deb/systemd/asterix-nc.service
+++ b/asterixdb/asterix-server/src/deb/systemd/asterix-nc.service
@@ -21,6 +21,7 @@
User=asterixdb
ExecStart=/opt/apache-asterixdb/bin/asterixncservice
Restart=on-abort
+WorkingDirectory=/opt/apache-asterixdb
[Install]
WantedBy=multi-user.target
diff --git a/asterixdb/asterix-server/src/deb/systemd/cc.conf b/asterixdb/asterix-server/src/deb/systemd/cc.conf
new file mode 100644
index 0000000..0af967a
--- /dev/null
+++ b/asterixdb/asterix-server/src/deb/systemd/cc.conf
@@ -0,0 +1,33 @@
+; Licensed to the Apache Software Foundation (ASF) under one
+; or more contributor license agreements. See the NOTICE file
+; distributed with this work for additional information
+; regarding copyright ownership. The ASF licenses this file
+; to you under the Apache License, Version 2.0 (the
+; "License"); you may not use this file except in compliance
+; with the License. You may obtain a copy of the License at
+;
+; http://www.apache.org/licenses/LICENSE-2.0
+;
+; Unless required by applicable law or agreed to in writing,
+; software distributed under the License is distributed on an
+; "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+; KIND, either express or implied. See the License for the
+; specific language governing permissions and limitations
+; under the License.
+
+[nc/asterix_nc1]
+txn.log.dir=/opt/apache-asterixdb/data/txnlog
+core.dump.dir=/opt/apache-asterixdb/logs/coredump
+iodevices=/opt/apache-asterixdb/data/
+nc.api.port=19004
+
+[nc]
+address=127.0.0.1
+command=asterixnc
+
+[cc]
+address = 127.0.0.1
+
+[common]
+log.level = INFO
+log.dir = /opt/apache-asterixdb/logs/
diff --git a/asterixdb/asterix-server/src/deb/systemd/pyudf.socket b/asterixdb/asterix-server/src/deb/systemd/pyudf.socket
new file mode 100644
index 0000000..4e731db
--- /dev/null
+++ b/asterixdb/asterix-server/src/deb/systemd/pyudf.socket
@@ -0,0 +1,28 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+[Unit]
+Description=AsterixDB UDF Domain Socket
+PartOf=asterixdb_udf.service
+
+[Socket]
+ListenStream=/tmp/pyudf.socket
+SocketMode=0660
+SocketUser=asterixdb-udf
+SocketGroup=asterixdb
+Accept=true
+DeferAcceptSec=1
+
+[Install]
+WantedBy=sockets.target
\ No newline at end of file
diff --git a/asterixdb/asterix-server/src/deb/systemd/pyudf@.service b/asterixdb/asterix-server/src/deb/systemd/pyudf@.service
new file mode 100644
index 0000000..9856142
--- /dev/null
+++ b/asterixdb/asterix-server/src/deb/systemd/pyudf@.service
@@ -0,0 +1,30 @@
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+[Unit]
+Description=AsterixDB UDF Executor Service
+After=network.target pyudf.socket
+Requires=pyudf.socket
+
+[Service]
+User=asterixdb-udf
+Type=simple
+ExecStart=/usr/bin/python3 /opt/apache-asterixdb/bin/udf_listener.py
+TimeoutStopSec=5
+StandardError=journal
+StandardError=journal
+
+[Install]
+WantedBy=default.target
\ No newline at end of file
diff --git a/asterixdb/asterix-server/src/deb/udf_listener.py b/asterixdb/asterix-server/src/deb/udf_listener.py
new file mode 100644
index 0000000..03874b2
--- /dev/null
+++ b/asterixdb/asterix-server/src/deb/udf_listener.py
@@ -0,0 +1,283 @@
+#!/usr/bin/env python3
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import sys
+from systemd.daemon import listen_fds
+from os import chdir
+from os import getcwd
+from os import getpid
+from struct import *
+import signal
+import msgpack
+import socket
+import traceback
+from importlib import import_module
+from pathlib import Path
+from enum import IntEnum
+from io import BytesIO
+
+
+PROTO_VERSION = 1
+HEADER_SZ = 8 + 8 + 1
+REAL_HEADER_SZ = 4 + 8 + 8 + 1
+FRAMESZ = 32768
+
+
+class MessageType(IntEnum):
+ HELO = 0
+ QUIT = 1
+ INIT = 2
+ INIT_RSP = 3
+ CALL = 4
+ CALL_RSP = 5
+ ERROR = 6
+
+
+class MessageFlags(IntEnum):
+ NORMAL = 0
+ INITIAL_REQ = 1
+ INITIAL_ACK = 2
+ ERROR = 3
+
+
+class Wrapper(object):
+ wrapped_module = None
+ wrapped_class = None
+ wrapped_fn = None
+ sz = None
+ mid = None
+ rmid = None
+ flag = None
+ resp = None
+ unpacked_msg = None
+ msg_type = None
+ packer = msgpack.Packer(autoreset=False, use_bin_type=False)
+ unpacker = msgpack.Unpacker(raw=False)
+ response_buf = BytesIO()
+ stdin_buf = BytesIO()
+ wrapped_fns = {}
+ alive = True
+ readbuf = bytearray(FRAMESZ)
+ readview = memoryview(readbuf)
+
+
+ def init(self, module_name, class_name, fn_name):
+ self.wrapped_module = import_module(module_name)
+ # do not allow modules to be called that are not part of the uploaded module
+ wrapped_fn = None
+ if not self.check_module_path(self.wrapped_module):
+ self.wrapped_module = None
+ raise ImportError("Module was not found in library")
+ if class_name is not None:
+ self.wrapped_class = getattr(
+ import_module(module_name), class_name)()
+ if self.wrapped_class is not None:
+ wrapped_fn = getattr(self.wrapped_class, fn_name)
+ else:
+ wrapped_fn = getattr(import_module(module_name), fn_name)
+ if wrapped_fn is None:
+ raise ImportError(
+ "Could not find class or function in specified module")
+ self.wrapped_fns[self.mid] = wrapped_fn
+
+ def next_tuple(self, *args, key=None):
+ return self.wrapped_fns[key](*args)
+
+ def check_module_path(self, module):
+ cwd = Path('.').resolve()
+ module_path = Path(module.__file__).resolve()
+ return cwd in module_path.parents
+ return True
+
+ def read_header(self, readbuf):
+ self.sz, self.mid, self.rmid, self.flag = unpack(
+ "!iqqb", readbuf[0:REAL_HEADER_SZ])
+ return True
+
+ def write_header(self, response_buf, dlen):
+ total_len = dlen + HEADER_SZ
+ header = pack("!iqqb", total_len, int(-1), int(self.rmid), self.flag)
+ self.response_buf.write(header)
+ return total_len + 4
+
+ def get_ver_hlen(self, hlen):
+ return hlen + (PROTO_VERSION << 4)
+
+ def get_hlen(self):
+ return self.ver_hlen - (PROTO_VERSION << 4)
+
+ def init_remote_ipc(self):
+ self.response_buf.seek(0)
+ self.flag = MessageFlags.INITIAL_REQ
+ dlen = len(self.unpacked_msg[1])
+ resp_len = self.write_header(self.response_buf, dlen)
+ self.response_buf.write(self.unpacked_msg[1])
+ self.resp = self.response_buf.getbuffer()[0:resp_len]
+ self.send_msg()
+ self.packer.reset()
+
+ def cd(self, basedir):
+ chdir(basedir + "/site-packages")
+ sys.path.insert(0,getcwd())
+
+ def helo(self):
+ # need to ack the connection back before sending actual HELO
+ # self.init_remote_ipc()
+ self.cd(self.unpacked_msg[1][1])
+ self.flag = MessageFlags.NORMAL
+ self.response_buf.seek(0)
+ self.packer.pack(int(MessageType.HELO))
+ self.packer.pack(int(getpid()))
+ dlen = len(self.packer.bytes()) # tag(1) + body(4)
+ resp_len = self.write_header(self.response_buf, dlen)
+ self.response_buf.write(self.packer.bytes())
+ self.resp = self.response_buf.getbuffer()[0:resp_len]
+ self.send_msg()
+ self.packer.reset()
+ return True
+
+ def handle_init(self):
+ self.flag = MessageFlags.NORMAL
+ self.response_buf.seek(0)
+ args = self.unpacked_msg[1]
+ module = args[0]
+ if len(args) == 3:
+ clazz = args[1]
+ fn = args[2]
+ else:
+ clazz = None
+ fn = args[1]
+ self.init(module, clazz, fn)
+ self.packer.pack(int(MessageType.INIT_RSP))
+ dlen = 1 # just the tag.
+ resp_len = self.write_header(self.response_buf, dlen)
+ self.response_buf.write(self.packer.bytes())
+ self.resp = self.response_buf.getbuffer()[0:resp_len]
+ self.send_msg()
+ self.packer.reset()
+ return True
+
+ def quit(self):
+ self.alive = False
+ return True
+
+ def handle_call(self):
+ self.flag = MessageFlags.NORMAL
+ result = ([], [])
+ if len(self.unpacked_msg) > 1:
+ args = self.unpacked_msg[1]
+ if args is not None:
+ for arg in args:
+ try:
+ result[0].append(self.next_tuple(*arg, key=self.mid))
+ except BaseException as e:
+ result[1].append(traceback.format_exc())
+ self.packer.reset()
+ self.response_buf.seek(0)
+ body = msgpack.packb(result)
+ dlen = len(body) + 1 # 1 for tag
+ resp_len = self.write_header(self.response_buf, dlen)
+ self.packer.pack(int(MessageType.CALL_RSP))
+ self.response_buf.write(self.packer.bytes())
+ self.response_buf.write(body)
+ self.resp = self.response_buf.getbuffer()[0:resp_len]
+ self.send_msg()
+ self.packer.reset()
+ return True
+
+ def handle_error(self, e):
+ self.flag = MessageFlags.NORMAL
+ self.packer.reset()
+ self.response_buf.seek(0)
+ body = msgpack.packb(str(e))
+ dlen = len(body) + 1 # 1 for tag
+ resp_len = self.write_header(self.response_buf, dlen)
+ self.packer.pack(int(MessageType.ERROR))
+ self.response_buf.write(self.packer.bytes())
+ self.response_buf.write(body)
+ self.resp = self.response_buf.getbuffer()[0:resp_len]
+ self.send_msg()
+ self.packer.reset()
+ self.alive = False
+ return True
+
+ type_handler = {
+ MessageType.HELO: helo,
+ MessageType.QUIT: quit,
+ MessageType.INIT: handle_init,
+ MessageType.CALL: handle_call
+ }
+
+ def connect_sock(self):
+ self.sock = socket.fromfd(listen_fds()[0], socket.AF_UNIX, socket.SOCK_STREAM)
+
+ def disconnect_sock(self, *args):
+ self.sock.shutdown(socket.SHUT_RDWR)
+ self.sock.close()
+
+ def recv_msg(self):
+ while self.alive:
+ pos = self.sock.recv_into(self.readbuf)
+ if pos <= 0:
+ self.alive = False
+ return
+ try:
+ while pos < REAL_HEADER_SZ:
+ read = self.sock.recv_into(self.readview[pos:])
+ if read <= 0:
+ self.alive = False
+ return
+ pos += read
+ self.read_header(self.readview)
+ while pos < self.sz and len(self.readbuf) - pos > 0:
+ read = self.sock.recv_into(self.readview[pos:])
+ if read <= 0:
+ self.alive = False
+ return
+ pos += read
+ while pos < self.sz:
+ vszchunk = self.sock.recv(4096)
+ if len(vszchunk) == 0:
+ self.alive = False
+ return
+ self.readview.release()
+ self.readbuf.extend(vszchunk)
+ self.readview = memoryview(self.readbuf)
+ pos += len(vszchunk)
+ self.unpacker.feed(self.readview[REAL_HEADER_SZ:self.sz])
+ self.unpacked_msg = list(self.unpacker)
+ self.msg_type = MessageType(self.unpacked_msg[0])
+ self.type_handler[self.msg_type](self)
+ except BaseException as e:
+ self.handle_error(''.join(traceback.format_exc()))
+
+ def send_msg(self):
+ self.sock.sendall(self.resp)
+ self.resp = None
+ return
+
+ def recv_loop(self):
+ while self.alive:
+ self.recv_msg()
+ self.disconnect_sock()
+
+
+wrap = Wrapper()
+wrap.connect_sock()
+signal.signal(signal.SIGTERM, wrap.disconnect_sock)
+wrap.recv_loop()
diff --git a/asterixdb/pom.xml b/asterixdb/pom.xml
index 9fa2fc1..dcd0978 100644
--- a/asterixdb/pom.xml
+++ b/asterixdb/pom.xml
@@ -931,7 +931,7 @@
<module>asterix-test-framework</module>
<module>asterix-maven-plugins</module>
<module>asterix-server</module>
- <module>asterix-docker</module>
+ <module>asterix-podman</module>
<module>asterix-doc</module>
<module>asterix-fuzzyjoin</module>
<module>asterix-replication</module>
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
index 01cb9bf..bb40e2b 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
@@ -99,6 +99,7 @@
PYTHON_USE_BUNDLED_MSGPACK(BOOLEAN, true),
PYTHON_ARGS(STRING_ARRAY, (String[]) null),
PYTHON_ENV(STRING_ARRAY, (String[]) null),
+ PYTHON_DS_PATH(STRING, (String) null),
CREDENTIAL_FILE(
OptionTypes.STRING,
(Function<IApplicationConfig, String>) appConfig -> FileUtil
@@ -248,6 +249,8 @@
return "Whether or not to attempt to automatically set PYTHON_CMD to a usable interpreter";
case PYTHON_ENV:
return "List of environment variables to set when invoking the Python interpreter for Python UDFs. E.g. FOO=1";
+ case PYTHON_DS_PATH:
+ return "Path to systemd socket for fenced Python UDFs. Requires JDK17+, *nix operating system, and ";
case CREDENTIAL_FILE:
return "Path to HTTP basic credentials";
default: