[ASTERIXDB-2838][RT][FUN] Batched PyUDF calls
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Implement the runtime for batched calls of Python
UDFs
- Fix and improve the stdin reading of the Python UDF
wrapper
- Check if the Python UDF process is still alive while
waiting on results
- Pull nullCall through function info to properly deal
with it
- Properly handle calls to multiple functions in one
library.
- Properly handle null/missing
- Fix calling top level functions in Python
- Fix recieve buffer growth
- Fix resource leaks
Change-Id: I5af4da999985afcc33cdfacea79576f1d6109173
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/10187
Reviewed-by: Ian Maxon <imaxon@uci.edu>
Reviewed-by: Dmitry Lychagin <dmitry.lychagin@couchbase.com>
Contrib: Ian Maxon <imaxon@uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-algebra/pom.xml b/asterixdb/asterix-algebra/pom.xml
index cf5802f..acc70e2 100644
--- a/asterixdb/asterix-algebra/pom.xml
+++ b/asterixdb/asterix-algebra/pom.xml
@@ -127,6 +127,11 @@
</dependency>
<dependency>
<groupId>org.apache.asterix</groupId>
+ <artifactId>asterix-om</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.asterix</groupId>
<artifactId>asterix-external-data</artifactId>
<version>${project.version}</version>
<exclusions>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
index 076783f..d466446 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
@@ -75,7 +75,7 @@
// Disable ASSIGN_BATCH physical operator if this option is set to 'false'
public static final String REWRITE_ATTEMPT_BATCH_ASSIGN = "rewrite_attempt_batch_assign";
- static final boolean REWRITE_ATTEMPT_BATCH_ASSIGN_DEFAULT = false;
+ static final boolean REWRITE_ATTEMPT_BATCH_ASSIGN_DEFAULT = true;
@Override
protected ILogicalOperatorVisitor<IPhysicalOperator, Boolean> createPhysicalOperatorFactoryVisitor(
diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml
index 1f22924..9131202 100644
--- a/asterixdb/asterix-app/pom.xml
+++ b/asterixdb/asterix-app/pom.xml
@@ -423,7 +423,7 @@
<id>asterix-gerrit-asterix-app-sql-execution</id>
<properties>
<test.excludes>**/*.java</test.excludes>
- <itest.includes>**/SqlppExecution*IT.java,**/ExternalPythonFunction*IT.java</itest.includes>
+ <itest.includes>**/SqlppExecution*IT.java,**/ExternalPythonFunctionIT.java</itest.includes>
<failIfNoTests>false</failIfNoTests>
</properties>
</profile>
diff --git a/asterixdb/asterix-app/src/main/resources/entrypoint.py b/asterixdb/asterix-app/src/main/resources/entrypoint.py
index 0917f49..aba4f29 100755
--- a/asterixdb/asterix-app/src/main/resources/entrypoint.py
+++ b/asterixdb/asterix-app/src/main/resources/entrypoint.py
@@ -26,14 +26,16 @@
import signal
import msgpack
import socket
+import traceback
from importlib import import_module
from pathlib import Path
from enum import IntEnum
from io import BytesIO
PROTO_VERSION = 1
-HEADER_SZ = 8+8+1
-REAL_HEADER_SZ = 4+8+8+1
+HEADER_SZ = 8 + 8 + 1
+REAL_HEADER_SZ = 4 + 8 + 8 + 1
+FRAMESZ = 32768
class MessageType(IntEnum):
@@ -57,19 +59,29 @@
wrapped_module = None
wrapped_class = None
wrapped_fn = None
+ sz = None
+ mid = None
+ rmid = None
+ flag = None
+ resp = None
+ unpacked_msg = None
+ msg_type = None
packer = msgpack.Packer(autoreset=False)
unpacker = msgpack.Unpacker()
response_buf = BytesIO()
stdin_buf = BytesIO()
wrapped_fns = {}
alive = True
+ readbuf = bytearray(FRAMESZ)
+ readview = memoryview(readbuf)
+
def init(self, module_name, class_name, fn_name):
self.wrapped_module = import_module(module_name)
# do not allow modules to be called that are not part of the uploaded module
wrapped_fn = None
if not self.check_module_path(self.wrapped_module):
- wrapped_module = None
+ self.wrapped_module = None
raise ImportError("Module was not found in library")
if class_name is not None:
self.wrapped_class = getattr(
@@ -77,12 +89,13 @@
if self.wrapped_class is not None:
wrapped_fn = getattr(self.wrapped_class, fn_name)
else:
- wrapped_fn = locals()[fn_name]
+ wrapped_fn = getattr(import_module(module_name), fn_name)
if wrapped_fn is None:
- raise ImportError("Could not find class or function in specified module")
- self.wrapped_fns[self.rmid] = wrapped_fn
+ raise ImportError(
+ "Could not find class or function in specified module")
+ self.wrapped_fns[self.mid] = wrapped_fn
- def nextTuple(self, *args, key=None):
+ def next_tuple(self, *args, key=None):
return self.wrapped_fns[key](*args)
def check_module_path(self, module):
@@ -92,14 +105,14 @@
def read_header(self, readbuf):
self.sz, self.mid, self.rmid, self.flag = unpack(
- "!iqqb", readbuf[0:21])
+ "!iqqb", readbuf[0:REAL_HEADER_SZ])
return True
def write_header(self, response_buf, dlen):
total_len = dlen + HEADER_SZ
header = pack("!iqqb", total_len, int(-1), int(self.rmid), self.flag)
self.response_buf.write(header)
- return total_len+4
+ return total_len + 4
def get_ver_hlen(self, hlen):
return hlen + (PROTO_VERSION << 4)
@@ -118,14 +131,13 @@
self.packer.reset()
def helo(self):
- #need to ack the connection back before sending actual HELO
+ # need to ack the connection back before sending actual HELO
self.init_remote_ipc()
-
self.flag = MessageFlags.NORMAL
self.response_buf.seek(0)
self.packer.pack(int(MessageType.HELO))
self.packer.pack("HELO")
- dlen = 5 #tag(1) + body(4)
+ dlen = 5 # tag(1) + body(4)
resp_len = self.write_header(self.response_buf, dlen)
self.response_buf.write(self.packer.bytes())
self.resp = self.response_buf.getbuffer()[0:resp_len]
@@ -160,16 +172,19 @@
def handle_call(self):
self.flag = MessageFlags.NORMAL
- args = self.unpacked_msg[1]
- result = None
- if args is None:
- result = self.nextTuple(key=self.rmid)
- else:
- result = self.nextTuple(args, key=self.rmid)
+ result = ([], [])
+ if len(self.unpacked_msg) > 1:
+ args = self.unpacked_msg[1]
+ if args is not None:
+ for arg in args:
+ try:
+ result[0].append(self.next_tuple(*arg, key=self.mid))
+ except BaseException as e:
+ result[1].append(traceback.format_exc())
self.packer.reset()
self.response_buf.seek(0)
body = msgpack.packb(result)
- dlen = len(body)+1 # 1 for tag
+ dlen = len(body) + 1 # 1 for tag
resp_len = self.write_header(self.response_buf, dlen)
self.packer.pack(int(MessageType.CALL_RSP))
self.response_buf.write(self.packer.bytes())
@@ -179,13 +194,12 @@
self.packer.reset()
return True
- def handle_error(self,e):
+ def handle_error(self, e):
self.flag = MessageFlags.NORMAL
- result = type(e).__name__ + ": " + str(e)
self.packer.reset()
self.response_buf.seek(0)
- body = msgpack.packb(result)
- dlen = len(body)+1 # 1 for tag
+ body = msgpack.packb(e)
+ dlen = len(body) + 1 # 1 for tag
resp_len = self.write_header(self.response_buf, dlen)
self.packer.pack(int(MessageType.ERROR))
self.response_buf.write(self.packer.bytes())
@@ -193,6 +207,7 @@
self.resp = self.response_buf.getbuffer()[0:resp_len]
self.send_msg()
self.packer.reset()
+ self.alive = False
return True
type_handler = {
@@ -204,33 +219,47 @@
def connect_sock(self, addr, port):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- try:
- self.sock.connect((addr, int(port)))
- except socket.error as msg:
- print(sys.stderr, msg)
+ self.sock.connect((addr, int(port)))
def disconnect_sock(self, *args):
self.sock.shutdown(socket.SHUT_RDWR)
self.sock.close()
def recv_msg(self):
- completed = False
- while not completed and self.alive:
- readbuf = sys.stdin.buffer.read1(4096)
+ while self.alive:
+ pos = sys.stdin.buffer.readinto1(self.readbuf)
+ if pos <= 0:
+ self.alive = False
+ return
try:
- if(len(readbuf) < REAL_HEADER_SZ):
- while(len(readbuf) < REAL_HEADER_SZ):
- readbuf += sys.stdin.buffer.read1(4096)
- self.read_header(readbuf)
- if(self.sz > len(readbuf)):
- while(len(readbuf) < self.sz):
- readbuf += sys.stdin.buffer.read1(4096)
- self.unpacker.feed(readbuf[21:])
+ while pos < REAL_HEADER_SZ:
+ read = sys.stdin.buffer.readinto1(self.readview[pos:])
+ if read <= 0:
+ self.alive = False
+ return
+ pos += read
+ self.read_header(self.readview)
+ while pos < self.sz and len(self.readbuf) - pos > 0:
+ read = sys.stdin.buffer.readinto1(self.readview[pos:])
+ if read <= 0:
+ self.alive = False
+ return
+ pos += read
+ while pos < self.sz:
+ vszchunk = sys.stdin.buffer.read1()
+ if len(vszchunk) == 0:
+ self.alive = False
+ return
+ self.readview = None
+ self.readbuf.extend(vszchunk)
+ self.readview = memoryview(self.readbuf)
+ pos += len(vszchunk)
+ self.unpacker.feed(self.readview[REAL_HEADER_SZ:self.sz])
self.unpacked_msg = list(self.unpacker)
- self.type = MessageType(self.unpacked_msg[0])
- completed = self.type_handler[self.type](self)
+ self.msg_type = MessageType(self.unpacked_msg[0])
+ self.type_handler[self.msg_type](self)
except BaseException as e:
- completed = self.handle_error(e)
+ self.handle_error(traceback.format_exc())
def send_msg(self):
self.sock.sendall(self.resp)
diff --git a/asterixdb/asterix-app/src/test/resources/TweetSent/crashy.py b/asterixdb/asterix-app/src/test/resources/TweetSent/crashy.py
new file mode 100644
index 0000000..c3d9a74
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/TweetSent/crashy.py
@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pickle
+import sklearn
+import sys
+import os
+import ctypes
+class TweetSent(object):
+
+ def __init__(self):
+ pickle_path = os.path.join(os.path.dirname(__file__), 'sentiment_pipeline3')
+ f = open(pickle_path,'rb')
+ self.pipeline = pickle.load(f)
+ f.close()
+
+ def sentiment(self, args):
+ if args is None:
+ return 2
+ return self.pipeline.predict([args])[0].item()
+
+ def crash(self):
+ os._exit(1)
diff --git a/asterixdb/asterix-app/src/test/resources/TweetSent/roundtrip.py b/asterixdb/asterix-app/src/test/resources/TweetSent/roundtrip.py
index 37350be..8b8fced 100644
--- a/asterixdb/asterix-app/src/test/resources/TweetSent/roundtrip.py
+++ b/asterixdb/asterix-app/src/test/resources/TweetSent/roundtrip.py
@@ -14,6 +14,10 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+import math
+
+def sqrt(num):
+ return math.sqrt(num)
class Tests(object):
diff --git a/asterixdb/asterix-app/src/test/resources/TweetSent/sentiment.py b/asterixdb/asterix-app/src/test/resources/TweetSent/sentiment.py
index 29d371f..66545ae 100644
--- a/asterixdb/asterix-app/src/test/resources/TweetSent/sentiment.py
+++ b/asterixdb/asterix-app/src/test/resources/TweetSent/sentiment.py
@@ -28,5 +28,7 @@
self.pipeline = pickle.load(f)
f.close()
- def sentiment(self, *args):
- return self.pipeline.predict(args[0])[0].item()
+ def sentiment(self, args):
+ if args is None:
+ return 2
+ return self.pipeline.predict([args])[0].item()
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/crash/crash.0.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/crash/crash.0.ddl.sqlpp
new file mode 100644
index 0000000..76cc70d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/crash/crash.0.ddl.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+DROP DATAVERSE externallibtest if exists;
+CREATE DATAVERSE externallibtest;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/crash/crash.1.lib.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/crash/crash.1.lib.sqlpp
new file mode 100644
index 0000000..699e565
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/crash/crash.1.lib.sqlpp
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+install externallibtest testlib python admin admin target/TweetSent.pyz
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/crash/crash.2.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/crash/crash.2.ddl.sqlpp
new file mode 100644
index 0000000..8a2746c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/crash/crash.2.ddl.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ USE externallibtest;
+
+create function crash()
+ as "crashy", "TweetSent.crash" at testlib;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/crash/crash.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/crash/crash.3.query.sqlpp
new file mode 100644
index 0000000..f1858f3
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/crash/crash.3.query.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+// param max-warnings:json=2
+
+use externallibtest;
+
+crash();
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment/mysentiment.6.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment/mysentiment.6.query.sqlpp
new file mode 100644
index 0000000..1ec6766
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment/mysentiment.6.query.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use externallibtest;
+
+select sentiment("great") as peachy, sentiment("okay") as phlegmatic,
+ sentiment("meh") as indifferent, sentiment("ugh") as choleric;
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment/mysentiment.6.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment/mysentiment.7.ddl.sqlpp
similarity index 100%
rename from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment/mysentiment.6.ddl.sqlpp
rename to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment/mysentiment.7.ddl.sqlpp
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.0.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.0.ddl.sqlpp
new file mode 100644
index 0000000..18ad48b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.0.ddl.sqlpp
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+DROP DATAVERSE externallibtest if exists;
+CREATE DATAVERSE externallibtest;
+USE externallibtest;
+create type typeTweet if not exists as open{
+ create_at : datetime,
+ id: bigint
+};
+
+create dataset Tweet(typeTweet) primary key id;
+
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.1.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.1.update.sqlpp
new file mode 100644
index 0000000..5a8dae2
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.1.update.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+USE externallibtest;
+
+load dataset Tweet using localfs(("path"="asterix_nc1://data/twitter/real.adm"),("format"="adm"));
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.10.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.10.query.sqlpp
new file mode 100644
index 0000000..f7a29fd
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.10.query.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use externallibtest;
+
+select value count(sentiment(t.text))
+from Tweet t;
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.11.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.11.query.sqlpp
new file mode 100644
index 0000000..f1ef8e9
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.11.query.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use externallibtest;
+
+set `rewrite_attempt_batch_assign` "false";
+
+select value count(sentiment(t.text))
+from Tweet t;
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.12.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.12.query.sqlpp
new file mode 100644
index 0000000..4d4d179
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.12.query.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use externallibtest;
+
+select count(sentiment(t.text)), count(t.text)
+from Tweet t;
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.13.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.13.ddl.sqlpp
new file mode 100644
index 0000000..3438521
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.13.ddl.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ USE externallibtest;
+
+create function sentiment_nullcall(s)
+ as "sentiment", "TweetSent.sentiment" at testlib with {"null-call": "true"};
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.14.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.14.query.sqlpp
new file mode 100644
index 0000000..0a219ca
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.14.query.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use externallibtest;
+
+select value count(sentiment_nullcall(t.text))
+from Tweet t;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.15.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.15.query.sqlpp
new file mode 100644
index 0000000..a259660
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.15.query.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use externallibtest;
+
+set `rewrite_attempt_batch_assign` "false";
+
+select value count(sentiment_nullcall(t.text))
+from Tweet t;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.16.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.16.ddl.sqlpp
new file mode 100644
index 0000000..b3adc32
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.16.ddl.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ USE externallibtest;
+
+create function sentiment_nullcall_bool(s)
+ as "sentiment", "TweetSent.sentiment" at testlib with {"null-call": true};
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.17.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.17.query.sqlpp
new file mode 100644
index 0000000..a336979
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.17.query.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use externallibtest;
+
+select value count(sentiment_nullcall_bool(t.text))
+from Tweet t;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.18.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.18.query.sqlpp
new file mode 100644
index 0000000..14650a3
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.18.query.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use externallibtest;
+
+set `rewrite_attempt_batch_assign` "false";
+
+select value count(sentiment_nullcall_bool(t.text))
+from Tweet t;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.19.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.19.ddl.sqlpp
new file mode 100644
index 0000000..d05af3a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.19.ddl.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use externallibtest;
+
+create function roundtrip(s)
+ as "roundtrip", "Tests.roundtrip" at testlib;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.2.lib.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.2.lib.sqlpp
new file mode 100644
index 0000000..699e565
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.2.lib.sqlpp
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+install externallibtest testlib python admin admin target/TweetSent.pyz
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.20.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.20.query.sqlpp
new file mode 100644
index 0000000..7d6550c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.20.query.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+USE externallibtest;
+
+select t.id as id, length(roundtrip(t.text)[0]) as len, sentiment(t.text) as sent
+from Tweet t
+order by id DESC
+limit 100;
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment/mysentiment.6.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.21.ddl.sqlpp
similarity index 100%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment/mysentiment.6.ddl.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.21.ddl.sqlpp
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.3.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.3.ddl.sqlpp
new file mode 100644
index 0000000..00838de
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.3.ddl.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ USE externallibtest;
+
+create function sentiment(s)
+ as "sentiment", "TweetSent.sentiment" at testlib;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.4.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.4.query.sqlpp
new file mode 100644
index 0000000..f7a29fd
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.4.query.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use externallibtest;
+
+select value count(sentiment(t.text))
+from Tweet t;
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.5.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.5.query.sqlpp
new file mode 100644
index 0000000..f1ef8e9
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.5.query.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use externallibtest;
+
+set `rewrite_attempt_batch_assign` "false";
+
+select value count(sentiment(t.text))
+from Tweet t;
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.6.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.6.query.sqlpp
new file mode 100644
index 0000000..4a85ab7
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.6.query.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use externallibtest;
+
+select sentiment(t.text) as sent, length(t.text) as text
+from Tweet t
+order by t.id
+limit 100;
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.7.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.7.query.sqlpp
new file mode 100644
index 0000000..69174bf
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.7.query.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use externallibtest;
+
+set `rewrite_attempt_batch_assign` "false";
+
+select sentiment(t.text) as sent, length(t.text) as text
+from Tweet t
+order by t.id
+limit 100;
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.8.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.8.update.sqlpp
new file mode 100644
index 0000000..891efc0
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.8.update.sqlpp
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use externallibtest;
+
+insert into Tweet (
+ select t.create_at, t.id+1000000 as id, t.in_reply_to_status, t.in_reply_to_user, t.favorite_count, t.coordinate, t.retweet_count, t.lang,
+ t.is_retweet, t.user, t.place
+ from Tweet t
+ limit 50
+);
+
+insert into Tweet (
+ select t.create_at, t.id+2000000 as id, t.in_reply_to_status, t.in_reply_to_user, t.favorite_count, t.coordinate, t.retweet_count, t.lang,
+ t.is_retweet, t.user, t.place, null as text
+ from Tweet t
+ limit 50
+);
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.9.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.9.query.sqlpp
new file mode 100644
index 0000000..e77b11a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment_twitter/mysentiment_twitter.9.query.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use externallibtest;
+
+select value count(t.text)
+from Tweet t;
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp
index 6bbeaa1..0ad9fb3 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp
@@ -21,3 +21,6 @@
create function warning()
as "roundtrip", "Tests.warning" at testlib;
+
+create function roundtrip(s)
+ as "roundtrip", "Tests.roundtrip" at testlib;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.4.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.4.query.sqlpp
new file mode 100644
index 0000000..1ef35a7
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.4.query.sqlpp
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description : Access a records nested records at each level.
+* Expected Res : Success
+* Date : 04 Jun 2015
+*/
+// param max-warnings:json=0
+
+use test;
+
+set `rewrite_attempt_batch_assign` "false";
+
+select warning(), roundtrip(d)
+from [ "a", "b" , "c", "d" ] d;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.10.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.10.query.sqlpp
index c48dda5..30d9426 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.10.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.10.query.sqlpp
@@ -28,6 +28,6 @@
select value [(
select element result
from Animals as test
-with result as roundtrip(test)[0][0].class.fullClassification.lower
+with result as roundtrip(test)[0].class.fullClassification.lower
order by result.id)][0]
;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.11.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.11.query.sqlpp
index 30ec1da..9c3e0cb 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.11.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.11.query.sqlpp
@@ -28,6 +28,6 @@
select value [(
select element result
from Animals as test
-with result as roundtrip(test)[0][0].class.fullClassification
+with result as roundtrip(test)[0].class.fullClassification
order by result.id)][0]
;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.12.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.12.query.sqlpp
index 1f7925f..2aed607 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.12.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.12.query.sqlpp
@@ -28,6 +28,6 @@
select value [(
select element result
from Animals as test
-with result as roundtrip(test)[0][0].class
+with result as roundtrip(test)[0].class
order by result.id)][0]
;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.13.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.13.query.sqlpp
index 13b4c28..52705db 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.13.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.13.query.sqlpp
@@ -28,6 +28,6 @@
select value [(
select element result
from Animals as test
-with result as roundtrip(test)[0][0]
+with result as roundtrip(test)[0]
order by result.id)][0]
;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.4.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.4.query.sqlpp
index 1e9a088..e17f03f 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.4.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.4.query.sqlpp
@@ -27,6 +27,5 @@
select element result
from Animals as test
-with result as roundtrip(test)[0][0].class.fullClassification.lower.lower.lower.lower.lower.lower.Species
-order by result
-;
+with result as roundtrip(test)[0].class.fullClassification.lower.lower.lower.lower.lower.lower.Species
+order by result;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.5.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.5.query.sqlpp
index eedf56b..621b4d6 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.5.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.5.query.sqlpp
@@ -23,9 +23,9 @@
*/
use test;
-select value [(
+select value (
select element result
from Animals as test
-with result as roundtrip(test)[0][0].class.fullClassification.lower.lower.lower.lower.lower.lower
-order by result.id )][0]
+with result as roundtrip(test)[0].class.fullClassification.lower.lower.lower.lower.lower.lower
+order by result.id )
;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.6.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.6.query.sqlpp
index 4912ad6..8e2ec95 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.6.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.6.query.sqlpp
@@ -28,6 +28,6 @@
select value [(
select element result
from Animals as test
-with result as roundtrip(test)[0][0].class.fullClassification.lower.lower.lower.lower.lower
+with result as roundtrip(test)[0].class.fullClassification.lower.lower.lower.lower.lower
order by result.id)][0]
;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.7.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.7.query.sqlpp
index 546c174..c39d933 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.7.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.7.query.sqlpp
@@ -28,6 +28,6 @@
select value [(
select element result
from Animals as test
-with result as roundtrip(test)[0][0].class.fullClassification.lower.lower.lower.lower
+with result as roundtrip(test)[0].class.fullClassification.lower.lower.lower.lower
order by result.id)][0]
;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.8.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.8.query.sqlpp
index 62af850..9a20dc2 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.8.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.8.query.sqlpp
@@ -28,6 +28,6 @@
select value [(
select element result
from Animals as test
-with result as roundtrip(test)[0][0].class.fullClassification.lower.lower.lower
+with result as roundtrip(test)[0].class.fullClassification.lower.lower.lower
order by result.id)][0]
;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.9.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.9.query.sqlpp
index 6c594f0..0314f22 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.9.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.9.query.sqlpp
@@ -28,6 +28,6 @@
select value [(
select element result
from Animals as test
-with result as roundtrip(test)[0][0].class.fullClassification.lower.lower
+with result as roundtrip(test)[0].class.fullClassification.lower.lower
order by result.id)][0]
;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/python_open_type_validation/type_validation.0.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/python_open_type_validation/type_validation.0.ddl.sqlpp
new file mode 100644
index 0000000..76cc70d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/python_open_type_validation/type_validation.0.ddl.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+DROP DATAVERSE externallibtest if exists;
+CREATE DATAVERSE externallibtest;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/python_open_type_validation/type_validation.1.lib.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/python_open_type_validation/type_validation.1.lib.sqlpp
new file mode 100644
index 0000000..3250a90
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/python_open_type_validation/type_validation.1.lib.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+install externallibtest testlib python admin admin target/TweetSent.pyz
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/python_open_type_validation/type_validation.2.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/python_open_type_validation/type_validation.2.ddl.sqlpp
new file mode 100644
index 0000000..a8ba8a1
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/python_open_type_validation/type_validation.2.ddl.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use externallibtest;
+
+create function typeValidation(a, b, c, d, e, f, g)
+ as "roundtrip", "Tests.roundtrip" at testlib;
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/python_open_type_validation/type_validation.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/python_open_type_validation/type_validation.3.query.sqlpp
new file mode 100644
index 0000000..0ae7d0c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/python_open_type_validation/type_validation.3.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use externallibtest;
+typeValidation(907, 9.07, "907", 9.07, true, unix_time_from_date_in_days(date("2013-01-01")),
+ unix_time_from_datetime_in_secs(datetime("1989-09-07T12:13:14.039Z")));
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/python_open_type_validation/type_validation.4.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/python_open_type_validation/type_validation.4.ddl.sqlpp
new file mode 100644
index 0000000..2b27030
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/python_open_type_validation/type_validation.4.ddl.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE externallibtest;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/toplevel_fn/mysentiment.0.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/toplevel_fn/mysentiment.0.ddl.sqlpp
new file mode 100644
index 0000000..76cc70d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/toplevel_fn/mysentiment.0.ddl.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+DROP DATAVERSE externallibtest if exists;
+CREATE DATAVERSE externallibtest;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/toplevel_fn/mysentiment.1.lib.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/toplevel_fn/mysentiment.1.lib.sqlpp
new file mode 100644
index 0000000..699e565
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/toplevel_fn/mysentiment.1.lib.sqlpp
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+install externallibtest testlib python admin admin target/TweetSent.pyz
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/toplevel_fn/mysentiment.2.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/toplevel_fn/mysentiment.2.ddl.sqlpp
new file mode 100644
index 0000000..d6a4ea7
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/toplevel_fn/mysentiment.2.ddl.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ USE externallibtest;
+
+create function sqrt(s)
+ as "roundtrip", "sqrt" at testlib;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/toplevel_fn/mysentiment.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/toplevel_fn/mysentiment.3.query.sqlpp
new file mode 100644
index 0000000..755d980
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/toplevel_fn/mysentiment.3.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use externallibtest;
+
+sqrt(4);
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment/mysentiment.6.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/toplevel_fn/mysentiment.4.ddl.sqlpp
similarity index 100%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/mysentiment/mysentiment.6.ddl.sqlpp
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/toplevel_fn/mysentiment.4.ddl.sqlpp
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/crash/crash.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/crash/crash.1.adm
new file mode 100644
index 0000000..ec747fa
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/crash/crash.1.adm
@@ -0,0 +1 @@
+null
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/library_list_api_multipart/library_list_ap1.1.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/library_list_api_multipart/library_list_api.1.regexjson
similarity index 100%
rename from asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/library_list_api_multipart/library_list_ap1.1.regexjson
rename to asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/library_list_api_multipart/library_list_api.1.regexjson
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/library_list_api_multipart/library_list_ap1.2.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/library_list_api_multipart/library_list_api.2.regexjson
similarity index 100%
rename from asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/library_list_api_multipart/library_list_ap1.2.regexjson
rename to asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/library_list_api_multipart/library_list_api.2.regexjson
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/library_list_api_multipart/library_list_ap1.3.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/library_list_api_multipart/library_list_api.3.regexjson
similarity index 100%
rename from asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/library_list_api_multipart/library_list_ap1.3.regexjson
rename to asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/library_list_api_multipart/library_list_api.3.regexjson
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/library_list_api_multipart/library_list_ap1.4.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/library_list_api_multipart/library_list_api.4.regexjson
similarity index 100%
rename from asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/library_list_api_multipart/library_list_ap1.4.regexjson
rename to asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/library_list_api_multipart/library_list_api.4.regexjson
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/library_list_api_multipart/library_list_ap1.5.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/library_list_api_multipart/library_list_api.5.regexjson
similarity index 99%
rename from asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/library_list_api_multipart/library_list_ap1.5.regexjson
rename to asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/library_list_api_multipart/library_list_api.5.regexjson
index e5d039f..c896e0d 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/library_list_api_multipart/library_list_ap1.5.regexjson
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/library_list_api_multipart/library_list_api.5.regexjson
@@ -17,4 +17,4 @@
"dataverse": ["externallibtest", "foo", "bar"],
"hash_md5": "R{[a-zA-Z0-9-]+}",
"name": "testlib"
-}]
+}]
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment/mysentiment.4.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment/mysentiment.4.adm
new file mode 100644
index 0000000..d7f41ee
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment/mysentiment.4.adm
@@ -0,0 +1 @@
+{ "peachy": 1, "phlegmatic": 0, "indifferent": 0, "choleric": 0 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.1.adm
new file mode 100644
index 0000000..e9c02da
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.1.adm
@@ -0,0 +1 @@
+5000
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.10.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.10.adm
new file mode 100644
index 0000000..878726a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.10.adm
@@ -0,0 +1 @@
+5100
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.11.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.11.adm
new file mode 100644
index 0000000..0ead4c6
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.11.adm
@@ -0,0 +1 @@
+5100
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.12.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.12.adm
new file mode 100644
index 0000000..878726a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.12.adm
@@ -0,0 +1 @@
+5100
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.13.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.13.adm
new file mode 100644
index 0000000..65a7c81
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.13.adm
@@ -0,0 +1,100 @@
+{ "id": 670301227662491648, "len": 20, "sent": 1 }
+{ "id": 670301227553566720, "len": 139, "sent": 0 }
+{ "id": 670301227041857536, "len": 112, "sent": 0 }
+{ "id": 670301227037519876, "len": 33, "sent": 0 }
+{ "id": 670301226987159552, "len": 57, "sent": 0 }
+{ "id": 670301226513391616, "len": 28, "sent": 1 }
+{ "id": 670301226337202180, "len": 77, "sent": 1 }
+{ "id": 670301226190278656, "len": 25, "sent": 0 }
+{ "id": 670301225959579648, "len": 112, "sent": 1 }
+{ "id": 670301225838125056, "len": 107, "sent": 0 }
+{ "id": 670301225598906369, "len": 64, "sent": 0 }
+{ "id": 670301225489817600, "len": 49, "sent": 0 }
+{ "id": 670301225456308224, "len": 103, "sent": 0 }
+{ "id": 670301225326391296, "len": 66, "sent": 1 }
+{ "id": 670301225162661889, "len": 28, "sent": 1 }
+{ "id": 670301224885837824, "len": 63, "sent": 0 }
+{ "id": 670301224814698496, "len": 59, "sent": 0 }
+{ "id": 670301224709849090, "len": 33, "sent": 1 }
+{ "id": 670301224684556288, "len": 21, "sent": 0 }
+{ "id": 670301224680480768, "len": 39, "sent": 0 }
+{ "id": 670301224348946433, "len": 64, "sent": 1 }
+{ "id": 670301224261058560, "len": 61, "sent": 1 }
+{ "id": 670301224231690240, "len": 33, "sent": 0 }
+{ "id": 670301224214794240, "len": 33, "sent": 0 }
+{ "id": 670301223753351168, "len": 105, "sent": 1 }
+{ "id": 670301223426367488, "len": 23, "sent": 0 }
+{ "id": 670301223216545792, "len": 31, "sent": 0 }
+{ "id": 670301223182974976, "len": 34, "sent": 1 }
+{ "id": 670301223128535041, "len": 21, "sent": 0 }
+{ "id": 670301222759301121, "len": 132, "sent": 0 }
+{ "id": 670301222734307329, "len": 110, "sent": 1 }
+{ "id": 670301222717419520, "len": 81, "sent": 0 }
+{ "id": 670301222318936064, "len": 110, "sent": 1 }
+{ "id": 670301222302150657, "len": 131, "sent": 0 }
+{ "id": 670301222222602240, "len": 43, "sent": 1 }
+{ "id": 670301222113517568, "len": 27, "sent": 0 }
+{ "id": 670301221836615680, "len": 44, "sent": 1 }
+{ "id": 670301221719310336, "len": 28, "sent": 0 }
+{ "id": 670301221442486272, "len": 34, "sent": 0 }
+{ "id": 670301221266153472, "len": 86, "sent": 0 }
+{ "id": 670301220960096256, "len": 102, "sent": 0 }
+{ "id": 670301220855136256, "len": 129, "sent": 1 }
+{ "id": 670301220637044736, "len": 43, "sent": 0 }
+{ "id": 670301220305821696, "len": 140, "sent": 0 }
+{ "id": 670301220247072770, "len": 83, "sent": 1 }
+{ "id": 670301220196626432, "len": 36, "sent": 0 }
+{ "id": 670301220079312901, "len": 31, "sent": 1 }
+{ "id": 670301219949305857, "len": 70, "sent": 1 }
+{ "id": 670301219739574273, "len": 131, "sent": 1 }
+{ "id": 670301219206877184, "len": 27, "sent": 0 }
+{ "id": 670301219139620864, "len": 124, "sent": 0 }
+{ "id": 670301218737123328, "len": 124, "sent": 0 }
+{ "id": 670301218640531458, "len": 31, "sent": 1 }
+{ "id": 670301218598756352, "len": 47, "sent": 0 }
+{ "id": 670301218565156865, "len": 44, "sent": 0 }
+{ "id": 670301218414206976, "len": 71, "sent": 1 }
+{ "id": 670301218376413185, "len": 14, "sent": 0 }
+{ "id": 670301218078629888, "len": 9, "sent": 0 }
+{ "id": 670301217851990017, "len": 111, "sent": 0 }
+{ "id": 670301217793269760, "len": 113, "sent": 0 }
+{ "id": 670301217508036608, "len": 47, "sent": 0 }
+{ "id": 670301217369657344, "len": 137, "sent": 0 }
+{ "id": 670301217311088641, "len": 28, "sent": 0 }
+{ "id": 670301217231347712, "len": 123, "sent": 0 }
+{ "id": 670301216891473920, "len": 44, "sent": 0 }
+{ "id": 670301216874721280, "len": 68, "sent": 0 }
+{ "id": 670301216799232000, "len": 50, "sent": 1 }
+{ "id": 670301216669171713, "len": 54, "sent": 0 }
+{ "id": 670301216493060097, "len": 113, "sent": 1 }
+{ "id": 670301216400924676, "len": 35, "sent": 1 }
+{ "id": 670301216371552258, "len": 58, "sent": 0 }
+{ "id": 670301216367185920, "len": 48, "sent": 0 }
+{ "id": 670301216228831232, "len": 130, "sent": 1 }
+{ "id": 670301215901802496, "len": 71, "sent": 1 }
+{ "id": 670301215725649921, "len": 20, "sent": 0 }
+{ "id": 670301215306199040, "len": 35, "sent": 0 }
+{ "id": 670301215138250754, "len": 48, "sent": 0 }
+{ "id": 670301214958055424, "len": 58, "sent": 1 }
+{ "id": 670301214605733888, "len": 139, "sent": 1 }
+{ "id": 670301214509129728, "len": 114, "sent": 1 }
+{ "id": 670301214442041344, "len": 18, "sent": 1 }
+{ "id": 670301214295392256, "len": 47, "sent": 0 }
+{ "id": 670301213737529344, "len": 9, "sent": 0 }
+{ "id": 670301213544595457, "len": 63, "sent": 1 }
+{ "id": 670301213515235333, "len": 107, "sent": 0 }
+{ "id": 670301213464899584, "len": 105, "sent": 1 }
+{ "id": 670301213120942080, "len": 39, "sent": 0 }
+{ "id": 670301212961603585, "len": 63, "sent": 0 }
+{ "id": 670301212961603584, "len": 20, "sent": 0 }
+{ "id": 670301212856737792, "len": 51, "sent": 0 }
+{ "id": 670301212760117248, "len": 133, "sent": 1 }
+{ "id": 670301211808010240, "len": 103, "sent": 0 }
+{ "id": 670301211774468096, "len": 40, "sent": 0 }
+{ "id": 670301211703144450, "len": 138, "sent": 1 }
+{ "id": 670301211581685761, "len": 25, "sent": 1 }
+{ "id": 670301211560685568, "len": 12, "sent": 1 }
+{ "id": 670301211090751490, "len": 140, "sent": 0 }
+{ "id": 670301210654699520, "len": 13, "sent": 0 }
+{ "id": 670301210486919168, "len": 38, "sent": 0 }
+{ "id": 670301210470195200, "len": 67, "sent": 1 }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.2.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.2.adm
new file mode 100644
index 0000000..0b3e0a6
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.2.adm
@@ -0,0 +1 @@
+5000
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.3.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.3.adm
new file mode 100644
index 0000000..1995b70
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.3.adm
@@ -0,0 +1,100 @@
+{ "text": 65, "sent": 0 }
+{ "text": 47, "sent": 0 }
+{ "text": 68, "sent": 0 }
+{ "text": 60, "sent": 0 }
+{ "text": 26, "sent": 1 }
+{ "text": 90, "sent": 1 }
+{ "text": 89, "sent": 0 }
+{ "text": 36, "sent": 0 }
+{ "text": 16, "sent": 0 }
+{ "text": 67, "sent": 0 }
+{ "text": 26, "sent": 0 }
+{ "text": 103, "sent": 1 }
+{ "text": 38, "sent": 1 }
+{ "text": 23, "sent": 1 }
+{ "text": 134, "sent": 1 }
+{ "text": 18, "sent": 0 }
+{ "text": 13, "sent": 1 }
+{ "text": 140, "sent": 0 }
+{ "text": 70, "sent": 1 }
+{ "text": 122, "sent": 0 }
+{ "text": 64, "sent": 0 }
+{ "text": 135, "sent": 0 }
+{ "text": 42, "sent": 1 }
+{ "text": 59, "sent": 0 }
+{ "text": 23, "sent": 1 }
+{ "text": 15, "sent": 1 }
+{ "text": 10, "sent": 0 }
+{ "text": 39, "sent": 1 }
+{ "text": 56, "sent": 0 }
+{ "text": 35, "sent": 0 }
+{ "text": 98, "sent": 1 }
+{ "text": 9, "sent": 0 }
+{ "text": 21, "sent": 0 }
+{ "text": 52, "sent": 0 }
+{ "text": 44, "sent": 0 }
+{ "text": 135, "sent": 0 }
+{ "text": 50, "sent": 0 }
+{ "text": 32, "sent": 0 }
+{ "text": 45, "sent": 0 }
+{ "text": 47, "sent": 0 }
+{ "text": 105, "sent": 0 }
+{ "text": 77, "sent": 0 }
+{ "text": 33, "sent": 0 }
+{ "text": 64, "sent": 0 }
+{ "text": 12, "sent": 0 }
+{ "text": 27, "sent": 1 }
+{ "text": 30, "sent": 0 }
+{ "text": 140, "sent": 1 }
+{ "text": 107, "sent": 1 }
+{ "text": 47, "sent": 0 }
+{ "text": 31, "sent": 0 }
+{ "text": 32, "sent": 1 }
+{ "text": 24, "sent": 0 }
+{ "text": 132, "sent": 0 }
+{ "text": 88, "sent": 0 }
+{ "text": 16, "sent": 0 }
+{ "text": 69, "sent": 0 }
+{ "text": 80, "sent": 0 }
+{ "text": 28, "sent": 1 }
+{ "text": 23, "sent": 0 }
+{ "text": 42, "sent": 0 }
+{ "text": 101, "sent": 1 }
+{ "text": 30, "sent": 0 }
+{ "text": 138, "sent": 0 }
+{ "text": 66, "sent": 0 }
+{ "text": 61, "sent": 0 }
+{ "text": 51, "sent": 1 }
+{ "text": 107, "sent": 0 }
+{ "text": 136, "sent": 0 }
+{ "text": 17, "sent": 0 }
+{ "text": 36, "sent": 1 }
+{ "text": 23, "sent": 0 }
+{ "text": 20, "sent": 1 }
+{ "text": 103, "sent": 0 }
+{ "text": 8, "sent": 0 }
+{ "text": 139, "sent": 0 }
+{ "text": 114, "sent": 0 }
+{ "text": 57, "sent": 1 }
+{ "text": 30, "sent": 0 }
+{ "text": 72, "sent": 0 }
+{ "text": 32, "sent": 0 }
+{ "text": 140, "sent": 0 }
+{ "text": 90, "sent": 1 }
+{ "text": 25, "sent": 0 }
+{ "text": 56, "sent": 0 }
+{ "text": 43, "sent": 0 }
+{ "text": 58, "sent": 0 }
+{ "text": 23, "sent": 0 }
+{ "text": 15, "sent": 1 }
+{ "text": 53, "sent": 1 }
+{ "text": 58, "sent": 1 }
+{ "text": 14, "sent": 0 }
+{ "text": 21, "sent": 1 }
+{ "text": 37, "sent": 0 }
+{ "text": 118, "sent": 0 }
+{ "text": 59, "sent": 0 }
+{ "text": 43, "sent": 0 }
+{ "text": 55, "sent": 0 }
+{ "text": 35, "sent": 1 }
+{ "text": 127, "sent": 0 }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.4.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.4.adm
new file mode 100644
index 0000000..1995b70
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.4.adm
@@ -0,0 +1,100 @@
+{ "text": 65, "sent": 0 }
+{ "text": 47, "sent": 0 }
+{ "text": 68, "sent": 0 }
+{ "text": 60, "sent": 0 }
+{ "text": 26, "sent": 1 }
+{ "text": 90, "sent": 1 }
+{ "text": 89, "sent": 0 }
+{ "text": 36, "sent": 0 }
+{ "text": 16, "sent": 0 }
+{ "text": 67, "sent": 0 }
+{ "text": 26, "sent": 0 }
+{ "text": 103, "sent": 1 }
+{ "text": 38, "sent": 1 }
+{ "text": 23, "sent": 1 }
+{ "text": 134, "sent": 1 }
+{ "text": 18, "sent": 0 }
+{ "text": 13, "sent": 1 }
+{ "text": 140, "sent": 0 }
+{ "text": 70, "sent": 1 }
+{ "text": 122, "sent": 0 }
+{ "text": 64, "sent": 0 }
+{ "text": 135, "sent": 0 }
+{ "text": 42, "sent": 1 }
+{ "text": 59, "sent": 0 }
+{ "text": 23, "sent": 1 }
+{ "text": 15, "sent": 1 }
+{ "text": 10, "sent": 0 }
+{ "text": 39, "sent": 1 }
+{ "text": 56, "sent": 0 }
+{ "text": 35, "sent": 0 }
+{ "text": 98, "sent": 1 }
+{ "text": 9, "sent": 0 }
+{ "text": 21, "sent": 0 }
+{ "text": 52, "sent": 0 }
+{ "text": 44, "sent": 0 }
+{ "text": 135, "sent": 0 }
+{ "text": 50, "sent": 0 }
+{ "text": 32, "sent": 0 }
+{ "text": 45, "sent": 0 }
+{ "text": 47, "sent": 0 }
+{ "text": 105, "sent": 0 }
+{ "text": 77, "sent": 0 }
+{ "text": 33, "sent": 0 }
+{ "text": 64, "sent": 0 }
+{ "text": 12, "sent": 0 }
+{ "text": 27, "sent": 1 }
+{ "text": 30, "sent": 0 }
+{ "text": 140, "sent": 1 }
+{ "text": 107, "sent": 1 }
+{ "text": 47, "sent": 0 }
+{ "text": 31, "sent": 0 }
+{ "text": 32, "sent": 1 }
+{ "text": 24, "sent": 0 }
+{ "text": 132, "sent": 0 }
+{ "text": 88, "sent": 0 }
+{ "text": 16, "sent": 0 }
+{ "text": 69, "sent": 0 }
+{ "text": 80, "sent": 0 }
+{ "text": 28, "sent": 1 }
+{ "text": 23, "sent": 0 }
+{ "text": 42, "sent": 0 }
+{ "text": 101, "sent": 1 }
+{ "text": 30, "sent": 0 }
+{ "text": 138, "sent": 0 }
+{ "text": 66, "sent": 0 }
+{ "text": 61, "sent": 0 }
+{ "text": 51, "sent": 1 }
+{ "text": 107, "sent": 0 }
+{ "text": 136, "sent": 0 }
+{ "text": 17, "sent": 0 }
+{ "text": 36, "sent": 1 }
+{ "text": 23, "sent": 0 }
+{ "text": 20, "sent": 1 }
+{ "text": 103, "sent": 0 }
+{ "text": 8, "sent": 0 }
+{ "text": 139, "sent": 0 }
+{ "text": 114, "sent": 0 }
+{ "text": 57, "sent": 1 }
+{ "text": 30, "sent": 0 }
+{ "text": 72, "sent": 0 }
+{ "text": 32, "sent": 0 }
+{ "text": 140, "sent": 0 }
+{ "text": 90, "sent": 1 }
+{ "text": 25, "sent": 0 }
+{ "text": 56, "sent": 0 }
+{ "text": 43, "sent": 0 }
+{ "text": 58, "sent": 0 }
+{ "text": 23, "sent": 0 }
+{ "text": 15, "sent": 1 }
+{ "text": 53, "sent": 1 }
+{ "text": 58, "sent": 1 }
+{ "text": 14, "sent": 0 }
+{ "text": 21, "sent": 1 }
+{ "text": 37, "sent": 0 }
+{ "text": 118, "sent": 0 }
+{ "text": 59, "sent": 0 }
+{ "text": 43, "sent": 0 }
+{ "text": 55, "sent": 0 }
+{ "text": 35, "sent": 1 }
+{ "text": 127, "sent": 0 }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.5.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.5.adm
new file mode 100644
index 0000000..e9c02da
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.5.adm
@@ -0,0 +1 @@
+5000
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.6.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.6.adm
new file mode 100644
index 0000000..e9c02da
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.6.adm
@@ -0,0 +1 @@
+5000
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.7.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.7.adm
new file mode 100644
index 0000000..e9c02da
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.7.adm
@@ -0,0 +1 @@
+5000
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.8.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.8.adm
new file mode 100644
index 0000000..9fada79
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.8.adm
@@ -0,0 +1 @@
+{ "$1": 5000, "$2": 5000 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.9.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.9.adm
new file mode 100644
index 0000000..0ead4c6
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.9.adm
@@ -0,0 +1 @@
+5100
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_function_error/py_function_error.2.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_function_error/py_function_error.2.adm
new file mode 100644
index 0000000..f0405ed
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_function_error/py_function_error.2.adm
@@ -0,0 +1,4 @@
+{ "$1": null, "$2": [ "a" ] }
+{ "$1": null, "$2": [ "b" ] }
+{ "$1": null, "$2": [ "c" ] }
+{ "$1": null, "$2": [ "d" ] }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/python_open_type_validation/type_validation.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/python_open_type_validation/type_validation.1.adm
new file mode 100644
index 0000000..93f8aec
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/python_open_type_validation/type_validation.1.adm
@@ -0,0 +1 @@
+[ 907, 9.07, "907", 9.07, true, 15706, 621173594 ]
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/toplevel_fn/toplevel_fn.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/toplevel_fn/toplevel_fn.1.adm
new file mode 100644
index 0000000..cd5ac03
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/toplevel_fn/toplevel_fn.1.adm
@@ -0,0 +1 @@
+2.0
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_it_python.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_it_python.xml
index 59dec11..4e1d5b2 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_it_python.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_it_python.xml
@@ -44,10 +44,40 @@
<output-dir compare="Clean-JSON">py_nested_access</output-dir>
</compilation-unit>
</test-case>
+ <test-case FilePath="external-library">
+ <compilation-unit name="python_open_type_validation">
+ <output-dir compare="Clean-JSON">python_open_type_validation</output-dir>
+ </compilation-unit>
+ </test-case>
<test-case FilePath="external-library" check-warnings="true">
<compilation-unit name="py_function_error">
<output-dir compare="Clean-JSON">py_function_error</output-dir>
- <expected-warn>ASX0201: External UDF returned exception. Returned exception was: ArithmeticError: oof</expected-warn>
+ <expected-warn>ASX0201: External UDF returned exception. Returned exception was: Traceback (most recent call last):
+ File "entrypoint.py", line 181, in handle_call
+ result[0].append(self.next_tuple(*arg, key=self.mid))
+ File "entrypoint.py", line 99, in next_tuple
+ return self.wrapped_fns[key](*args)
+ File "site-packages/roundtrip.py", line 28, in warning
+ raise ArithmeticError("oof")
+ArithmeticError: oof
+ (in line 28, at column 1)</expected-warn>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-library">
+ <compilation-unit name="mysentiment_twitter">
+ <output-dir compare="Text">mysentiment_twitter</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-library">
+ <compilation-unit name="toplevel_fn">
+ <output-dir compare="Text">toplevel_fn</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-library" check-warnings="true">
+ <compilation-unit name="crash">
+ <output-dir compare="Text">crash</output-dir>
+ <expected-warn>ASX0201: External UDF returned exception. Returned exception was: Function externallibtest:crash#0 failed to execute (in line 23, at column 1)</expected-warn>
+ <expected-warn>ASX0201: External UDF returned exception. Returned exception was: java.io.IOException: Python process exited with code: 1 (in line 23, at column 1)</expected-warn>
</compilation-unit>
</test-case>
</test-group>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/external/ipc/ExternalFunctionResultRouter.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/external/ipc/ExternalFunctionResultRouter.java
index 8d56eb7..94baa64 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/external/ipc/ExternalFunctionResultRouter.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/external/ipc/ExternalFunctionResultRouter.java
@@ -22,7 +22,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.ipc.api.IIPCHandle;
@@ -33,9 +33,8 @@
public class ExternalFunctionResultRouter implements IIPCI {
- AtomicLong maxId = new AtomicLong(0);
- ConcurrentHashMap<Long, MutableObject<ByteBuffer>> activeClients = new ConcurrentHashMap<>();
- ConcurrentHashMap<Long, Exception> exceptionInbox = new ConcurrentHashMap<>();
+ private final AtomicLong maxId = new AtomicLong(0);
+ private final ConcurrentHashMap<Long, Pair<ByteBuffer, Exception>> activeClients = new ConcurrentHashMap<>();
private static int MAX_BUF_SIZE = 32 * 1024 * 1024; //32MB
@Override
@@ -44,7 +43,8 @@
ByteBuffer buf = (ByteBuffer) payload;
int end = buf.position();
buf.position(end - rewind);
- ByteBuffer copyTo = activeClients.get(rmid).getValue();
+ Pair<ByteBuffer, Exception> route = activeClients.get(rmid);
+ ByteBuffer copyTo = route.getFirst();
if (copyTo.capacity() < handle.getAttachmentLen()) {
int nextSize = closestPow2(handle.getAttachmentLen());
if (nextSize > MAX_BUF_SIZE) {
@@ -52,44 +52,43 @@
return;
}
copyTo = ByteBuffer.allocate(nextSize);
- activeClients.get(rmid).setValue(copyTo);
+ route.setFirst(copyTo);
}
copyTo.position(0);
System.arraycopy(buf.array(), buf.position() + buf.arrayOffset(), copyTo.array(), copyTo.arrayOffset(),
handle.getAttachmentLen());
- synchronized (copyTo) {
+ synchronized (route) {
copyTo.limit(handle.getAttachmentLen() + 1);
- copyTo.notify();
+ route.notifyAll();
}
buf.position(end);
}
@Override
public void onError(IIPCHandle handle, long mid, long rmid, Exception exception) {
- exceptionInbox.put(rmid, exception);
- ByteBuffer route = activeClients.get(rmid).getValue();
+ Pair<ByteBuffer, Exception> route = activeClients.get(rmid);
synchronized (route) {
- route.notify();
+ route.setSecond(exception);
+ route.notifyAll();
}
}
- public Long insertRoute(ByteBuffer buf) {
- Long id = maxId.incrementAndGet();
- activeClients.put(id, new MutableObject<>(buf));
- return id;
+ public Pair<Long, Pair<ByteBuffer, Exception>> insertRoute(ByteBuffer buf) {
+ Long id = maxId.getAndIncrement();
+ Pair<ByteBuffer, Exception> bufferHolder = new Pair<>(buf, null);
+ activeClients.put(id, bufferHolder);
+ return new Pair<>(id, bufferHolder);
}
- public Exception getException(Long id) {
- return exceptionInbox.remove(id);
- }
-
- public boolean hasException(long id) {
- return exceptionInbox.get(id) == null;
+ public Exception getAndRemoveException(Long id) {
+ Pair<ByteBuffer, Exception> route = activeClients.get(id);
+ Exception e = route.getSecond();
+ route.setSecond(null);
+ return e;
}
public void removeRoute(Long id) {
activeClients.remove(id);
- exceptionInbox.remove(id);
}
public static int closestPow2(int n) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonIPCProto.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonIPCProto.java
index feb52cf..cd7ec18 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonIPCProto.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonIPCProto.java
@@ -24,32 +24,41 @@
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.hyracks.ipc.impl.IPCSystem;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.ipc.impl.Message;
import org.msgpack.core.MessagePack;
+import org.msgpack.core.MessageUnpacker;
+import org.msgpack.core.buffer.ArrayBufferInput;
public class PythonIPCProto {
- public PythonMessageBuilder messageBuilder;
- OutputStream sockOut;
- ByteBuffer headerBuffer = ByteBuffer.allocate(21);
- ByteBuffer recvBuffer = ByteBuffer.allocate(4096);
- ExternalFunctionResultRouter router;
- IPCSystem ipcSys;
- Message outMsg;
- Long key;
+ private PythonMessageBuilder messageBuilder;
+ private OutputStream sockOut;
+ private ByteBuffer headerBuffer = ByteBuffer.allocate(21);
+ private ByteBuffer recvBuffer = ByteBuffer.allocate(32768);
+ private ExternalFunctionResultRouter router;
+ private long routeId;
+ private Pair<ByteBuffer, Exception> bufferBox;
+ private Process pythonProc;
+ private long maxFunctionId;
+ private ArrayBufferInput unpackerInput;
+ private MessageUnpacker unpacker;
- public PythonIPCProto(OutputStream sockOut, ExternalFunctionResultRouter router, IPCSystem ipcSys)
- throws IOException {
+ public PythonIPCProto(OutputStream sockOut, ExternalFunctionResultRouter router, Process pythonProc) {
this.sockOut = sockOut;
messageBuilder = new PythonMessageBuilder();
this.router = router;
- this.ipcSys = ipcSys;
- this.outMsg = new Message(null);
+ this.pythonProc = pythonProc;
+ this.maxFunctionId = 0l;
+ unpackerInput = new ArrayBufferInput(new byte[0]);
+ unpacker = MessagePack.newDefaultUnpacker(unpackerInput);
}
public void start() {
- this.key = router.insertRoute(recvBuffer);
+ Pair<Long, Pair<ByteBuffer, Exception>> keyAndBufferBox = router.insertRoute(recvBuffer);
+ this.routeId = keyAndBufferBox.getFirst();
+ this.bufferBox = keyAndBufferBox.getSecond();
}
public void helo() throws IOException, AsterixException {
@@ -59,78 +68,106 @@
messageBuilder.buf.clear();
messageBuilder.buf.position(0);
messageBuilder.hello();
- sendMsg();
+ sendMsg(routeId);
receiveMsg();
if (getResponseType() != MessageType.HELO) {
- throw new IllegalStateException("Illegal reply received, expected HELO");
+ throw HyracksDataException.create(org.apache.hyracks.api.exceptions.ErrorCode.ILLEGAL_STATE,
+ "Expected HELO, recieved " + getResponseType().name());
}
}
- public void init(String module, String clazz, String fn) throws IOException, AsterixException {
+ public long init(String module, String clazz, String fn) throws IOException, AsterixException {
+ long functionId = maxFunctionId++;
recvBuffer.clear();
recvBuffer.position(0);
recvBuffer.limit(0);
messageBuilder.buf.clear();
messageBuilder.buf.position(0);
messageBuilder.init(module, clazz, fn);
- sendMsg();
+ sendMsg(functionId);
receiveMsg();
if (getResponseType() != MessageType.INIT_RSP) {
- throw new IllegalStateException("Illegal reply received, expected INIT_RSP");
+ throw HyracksDataException.create(org.apache.hyracks.api.exceptions.ErrorCode.ILLEGAL_STATE,
+ "Expected INIT_RSP, recieved " + getResponseType().name());
}
+ return functionId;
}
- public ByteBuffer call(ByteBuffer args, int numArgs) throws Exception {
+ public ByteBuffer call(long functionId, ByteBuffer args, int numArgs) throws IOException, AsterixException {
recvBuffer.clear();
recvBuffer.position(0);
recvBuffer.limit(0);
messageBuilder.buf.clear();
messageBuilder.buf.position(0);
messageBuilder.call(args.array(), args.position(), numArgs);
- sendMsg();
+ sendMsg(functionId);
receiveMsg();
if (getResponseType() != MessageType.CALL_RSP) {
- throw new IllegalStateException("Illegal reply received, expected CALL_RSP, recvd: " + getResponseType());
+ throw HyracksDataException.create(org.apache.hyracks.api.exceptions.ErrorCode.ILLEGAL_STATE,
+ "Expected CALL_RSP, recieved " + getResponseType().name());
}
return recvBuffer;
}
- public void quit() throws IOException {
+ public ByteBuffer callMulti(long key, ByteBuffer args, int numTuples) throws IOException, AsterixException {
+ recvBuffer.clear();
+ recvBuffer.position(0);
+ recvBuffer.limit(0);
+ messageBuilder.buf.clear();
+ messageBuilder.buf.position(0);
+ messageBuilder.callMulti(args.array(), args.position(), numTuples);
+ sendMsg(key);
+ receiveMsg();
+ if (getResponseType() != MessageType.CALL_RSP) {
+ throw HyracksDataException.create(org.apache.hyracks.api.exceptions.ErrorCode.ILLEGAL_STATE,
+ "Expected CALL_RSP, recieved " + getResponseType().name());
+ }
+ return recvBuffer;
+ }
+
+ //For future use with interpreter reuse between jobs.
+ public void quit() throws HyracksDataException {
messageBuilder.quit();
- router.removeRoute(key);
+ router.removeRoute(routeId);
}
public void receiveMsg() throws IOException, AsterixException {
Exception except = null;
try {
- synchronized (recvBuffer) {
- while (recvBuffer.limit() == 0) {
- recvBuffer.wait(100);
+ synchronized (bufferBox) {
+ while ((bufferBox.getFirst().limit() == 0 || bufferBox.getSecond() != null) && pythonProc.isAlive()) {
+ bufferBox.wait(100);
}
}
- if (router.hasException(key)) {
- except = router.getException(key);
+ except = router.getAndRemoveException(routeId);
+ if (!pythonProc.isAlive()) {
+ except = new IOException("Python process exited with code: " + pythonProc.exitValue());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new AsterixException(ErrorCode.EXTERNAL_UDF_EXCEPTION, e);
}
if (except != null) {
- throw new AsterixException(ErrorCode.EXTERNAL_UDF_EXCEPTION, except);
+ throw new AsterixException(except);
+ }
+ if (bufferBox.getFirst() != recvBuffer) {
+ recvBuffer = bufferBox.getFirst();
}
messageBuilder.readHead(recvBuffer);
if (messageBuilder.type == MessageType.ERROR) {
- throw new AsterixException(ErrorCode.EXTERNAL_UDF_EXCEPTION,
- MessagePack.newDefaultUnpacker(recvBuffer).unpackString());
+ unpackerInput.reset(recvBuffer.array(), recvBuffer.position() + recvBuffer.arrayOffset(),
+ recvBuffer.remaining());
+ unpacker.reset(unpackerInput);
+ throw new AsterixException(unpacker.unpackString());
}
}
- public void sendMsg() throws IOException {
+ public void sendMsg(long key) throws IOException {
headerBuffer.clear();
headerBuffer.position(0);
- headerBuffer.putInt(HEADER_SIZE + messageBuilder.buf.position());
- headerBuffer.putLong(-1);
+ headerBuffer.putInt(HEADER_SIZE + Integer.BYTES + messageBuilder.buf.position());
headerBuffer.putLong(key);
+ headerBuffer.putLong(routeId);
headerBuffer.put(Message.NORMAL);
sockOut.write(headerBuffer.array(), 0, HEADER_SIZE + Integer.BYTES);
sockOut.write(messageBuilder.buf.array(), 0, messageBuilder.buf.position());
@@ -141,4 +178,8 @@
return messageBuilder.type;
}
+ public long getRouteId() {
+ return routeId;
+ }
+
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonMessageBuilder.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonMessageBuilder.java
index 506e80d..5052eb4 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonMessageBuilder.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonMessageBuilder.java
@@ -25,19 +25,16 @@
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
-import java.util.Arrays;
import org.apache.asterix.external.library.msgpack.MessagePackerFromADM;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
public class PythonMessageBuilder {
- private static final int MAX_BUF_SIZE = 21 * 1024 * 1024; //21MB.
- private static final Logger LOGGER = LogManager.getLogger();
+ private static final int MAX_BUF_SIZE = 64 * 1024 * 1024; //64MB.
MessageType type;
long dataLength;
ByteBuffer buf;
- String[] initAry = new String[3];
public PythonMessageBuilder() {
this.type = null;
@@ -49,12 +46,12 @@
this.type = type;
}
- public void packHeader() {
+ public void packHeader() throws HyracksDataException {
MessagePackerFromADM.packFixPos(buf, (byte) type.ordinal());
}
//TODO: this is wrong for any multibyte chars
- private int getStringLength(String s) {
+ private static int getStringLength(String s) {
return s.length();
}
@@ -66,7 +63,7 @@
public void hello() throws IOException {
this.type = MessageType.HELO;
byte[] serAddr = serialize(new InetSocketAddress(InetAddress.getLoopbackAddress(), 1));
- dataLength = serAddr.length + 5;
+ dataLength = serAddr.length + 1;
packHeader();
//TODO:make this cleaner
buf.put(BIN32);
@@ -74,32 +71,38 @@
buf.put(serAddr);
}
- public void quit() {
+ public void quit() throws HyracksDataException {
this.type = MessageType.QUIT;
dataLength = getStringLength("QUIT");
packHeader();
MessagePackerFromADM.packFixStr(buf, "QUIT");
}
- public void init(String module, String clazz, String fn) {
+ public void init(final String module, final String clazz, final String fn) throws HyracksDataException {
this.type = MessageType.INIT;
- initAry[0] = module;
- initAry[1] = clazz;
- initAry[2] = fn;
- dataLength = Arrays.stream(initAry).mapToInt(s -> getStringLength(s)).sum() + 2;
- packHeader();
- MessagePackerFromADM.packFixArrayHeader(buf, (byte) initAry.length);
- for (String s : initAry) {
- MessagePackerFromADM.packStr(buf, s);
+ // sum(string lengths) + 2 from fix array tag and message type
+ if (clazz != null) {
+ dataLength =
+ PythonMessageBuilder.getStringLength(module) + getStringLength(clazz) + getStringLength(fn) + 2;
+ } else {
+ dataLength = PythonMessageBuilder.getStringLength(module) + getStringLength(fn) + 2;
}
+ packHeader();
+ int numArgs = clazz == null ? 2 : 3;
+ MessagePackerFromADM.packFixArrayHeader(buf, (byte) numArgs);
+ MessagePackerFromADM.packStr(buf, module);
+ if (clazz != null) {
+ MessagePackerFromADM.packStr(buf, clazz);
+ }
+ MessagePackerFromADM.packStr(buf, fn);
}
- public void call(byte[] args, int lim, int numArgs) {
+ public void call(byte[] args, int lim, int numArgs) throws HyracksDataException {
if (args.length > buf.capacity()) {
int growTo = ExternalFunctionResultRouter.closestPow2(args.length);
if (growTo > MAX_BUF_SIZE) {
- //TODO: something more graceful
- throw new IllegalArgumentException("Reached maximum buffer size");
+ throw HyracksDataException.create(ErrorCode.ILLEGAL_STATE,
+ "Unable to allocate message buffer larger than:" + MAX_BUF_SIZE + " bytes");
}
buf = ByteBuffer.allocate(growTo);
}
@@ -109,11 +112,32 @@
dataLength = 5 + 1 + lim;
packHeader();
//TODO: make this switch between fixarray/array16/array32
- if (numArgs == 0) {
- buf.put(NIL);
- } else {
- buf.put(ARRAY32);
- buf.putInt(numArgs);
+ buf.put((byte) (FIXARRAY_PREFIX + 1));
+ buf.put(ARRAY32);
+ buf.putInt(numArgs);
+ if (numArgs > 0) {
+ buf.put(args, 0, lim);
+ }
+ }
+
+ public void callMulti(byte[] args, int lim, int numArgs) throws HyracksDataException {
+ if (args.length > buf.capacity()) {
+ int growTo = ExternalFunctionResultRouter.closestPow2(args.length);
+ if (growTo > MAX_BUF_SIZE) {
+ throw HyracksDataException.create(ErrorCode.ILLEGAL_STATE,
+ "Unable to allocate message buffer larger than:" + MAX_BUF_SIZE + " bytes");
+ }
+ buf = ByteBuffer.allocate(growTo);
+ }
+ buf.clear();
+ buf.position(0);
+ this.type = MessageType.CALL;
+ dataLength = 5 + 1 + lim;
+ packHeader();
+ //TODO: make this switch between fixarray/array16/array32
+ buf.put(ARRAY16);
+ buf.putShort((short) numArgs);
+ if (numArgs > 0) {
buf.put(args, 0, lim);
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarPythonFunctionEvaluator.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarPythonFunctionEvaluator.java
index 1fa53ea..e664f47 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarPythonFunctionEvaluator.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarPythonFunctionEvaluator.java
@@ -19,50 +19,31 @@
package org.apache.asterix.external.library;
+import static org.msgpack.core.MessagePack.Code.FIXARRAY_PREFIX;
+
import java.io.DataOutput;
-import java.io.File;
import java.io.IOException;
-import java.net.InetAddress;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Objects;
-import java.util.concurrent.TimeUnit;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.library.ILibraryManager;
-import org.apache.asterix.common.metadata.DataverseName;
-import org.apache.asterix.external.ipc.ExternalFunctionResultRouter;
-import org.apache.asterix.external.ipc.PythonIPCProto;
-import org.apache.asterix.external.library.msgpack.MessagePackerFromADM;
import org.apache.asterix.external.library.msgpack.MessageUnpackerToADM;
import org.apache.asterix.om.functions.IExternalFunctionInfo;
import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.EnumDeserializer;
import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.types.TypeTagUtil;
+import org.apache.asterix.runtime.evaluators.functions.PointableHelper;
import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
-import org.apache.hyracks.api.config.IApplicationConfig;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.TaskAttemptId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.exceptions.IWarningCollector;
import org.apache.hyracks.api.exceptions.SourceLocation;
import org.apache.hyracks.api.exceptions.Warning;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.resources.IDeallocatable;
-import org.apache.hyracks.control.common.controllers.NCConfig;
import org.apache.hyracks.data.std.api.IPointable;
-import org.apache.hyracks.data.std.api.IValueReference;
-import org.apache.hyracks.data.std.primitive.TaggedValuePointable;
import org.apache.hyracks.data.std.primitive.VoidPointable;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-import org.apache.hyracks.dataflow.std.base.AbstractStateObject;
-import org.apache.hyracks.ipc.impl.IPCSystem;
+import org.msgpack.core.MessagePack;
+import org.msgpack.core.MessageUnpacker;
+import org.msgpack.core.buffer.ArrayBufferInput;
class ExternalScalarPythonFunctionEvaluator extends ExternalScalarFunctionEvaluator {
@@ -72,53 +53,22 @@
private final ByteBuffer argHolder;
private final ByteBuffer outputWrapper;
private final IEvaluatorContext evaluatorContext;
- private static final String ENTRYPOINT = "entrypoint.py";
- private static final String SITE_PACKAGES = "site-packages";
private final IPointable[] argValues;
+ private final SourceLocation sourceLocation;
+
+ private MessageUnpacker unpacker;
+ private ArrayBufferInput unpackerInput;
+
+ private long fnId;
ExternalScalarPythonFunctionEvaluator(IExternalFunctionInfo finfo, IScalarEvaluatorFactory[] args,
IAType[] argTypes, IEvaluatorContext ctx, SourceLocation sourceLoc) throws HyracksDataException {
super(finfo, args, argTypes, ctx);
- IApplicationConfig cfg = ctx.getServiceContext().getAppConfig();
- String pythonPathCmd = cfg.getString(NCConfig.Option.PYTHON_CMD);
- boolean findPython = cfg.getBoolean(NCConfig.Option.PYTHON_CMD_AUTOLOCATE);
- List<String> pythonArgs = new ArrayList<>();
- if (pythonPathCmd == null) {
- //if absolute path to interpreter is not specified, try to use environmental python
- if (findPython) {
- pythonPathCmd = "/usr/bin/env";
- pythonArgs.add("python3");
- } else {
- throw HyracksDataException.create(AsterixException.create(ErrorCode.EXTERNAL_UDF_EXCEPTION,
- "Python interpreter not specified, and " + NCConfig.Option.PYTHON_CMD_AUTOLOCATE.ini()
- + " is false"));
- }
- }
- File pythonPath = new File(pythonPathCmd);
- List<String> sitePkgs = new ArrayList<>();
- sitePkgs.add(SITE_PACKAGES);
- String[] addlSitePackages =
- ctx.getServiceContext().getAppConfig().getStringArray((NCConfig.Option.PYTHON_ADDITIONAL_PACKAGES));
- sitePkgs.addAll(Arrays.asList(addlSitePackages));
- if (cfg.getBoolean(NCConfig.Option.PYTHON_USE_BUNDLED_MSGPACK)) {
- sitePkgs.add("ipc" + File.separator + SITE_PACKAGES + File.separator);
- }
- String[] pythonArgsRaw = ctx.getServiceContext().getAppConfig().getStringArray(NCConfig.Option.PYTHON_ARGS);
- if (pythonArgsRaw != null) {
- pythonArgs.addAll(Arrays.asList(pythonArgsRaw));
- }
- StringBuilder sitePackagesPathBuilder = new StringBuilder();
- for (int i = 0; i < sitePkgs.size() - 1; i++) {
- sitePackagesPathBuilder.append(sitePkgs.get(i));
- sitePackagesPathBuilder.append(File.pathSeparator);
- }
- sitePackagesPathBuilder.append(sitePkgs.get(sitePkgs.size() - 1));
-
try {
- libraryEvaluator = PythonLibraryEvaluator.getInstance(finfo, libraryManager, router, ipcSys, pythonPath,
- ctx.getTaskContext(), sitePackagesPathBuilder.toString(), pythonArgs, ctx.getWarningCollector(),
- sourceLoc);
+ PythonLibraryEvaluatorFactory evaluatorFactory = new PythonLibraryEvaluatorFactory(ctx.getTaskContext());
+ this.libraryEvaluator = evaluatorFactory.getEvaluator(finfo, sourceLoc);
+ this.fnId = libraryEvaluator.initialize(finfo);
} catch (IOException | AsterixException e) {
throw new HyracksDataException("Failed to initialize Python", e);
}
@@ -130,6 +80,9 @@
this.argHolder = ByteBuffer.wrap(new byte[Short.MAX_VALUE * 2]);
this.outputWrapper = ByteBuffer.wrap(new byte[Short.MAX_VALUE * 2]);
this.evaluatorContext = ctx;
+ this.sourceLocation = sourceLoc;
+ this.unpackerInput = new ArrayBufferInput(new byte[0]);
+ this.unpacker = MessagePack.newDefaultUnpacker(unpackerInput);
}
@Override
@@ -137,193 +90,57 @@
argHolder.clear();
for (int i = 0, ln = argEvals.length; i < ln; i++) {
argEvals[i].evaluate(tuple, argValues[i]);
+ if (!finfo.getNullCall() && PointableHelper.checkAndSetMissingOrNull(result, argValues[i])) {
+ return;
+ }
try {
- setArgument(i, argValues[i]);
+ PythonLibraryEvaluator.setArgument(argTypes[i], argValues[i], argHolder, finfo.getNullCall());
} catch (IOException e) {
throw new HyracksDataException("Error evaluating Python UDF", e);
}
}
try {
- ByteBuffer res = libraryEvaluator.callPython(argHolder, argTypes.length);
+ ByteBuffer res = libraryEvaluator.callPython(fnId, argHolder, argTypes.length);
resultBuffer.reset();
wrap(res, resultBuffer.getDataOutput());
} catch (Exception e) {
throw new HyracksDataException("Error evaluating Python UDF", e);
}
- result.set(resultBuffer.getByteArray(), resultBuffer.getStartOffset(), resultBuffer.getLength());
- }
-
- private static class PythonLibraryEvaluator extends AbstractStateObject implements IDeallocatable {
- Process p;
- IExternalFunctionInfo finfo;
- ILibraryManager libMgr;
- File pythonHome;
- PythonIPCProto proto;
- ExternalFunctionResultRouter router;
- IPCSystem ipcSys;
- String module;
- String clazz;
- String fn;
- String sitePkgs;
- List<String> pythonArgs;
- TaskAttemptId task;
- IWarningCollector warningCollector;
- SourceLocation sourceLoc;
-
- private PythonLibraryEvaluator(JobId jobId, PythonLibraryEvaluatorId evaluatorId, IExternalFunctionInfo finfo,
- ILibraryManager libMgr, File pythonHome, String sitePkgs, List<String> pythonArgs,
- ExternalFunctionResultRouter router, IPCSystem ipcSys, TaskAttemptId task,
- IWarningCollector warningCollector, SourceLocation sourceLoc) {
- super(jobId, evaluatorId);
- this.finfo = finfo;
- this.libMgr = libMgr;
- this.pythonHome = pythonHome;
- this.sitePkgs = sitePkgs;
- this.pythonArgs = pythonArgs;
- this.router = router;
- this.task = task;
- this.ipcSys = ipcSys;
- this.warningCollector = warningCollector;
- this.sourceLoc = sourceLoc;
-
- }
-
- public void initialize() throws IOException, AsterixException {
- PythonLibraryEvaluatorId fnId = (PythonLibraryEvaluatorId) id;
- List<String> externalIdents = finfo.getExternalIdentifier();
- PythonLibrary library = (PythonLibrary) libMgr.getLibrary(fnId.libraryDataverseName, fnId.libraryName);
- String wd = library.getFile().getAbsolutePath();
- String packageModule = externalIdents.get(0);
- String clazz;
- String fn;
- String externalIdent1 = externalIdents.get(1);
- int idx = externalIdent1.lastIndexOf('.');
- if (idx >= 0) {
- clazz = externalIdent1.substring(0, idx);
- fn = externalIdent1.substring(idx + 1);
- } else {
- clazz = "None";
- fn = externalIdent1;
- }
- this.fn = fn;
- this.clazz = clazz;
- this.module = packageModule;
- int port = ipcSys.getSocketAddress().getPort();
- List<String> args = new ArrayList<>();
- args.add(pythonHome.getAbsolutePath());
- args.addAll(pythonArgs);
- args.add(ENTRYPOINT);
- args.add(InetAddress.getLoopbackAddress().getHostAddress());
- args.add(Integer.toString(port));
- args.add(sitePkgs);
- ProcessBuilder pb = new ProcessBuilder(args.toArray(new String[0]));
- pb.directory(new File(wd));
- p = pb.start();
- proto = new PythonIPCProto(p.getOutputStream(), router, ipcSys);
- proto.start();
- proto.helo();
- proto.init(packageModule, clazz, fn);
- }
-
- ByteBuffer callPython(ByteBuffer arguments, int numArgs) throws Exception {
- ByteBuffer ret = null;
- try {
- ret = proto.call(arguments, numArgs);
- } catch (AsterixException e) {
- warningCollector.warn(Warning.of(sourceLoc, ErrorCode.EXTERNAL_UDF_EXCEPTION, e.getMessage()));
- }
- return ret;
- }
-
- @Override
- public void deallocate() {
- if (p != null) {
- boolean dead = false;
- try {
- p.destroy();
- dead = p.waitFor(100, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- //gonna kill it anyway
- }
- if (!dead) {
- p.destroyForcibly();
- }
- }
- }
-
- private static PythonLibraryEvaluator getInstance(IExternalFunctionInfo finfo, ILibraryManager libMgr,
- ExternalFunctionResultRouter router, IPCSystem ipcSys, File pythonHome, IHyracksTaskContext ctx,
- String sitePkgs, List<String> pythonArgs, IWarningCollector warningCollector, SourceLocation sourceLoc)
- throws IOException, AsterixException {
- PythonLibraryEvaluatorId evaluatorId =
- new PythonLibraryEvaluatorId(finfo.getLibraryDataverseName(), finfo.getLibraryName());
- PythonLibraryEvaluator evaluator = (PythonLibraryEvaluator) ctx.getStateObject(evaluatorId);
- if (evaluator == null) {
- evaluator = new PythonLibraryEvaluator(ctx.getJobletContext().getJobId(), evaluatorId, finfo, libMgr,
- pythonHome, sitePkgs, pythonArgs, router, ipcSys, ctx.getTaskAttemptId(), warningCollector,
- sourceLoc);
- ctx.registerDeallocatable(evaluator);
- evaluator.initialize();
- ctx.setStateObject(evaluator);
- }
- return evaluator;
- }
- }
-
- private static final class PythonLibraryEvaluatorId {
-
- private final DataverseName libraryDataverseName;
-
- private final String libraryName;
-
- private PythonLibraryEvaluatorId(DataverseName libraryDataverseName, String libraryName) {
- this.libraryDataverseName = Objects.requireNonNull(libraryDataverseName);
- this.libraryName = Objects.requireNonNull(libraryName);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o)
- return true;
- if (o == null || getClass() != o.getClass())
- return false;
- PythonLibraryEvaluatorId that = (PythonLibraryEvaluatorId) o;
- return libraryDataverseName.equals(that.libraryDataverseName) && libraryName.equals(that.libraryName);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(libraryDataverseName, libraryName);
- }
- }
-
- private void setArgument(int index, IValueReference valueReference) throws IOException {
- IAType type = argTypes[index];
- ATypeTag tag = type.getTypeTag();
- switch (tag) {
- case ANY:
- TaggedValuePointable pointy = TaggedValuePointable.FACTORY.createPointable();
- pointy.set(valueReference);
- ATypeTag rtTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(pointy.getTag());
- IAType rtType = TypeTagUtil.getBuiltinTypeByTag(rtTypeTag);
- MessagePackerFromADM.pack(valueReference, rtType, argHolder);
- break;
- default:
- MessagePackerFromADM.pack(valueReference, type, argHolder);
- break;
- }
+ result.set(resultBuffer);
}
private void wrap(ByteBuffer resultWrapper, DataOutput out) throws HyracksDataException {
//TODO: output wrapper needs to grow with result wrapper
outputWrapper.clear();
outputWrapper.position(0);
- MessageUnpackerToADM.unpack(resultWrapper, outputWrapper, true);
try {
+ if (resultWrapper == null) {
+ outputWrapper.put(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
+ out.write(outputWrapper.array(), 0, outputWrapper.position() + outputWrapper.arrayOffset());
+ return;
+ }
+ if ((resultWrapper.get() ^ FIXARRAY_PREFIX) != (byte) 2) {
+ throw HyracksDataException.create(AsterixException.create(ErrorCode.EXTERNAL_UDF_EXCEPTION,
+ "Returned result missing outer wrapper"));
+ }
+ int numresults = resultWrapper.get() ^ FIXARRAY_PREFIX;
+ if (numresults > 0) {
+ MessageUnpackerToADM.unpack(resultWrapper, outputWrapper, true);
+ }
+ unpackerInput.reset(resultWrapper.array(), resultWrapper.position() + resultWrapper.arrayOffset(),
+ resultWrapper.remaining());
+ unpacker.reset(unpackerInput);
+ int numEntries = unpacker.unpackArrayHeader();
+ for (int j = 0; j < numEntries; j++) {
+ outputWrapper.put(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
+ if (evaluatorContext.getWarningCollector().shouldWarn()) {
+ evaluatorContext.getWarningCollector().warn(
+ Warning.of(sourceLocation, ErrorCode.EXTERNAL_UDF_EXCEPTION, unpacker.unpackString()));
+ }
+ }
out.write(outputWrapper.array(), 0, outputWrapper.position() + outputWrapper.arrayOffset());
} catch (IOException e) {
- throw new HyracksDataException(e.getMessage());
+ throw HyracksDataException.create(e);
}
-
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryEvaluator.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryEvaluator.java
new file mode 100644
index 0000000..e2229ee
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryEvaluator.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.library;
+
+import static org.apache.asterix.common.exceptions.ErrorCode.EXTERNAL_UDF_EXCEPTION;
+import static org.msgpack.core.MessagePack.Code.ARRAY16;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.library.ILibraryManager;
+import org.apache.asterix.external.ipc.ExternalFunctionResultRouter;
+import org.apache.asterix.external.ipc.PythonIPCProto;
+import org.apache.asterix.external.library.msgpack.MessagePackerFromADM;
+import org.apache.asterix.om.functions.IExternalFunctionInfo;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.EnumDeserializer;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.types.TypeTagUtil;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.TaskAttemptId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.api.exceptions.Warning;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.resources.IDeallocatable;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.TaggedValuePointable;
+import org.apache.hyracks.dataflow.std.base.AbstractStateObject;
+import org.apache.hyracks.ipc.impl.IPCSystem;
+
+public class PythonLibraryEvaluator extends AbstractStateObject implements IDeallocatable {
+
+ public static final String ENTRYPOINT = "entrypoint.py";
+ public static final String SITE_PACKAGES = "site-packages";
+
+ private Process p;
+ private ILibraryManager libMgr;
+ private File pythonHome;
+ private PythonIPCProto proto;
+ private ExternalFunctionResultRouter router;
+ private IPCSystem ipcSys;
+ private String sitePkgs;
+ private List<String> pythonArgs;
+ private TaskAttemptId task;
+ private IWarningCollector warningCollector;
+ private SourceLocation sourceLoc;
+
+ public PythonLibraryEvaluator(JobId jobId, PythonLibraryEvaluatorId evaluatorId, ILibraryManager libMgr,
+ File pythonHome, String sitePkgs, List<String> pythonArgs, ExternalFunctionResultRouter router,
+ IPCSystem ipcSys, TaskAttemptId task, IWarningCollector warningCollector, SourceLocation sourceLoc) {
+ super(jobId, evaluatorId);
+ this.libMgr = libMgr;
+ this.pythonHome = pythonHome;
+ this.sitePkgs = sitePkgs;
+ this.pythonArgs = pythonArgs;
+ this.router = router;
+ this.task = task;
+ this.ipcSys = ipcSys;
+ this.warningCollector = warningCollector;
+ this.sourceLoc = sourceLoc;
+
+ }
+
+ private void initialize() throws IOException, AsterixException {
+ PythonLibraryEvaluatorId fnId = (PythonLibraryEvaluatorId) id;
+ PythonLibrary library =
+ (PythonLibrary) libMgr.getLibrary(fnId.getLibraryDataverseName(), fnId.getLibraryName());
+ String wd = library.getFile().getAbsolutePath();
+ int port = ipcSys.getSocketAddress().getPort();
+ List<String> args = new ArrayList<>();
+ args.add(pythonHome.getAbsolutePath());
+ args.addAll(pythonArgs);
+ args.add(ENTRYPOINT);
+ args.add(InetAddress.getLoopbackAddress().getHostAddress());
+ args.add(Integer.toString(port));
+ args.add(sitePkgs);
+
+ ProcessBuilder pb = new ProcessBuilder(args.toArray(new String[0]));
+ pb.directory(new File(wd));
+ p = pb.start();
+ proto = new PythonIPCProto(p.getOutputStream(), router, p);
+ proto.start();
+ proto.helo();
+ }
+
+ public long initialize(IExternalFunctionInfo finfo) throws IOException, AsterixException {
+ List<String> externalIdents = finfo.getExternalIdentifier();
+ String packageModule = externalIdents.get(0);
+ String clazz;
+ String fn;
+ String externalIdent1 = externalIdents.get(1);
+ int idx = externalIdent1.lastIndexOf('.');
+ if (idx >= 0) {
+ clazz = externalIdent1.substring(0, idx);
+ fn = externalIdent1.substring(idx + 1);
+ } else {
+ clazz = null;
+ fn = externalIdent1;
+ }
+ return proto.init(packageModule, clazz, fn);
+ }
+
+ public ByteBuffer callPython(long id, ByteBuffer arguments, int numArgs) throws IOException {
+ ByteBuffer ret = null;
+ try {
+ ret = proto.call(id, arguments, numArgs);
+ } catch (AsterixException e) {
+ if (warningCollector.shouldWarn()) {
+ warningCollector.warn(Warning.of(sourceLoc, EXTERNAL_UDF_EXCEPTION, e.getMessage()));
+ }
+ }
+ return ret;
+ }
+
+ public ByteBuffer callPythonMulti(long id, ByteBuffer arguments, int numTuples) throws IOException {
+ ByteBuffer ret = null;
+ try {
+ ret = proto.callMulti(id, arguments, numTuples);
+ } catch (AsterixException e) {
+ if (warningCollector.shouldWarn()) {
+ warningCollector.warn(Warning.of(sourceLoc, EXTERNAL_UDF_EXCEPTION, e.getMessage()));
+ }
+ }
+ return ret;
+ }
+
+ @Override
+ public void deallocate() {
+ if (p != null) {
+ boolean dead = false;
+ try {
+ p.destroy();
+ dead = p.waitFor(100, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ //gonna kill it anyway
+ }
+ if (!dead) {
+ p.destroyForcibly();
+ }
+ }
+ router.removeRoute(proto.getRouteId());
+ }
+
+ public static ATypeTag setArgument(IAType type, IValueReference valueReference, ByteBuffer argHolder,
+ boolean nullCall) throws IOException {
+ ATypeTag tag = type.getTypeTag();
+ if (tag == ATypeTag.ANY) {
+ TaggedValuePointable pointy = TaggedValuePointable.FACTORY.createPointable();
+ pointy.set(valueReference);
+ ATypeTag rtTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(pointy.getTag());
+ IAType rtType = TypeTagUtil.getBuiltinTypeByTag(rtTypeTag);
+ return MessagePackerFromADM.pack(valueReference, rtType, argHolder, nullCall);
+ } else {
+ return MessagePackerFromADM.pack(valueReference, type, argHolder, nullCall);
+ }
+ }
+
+ public static ATypeTag peekArgument(IAType type, IValueReference valueReference) throws HyracksDataException {
+ ATypeTag tag = type.getTypeTag();
+ if (tag == ATypeTag.ANY) {
+ TaggedValuePointable pointy = TaggedValuePointable.FACTORY.createPointable();
+ pointy.set(valueReference);
+ ATypeTag rtTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(pointy.getTag());
+ IAType rtType = TypeTagUtil.getBuiltinTypeByTag(rtTypeTag);
+ return MessagePackerFromADM.peekUnknown(rtType);
+ } else {
+ return MessagePackerFromADM.peekUnknown(type);
+ }
+ }
+
+ public static void setVoidArgument(ByteBuffer argHolder) {
+ argHolder.put(ARRAY16);
+ argHolder.putShort((short) 0);
+ }
+
+ public static PythonLibraryEvaluator getInstance(IExternalFunctionInfo finfo, ILibraryManager libMgr,
+ ExternalFunctionResultRouter router, IPCSystem ipcSys, File pythonHome, IHyracksTaskContext ctx,
+ String sitePkgs, List<String> pythonArgs, IWarningCollector warningCollector, SourceLocation sourceLoc)
+ throws IOException, AsterixException {
+ PythonLibraryEvaluatorId evaluatorId = new PythonLibraryEvaluatorId(finfo.getLibraryDataverseName(),
+ finfo.getLibraryName(), Thread.currentThread());
+ PythonLibraryEvaluator evaluator = (PythonLibraryEvaluator) ctx.getStateObject(evaluatorId);
+ if (evaluator == null) {
+ evaluator = new PythonLibraryEvaluator(ctx.getJobletContext().getJobId(), evaluatorId, libMgr, pythonHome,
+ sitePkgs, pythonArgs, router, ipcSys, ctx.getTaskAttemptId(), warningCollector, sourceLoc);
+ ctx.getJobletContext().registerDeallocatable(evaluator);
+ evaluator.initialize();
+ ctx.setStateObject(evaluator);
+ }
+ return evaluator;
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryEvaluatorFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryEvaluatorFactory.java
new file mode 100644
index 0000000..86d51de
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryEvaluatorFactory.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.library;
+
+import static org.apache.asterix.external.library.PythonLibraryEvaluator.SITE_PACKAGES;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.library.ILibraryManager;
+import org.apache.asterix.external.ipc.ExternalFunctionResultRouter;
+import org.apache.asterix.om.functions.IExternalFunctionInfo;
+import org.apache.hyracks.api.config.IApplicationConfig;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.control.common.controllers.NCConfig;
+import org.apache.hyracks.ipc.impl.IPCSystem;
+
+public class PythonLibraryEvaluatorFactory {
+ private final ILibraryManager libraryManager;
+ private final IPCSystem ipcSys;
+ private final File pythonPath;
+ private final IHyracksTaskContext ctx;
+ private final ExternalFunctionResultRouter router;
+ private final String sitePackagesPath;
+ private final List<String> pythonArgs;
+
+ public PythonLibraryEvaluatorFactory(IHyracksTaskContext ctx) throws AsterixException {
+ this.ctx = ctx;
+ libraryManager = ((INcApplicationContext) ctx.getJobletContext().getServiceContext().getApplicationContext())
+ .getLibraryManager();
+ router = libraryManager.getRouter();
+ ipcSys = libraryManager.getIPCI();
+ IApplicationConfig appCfg = ctx.getJobletContext().getServiceContext().getAppConfig();
+ String pythonPathCmd = appCfg.getString(NCConfig.Option.PYTHON_CMD);
+ boolean findPython = appCfg.getBoolean(NCConfig.Option.PYTHON_CMD_AUTOLOCATE);
+ pythonArgs = new ArrayList<>();
+ if (pythonPathCmd == null) {
+ if (findPython) {
+ //if absolute path to interpreter is not specified, try to use environmental python
+ pythonPathCmd = "/usr/bin/env";
+ pythonArgs.add("python3");
+ } else {
+ throw AsterixException.create(ErrorCode.EXTERNAL_UDF_EXCEPTION, "Python interpreter not specified, and "
+ + NCConfig.Option.PYTHON_CMD_AUTOLOCATE.ini() + " is false");
+ }
+ }
+ pythonPath = new File(pythonPathCmd);
+ List<String> sitePkgs = new ArrayList<>();
+ sitePkgs.add(SITE_PACKAGES);
+ String[] addlSitePackages = appCfg.getStringArray((NCConfig.Option.PYTHON_ADDITIONAL_PACKAGES));
+ sitePkgs.addAll(Arrays.asList(addlSitePackages));
+ if (appCfg.getBoolean(NCConfig.Option.PYTHON_USE_BUNDLED_MSGPACK)) {
+ sitePkgs.add("ipc" + File.separator + SITE_PACKAGES + File.separator);
+ }
+ String[] pythonArgsRaw = appCfg.getStringArray(NCConfig.Option.PYTHON_ARGS);
+ if (pythonArgsRaw != null) {
+ pythonArgs.addAll(Arrays.asList(pythonArgsRaw));
+ }
+ StringBuilder sitePackagesPathBuilder = new StringBuilder();
+ for (int i = 0; i < sitePkgs.size() - 1; i++) {
+ sitePackagesPathBuilder.append(sitePkgs.get(i));
+ sitePackagesPathBuilder.append(File.pathSeparator);
+ }
+ sitePackagesPathBuilder.append(sitePkgs.get(sitePkgs.size() - 1));
+ sitePackagesPath = sitePackagesPathBuilder.toString();
+ }
+
+ public PythonLibraryEvaluator getEvaluator(IExternalFunctionInfo fnInfo, SourceLocation sourceLoc)
+ throws IOException, AsterixException {
+ return PythonLibraryEvaluator.getInstance(fnInfo, libraryManager, router, ipcSys, pythonPath, ctx,
+ sitePackagesPath, pythonArgs, ctx.getWarningCollector(), sourceLoc);
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryEvaluatorId.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryEvaluatorId.java
new file mode 100644
index 0000000..c2f6f00
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryEvaluatorId.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.library;
+
+import java.util.Objects;
+
+import org.apache.asterix.common.metadata.DataverseName;
+
+final class PythonLibraryEvaluatorId {
+
+ private final DataverseName libraryDataverseName;
+
+ private final String libraryName;
+
+ private final Thread thread;
+
+ PythonLibraryEvaluatorId(DataverseName libraryDataverseName, String libraryName, Thread thread) {
+ this.libraryDataverseName = Objects.requireNonNull(libraryDataverseName);
+ this.libraryName = Objects.requireNonNull(libraryName);
+ this.thread = Objects.requireNonNull(thread);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+ PythonLibraryEvaluatorId that = (PythonLibraryEvaluatorId) o;
+ return libraryDataverseName.equals(that.libraryDataverseName) && libraryName.equals(that.libraryName)
+ && thread.equals(that.thread);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(libraryDataverseName, libraryName);
+ }
+
+ public DataverseName getLibraryDataverseName() {
+ return libraryDataverseName;
+ }
+
+ public String getLibraryName() {
+ return libraryName;
+ }
+
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/MessagePackerFromADM.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/MessagePackerFromADM.java
index 383b2f1..f0ac56e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/MessagePackerFromADM.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/MessagePackerFromADM.java
@@ -27,16 +27,15 @@
import static org.msgpack.core.MessagePack.Code.INT64;
import static org.msgpack.core.MessagePack.Code.INT8;
import static org.msgpack.core.MessagePack.Code.MAP32;
+import static org.msgpack.core.MessagePack.Code.NIL;
import static org.msgpack.core.MessagePack.Code.STR32;
import static org.msgpack.core.MessagePack.Code.TRUE;
-import static org.msgpack.core.MessagePack.Code.UINT16;
-import static org.msgpack.core.MessagePack.Code.UINT32;
-import static org.msgpack.core.MessagePack.Code.UINT64;
-import static org.msgpack.core.MessagePack.Code.UINT8;
import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.types.AUnionType;
@@ -64,11 +63,12 @@
private static final int ITEM_COUNT_SIZE = 4;
private static final int ITEM_OFFSET_SIZE = 4;
- public static void pack(IValueReference ptr, IAType type, ByteBuffer out) throws HyracksDataException {
- pack(ptr.getByteArray(), ptr.getStartOffset(), type, true, out);
+ public static ATypeTag pack(IValueReference ptr, IAType type, ByteBuffer out, boolean packUnknown)
+ throws HyracksDataException {
+ return pack(ptr.getByteArray(), ptr.getStartOffset(), type, true, packUnknown, out);
}
- public static void pack(byte[] ptr, int offs, IAType type, boolean tagged, ByteBuffer out)
+ public static ATypeTag pack(byte[] ptr, int offs, IAType type, boolean tagged, boolean packUnknown, ByteBuffer out)
throws HyracksDataException {
int relOffs = tagged ? offs + 1 : offs;
ATypeTag tag = type.getTypeTag();
@@ -108,32 +108,33 @@
case OBJECT:
packObject(ptr, offs, type, out);
break;
+ case MISSING:
+ case NULL:
+ if (packUnknown) {
+ packNull(out);
+ break;
+ } else {
+ return tag;
+ }
default:
- throw new IllegalArgumentException("NYI");
+ throw HyracksDataException.create(AsterixException.create(ErrorCode.PARSER_ADM_DATA_PARSER_CAST_ERROR,
+ tag.name(), "to a msgpack"));
+ }
+ return ATypeTag.TYPE;
+ }
+
+ public static ATypeTag peekUnknown(IAType type) {
+ switch (type.getTypeTag()) {
+ case MISSING:
+ case NULL:
+ return type.getTypeTag();
+ default:
+ return ATypeTag.TYPE;
}
}
- public static byte minPackPosLong(ByteBuffer out, long in) {
- if (in < 127) {
- packFixPos(out, (byte) in);
- return 1;
- } else if (in < Byte.MAX_VALUE) {
- out.put(UINT8);
- out.put((byte) in);
- return 2;
- } else if (in < Short.MAX_VALUE) {
- out.put(UINT16);
- out.putShort((short) in);
- return 3;
- } else if (in < Integer.MAX_VALUE) {
- out.put(UINT32);
- out.putInt((int) in);
- return 5;
- } else {
- out.put(UINT64);
- out.putLong(in);
- return 9;
- }
+ public static void packNull(ByteBuffer out) {
+ out.put(NIL);
}
public static void packByte(ByteBuffer out, byte in) {
@@ -167,18 +168,20 @@
out.putDouble(in);
}
- public static void packFixPos(ByteBuffer out, byte in) {
+ public static void packFixPos(ByteBuffer out, byte in) throws HyracksDataException {
byte mask = (byte) (1 << 7);
if ((in & mask) != 0) {
- throw new IllegalArgumentException("fixint7 must be positive");
+ throw HyracksDataException.create(org.apache.hyracks.api.exceptions.ErrorCode.ILLEGAL_STATE,
+ "fixint7 must be positive");
}
out.put(in);
}
- public static void packFixStr(ByteBuffer buf, String in) {
- byte[] strBytes = in.getBytes(Charset.forName("UTF-8"));
+ public static void packFixStr(ByteBuffer buf, String in) throws HyracksDataException {
+ byte[] strBytes = in.getBytes(StandardCharsets.UTF_8);
if (strBytes.length > 31) {
- throw new IllegalArgumentException("fixstr cannot be longer than 31");
+ throw HyracksDataException.create(org.apache.hyracks.api.exceptions.ErrorCode.ILLEGAL_STATE,
+ "fixint7 must be positive");
}
buf.put((byte) (FIXSTR_PREFIX + strBytes.length));
buf.put(strBytes);
@@ -186,7 +189,7 @@
public static void packStr(ByteBuffer out, String in) {
out.put(STR32);
- byte[] strBytes = in.getBytes(Charset.forName("UTF-8"));
+ byte[] strBytes = in.getBytes(StandardCharsets.UTF_8);
out.putInt(strBytes.length);
out.put(strBytes);
}
@@ -195,14 +198,14 @@
out.put(STR32);
//TODO: tagged/untagged. closed support is borked so always tagged rn
String str = UTF8StringUtil.toString(in, offs);
- byte[] strBytes = str.getBytes(Charset.forName("UTF-8"));
+ byte[] strBytes = str.getBytes(StandardCharsets.UTF_8);
out.putInt(strBytes.length);
out.put(strBytes);
}
public static void packStr(String str, ByteBuffer out) {
out.put(STR32);
- byte[] strBytes = str.getBytes(Charset.forName("UTF-8"));
+ byte[] strBytes = str.getBytes(StandardCharsets.UTF_8);
out.putInt(strBytes.length);
out.put(strBytes);
}
@@ -221,12 +224,12 @@
if (fixType) {
int itemOffs = itemCtOffs + ITEM_COUNT_SIZE + (i
* NonTaggedFormatUtil.getFieldValueLength(in, 0, collType.getItemType().getTypeTag(), false));
- pack(in, itemOffs, collType.getItemType(), false, out);
+ pack(in, itemOffs, collType.getItemType(), false, true, out);
} else {
int itemOffs =
offs + IntegerPointable.getInteger(in, itemCtOffs + ITEM_COUNT_SIZE + (i * ITEM_OFFSET_SIZE));
ATypeTag tag = ATypeTag.VALUE_TYPE_MAPPING[BytePointable.getByte(in, itemOffs)];
- pack(in, itemOffs, TypeTagUtil.getBuiltinTypeByTag(tag), true, out);
+ pack(in, itemOffs, TypeTagUtil.getBuiltinTypeByTag(tag), true, true, out);
}
}
}
@@ -240,14 +243,14 @@
String field = recType.getFieldNames()[i];
IAType fieldType = RecordUtils.getClosedFieldType(recType, i);
packStr(field, out);
- pack(in, RecordUtils.getClosedFieldOffset(in, offs, recType, i), fieldType, false, out);
+ pack(in, RecordUtils.getClosedFieldOffset(in, offs, recType, i), fieldType, false, true, out);
}
if (RecordUtils.isExpanded(in, offs, recType)) {
for (int i = 0; i < RecordUtils.getOpenFieldCount(in, offs, recType); i++) {
packStr(in, RecordUtils.getOpenFieldNameOffset(in, offs, recType, i), out);
ATypeTag tag = ATypeTag.VALUE_TYPE_MAPPING[RecordUtils.getOpenFieldTag(in, offs, recType, i)];
pack(in, RecordUtils.getOpenFieldValueOffset(in, offs, recType, i),
- TypeTagUtil.getBuiltinTypeByTag(tag), true, out);
+ TypeTagUtil.getBuiltinTypeByTag(tag), true, true, out);
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/MessageUnpackerToADM.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/MessageUnpackerToADM.java
index fedd1f6..4af1121 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/MessageUnpackerToADM.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/MessageUnpackerToADM.java
@@ -20,13 +20,16 @@
import java.nio.ByteBuffer;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.util.encoding.VarLenIntEncoderDecoder;
import org.apache.hyracks.util.string.UTF8StringUtil;
public class MessageUnpackerToADM {
- public static void unpack(ByteBuffer in, ByteBuffer out, boolean tagged) {
+ public static void unpack(ByteBuffer in, ByteBuffer out, boolean tagged) throws HyracksDataException {
byte tag = NIL;
if (in != null) {
tag = in.get();
@@ -68,6 +71,9 @@
case UINT32:
unpackUInt(in, out, tagged);
break;
+ case UINT64:
+ unpackULong(in, out, tagged);
+ break;
case INT8:
unpackByte(in, out, tagged);
break;
@@ -109,42 +115,12 @@
break;
default:
- throw new IllegalArgumentException("NYI");
+ throw HyracksDataException.create(AsterixException.create(
+ ErrorCode.PARSER_ADM_DATA_PARSER_CAST_ERROR, "msgpack tag " + tag + " ", "to an ADM type"));
}
}
}
- public static long unpackNextInt(ByteBuffer in) {
- byte tag = in.get();
- if (isFixInt(tag)) {
- if (isPosFixInt(tag)) {
- return tag;
- } else if (isNegFixInt(tag)) {
- return (tag ^ NEGFIXINT_PREFIX);
- }
- } else {
- switch (tag) {
- case INT8:
- return in.get();
- case UINT8:
- return Byte.toUnsignedInt(in.get());
- case INT16:
- return in.getShort();
- case UINT16:
- return Short.toUnsignedInt(in.getShort());
- case INT32:
- return in.getInt();
- case UINT32:
- return Integer.toUnsignedLong(in.getInt());
- case INT64:
- return in.getLong();
- default:
- throw new IllegalArgumentException("NYI");
- }
- }
- return -1;
- }
-
public static void unpackByte(ByteBuffer in, ByteBuffer out, boolean tagged) {
if (tagged) {
out.put(ATypeTag.SERIALIZED_INT8_TYPE_TAG);
@@ -194,6 +170,17 @@
out.putLong(in.getInt() & 0x00000000FFFFFFFFl);
}
+ public static void unpackULong(ByteBuffer in, ByteBuffer out, boolean tagged) {
+ if (tagged) {
+ out.put(ATypeTag.SERIALIZED_INT64_TYPE_TAG);
+ }
+ long val = in.getLong();
+ if (val < 0) {
+ throw new IllegalArgumentException("Integer overflow");
+ }
+ out.putLong(val);
+ }
+
public static void unpackFloat(ByteBuffer in, ByteBuffer out, boolean tagged) {
if (tagged) {
out.put(ATypeTag.SERIALIZED_FLOAT_TYPE_TAG);
@@ -209,9 +196,9 @@
out.putDouble(in.getDouble());
}
- public static void unpackArray(ByteBuffer in, ByteBuffer out, long uLen) {
+ public static void unpackArray(ByteBuffer in, ByteBuffer out, long uLen) throws HyracksDataException {
if (uLen > Integer.MAX_VALUE) {
- throw new UnsupportedOperationException("String is too long");
+ throw new UnsupportedOperationException("Array is too long");
}
int count = (int) uLen;
int offs = out.position();
@@ -233,7 +220,7 @@
out.putInt(asxLenPos, totalLen);
}
- public static void unpackMap(ByteBuffer in, ByteBuffer out, int count) {
+ public static void unpackMap(ByteBuffer in, ByteBuffer out, int count) throws HyracksDataException {
//TODO: need to handle typed records. this only produces a completely open record.
//hdr size = 6?
int startOffs = out.position();
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalAssignBatchRuntimeFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalAssignBatchRuntimeFactory.java
index 44a17a9..39e480a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalAssignBatchRuntimeFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalAssignBatchRuntimeFactory.java
@@ -19,18 +19,44 @@
package org.apache.asterix.external.operators;
+import static org.msgpack.core.MessagePack.Code.ARRAY16;
+import static org.msgpack.core.MessagePack.Code.ARRAY32;
+import static org.msgpack.core.MessagePack.Code.FIXARRAY_PREFIX;
+import static org.msgpack.core.MessagePack.Code.isFixedArray;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.external.library.PythonLibraryEvaluator;
+import org.apache.asterix.external.library.PythonLibraryEvaluatorFactory;
+import org.apache.asterix.external.library.msgpack.MessageUnpackerToADM;
import org.apache.asterix.om.functions.IExternalFunctionDescriptor;
-import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.Counter;
+import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputPushRuntime;
import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputRuntimeFactory;
import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.Warning;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import org.msgpack.core.MessagePack;
+import org.msgpack.core.MessagePackException;
+import org.msgpack.core.MessageUnpacker;
+import org.msgpack.core.buffer.ArrayBufferInput;
public final class ExternalAssignBatchRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
private static final long serialVersionUID = 1L;
-
private int[] outColumns;
private final IExternalFunctionDescriptor[] fnDescs;
private final int[][] fnArgColumns;
@@ -44,9 +70,242 @@
}
@Override
- public AbstractOneInputOneOutputPushRuntime createOneOutputPushRuntime(IHyracksTaskContext ctx)
- throws HyracksDataException {
- throw new HyracksDataException(ErrorCode.OPERATOR_NOT_IMPLEMENTED, sourceLoc,
- PhysicalOperatorTag.ASSIGN_BATCH.toString());
+ public AbstractOneInputOneOutputPushRuntime createOneOutputPushRuntime(IHyracksTaskContext ctx) {
+
+ final int[] projectionToOutColumns = new int[projectionList.length];
+ for (int j = 0; j < projectionList.length; j++) {
+ projectionToOutColumns[j] = Arrays.binarySearch(outColumns, projectionList[j]);
+ }
+
+ return new AbstractOneInputOneOutputOneFramePushRuntime() {
+
+ private ByteBuffer outputWrapper;
+ private List<ByteBuffer> argHolders;
+ ArrayTupleBuilder tupleBuilder;
+ private List<Pair<Long, PythonLibraryEvaluator>> libraryEvaluators;
+ private ATypeTag[][] nullCalls;
+ private int[] numCalls;
+ private VoidPointable ref;
+ private MessageUnpacker unpacker;
+ private ArrayBufferInput unpackerInput;
+ private List<Pair<ByteBuffer, Counter>> batchResults;
+
+ @Override
+ public void open() throws HyracksDataException {
+ super.open();
+ initAccessAppend(ctx);
+ tupleBuilder = new ArrayTupleBuilder(projectionList.length);
+ tRef = new FrameTupleReference();
+ ref = VoidPointable.FACTORY.createPointable();
+ libraryEvaluators = new ArrayList<>();
+ try {
+ PythonLibraryEvaluatorFactory evalFactory = new PythonLibraryEvaluatorFactory(ctx);
+ for (IExternalFunctionDescriptor fnDesc : fnDescs) {
+ PythonLibraryEvaluator eval = evalFactory.getEvaluator(fnDesc.getFunctionInfo(), sourceLoc);
+ long id = eval.initialize(fnDesc.getFunctionInfo());
+ libraryEvaluators.add(new Pair<>(id, eval));
+ }
+ } catch (IOException | AsterixException e) {
+ throw RuntimeDataException.create(ErrorCode.EXTERNAL_UDF_EXCEPTION, e, sourceLoc, e.getMessage());
+ }
+ argHolders = new ArrayList<>(fnArgColumns.length);
+ for (int i = 0; i < fnArgColumns.length; i++) {
+ argHolders.add(ctx.allocateFrame());
+ }
+ outputWrapper = ctx.allocateFrame();
+ nullCalls = new ATypeTag[argHolders.size()][0];
+ numCalls = new int[fnArgColumns.length];
+ batchResults = new ArrayList<>(argHolders.size());
+ for (int i = 0; i < argHolders.size(); i++) {
+ batchResults.add(new Pair<>(ctx.allocateFrame(), new Counter(-1)));
+ }
+ unpackerInput = new ArrayBufferInput(new byte[0]);
+ unpacker = MessagePack.newDefaultUnpacker(unpackerInput);
+ }
+
+ private void resetBuffers(int numTuples, int[] numCalls) {
+ for (int func = 0; func < fnArgColumns.length; func++) {
+ argHolders.get(func).clear();
+ argHolders.get(func).position(0);
+ if (nullCalls[func].length < numTuples) {
+ nullCalls[func] = new ATypeTag[numTuples];
+ }
+ numCalls[func] = numTuples;
+ Arrays.fill(nullCalls[func], ATypeTag.TYPE);
+ for (Pair<ByteBuffer, Counter> batch : batchResults) {
+ batch.getFirst().clear();
+ batch.getFirst().position(0);
+ batch.getSecond().set(-1);
+ }
+ }
+ }
+
+ private ATypeTag handleNullMatrix(int func, int t, ATypeTag argumentPresence, ATypeTag argumentStatus) {
+ //If any argument is unknown, skip call. If any argument is null, return null, first.
+ //However, if any argument is missing, return missing instead.
+ if (nullCalls[func][t] == ATypeTag.TYPE && argumentPresence != ATypeTag.TYPE) {
+ if (argumentPresence == ATypeTag.NULL && argumentStatus != ATypeTag.MISSING) {
+ nullCalls[func][t] = argumentPresence;
+ return ATypeTag.NULL;
+ } else {
+ nullCalls[func][t] = argumentPresence;
+ return ATypeTag.MISSING;
+ }
+ }
+ return argumentPresence;
+ }
+
+ private void collectFunctionWarnings(List<Pair<ByteBuffer, Counter>> batchResults) throws IOException {
+ for (Pair<ByteBuffer, Counter> result : batchResults) {
+ if (result.getSecond().get() > -1) {
+ ByteBuffer resBuf = result.getFirst();
+ unpackerInput.reset(resBuf.array(), resBuf.position() + resBuf.arrayOffset(),
+ resBuf.remaining());
+ unpacker.reset(unpackerInput);
+ try {
+ int numEntries = unpacker.unpackArrayHeader();
+ for (int j = 0; j < numEntries; j++) {
+ if (ctx.getWarningCollector().shouldWarn()) {
+ ctx.getWarningCollector().warn(Warning.of(sourceLoc,
+ ErrorCode.EXTERNAL_UDF_EXCEPTION, unpacker.unpackString()));
+ }
+ }
+ } catch (MessagePackException e) {
+ if (ctx.getWarningCollector().shouldWarn()) {
+ ctx.getWarningCollector().warn(Warning.of(sourceLoc, ErrorCode.EXTERNAL_UDF_EXCEPTION,
+ "Error retrieving returned warnings from Python UDF"));
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ tAccess.reset(buffer);
+ tupleBuilder.reset();
+ try {
+ int numTuples = tAccess.getTupleCount();
+ resetBuffers(numTuples, numCalls);
+ //build columns of arguments for each function
+ for (int t = 0; t < numTuples; t++) {
+ for (int func = 0; func < fnArgColumns.length; func++) {
+ tRef.reset(tAccess, t);
+ int[] cols = fnArgColumns[func];
+ //TODO: switch between fixarray/array16/array32 where appropriate
+ ATypeTag argumentStatus = ATypeTag.TYPE;
+ if (!fnDescs[func].getFunctionInfo().getNullCall()) {
+ for (int colIdx = 0; colIdx < cols.length; colIdx++) {
+ ref.set(buffer.array(), tRef.getFieldStart(cols[colIdx]),
+ tRef.getFieldLength(cols[colIdx]));
+ ATypeTag argumentPresence = PythonLibraryEvaluator
+ .peekArgument(fnDescs[func].getArgumentTypes()[colIdx], ref);
+ argumentStatus = handleNullMatrix(func, t, argumentPresence, argumentStatus);
+ }
+ }
+ if (argumentStatus == ATypeTag.TYPE) {
+ if (cols.length > 0) {
+ argHolders.get(func).put(ARRAY16);
+ argHolders.get(func).putShort((short) cols.length);
+ }
+ for (int colIdx = 0; colIdx < cols.length; colIdx++) {
+ ref.set(buffer.array(), tRef.getFieldStart(cols[colIdx]),
+ tRef.getFieldLength(cols[colIdx]));
+ PythonLibraryEvaluator.setArgument(fnDescs[func].getArgumentTypes()[colIdx], ref,
+ argHolders.get(func), fnDescs[func].getFunctionInfo().getNullCall());
+ }
+ } else {
+ numCalls[func]--;
+ }
+ if (cols.length == 0) {
+ PythonLibraryEvaluator.setVoidArgument(argHolders.get(func));
+ }
+ }
+ }
+ //TODO: maybe this could be done in parallel for each unique library evaluator?
+ for (int argHolderIdx = 0; argHolderIdx < argHolders.size(); argHolderIdx++) {
+ Pair<Long, PythonLibraryEvaluator> fnEval = libraryEvaluators.get(argHolderIdx);
+ ByteBuffer columnResult = fnEval.getSecond().callPythonMulti(fnEval.getFirst(),
+ argHolders.get(argHolderIdx), numCalls[argHolderIdx]);
+ if (columnResult != null) {
+ Pair<ByteBuffer, Counter> resultholder = batchResults.get(argHolderIdx);
+ if (resultholder.getFirst().capacity() < columnResult.capacity()) {
+ resultholder.setFirst(ctx.allocateFrame(columnResult.capacity()));
+ }
+ ByteBuffer resultBuf = resultholder.getFirst();
+ resultBuf.clear();
+ resultBuf.position(0);
+ //offset 1 to skip message type
+ System.arraycopy(columnResult.array(), columnResult.arrayOffset() + 1, resultBuf.array(),
+ resultBuf.arrayOffset(), columnResult.capacity() - 1);
+ //wrapper for results and warnings arrays. always length 2
+ consumeAndGetBatchLength(resultBuf);
+ int numResults = (int) consumeAndGetBatchLength(resultBuf);
+ resultholder.getSecond().set(numResults);
+ } else {
+ if (ctx.getWarningCollector().shouldWarn()) {
+ ctx.getWarningCollector()
+ .warn(Warning.of(sourceLoc, ErrorCode.EXTERNAL_UDF_EXCEPTION,
+ "Function "
+ + fnDescs[argHolderIdx].getFunctionInfo()
+ .getFunctionIdentifier().toString()
+ + " failed to execute"));
+ }
+ }
+ }
+ //decompose returned function columns into frame tuple format
+ for (int i = 0; i < numTuples; i++) {
+ tupleBuilder.reset();
+ for (int f = 0; f < projectionList.length; f++) {
+ int k = projectionToOutColumns[f];
+ if (k >= 0) {
+ outputWrapper.clear();
+ outputWrapper.position(0);
+ Pair<ByteBuffer, Counter> result = batchResults.get(k);
+ int start = outputWrapper.arrayOffset();
+ ATypeTag functionCalled = nullCalls[k][i];
+ if (functionCalled == ATypeTag.TYPE) {
+ if (result.getSecond().get() > 0) {
+ MessageUnpackerToADM.unpack(result.getFirst(), outputWrapper, true);
+ result.getSecond().set(result.getSecond().get() - 1);
+ } else {
+ //emit NULL for functions which failed with a warning
+ outputWrapper.put(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
+ }
+ } else if (functionCalled == ATypeTag.NULL) {
+ outputWrapper.put(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
+ } else {
+ outputWrapper.put(ATypeTag.SERIALIZED_MISSING_TYPE_TAG);
+ }
+ tupleBuilder.addField(outputWrapper.array(), start, start + outputWrapper.position());
+ } else {
+ tupleBuilder.addField(tAccess, i, projectionList[f]);
+ }
+ }
+ appendToFrameFromTupleBuilder(tupleBuilder);
+ }
+ collectFunctionWarnings(batchResults);
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ private long consumeAndGetBatchLength(ByteBuffer buf) {
+ byte tag = buf.get();
+ if (isFixedArray(tag)) {
+ return tag ^ FIXARRAY_PREFIX;
+ } else if (tag == ARRAY16) {
+ return Short.toUnsignedInt(buf.getShort());
+ } else if (tag == ARRAY32) {
+ return Integer.toUnsignedLong(buf.getInt());
+ }
+ return -1L;
+ }
+
+ @Override
+ public void flush() throws HyracksDataException {
+ appender.flush(writer);
+ }
+ };
}
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/functions/ExternalFunctionCompilerUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/functions/ExternalFunctionCompilerUtil.java
index f900c92..6751171 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/functions/ExternalFunctionCompilerUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/functions/ExternalFunctionCompilerUtil.java
@@ -81,7 +81,7 @@
return new ExternalScalarFunctionInfo(function.getSignature().createFunctionIdentifier(), paramTypes,
returnType, typeComputer, lang, function.getLibraryDataverseName(), function.getLibraryName(),
- function.getExternalIdentifier(), function.getResources(), deterministic);
+ function.getExternalIdentifier(), function.getResources(), deterministic, function.getNullCall());
}
private static IFunctionInfo getUnnestFunctionInfo(MetadataProvider metadataProvider, Function function) {
@@ -182,7 +182,7 @@
case JAVA:
return false;
case PYTHON:
- return false;
+ return true;
default:
throw new CompilationException(ErrorCode.METADATA_ERROR, language.name());
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/functions/ExternalScalarFunctionInfo.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/functions/ExternalScalarFunctionInfo.java
index 854320a..82f74d9 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/functions/ExternalScalarFunctionInfo.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/functions/ExternalScalarFunctionInfo.java
@@ -35,8 +35,9 @@
public ExternalScalarFunctionInfo(FunctionIdentifier fid, List<IAType> parameterTypes, IAType returnType,
IResultTypeComputer rtc, ExternalFunctionLanguage language, DataverseName libraryDataverseName,
- String libraryName, List<String> externalIdentifier, Map<String, String> resources, boolean deterministic) {
+ String libraryName, List<String> externalIdentifier, Map<String, String> resources, boolean deterministic,
+ boolean nullCall) {
super(fid, FunctionKind.SCALAR, parameterTypes, returnType, rtc, language, libraryDataverseName, libraryName,
- externalIdentifier, resources, deterministic);
+ externalIdentifier, resources, deterministic, nullCall);
}
}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/ExternalFunctionInfo.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/ExternalFunctionInfo.java
index 7477330..a3ec9c6 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/ExternalFunctionInfo.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/ExternalFunctionInfo.java
@@ -30,7 +30,7 @@
public class ExternalFunctionInfo extends FunctionInfo implements IExternalFunctionInfo {
- private static final long serialVersionUID = 4L;
+ private static final long serialVersionUID = 5L;
private final FunctionKind kind;
private final List<IAType> parameterTypes;
@@ -40,11 +40,12 @@
private final String libraryName;
private final List<String> externalIdentifier;
private final Map<String, String> resources;
+ private final boolean nullCall;
public ExternalFunctionInfo(FunctionIdentifier fid, FunctionKind kind, List<IAType> parameterTypes,
IAType returnType, IResultTypeComputer rtc, ExternalFunctionLanguage language,
DataverseName libraryDataverseName, String libraryName, List<String> externalIdentifier,
- Map<String, String> resources, boolean deterministic) {
+ Map<String, String> resources, boolean deterministic, boolean nullCall) {
super(fid, rtc, deterministic);
this.kind = kind;
this.parameterTypes = parameterTypes;
@@ -54,6 +55,7 @@
this.libraryName = libraryName;
this.externalIdentifier = externalIdentifier;
this.resources = resources;
+ this.nullCall = nullCall;
}
@Override
@@ -94,4 +96,9 @@
public Map<String, String> getResources() {
return resources;
}
+
+ @Override
+ public boolean getNullCall() {
+ return nullCall;
+ }
}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/IExternalFunctionInfo.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/IExternalFunctionInfo.java
index d87d6df..9eb9875 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/IExternalFunctionInfo.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/IExternalFunctionInfo.java
@@ -47,4 +47,6 @@
List<String> getExternalIdentifier();
Map<String, String> getResources();
+
+ boolean getNullCall();
}