[ASTERIXDB-3034][RT] Fenced UDFs
- user model changes: yes
- storage format changes: no
- interface changes: yes
Details:
- Allow UDFs to be run via domain socket activated
systemd services . This makes it so the UDF is run
as a different user than the NC process itself
Change-Id: Ibeb6228f2dc8edbf642e61cd5633c71913e18972
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/16364
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-server/src/deb/control/control b/asterixdb/asterix-server/src/deb/control/control
index 1f6c213..77bbd1d 100644
--- a/asterixdb/asterix-server/src/deb/control/control
+++ b/asterixdb/asterix-server/src/deb/control/control
@@ -17,8 +17,7 @@
Section: databases
Priority: extra
Architecture: all
-Depends: jdk (>= 1.8)
+Depends: java17-runtime-headless
Maintainer: Ian Maxon <ian@maxons.email>
Description: Apache AsterixDB - a scalable, open source Big Data Management System (BDMS)
-Distribution: development
-Depends: default-jre | java8-runtime
+Distribution: development
\ No newline at end of file
diff --git a/asterixdb/asterix-server/src/deb/control/postinst b/asterixdb/asterix-server/src/deb/control/postinst
index 896ca28..fe5c912 100644
--- a/asterixdb/asterix-server/src/deb/control/postinst
+++ b/asterixdb/asterix-server/src/deb/control/postinst
@@ -13,5 +13,4 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-adduser --system --group --quiet --home /opt/apache-asterixdb/ \
---no-create-home --disabled-login --force-badname asterixdb
+chmod -R 755 /opt/apache-asterixdb/
\ No newline at end of file
diff --git a/asterixdb/asterix-server/src/deb/control/preinst b/asterixdb/asterix-server/src/deb/control/preinst
index 4509c90..8d14847 100644
--- a/asterixdb/asterix-server/src/deb/control/preinst
+++ b/asterixdb/asterix-server/src/deb/control/preinst
@@ -13,3 +13,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+adduser --system --group --quiet --home /opt/apache-asterixdb/ \
+--no-create-home --disabled-login --force-badname asterixdb
+adduser --system --group --quiet --home /opt/apache-asterixdb/ \
+--no-create-home --disabled-login --force-badname asterixdb-udf
\ No newline at end of file
diff --git a/asterixdb/asterix-server/src/deb/systemd/asterix-cc.service b/asterixdb/asterix-server/src/deb/systemd/asterix-cc.service
index 9711fba..2a52e2d 100644
--- a/asterixdb/asterix-server/src/deb/systemd/asterix-cc.service
+++ b/asterixdb/asterix-server/src/deb/systemd/asterix-cc.service
@@ -19,8 +19,9 @@
[Service]
Type=simple
User=asterixdb
-ExecStart=/opt/apache-asterixdb/bin/asterixcc --config-file /opt/apache-asterixdb/cc.conf
+ExecStart=/opt/apache-asterixdb/bin/asterixcc -config-file "/opt/apache-asterixdb/cc.conf"
Restart=on-abort
+WorkingDirectory=/opt/apache-asterixdb
[Install]
WantedBy=multi-user.target
diff --git a/asterixdb/asterix-server/src/deb/systemd/asterix-nc.service b/asterixdb/asterix-server/src/deb/systemd/asterix-nc.service
index bfe6296..e09d8e8 100644
--- a/asterixdb/asterix-server/src/deb/systemd/asterix-nc.service
+++ b/asterixdb/asterix-server/src/deb/systemd/asterix-nc.service
@@ -21,6 +21,7 @@
User=asterixdb
ExecStart=/opt/apache-asterixdb/bin/asterixncservice
Restart=on-abort
+WorkingDirectory=/opt/apache-asterixdb
[Install]
WantedBy=multi-user.target
diff --git a/asterixdb/asterix-server/src/deb/systemd/cc.conf b/asterixdb/asterix-server/src/deb/systemd/cc.conf
new file mode 100644
index 0000000..0af967a
--- /dev/null
+++ b/asterixdb/asterix-server/src/deb/systemd/cc.conf
@@ -0,0 +1,33 @@
+; Licensed to the Apache Software Foundation (ASF) under one
+; or more contributor license agreements. See the NOTICE file
+; distributed with this work for additional information
+; regarding copyright ownership. The ASF licenses this file
+; to you under the Apache License, Version 2.0 (the
+; "License"); you may not use this file except in compliance
+; with the License. You may obtain a copy of the License at
+;
+; http://www.apache.org/licenses/LICENSE-2.0
+;
+; Unless required by applicable law or agreed to in writing,
+; software distributed under the License is distributed on an
+; "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+; KIND, either express or implied. See the License for the
+; specific language governing permissions and limitations
+; under the License.
+
+[nc/asterix_nc1]
+txn.log.dir=/opt/apache-asterixdb/data/txnlog
+core.dump.dir=/opt/apache-asterixdb/logs/coredump
+iodevices=/opt/apache-asterixdb/data/
+nc.api.port=19004
+
+[nc]
+address=127.0.0.1
+command=asterixnc
+
+[cc]
+address = 127.0.0.1
+
+[common]
+log.level = INFO
+log.dir = /opt/apache-asterixdb/logs/
diff --git a/asterixdb/asterix-server/src/deb/systemd/pyudf.socket b/asterixdb/asterix-server/src/deb/systemd/pyudf.socket
new file mode 100644
index 0000000..4e731db
--- /dev/null
+++ b/asterixdb/asterix-server/src/deb/systemd/pyudf.socket
@@ -0,0 +1,28 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+[Unit]
+Description=AsterixDB UDF Domain Socket
+PartOf=asterixdb_udf.service
+
+[Socket]
+ListenStream=/tmp/pyudf.socket
+SocketMode=0660
+SocketUser=asterixdb-udf
+SocketGroup=asterixdb
+Accept=true
+DeferAcceptSec=1
+
+[Install]
+WantedBy=sockets.target
\ No newline at end of file
diff --git a/asterixdb/asterix-server/src/deb/systemd/pyudf@.service b/asterixdb/asterix-server/src/deb/systemd/pyudf@.service
new file mode 100644
index 0000000..9856142
--- /dev/null
+++ b/asterixdb/asterix-server/src/deb/systemd/pyudf@.service
@@ -0,0 +1,30 @@
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+[Unit]
+Description=AsterixDB UDF Executor Service
+After=network.target pyudf.socket
+Requires=pyudf.socket
+
+[Service]
+User=asterixdb-udf
+Type=simple
+ExecStart=/usr/bin/python3 /opt/apache-asterixdb/bin/udf_listener.py
+TimeoutStopSec=5
+StandardError=journal
+StandardError=journal
+
+[Install]
+WantedBy=default.target
\ No newline at end of file
diff --git a/asterixdb/asterix-server/src/deb/udf_listener.py b/asterixdb/asterix-server/src/deb/udf_listener.py
new file mode 100644
index 0000000..03874b2
--- /dev/null
+++ b/asterixdb/asterix-server/src/deb/udf_listener.py
@@ -0,0 +1,283 @@
+#!/usr/bin/env python3
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import sys
+from systemd.daemon import listen_fds
+from os import chdir
+from os import getcwd
+from os import getpid
+from struct import *
+import signal
+import msgpack
+import socket
+import traceback
+from importlib import import_module
+from pathlib import Path
+from enum import IntEnum
+from io import BytesIO
+
+
+PROTO_VERSION = 1
+HEADER_SZ = 8 + 8 + 1
+REAL_HEADER_SZ = 4 + 8 + 8 + 1
+FRAMESZ = 32768
+
+
+class MessageType(IntEnum):
+ HELO = 0
+ QUIT = 1
+ INIT = 2
+ INIT_RSP = 3
+ CALL = 4
+ CALL_RSP = 5
+ ERROR = 6
+
+
+class MessageFlags(IntEnum):
+ NORMAL = 0
+ INITIAL_REQ = 1
+ INITIAL_ACK = 2
+ ERROR = 3
+
+
+class Wrapper(object):
+ wrapped_module = None
+ wrapped_class = None
+ wrapped_fn = None
+ sz = None
+ mid = None
+ rmid = None
+ flag = None
+ resp = None
+ unpacked_msg = None
+ msg_type = None
+ packer = msgpack.Packer(autoreset=False, use_bin_type=False)
+ unpacker = msgpack.Unpacker(raw=False)
+ response_buf = BytesIO()
+ stdin_buf = BytesIO()
+ wrapped_fns = {}
+ alive = True
+ readbuf = bytearray(FRAMESZ)
+ readview = memoryview(readbuf)
+
+
+ def init(self, module_name, class_name, fn_name):
+ self.wrapped_module = import_module(module_name)
+ # do not allow modules to be called that are not part of the uploaded module
+ wrapped_fn = None
+ if not self.check_module_path(self.wrapped_module):
+ self.wrapped_module = None
+ raise ImportError("Module was not found in library")
+ if class_name is not None:
+ self.wrapped_class = getattr(
+ import_module(module_name), class_name)()
+ if self.wrapped_class is not None:
+ wrapped_fn = getattr(self.wrapped_class, fn_name)
+ else:
+ wrapped_fn = getattr(import_module(module_name), fn_name)
+ if wrapped_fn is None:
+ raise ImportError(
+ "Could not find class or function in specified module")
+ self.wrapped_fns[self.mid] = wrapped_fn
+
+ def next_tuple(self, *args, key=None):
+ return self.wrapped_fns[key](*args)
+
+ def check_module_path(self, module):
+ cwd = Path('.').resolve()
+ module_path = Path(module.__file__).resolve()
+ return cwd in module_path.parents
+ return True
+
+ def read_header(self, readbuf):
+ self.sz, self.mid, self.rmid, self.flag = unpack(
+ "!iqqb", readbuf[0:REAL_HEADER_SZ])
+ return True
+
+ def write_header(self, response_buf, dlen):
+ total_len = dlen + HEADER_SZ
+ header = pack("!iqqb", total_len, int(-1), int(self.rmid), self.flag)
+ self.response_buf.write(header)
+ return total_len + 4
+
+ def get_ver_hlen(self, hlen):
+ return hlen + (PROTO_VERSION << 4)
+
+ def get_hlen(self):
+ return self.ver_hlen - (PROTO_VERSION << 4)
+
+ def init_remote_ipc(self):
+ self.response_buf.seek(0)
+ self.flag = MessageFlags.INITIAL_REQ
+ dlen = len(self.unpacked_msg[1])
+ resp_len = self.write_header(self.response_buf, dlen)
+ self.response_buf.write(self.unpacked_msg[1])
+ self.resp = self.response_buf.getbuffer()[0:resp_len]
+ self.send_msg()
+ self.packer.reset()
+
+ def cd(self, basedir):
+ chdir(basedir + "/site-packages")
+ sys.path.insert(0,getcwd())
+
+ def helo(self):
+ # need to ack the connection back before sending actual HELO
+ # self.init_remote_ipc()
+ self.cd(self.unpacked_msg[1][1])
+ self.flag = MessageFlags.NORMAL
+ self.response_buf.seek(0)
+ self.packer.pack(int(MessageType.HELO))
+ self.packer.pack(int(getpid()))
+ dlen = len(self.packer.bytes()) # tag(1) + body(4)
+ resp_len = self.write_header(self.response_buf, dlen)
+ self.response_buf.write(self.packer.bytes())
+ self.resp = self.response_buf.getbuffer()[0:resp_len]
+ self.send_msg()
+ self.packer.reset()
+ return True
+
+ def handle_init(self):
+ self.flag = MessageFlags.NORMAL
+ self.response_buf.seek(0)
+ args = self.unpacked_msg[1]
+ module = args[0]
+ if len(args) == 3:
+ clazz = args[1]
+ fn = args[2]
+ else:
+ clazz = None
+ fn = args[1]
+ self.init(module, clazz, fn)
+ self.packer.pack(int(MessageType.INIT_RSP))
+ dlen = 1 # just the tag.
+ resp_len = self.write_header(self.response_buf, dlen)
+ self.response_buf.write(self.packer.bytes())
+ self.resp = self.response_buf.getbuffer()[0:resp_len]
+ self.send_msg()
+ self.packer.reset()
+ return True
+
+ def quit(self):
+ self.alive = False
+ return True
+
+ def handle_call(self):
+ self.flag = MessageFlags.NORMAL
+ result = ([], [])
+ if len(self.unpacked_msg) > 1:
+ args = self.unpacked_msg[1]
+ if args is not None:
+ for arg in args:
+ try:
+ result[0].append(self.next_tuple(*arg, key=self.mid))
+ except BaseException as e:
+ result[1].append(traceback.format_exc())
+ self.packer.reset()
+ self.response_buf.seek(0)
+ body = msgpack.packb(result)
+ dlen = len(body) + 1 # 1 for tag
+ resp_len = self.write_header(self.response_buf, dlen)
+ self.packer.pack(int(MessageType.CALL_RSP))
+ self.response_buf.write(self.packer.bytes())
+ self.response_buf.write(body)
+ self.resp = self.response_buf.getbuffer()[0:resp_len]
+ self.send_msg()
+ self.packer.reset()
+ return True
+
+ def handle_error(self, e):
+ self.flag = MessageFlags.NORMAL
+ self.packer.reset()
+ self.response_buf.seek(0)
+ body = msgpack.packb(str(e))
+ dlen = len(body) + 1 # 1 for tag
+ resp_len = self.write_header(self.response_buf, dlen)
+ self.packer.pack(int(MessageType.ERROR))
+ self.response_buf.write(self.packer.bytes())
+ self.response_buf.write(body)
+ self.resp = self.response_buf.getbuffer()[0:resp_len]
+ self.send_msg()
+ self.packer.reset()
+ self.alive = False
+ return True
+
+ type_handler = {
+ MessageType.HELO: helo,
+ MessageType.QUIT: quit,
+ MessageType.INIT: handle_init,
+ MessageType.CALL: handle_call
+ }
+
+ def connect_sock(self):
+ self.sock = socket.fromfd(listen_fds()[0], socket.AF_UNIX, socket.SOCK_STREAM)
+
+ def disconnect_sock(self, *args):
+ self.sock.shutdown(socket.SHUT_RDWR)
+ self.sock.close()
+
+ def recv_msg(self):
+ while self.alive:
+ pos = self.sock.recv_into(self.readbuf)
+ if pos <= 0:
+ self.alive = False
+ return
+ try:
+ while pos < REAL_HEADER_SZ:
+ read = self.sock.recv_into(self.readview[pos:])
+ if read <= 0:
+ self.alive = False
+ return
+ pos += read
+ self.read_header(self.readview)
+ while pos < self.sz and len(self.readbuf) - pos > 0:
+ read = self.sock.recv_into(self.readview[pos:])
+ if read <= 0:
+ self.alive = False
+ return
+ pos += read
+ while pos < self.sz:
+ vszchunk = self.sock.recv(4096)
+ if len(vszchunk) == 0:
+ self.alive = False
+ return
+ self.readview.release()
+ self.readbuf.extend(vszchunk)
+ self.readview = memoryview(self.readbuf)
+ pos += len(vszchunk)
+ self.unpacker.feed(self.readview[REAL_HEADER_SZ:self.sz])
+ self.unpacked_msg = list(self.unpacker)
+ self.msg_type = MessageType(self.unpacked_msg[0])
+ self.type_handler[self.msg_type](self)
+ except BaseException as e:
+ self.handle_error(''.join(traceback.format_exc()))
+
+ def send_msg(self):
+ self.sock.sendall(self.resp)
+ self.resp = None
+ return
+
+ def recv_loop(self):
+ while self.alive:
+ self.recv_msg()
+ self.disconnect_sock()
+
+
+wrap = Wrapper()
+wrap.connect_sock()
+signal.signal(signal.SIGTERM, wrap.disconnect_sock)
+wrap.recv_loop()