[ASTERIXDB-2176][RT] Improved Python IPC
- use msgpack serialization over tcp loopback
- convert directly from ADM binary format to msg
Change-Id: I5cbbc367944b489aee651ea050e74990dcf65521
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/6883
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Dmitry Lychagin <dmitry.lychagin@couchbase.com>
diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml
index eecdc73..27c0215 100644
--- a/asterixdb/asterix-app/pom.xml
+++ b/asterixdb/asterix-app/pom.xml
@@ -208,7 +208,7 @@
</configuration>
</execution>
<execution>
- <id>shiv-pyro-shim</id>
+ <id>shiv-msgpack-shim</id>
<phase>${pyro-shim.stage}</phase>
<goals>
<goal>exec</goal>
@@ -218,8 +218,8 @@
<workingDirectory>${project.build.directory}</workingDirectory>
<arguments>
<argument>-o </argument>
- <argument>${project.build.directory}${file.separator}classes${file.separator}pyro4.pyz</argument>
- <argument>pyro4</argument>
+ <argument>${project.build.directory}${file.separator}classes${file.separator}msgpack.pyz</argument>
+ <argument>msgpack</argument>
</arguments>
<environmentVariables>
<VIRTUALENV>${project.build.directory}</VIRTUALENV>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 5c15e68..e804d60 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -257,7 +257,7 @@
FileReference appDir =
ioManager.resolveAbsolutePath(getServiceContext().getServerCtx().getAppDir().getAbsolutePath());
libraryManager = new ExternalLibraryManager(ncs, persistedResourceRegistry, appDir);
- libraryManager.initStorage(resetStorageData);
+ libraryManager.initialize(resetStorageData);
/*
* The order of registration is important. The buffer cache must registered before recovery and transaction
diff --git a/asterixdb/asterix-app/src/main/resources/entrypoint.py b/asterixdb/asterix-app/src/main/resources/entrypoint.py
old mode 100644
new mode 100755
index cd3298e..bdb68e2
--- a/asterixdb/asterix-app/src/main/resources/entrypoint.py
+++ b/asterixdb/asterix-app/src/main/resources/entrypoint.py
@@ -15,46 +15,233 @@
# specific language governing permissions and limitations
# under the License.
-import math,sys
-sys.path.insert(0,'./site-packages/')
-import Pyro4
+import sys
+sys.path.insert(0, './site-packages/')
+sys.path.insert(len(sys.path)-1, './ipc/site-packages')
+from struct import *
+import signal
+import msgpack
+import socket
from importlib import import_module
from pathlib import Path
+from enum import IntEnum
+from io import BytesIO
-@Pyro4.expose
+PROTO_VERSION = 1
+HEADER_SZ = 8+8+1
+REAL_HEADER_SZ = 4+8+8+1
+
+
+class MessageType(IntEnum):
+ HELO = 0
+ QUIT = 1
+ INIT = 2
+ INIT_RSP = 3
+ CALL = 4
+ CALL_RSP = 5
+ ERROR = 6
+
+
+class MessageFlags(IntEnum):
+ NORMAL = 0
+ INITIAL_REQ = 1
+ INITIAL_ACK = 2
+ ERROR = 3
+
+
class Wrapper(object):
wrapped_module = None
wrapped_class = None
wrapped_fn = None
+ packer = msgpack.Packer(autoreset=False)
+ unpacker = msgpack.Unpacker()
+ response_buf = BytesIO()
+ stdin_buf = BytesIO()
+ wrapped_fns = {}
+ alive = True
- def __init__(self, module_name, class_name, fn_name):
+ def init(self, module_name, class_name, fn_name):
self.wrapped_module = import_module(module_name)
# do not allow modules to be called that are not part of the uploaded module
+ wrapped_fn = None
if not self.check_module_path(self.wrapped_module):
wrapped_module = None
- return None
+ raise ImportError("Module was not found in library")
if class_name is not None:
- self.wrapped_class = getattr(import_module(module_name),class_name)()
+ self.wrapped_class = getattr(
+ import_module(module_name), class_name)()
if self.wrapped_class is not None:
- self.wrapped_fn = getattr(self.wrapped_class,fn_name)
+ wrapped_fn = getattr(self.wrapped_class, fn_name)
else:
- self.wrapped_fn = locals()[fn_name]
+ wrapped_fn = locals()[fn_name]
+ if wrapped_fn is None:
+ raise ImportError("Could not find class or function in specified module")
+ self.wrapped_fns[self.rmid] = wrapped_fn
- def nextTuple(self, *args):
- return self.wrapped_fn(args)
+ def nextTuple(self, *args, key=None):
+ return self.wrapped_fns[key](*args)
- def ping(self):
- return "pong"
-
- def check_module_path(self,module):
+ def check_module_path(self, module):
cwd = Path('.').resolve()
module_path = Path(module.__file__).resolve()
return cwd in module_path.parents
+ def read_header(self, readbuf):
+ self.sz, self.mid, self.rmid, self.flag = unpack(
+ "!iqqb", readbuf[0:21])
+ return True
-port = int(sys.argv[1])
-wrap = Wrapper(sys.argv[2],sys.argv[3],sys.argv[4])
-d = Pyro4.Daemon(host="127.0.0.1",port=port)
-d.register(wrap,"nextTuple")
-print(Pyro4.config.dump())
-d.requestLoop()
+ def write_header(self, response_buf, dlen):
+ total_len = dlen + HEADER_SZ
+ header = pack("!iqqb", total_len, int(-1), int(self.rmid), self.flag)
+ self.response_buf.write(header)
+ return total_len+4
+
+ def get_ver_hlen(self, hlen):
+ return hlen + (PROTO_VERSION << 4)
+
+ def get_hlen(self):
+ return self.ver_hlen - (PROTO_VERSION << 4)
+
+ def init_remote_ipc(self):
+ self.response_buf.seek(0)
+ self.flag = MessageFlags.INITIAL_REQ
+ dlen = len(self.unpacked_msg[1])
+ resp_len = self.write_header(self.response_buf, dlen)
+ self.response_buf.write(self.unpacked_msg[1])
+ self.resp = self.response_buf.getbuffer()[0:resp_len]
+ self.send_msg()
+ self.packer.reset()
+
+ def helo(self):
+ #need to ack the connection back before sending actual HELO
+ self.init_remote_ipc()
+
+ self.flag = MessageFlags.NORMAL
+ self.response_buf.seek(0)
+ self.packer.pack(int(MessageType.HELO))
+ self.packer.pack("HELO")
+ dlen = 5 #tag(1) + body(4)
+ resp_len = self.write_header(self.response_buf, dlen)
+ self.response_buf.write(self.packer.bytes())
+ self.resp = self.response_buf.getbuffer()[0:resp_len]
+ self.send_msg()
+ self.packer.reset()
+ return True
+
+ def handle_init(self):
+ self.flag = MessageFlags.NORMAL
+ self.response_buf.seek(0)
+ args = self.unpacked_msg[1]
+ module = args[0]
+ if len(args) == 3:
+ clazz = args[1]
+ fn = args[2]
+ else:
+ clazz = None
+ fn = args[1]
+ self.init(module, clazz, fn)
+ self.packer.pack(int(MessageType.INIT_RSP))
+ dlen = 1 # just the tag.
+ resp_len = self.write_header(self.response_buf, dlen)
+ self.response_buf.write(self.packer.bytes())
+ self.resp = self.response_buf.getbuffer()[0:resp_len]
+ self.send_msg()
+ self.packer.reset()
+ return True
+
+ def quit(self):
+ self.alive = False
+ return True
+
+ def handle_call(self):
+ self.flag = MessageFlags.NORMAL
+ args = self.unpacked_msg[1]
+ result = None
+ if args is None:
+ result = self.nextTuple(key=self.rmid)
+ else:
+ result = self.nextTuple(args, key=self.rmid)
+ self.packer.reset()
+ self.response_buf.seek(0)
+ body = msgpack.packb(result)
+ dlen = len(body)+1 # 1 for tag
+ resp_len = self.write_header(self.response_buf, dlen)
+ self.packer.pack(int(MessageType.CALL_RSP))
+ self.response_buf.write(self.packer.bytes())
+ self.response_buf.write(body)
+ self.resp = self.response_buf.getbuffer()[0:resp_len]
+ self.send_msg()
+ self.packer.reset()
+ return True
+
+ def handle_error(self,e):
+ self.flag = MessageFlags.NORMAL
+ result = type(e).__name__ + ": " + str(e)
+ self.packer.reset()
+ self.response_buf.seek(0)
+ body = msgpack.packb(result)
+ dlen = len(body)+1 # 1 for tag
+ resp_len = self.write_header(self.response_buf, dlen)
+ self.packer.pack(int(MessageType.ERROR))
+ self.response_buf.write(self.packer.bytes())
+ self.response_buf.write(body)
+ self.resp = self.response_buf.getbuffer()[0:resp_len]
+ self.send_msg()
+ self.packer.reset()
+ return True
+
+ type_handler = {
+ MessageType.HELO: helo,
+ MessageType.QUIT: quit,
+ MessageType.INIT: handle_init,
+ MessageType.CALL: handle_call
+ }
+
+ def connect_sock(self, addr, port):
+ self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ try:
+ self.sock.connect((addr, int(port)))
+ except socket.error as msg:
+ print(sys.stderr, msg)
+
+ def disconnect_sock(self, *args):
+ self.sock.shutdown(socket.SHUT_RDWR)
+ self.sock.close()
+
+ def recv_msg(self):
+ completed = False
+ while not completed and self.alive:
+ readbuf = sys.stdin.buffer.read1(4096)
+ try:
+ if(len(readbuf) < REAL_HEADER_SZ):
+ while(len(readbuf) < REAL_HEADER_SZ):
+ readbuf += sys.stdin.buffer.read1(4096)
+ self.read_header(readbuf)
+ if(self.sz > len(readbuf)):
+ while(len(readbuf) < self.sz):
+ readbuf += sys.stdin.buffer.read1(4096)
+ self.unpacker.feed(readbuf[21:])
+ self.unpacked_msg = list(self.unpacker)
+ self.type = MessageType(self.unpacked_msg[0])
+ completed = self.type_handler[self.type](self)
+ except BaseException as e:
+ completed = self.handle_error(e)
+
+ def send_msg(self):
+ self.sock.sendall(self.resp)
+ self.resp = None
+ return
+
+ def recv_loop(self):
+ while self.alive:
+ self.recv_msg()
+ self.disconnect_sock()
+
+
+addr = str(sys.argv[1])
+port = str(sys.argv[2])
+wrap = Wrapper()
+wrap.connect_sock(addr, port)
+signal.signal(signal.SIGTERM, wrap.disconnect_sock)
+wrap.recv_loop()
diff --git a/asterixdb/asterix-app/src/test/resources/TweetSent/roundtrip.py b/asterixdb/asterix-app/src/test/resources/TweetSent/roundtrip.py
new file mode 100644
index 0000000..37350be
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/TweetSent/roundtrip.py
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+class Tests(object):
+
+ def roundtrip(self, *args):
+ return args
+
+ def warning(self):
+ raise ArithmeticError("oof")
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.0.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.0.ddl.sqlpp
new file mode 100644
index 0000000..2076054
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.0.ddl.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description : Access a records nested records at each level.
+* Expected Res : Success
+* Date : 04 Jun 2015
+*/
+
+drop dataverse test if exists;
+create dataverse test;
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.1.lib.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.1.lib.sqlpp
new file mode 100644
index 0000000..0f0a05b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.1.lib.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+install test testlib admin admin target/TweetSent.pyz
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp
new file mode 100644
index 0000000..5b8f8bd
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.2.ddl.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+create function warning() language python as "testlib","roundtrip:Tests";
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.3.query.sqlpp
new file mode 100644
index 0000000..1fe7c59
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_function_error/py_function_error.3.query.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description : Access a records nested records at each level.
+* Expected Res : Success
+* Date : 04 Jun 2015
+*/
+// param max-warnings:json=1
+
+use test;
+
+warning();
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.0.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.0.ddl.sqlpp
new file mode 100644
index 0000000..c074534
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.0.ddl.sqlpp
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description : Access a records nested records at each level.
+* Expected Res : Success
+* Date : 04 Jun 2015
+*/
+
+drop dataverse test if exists;
+create dataverse test;
+
+use test;
+
+
+create type test.S as
+{
+ id : bigint
+};
+
+create type test.GS as
+ closed {
+ id : bigint,
+ Genus : string,
+ lower : S
+};
+
+create type test.FGS as
+{
+ id : bigint,
+ Family : string
+};
+
+create type test.OFGS as
+ closed {
+ id : bigint,
+ `Order` : string,
+ lower : FGS
+};
+
+create type test.COFGS as
+ closed {
+ id : bigint,
+ Class : string,
+ lower : OFGS
+};
+
+create type test.PCOFGS as
+ closed {
+ id : bigint,
+ Phylum : string,
+ lower : COFGS
+};
+
+create type test.KPCOFGS as
+{
+ id : bigint,
+ Kingdom : string
+};
+
+create type test.Classification as
+ closed {
+ id : bigint,
+ fullClassification : KPCOFGS
+};
+
+create type test.Animal as
+{
+ id : bigint
+};
+
+create dataset Animals(Animal) primary key id;
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.1.lib.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.1.lib.sqlpp
new file mode 100644
index 0000000..0f0a05b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.1.lib.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+install test testlib admin admin target/TweetSent.pyz
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.10.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.10.query.sqlpp
new file mode 100644
index 0000000..c48dda5
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.10.query.sqlpp
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description : Access a records nested records at each level.
+* Expected Res : Success
+* Date : 04 Jun 2015
+*/
+
+use test;
+
+
+select value [(
+select element result
+from Animals as test
+with result as roundtrip(test)[0][0].class.fullClassification.lower
+order by result.id)][0]
+;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.11.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.11.query.sqlpp
new file mode 100644
index 0000000..30ec1da
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.11.query.sqlpp
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description : Access a records nested records at each level.
+* Expected Res : Success
+* Date : 04 Jun 2015
+*/
+
+use test;
+
+
+select value [(
+select element result
+from Animals as test
+with result as roundtrip(test)[0][0].class.fullClassification
+order by result.id)][0]
+;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.12.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.12.query.sqlpp
new file mode 100644
index 0000000..1f7925f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.12.query.sqlpp
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description : Access a records nested records at each level.
+* Expected Res : Success
+* Date : 04 Jun 2015
+*/
+
+use test;
+
+
+select value [(
+select element result
+from Animals as test
+with result as roundtrip(test)[0][0].class
+order by result.id)][0]
+;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.13.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.13.query.sqlpp
new file mode 100644
index 0000000..13b4c28
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.13.query.sqlpp
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description : Access a records nested records at each level.
+* Expected Res : Success
+* Date : 04 Jun 2015
+*/
+
+use test;
+
+
+select value [(
+select element result
+from Animals as test
+with result as roundtrip(test)[0][0]
+order by result.id)][0]
+;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.2.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.2.ddl.sqlpp
new file mode 100644
index 0000000..655036c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.2.ddl.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+create function roundtrip(s) language python as "testlib","roundtrip:Tests";
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.3.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.3.update.sqlpp
new file mode 100644
index 0000000..3ca7f76
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.3.update.sqlpp
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description : Access a records nested records at each level.
+* Expected Res : Success
+* Date : 04 Jun 2015
+*/
+
+use test;
+
+
+load dataset Animals using localfs ((`path`=`asterix_nc1://data/classifications/animals.adm`),(`format`=`adm`));
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.4.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.4.query.sqlpp
new file mode 100644
index 0000000..1e9a088
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.4.query.sqlpp
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description : Access a records nested records at each level.
+* Expected Res : Success
+* Date : 04 Jun 2015
+*/
+
+use test;
+
+
+select element result
+from Animals as test
+with result as roundtrip(test)[0][0].class.fullClassification.lower.lower.lower.lower.lower.lower.Species
+order by result
+;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.5.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.5.query.sqlpp
new file mode 100644
index 0000000..eedf56b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.5.query.sqlpp
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description : Access a records nested records at each level.
+* Expected Res : Success
+* Date : 04 Jun 2015
+*/
+use test;
+
+select value [(
+select element result
+from Animals as test
+with result as roundtrip(test)[0][0].class.fullClassification.lower.lower.lower.lower.lower.lower
+order by result.id )][0]
+;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.6.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.6.query.sqlpp
new file mode 100644
index 0000000..4912ad6
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.6.query.sqlpp
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description : Access a records nested records at each level.
+* Expected Res : Success
+* Date : 04 Jun 2015
+*/
+
+use test;
+
+
+select value [(
+select element result
+from Animals as test
+with result as roundtrip(test)[0][0].class.fullClassification.lower.lower.lower.lower.lower
+order by result.id)][0]
+;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.7.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.7.query.sqlpp
new file mode 100644
index 0000000..546c174
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.7.query.sqlpp
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description : Access a records nested records at each level.
+* Expected Res : Success
+* Date : 04 Jun 2015
+*/
+
+use test;
+
+
+select value [(
+select element result
+from Animals as test
+with result as roundtrip(test)[0][0].class.fullClassification.lower.lower.lower.lower
+order by result.id)][0]
+;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.8.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.8.query.sqlpp
new file mode 100644
index 0000000..62af850
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.8.query.sqlpp
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description : Access a records nested records at each level.
+* Expected Res : Success
+* Date : 04 Jun 2015
+*/
+
+use test;
+
+
+select value [(
+select element result
+from Animals as test
+with result as roundtrip(test)[0][0].class.fullClassification.lower.lower.lower
+order by result.id)][0]
+;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.9.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.9.query.sqlpp
new file mode 100644
index 0000000..6c594f0
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.9.query.sqlpp
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description : Access a records nested records at each level.
+* Expected Res : Success
+* Date : 04 Jun 2015
+*/
+
+use test;
+
+
+select value [(
+select element result
+from Animals as test
+with result as roundtrip(test)[0][0].class.fullClassification.lower.lower
+order by result.id)][0]
+;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_bigobj_roundtrip/py_bigobj_roundtrip.1.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_bigobj_roundtrip/py_bigobj_roundtrip.1.regexjson
new file mode 120000
index 0000000..7f8c8ee
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_bigobj_roundtrip/py_bigobj_roundtrip.1.regexjson
@@ -0,0 +1 @@
+../../big-object/big_object_insert/big_object_insert.1.adm
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_function_error/py_function_error.1.json b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_function_error/py_function_error.1.json
new file mode 100644
index 0000000..ec747fa
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_function_error/py_function_error.1.json
@@ -0,0 +1 @@
+null
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_nested_access/access-nested-fields.10.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_nested_access/access-nested-fields.10.regexjson
new file mode 100644
index 0000000..2cddb0f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_nested_access/access-nested-fields.10.regexjson
@@ -0,0 +1,4 @@
+[{ "id": 1, "Kingdom": "Animalia", "lower": { "id": 1, "Phylum": "Chordata", "lower": { "id": 1, "Class": "Mammalia", "lower": { "id": 1, "Order": "Carnivora", "lower": { "id": 1, "Family": "Mustelinae", "lower": { "id": 1, "Genus": "Gulo", "lower": { "id": 1, "Species": "Gulo" } } } } } } },
+{ "id": 2, "Kingdom": "Animalia", "lower": { "id": 2, "Phylum": "Chordata", "lower": { "id": 2, "Class": "Mammalia", "lower": { "id": 2, "Order": "Artiodactyla", "lower": { "id": 2, "Family": "Giraffidae", "lower": { "id": 2, "Genus": "Okapia", "lower": { "id": 2, "Species": "Johnstoni" } } } } } } },
+{ "id": 3, "Kingdom": "Animalia", "lower": { "id": 3, "Phylum": "Chordata", "lower": { "id": 3, "Class": "Mammalia", "lower": { "id": 3, "Order": "Atlantogenata", "lower": { "id": 3, "Family": "Afrotheria", "lower": { "id": 3, "Genus": "Paenungulata", "lower": { "id": 3, "Species": "Hyracoidea" } } } } } } },
+{ "id": 4, "Kingdom": "Animalia", "lower": { "id": 4, "Phylum": "Chordata", "lower": { "id": 4, "Class": "Aves", "lower": { "id": 4, "Order": "Accipitriformes", "lower": { "id": 4, "Family": "Accipitridae", "lower": { "id": 4, "Genus": "Buteo", "lower": { "id": 4, "Species": "Jamaicensis" } } } } } } }]
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_nested_access/access-nested-fields.11.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_nested_access/access-nested-fields.11.regexjson
new file mode 100644
index 0000000..a7ce00a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_nested_access/access-nested-fields.11.regexjson
@@ -0,0 +1,4 @@
+[{ "id": 1, "fullClassification": { "id": 1, "Kingdom": "Animalia", "lower": { "id": 1, "Phylum": "Chordata", "lower": { "id": 1, "Class": "Mammalia", "lower": { "id": 1, "Order": "Carnivora", "lower": { "id": 1, "Family": "Mustelinae", "lower": { "id": 1, "Genus": "Gulo", "lower": { "id": 1, "Species": "Gulo" } } } } } } } },
+{ "id": 2, "fullClassification": { "id": 2, "Kingdom": "Animalia", "lower": { "id": 2, "Phylum": "Chordata", "lower": { "id": 2, "Class": "Mammalia", "lower": { "id": 2, "Order": "Artiodactyla", "lower": { "id": 2, "Family": "Giraffidae", "lower": { "id": 2, "Genus": "Okapia", "lower": { "id": 2, "Species": "Johnstoni" } } } } } } } },
+{ "id": 3, "fullClassification": { "id": 3, "Kingdom": "Animalia", "lower": { "id": 3, "Phylum": "Chordata", "lower": { "id": 3, "Class": "Mammalia", "lower": { "id": 3, "Order": "Atlantogenata", "lower": { "id": 3, "Family": "Afrotheria", "lower": { "id": 3, "Genus": "Paenungulata", "lower": { "id": 3, "Species": "Hyracoidea" } } } } } } } },
+{ "id": 4, "fullClassification": { "id": 4, "Kingdom": "Animalia", "lower": { "id": 4, "Phylum": "Chordata", "lower": { "id": 4, "Class": "Aves", "lower": { "id": 4, "Order": "Accipitriformes", "lower": { "id": 4, "Family": "Accipitridae", "lower": { "id": 4, "Genus": "Buteo", "lower": { "id": 4, "Species": "Jamaicensis" } } } } } } } }]
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_nested_access/access-nested-fields.12.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_nested_access/access-nested-fields.12.regexjson
new file mode 100644
index 0000000..d09c537
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_nested_access/access-nested-fields.12.regexjson
@@ -0,0 +1,4 @@
+[{ "id": 1, "class": { "id": 1, "fullClassification": { "id": 1, "Kingdom": "Animalia", "lower": { "id": 1, "Phylum": "Chordata", "lower": { "id": 1, "Class": "Mammalia", "lower": { "id": 1, "Order": "Carnivora", "lower": { "id": 1, "Family": "Mustelinae", "lower": { "id": 1, "Genus": "Gulo", "lower": { "id": 1, "Species": "Gulo" } } } } } } } } },
+{ "id": 2, "class": { "id": 2, "fullClassification": { "id": 2, "Kingdom": "Animalia", "lower": { "id": 2, "Phylum": "Chordata", "lower": { "id": 2, "Class": "Mammalia", "lower": { "id": 2, "Order": "Artiodactyla", "lower": { "id": 2, "Family": "Giraffidae", "lower": { "id": 2, "Genus": "Okapia", "lower": { "id": 2, "Species": "Johnstoni" } } } } } } } } },
+{ "id": 3, "class": { "id": 3, "fullClassification": { "id": 3, "Kingdom": "Animalia", "lower": { "id": 3, "Phylum": "Chordata", "lower": { "id": 3, "Class": "Mammalia", "lower": { "id": 3, "Order": "Atlantogenata", "lower": { "id": 3, "Family": "Afrotheria", "lower": { "id": 3, "Genus": "Paenungulata", "lower": { "id": 3, "Species": "Hyracoidea" } } } } } } } } },
+{ "id": 4, "class": { "id": 4, "fullClassification": { "id": 4, "Kingdom": "Animalia", "lower": { "id": 4, "Phylum": "Chordata", "lower": { "id": 4, "Class": "Aves", "lower": { "id": 4, "Order": "Accipitriformes", "lower": { "id": 4, "Family": "Accipitridae", "lower": { "id": 4, "Genus": "Buteo", "lower": { "id": 4, "Species": "Jamaicensis" } } } } } } } } }]
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_nested_access/access-nested-fields.3.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_nested_access/access-nested-fields.3.adm
new file mode 100644
index 0000000..e34004a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_nested_access/access-nested-fields.3.adm
@@ -0,0 +1,4 @@
+"Gulo"
+"Hyracoidea"
+"Jamaicensis"
+"Johnstoni"
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_nested_access/access-nested-fields.4.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_nested_access/access-nested-fields.4.regexjson
new file mode 100644
index 0000000..5d8ddf4
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_nested_access/access-nested-fields.4.regexjson
@@ -0,0 +1,4 @@
+[{ "id": 1, "Species": "Gulo" },
+{ "id": 2, "Species": "Johnstoni" },
+{ "id": 3, "Species": "Hyracoidea" },
+{ "id": 4, "Species": "Jamaicensis" }]
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_nested_access/access-nested-fields.5.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_nested_access/access-nested-fields.5.regexjson
new file mode 100644
index 0000000..f10de1f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_nested_access/access-nested-fields.5.regexjson
@@ -0,0 +1,4 @@
+[{ "id": 1, "Genus": "Gulo", "lower": { "id": 1, "Species": "Gulo" } },
+{ "id": 2, "Genus": "Okapia", "lower": { "id": 2, "Species": "Johnstoni" } },
+{ "id": 3, "Genus": "Paenungulata", "lower": { "id": 3, "Species": "Hyracoidea" } },
+{ "id": 4, "Genus": "Buteo", "lower": { "id": 4, "Species": "Jamaicensis" } }]
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_nested_access/access-nested-fields.6.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_nested_access/access-nested-fields.6.regexjson
new file mode 100644
index 0000000..4e6d8ea
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_nested_access/access-nested-fields.6.regexjson
@@ -0,0 +1,4 @@
+[{ "id": 1, "Family": "Mustelinae", "lower": { "id": 1, "Genus": "Gulo", "lower": { "id": 1, "Species": "Gulo" } } },
+{ "id": 2, "Family": "Giraffidae", "lower": { "id": 2, "Genus": "Okapia", "lower": { "id": 2, "Species": "Johnstoni" } } },
+{ "id": 3, "Family": "Afrotheria", "lower": { "id": 3, "Genus": "Paenungulata", "lower": { "id": 3, "Species": "Hyracoidea" } } },
+{ "id": 4, "Family": "Accipitridae", "lower": { "id": 4, "Genus": "Buteo", "lower": { "id": 4, "Species": "Jamaicensis" } } }]
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_nested_access/access-nested-fields.7.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_nested_access/access-nested-fields.7.regexjson
new file mode 100644
index 0000000..60d21a9
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_nested_access/access-nested-fields.7.regexjson
@@ -0,0 +1,4 @@
+[{ "id": 1, "Order": "Carnivora", "lower": { "id": 1, "Family": "Mustelinae", "lower": { "id": 1, "Genus": "Gulo", "lower": { "id": 1, "Species": "Gulo" } } } },
+{ "id": 2, "Order": "Artiodactyla", "lower": { "id": 2, "Family": "Giraffidae", "lower": { "id": 2, "Genus": "Okapia", "lower": { "id": 2, "Species": "Johnstoni" } } } },
+{ "id": 3, "Order": "Atlantogenata", "lower": { "id": 3, "Family": "Afrotheria", "lower": { "id": 3, "Genus": "Paenungulata", "lower": { "id": 3, "Species": "Hyracoidea" } } } },
+{ "id": 4, "Order": "Accipitriformes", "lower": { "id": 4, "Family": "Accipitridae", "lower": { "id": 4, "Genus": "Buteo", "lower": { "id": 4, "Species": "Jamaicensis" } } } }]
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_nested_access/access-nested-fields.8.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_nested_access/access-nested-fields.8.regexjson
new file mode 100644
index 0000000..183e94c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_nested_access/access-nested-fields.8.regexjson
@@ -0,0 +1,4 @@
+[{ "id": 1, "Class": "Mammalia", "lower": { "id": 1, "Order": "Carnivora", "lower": { "id": 1, "Family": "Mustelinae", "lower": { "id": 1, "Genus": "Gulo", "lower": { "id": 1, "Species": "Gulo" } } } } },
+{ "id": 2, "Class": "Mammalia", "lower": { "id": 2, "Order": "Artiodactyla", "lower": { "id": 2, "Family": "Giraffidae", "lower": { "id": 2, "Genus": "Okapia", "lower": { "id": 2, "Species": "Johnstoni" } } } } },
+{ "id": 3, "Class": "Mammalia", "lower": { "id": 3, "Order": "Atlantogenata", "lower": { "id": 3, "Family": "Afrotheria", "lower": { "id": 3, "Genus": "Paenungulata", "lower": { "id": 3, "Species": "Hyracoidea" } } } } },
+{ "id": 4, "Class": "Aves", "lower": { "id": 4, "Order": "Accipitriformes", "lower": { "id": 4, "Family": "Accipitridae", "lower": { "id": 4, "Genus": "Buteo", "lower": { "id": 4, "Species": "Jamaicensis" } } } } }]
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_nested_access/access-nested-fields.9.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_nested_access/access-nested-fields.9.regexjson
new file mode 100644
index 0000000..347b5b6
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/py_nested_access/access-nested-fields.9.regexjson
@@ -0,0 +1,4 @@
+[{ "id": 1, "Phylum": "Chordata", "lower": { "id": 1, "Class": "Mammalia", "lower": { "id": 1, "Order": "Carnivora", "lower": { "id": 1, "Family": "Mustelinae", "lower": { "id": 1, "Genus": "Gulo", "lower": { "id": 1, "Species": "Gulo" } } } } } },
+{ "id": 2, "Phylum": "Chordata", "lower": { "id": 2, "Class": "Mammalia", "lower": { "id": 2, "Order": "Artiodactyla", "lower": { "id": 2, "Family": "Giraffidae", "lower": { "id": 2, "Genus": "Okapia", "lower": { "id": 2, "Species": "Johnstoni" } } } } } },
+{ "id": 3, "Phylum": "Chordata", "lower": { "id": 3, "Class": "Mammalia", "lower": { "id": 3, "Order": "Atlantogenata", "lower": { "id": 3, "Family": "Afrotheria", "lower": { "id": 3, "Genus": "Paenungulata", "lower": { "id": 3, "Species": "Hyracoidea" } } } } } },
+{ "id": 4, "Phylum": "Chordata", "lower": { "id": 4, "Class": "Aves", "lower": { "id": 4, "Order": "Accipitriformes", "lower": { "id": 4, "Family": "Accipitridae", "lower": { "id": 4, "Genus": "Buteo", "lower": { "id": 4, "Species": "Jamaicensis" } } } } } }]
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_it_python.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_it_python.xml
index 6d03162..e4669da 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_it_python.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_it_python.xml
@@ -22,7 +22,6 @@
ResultOffsetPath="results"
QueryOffsetPath="queries_sqlpp"
QueryFileExtension=".sqlpp">
-
<test-group name="external-library-python">
<test-case FilePath="external-library">
<compilation-unit name="mysentiment">
@@ -32,7 +31,18 @@
<test-case FilePath="external-library">
<compilation-unit name="python-fn-escape">
<output-dir compare="Text">python-fn-escape</output-dir>
- <expected-error>'NoneType' object is not callable</expected-error>
+ <expected-error>ImportError: Module was not found in library</expected-error>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-library">
+ <compilation-unit name="py_nested_access">
+ <output-dir compare="Clean-JSON">py_nested_access</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-library" check-warnings="true">
+ <compilation-unit name="py_function_error">
+ <output-dir compare="Clean-JSON">py_function_error</output-dir>
+ <expected-warn>ASX0201: External UDF returned exception. Returned exception was: ArithmeticError: oof</expected-warn>
</compilation-unit>
</test-case>
</test-group>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 73dfbd4..87a11cc 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -88,6 +88,7 @@
public static final int UNSUPPORTED_JRE = 100;
public static final int EXTERNAL_UDF_RESULT_TYPE_ERROR = 200;
+ public static final int EXTERNAL_UDF_EXCEPTION = 201;
// Compilation errors
public static final int PARSE_ERROR = 1001;
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/ILibraryManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/ILibraryManager.java
index 83f1d71..047018b 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/ILibraryManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/library/ILibraryManager.java
@@ -20,8 +20,10 @@
package org.apache.asterix.common.library;
import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.external.ipc.ExternalFunctionResultRouter;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.ipc.impl.IPCSystem;
public interface ILibraryManager {
@@ -36,4 +38,8 @@
void dropLibraryPath(FileReference fileRef) throws HyracksDataException;
byte[] serializeLibraryDescriptor(LibraryDescriptor libraryDescriptor) throws HyracksDataException;
+
+ ExternalFunctionResultRouter getRouter();
+
+ IPCSystem getIPCI();
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/external/ipc/ExternalFunctionResultRouter.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/external/ipc/ExternalFunctionResultRouter.java
new file mode 100644
index 0000000..8d56eb7
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/external/ipc/ExternalFunctionResultRouter.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.ipc;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.ipc.api.IIPCHandle;
+import org.apache.hyracks.ipc.api.IIPCI;
+import org.apache.hyracks.ipc.api.IPayloadSerializerDeserializer;
+import org.apache.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
+import org.apache.hyracks.ipc.impl.Message;
+
+public class ExternalFunctionResultRouter implements IIPCI {
+
+ AtomicLong maxId = new AtomicLong(0);
+ ConcurrentHashMap<Long, MutableObject<ByteBuffer>> activeClients = new ConcurrentHashMap<>();
+ ConcurrentHashMap<Long, Exception> exceptionInbox = new ConcurrentHashMap<>();
+ private static int MAX_BUF_SIZE = 32 * 1024 * 1024; //32MB
+
+ @Override
+ public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload) {
+ int rewind = handle.getAttachmentLen();
+ ByteBuffer buf = (ByteBuffer) payload;
+ int end = buf.position();
+ buf.position(end - rewind);
+ ByteBuffer copyTo = activeClients.get(rmid).getValue();
+ if (copyTo.capacity() < handle.getAttachmentLen()) {
+ int nextSize = closestPow2(handle.getAttachmentLen());
+ if (nextSize > MAX_BUF_SIZE) {
+ onError(handle, mid, rmid, HyracksException.create(ErrorCode.RECORD_IS_TOO_LARGE));
+ return;
+ }
+ copyTo = ByteBuffer.allocate(nextSize);
+ activeClients.get(rmid).setValue(copyTo);
+ }
+ copyTo.position(0);
+ System.arraycopy(buf.array(), buf.position() + buf.arrayOffset(), copyTo.array(), copyTo.arrayOffset(),
+ handle.getAttachmentLen());
+ synchronized (copyTo) {
+ copyTo.limit(handle.getAttachmentLen() + 1);
+ copyTo.notify();
+ }
+ buf.position(end);
+ }
+
+ @Override
+ public void onError(IIPCHandle handle, long mid, long rmid, Exception exception) {
+ exceptionInbox.put(rmid, exception);
+ ByteBuffer route = activeClients.get(rmid).getValue();
+ synchronized (route) {
+ route.notify();
+ }
+ }
+
+ public Long insertRoute(ByteBuffer buf) {
+ Long id = maxId.incrementAndGet();
+ activeClients.put(id, new MutableObject<>(buf));
+ return id;
+ }
+
+ public Exception getException(Long id) {
+ return exceptionInbox.remove(id);
+ }
+
+ public boolean hasException(long id) {
+ return exceptionInbox.get(id) == null;
+ }
+
+ public void removeRoute(Long id) {
+ activeClients.remove(id);
+ exceptionInbox.remove(id);
+ }
+
+ public static int closestPow2(int n) {
+ return (int) Math.pow(2, Math.ceil(Math.log(n) / Math.log(2)));
+ }
+
+ public static class NoOpNoSerJustDe implements IPayloadSerializerDeserializer {
+
+ private static byte[] noop = new byte[] { (byte) 0 };
+
+ @Override
+ public Object deserializeObject(ByteBuffer buffer, int length, byte flag) throws Exception {
+ if (flag == Message.INITIAL_REQ) {
+ return new JavaSerializationBasedPayloadSerializerDeserializer().deserializeObject(buffer, length,
+ flag);
+ }
+ return buffer;
+ }
+
+ @Override
+ public Exception deserializeException(ByteBuffer buffer, int length) throws Exception {
+ return null;
+ }
+
+ @Override
+ public byte[] serializeObject(Object object) throws Exception {
+ return noop;
+ }
+
+ @Override
+ public byte[] serializeException(Exception object) throws Exception {
+ return noop;
+ }
+ }
+}
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index a7d49d9..438d719 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -90,6 +90,7 @@
100 = Unsupported JRE: %1$s
200 = External UDF cannot produce expected result. Please check the UDF configuration
+201 = External UDF returned exception. Returned exception was: %1$s
# Compile-time check errors
1001 = Syntax error: %1$s
diff --git a/asterixdb/asterix-external-data/pom.xml b/asterixdb/asterix-external-data/pom.xml
index 7760101..3847576 100644
--- a/asterixdb/asterix-external-data/pom.xml
+++ b/asterixdb/asterix-external-data/pom.xml
@@ -445,14 +445,6 @@
<artifactId>guava</artifactId>
</dependency>
<dependency>
- <groupId>net.razorvine</groupId>
- <artifactId>pyrolite</artifactId>
- </dependency>
- <dependency>
- <groupId>net.razorvine</groupId>
- <artifactId>serpent</artifactId>
- </dependency>
- <dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>http-client-spi</artifactId>
</dependency>
@@ -472,5 +464,9 @@
<groupId>software.amazon.awssdk</groupId>
<artifactId>auth</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.msgpack</groupId>
+ <artifactId>msgpack-core</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/MessageType.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/MessageType.java
new file mode 100644
index 0000000..506d9d3
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/MessageType.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.external.ipc;
+
+public enum MessageType {
+ HELO,
+ QUIT,
+ INIT,
+ INIT_RSP,
+ CALL,
+ CALL_RSP,
+ ERROR;
+
+ static MessageType[] messageTypes;
+ static {
+ messageTypes = values();
+ }
+
+ public static MessageType fromByte(byte b) {
+ if (b > messageTypes.length - 1) {
+ return null;
+ }
+ return messageTypes[b];
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonIPCProto.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonIPCProto.java
new file mode 100644
index 0000000..feb52cf
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonIPCProto.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.ipc;
+
+import static org.apache.hyracks.ipc.impl.Message.HEADER_SIZE;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.hyracks.ipc.impl.IPCSystem;
+import org.apache.hyracks.ipc.impl.Message;
+import org.msgpack.core.MessagePack;
+
+public class PythonIPCProto {
+
+ public PythonMessageBuilder messageBuilder;
+ OutputStream sockOut;
+ ByteBuffer headerBuffer = ByteBuffer.allocate(21);
+ ByteBuffer recvBuffer = ByteBuffer.allocate(4096);
+ ExternalFunctionResultRouter router;
+ IPCSystem ipcSys;
+ Message outMsg;
+ Long key;
+
+ public PythonIPCProto(OutputStream sockOut, ExternalFunctionResultRouter router, IPCSystem ipcSys)
+ throws IOException {
+ this.sockOut = sockOut;
+ messageBuilder = new PythonMessageBuilder();
+ this.router = router;
+ this.ipcSys = ipcSys;
+ this.outMsg = new Message(null);
+ }
+
+ public void start() {
+ this.key = router.insertRoute(recvBuffer);
+ }
+
+ public void helo() throws IOException, AsterixException {
+ recvBuffer.clear();
+ recvBuffer.position(0);
+ recvBuffer.limit(0);
+ messageBuilder.buf.clear();
+ messageBuilder.buf.position(0);
+ messageBuilder.hello();
+ sendMsg();
+ receiveMsg();
+ if (getResponseType() != MessageType.HELO) {
+ throw new IllegalStateException("Illegal reply received, expected HELO");
+ }
+ }
+
+ public void init(String module, String clazz, String fn) throws IOException, AsterixException {
+ recvBuffer.clear();
+ recvBuffer.position(0);
+ recvBuffer.limit(0);
+ messageBuilder.buf.clear();
+ messageBuilder.buf.position(0);
+ messageBuilder.init(module, clazz, fn);
+ sendMsg();
+ receiveMsg();
+ if (getResponseType() != MessageType.INIT_RSP) {
+ throw new IllegalStateException("Illegal reply received, expected INIT_RSP");
+ }
+ }
+
+ public ByteBuffer call(ByteBuffer args, int numArgs) throws Exception {
+ recvBuffer.clear();
+ recvBuffer.position(0);
+ recvBuffer.limit(0);
+ messageBuilder.buf.clear();
+ messageBuilder.buf.position(0);
+ messageBuilder.call(args.array(), args.position(), numArgs);
+ sendMsg();
+ receiveMsg();
+ if (getResponseType() != MessageType.CALL_RSP) {
+ throw new IllegalStateException("Illegal reply received, expected CALL_RSP, recvd: " + getResponseType());
+ }
+ return recvBuffer;
+ }
+
+ public void quit() throws IOException {
+ messageBuilder.quit();
+ router.removeRoute(key);
+ }
+
+ public void receiveMsg() throws IOException, AsterixException {
+ Exception except = null;
+ try {
+ synchronized (recvBuffer) {
+ while (recvBuffer.limit() == 0) {
+ recvBuffer.wait(100);
+ }
+ }
+ if (router.hasException(key)) {
+ except = router.getException(key);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new AsterixException(ErrorCode.EXTERNAL_UDF_EXCEPTION, e);
+ }
+ if (except != null) {
+ throw new AsterixException(ErrorCode.EXTERNAL_UDF_EXCEPTION, except);
+ }
+ messageBuilder.readHead(recvBuffer);
+ if (messageBuilder.type == MessageType.ERROR) {
+ throw new AsterixException(ErrorCode.EXTERNAL_UDF_EXCEPTION,
+ MessagePack.newDefaultUnpacker(recvBuffer).unpackString());
+ }
+ }
+
+ public void sendMsg() throws IOException {
+ headerBuffer.clear();
+ headerBuffer.position(0);
+ headerBuffer.putInt(HEADER_SIZE + messageBuilder.buf.position());
+ headerBuffer.putLong(-1);
+ headerBuffer.putLong(key);
+ headerBuffer.put(Message.NORMAL);
+ sockOut.write(headerBuffer.array(), 0, HEADER_SIZE + Integer.BYTES);
+ sockOut.write(messageBuilder.buf.array(), 0, messageBuilder.buf.position());
+ sockOut.flush();
+ }
+
+ public MessageType getResponseType() {
+ return messageBuilder.type;
+ }
+
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonMessageBuilder.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonMessageBuilder.java
new file mode 100644
index 0000000..506e80d
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonMessageBuilder.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.ipc;
+
+import static org.apache.hyracks.api.util.JavaSerializationUtils.getSerializationProvider;
+import static org.msgpack.core.MessagePack.Code.*;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.apache.asterix.external.library.msgpack.MessagePackerFromADM;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class PythonMessageBuilder {
+ private static final int MAX_BUF_SIZE = 21 * 1024 * 1024; //21MB.
+ private static final Logger LOGGER = LogManager.getLogger();
+ MessageType type;
+ long dataLength;
+ ByteBuffer buf;
+ String[] initAry = new String[3];
+
+ public PythonMessageBuilder() {
+ this.type = null;
+ dataLength = -1;
+ this.buf = ByteBuffer.allocate(4096);
+ }
+
+ public void setType(MessageType type) {
+ this.type = type;
+ }
+
+ public void packHeader() {
+ MessagePackerFromADM.packFixPos(buf, (byte) type.ordinal());
+ }
+
+ //TODO: this is wrong for any multibyte chars
+ private int getStringLength(String s) {
+ return s.length();
+ }
+
+ public void readHead(ByteBuffer buf) {
+ byte typ = buf.get();
+ type = MessageType.fromByte(typ);
+ }
+
+ public void hello() throws IOException {
+ this.type = MessageType.HELO;
+ byte[] serAddr = serialize(new InetSocketAddress(InetAddress.getLoopbackAddress(), 1));
+ dataLength = serAddr.length + 5;
+ packHeader();
+ //TODO:make this cleaner
+ buf.put(BIN32);
+ buf.putInt(serAddr.length);
+ buf.put(serAddr);
+ }
+
+ public void quit() {
+ this.type = MessageType.QUIT;
+ dataLength = getStringLength("QUIT");
+ packHeader();
+ MessagePackerFromADM.packFixStr(buf, "QUIT");
+ }
+
+ public void init(String module, String clazz, String fn) {
+ this.type = MessageType.INIT;
+ initAry[0] = module;
+ initAry[1] = clazz;
+ initAry[2] = fn;
+ dataLength = Arrays.stream(initAry).mapToInt(s -> getStringLength(s)).sum() + 2;
+ packHeader();
+ MessagePackerFromADM.packFixArrayHeader(buf, (byte) initAry.length);
+ for (String s : initAry) {
+ MessagePackerFromADM.packStr(buf, s);
+ }
+ }
+
+ public void call(byte[] args, int lim, int numArgs) {
+ if (args.length > buf.capacity()) {
+ int growTo = ExternalFunctionResultRouter.closestPow2(args.length);
+ if (growTo > MAX_BUF_SIZE) {
+ //TODO: something more graceful
+ throw new IllegalArgumentException("Reached maximum buffer size");
+ }
+ buf = ByteBuffer.allocate(growTo);
+ }
+ buf.clear();
+ buf.position(0);
+ this.type = MessageType.CALL;
+ dataLength = 5 + 1 + lim;
+ packHeader();
+ //TODO: make this switch between fixarray/array16/array32
+ if (numArgs == 0) {
+ buf.put(NIL);
+ } else {
+ buf.put(ARRAY32);
+ buf.putInt(numArgs);
+ buf.put(args, 0, lim);
+ }
+ }
+
+ //this is used to send a serialized java inetaddress to the entrypoint so it can send it back
+ //to the IPC subsystem, which needs it. don't use this for anything else.
+ private byte[] serialize(Object object) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try (ObjectOutputStream oos = getSerializationProvider().newObjectOutputStream(baos)) {
+ oos.writeObject(object);
+ oos.flush();
+ baos.close();
+ }
+ return baos.toByteArray();
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java
index b0a4dfdc..d7a446b 100755
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalLibraryManager.java
@@ -23,6 +23,8 @@
import java.io.IOException;
import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
@@ -34,6 +36,7 @@
import org.apache.asterix.common.library.ILibraryManager;
import org.apache.asterix.common.library.LibraryDescriptor;
import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.external.ipc.ExternalFunctionResultRouter;
import org.apache.commons.io.FileUtils;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -43,6 +46,8 @@
import org.apache.hyracks.api.util.IoUtil;
import org.apache.hyracks.control.common.work.AbstractWork;
import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.ipc.impl.IPCSystem;
+import org.apache.hyracks.ipc.sockets.PlainSocketChannelFactory;
import org.apache.hyracks.util.file.FileUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -81,6 +86,8 @@
private final FileReference trashDir;
private final Path trashDirPath;
private final Map<Pair<DataverseName, String>, ILibrary> libraries = new HashMap<>();
+ private IPCSystem pythonIPC;
+ private final ExternalFunctionResultRouter router;
public ExternalLibraryManager(NodeControllerService ncs, IPersistedResourceRegistry reg, FileReference appDir) {
this.ncs = ncs;
@@ -91,10 +98,14 @@
trashDir = baseDir.getChild(TRASH_DIR_NAME);
trashDirPath = trashDir.getFile().toPath().normalize();
objectMapper = createObjectMapper();
+ router = new ExternalFunctionResultRouter();
}
- public void initStorage(boolean resetStorageData) throws HyracksDataException {
+ public void initialize(boolean resetStorageData) throws HyracksDataException {
try {
+ pythonIPC = new IPCSystem(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0),
+ PlainSocketChannelFactory.INSTANCE, router, new ExternalFunctionResultRouter.NoOpNoSerJustDe());
+ pythonIPC.start();
Path baseDirPath = baseDir.getFile().toPath();
if (Files.isDirectory(baseDirPath)) {
if (resetStorageData) {
@@ -291,6 +302,16 @@
return om;
}
+ @Override
+ public ExternalFunctionResultRouter getRouter() {
+ return router;
+ }
+
+ @Override
+ public IPCSystem getIPCI() {
+ return pythonIPC;
+ }
+
private static final class DeleteDirectoryWork extends AbstractWork {
private final Path path;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarFunctionDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarFunctionDescriptor.java
index c033cea..61ab3ea 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarFunctionDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarFunctionDescriptor.java
@@ -47,7 +47,7 @@
@Override
public IScalarEvaluatorFactory createEvaluatorFactory(IScalarEvaluatorFactory[] args) {
- return new ExternalScalarFunctionEvaluatorFactory(finfo, args, argTypes);
+ return new ExternalScalarFunctionEvaluatorFactory(finfo, args, argTypes, sourceLoc);
}
@Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarFunctionEvaluator.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarFunctionEvaluator.java
index 3ff706d..5ae1995 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarFunctionEvaluator.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarFunctionEvaluator.java
@@ -21,12 +21,14 @@
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.library.ILibraryManager;
+import org.apache.asterix.external.ipc.ExternalFunctionResultRouter;
import org.apache.asterix.om.functions.IExternalFunctionInfo;
import org.apache.asterix.om.types.IAType;
import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.ipc.impl.IPCSystem;
public abstract class ExternalScalarFunctionEvaluator implements IScalarEvaluator {
@@ -34,6 +36,8 @@
protected final IScalarEvaluator[] argEvals;
protected final IAType[] argTypes;
protected final ILibraryManager libraryManager;
+ protected final ExternalFunctionResultRouter router;
+ protected final IPCSystem ipcSys;
public ExternalScalarFunctionEvaluator(IExternalFunctionInfo finfo, IScalarEvaluatorFactory[] args,
IAType[] argTypes, IEvaluatorContext context) throws HyracksDataException {
@@ -45,5 +49,7 @@
}
libraryManager =
((INcApplicationContext) context.getServiceContext().getApplicationContext()).getLibraryManager();
+ router = libraryManager.getRouter();
+ ipcSys = libraryManager.getIPCI();
}
-}
\ No newline at end of file
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarFunctionEvaluatorFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarFunctionEvaluatorFactory.java
index ec757a1..de75f7a 100755
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarFunctionEvaluatorFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarFunctionEvaluatorFactory.java
@@ -25,6 +25,7 @@
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
public class ExternalScalarFunctionEvaluatorFactory implements IScalarEvaluatorFactory {
@@ -32,12 +33,14 @@
private final IExternalFunctionInfo finfo;
private final IScalarEvaluatorFactory[] args;
private final IAType[] argTypes;
+ private final SourceLocation sourceLoc;
public ExternalScalarFunctionEvaluatorFactory(IExternalFunctionInfo finfo, IScalarEvaluatorFactory[] args,
- IAType[] argTypes) {
+ IAType[] argTypes, SourceLocation sourceLoc) {
this.finfo = finfo;
this.args = args;
this.argTypes = argTypes;
+ this.sourceLoc = sourceLoc;
}
@Override
@@ -46,7 +49,7 @@
case JAVA:
return new ExternalScalarJavaFunctionEvaluator(finfo, args, argTypes, ctx);
case PYTHON:
- return new ExternalScalarPythonFunctionEvaluator(finfo, args, argTypes, ctx);
+ return new ExternalScalarPythonFunctionEvaluator(finfo, args, argTypes, ctx, sourceLoc);
default:
throw new HyracksDataException(ErrorCode.ASTERIX, ErrorCode.LIBRARY_EXTERNAL_FUNCTION_UNSUPPORTED_KIND,
finfo.getLanguage());
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarPythonFunctionEvaluator.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarPythonFunctionEvaluator.java
index e49c97e..31f96cf 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarPythonFunctionEvaluator.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarPythonFunctionEvaluator.java
@@ -22,36 +22,34 @@
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
-import java.net.ConnectException;
-import java.net.ServerSocket;
-import java.util.HashMap;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
import java.util.List;
-import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.WarningUtil;
import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.common.library.ILibraryManager;
import org.apache.asterix.common.metadata.DataverseName;
-import org.apache.asterix.external.api.IJObject;
-import org.apache.asterix.external.library.java.JObjectPointableVisitor;
-import org.apache.asterix.external.library.java.base.JComplexObject;
-import org.apache.asterix.external.library.java.base.JObject;
+import org.apache.asterix.external.ipc.ExternalFunctionResultRouter;
+import org.apache.asterix.external.ipc.PythonIPCProto;
+import org.apache.asterix.external.library.msgpack.MessagePackerFromADM;
+import org.apache.asterix.external.library.msgpack.MessageUnpackerToADM;
import org.apache.asterix.om.functions.IExternalFunctionInfo;
-import org.apache.asterix.om.pointables.AFlatValuePointable;
-import org.apache.asterix.om.pointables.AListVisitablePointable;
-import org.apache.asterix.om.pointables.ARecordVisitablePointable;
-import org.apache.asterix.om.pointables.PointableAllocator;
-import org.apache.asterix.om.pointables.base.IVisitablePointable;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.types.EnumDeserializer;
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.om.types.TypeTagUtil;
-import org.apache.asterix.om.util.container.IObjectPool;
-import org.apache.asterix.om.util.container.ListObjectPool;
import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.TaskAttemptId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.SourceLocation;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.resources.IDeallocatable;
import org.apache.hyracks.control.common.controllers.NCConfig;
@@ -62,49 +60,45 @@
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
import org.apache.hyracks.dataflow.std.base.AbstractStateObject;
-
-import net.razorvine.pyro.PyroProxy;
+import org.apache.hyracks.ipc.impl.IPCSystem;
class ExternalScalarPythonFunctionEvaluator extends ExternalScalarFunctionEvaluator {
private final PythonLibraryEvaluator libraryEvaluator;
private final ArrayBackedValueStorage resultBuffer = new ArrayBackedValueStorage();
- private final PointableAllocator pointableAllocator;
- private final JObjectPointableVisitor pointableVisitor;
- private final Object[] argHolder;
- private final IObjectPool<IJObject, IAType> reflectingPool = new ListObjectPool<>(JTypeObjectFactory.INSTANCE);
- private final Map<IAType, TypeInfo> infoPool = new HashMap<>();
+ private final ByteBuffer argHolder;
+ private final ByteBuffer outputWrapper;
+ private final IEvaluatorContext evaluatorContext;
private static final String ENTRYPOINT = "entrypoint.py";
- private static final String PY_NO_SITE_PKGS_OPT = "-S";
- private static final String PY_NO_USER_PKGS_OPT = "-s";
private final IPointable[] argValues;
ExternalScalarPythonFunctionEvaluator(IExternalFunctionInfo finfo, IScalarEvaluatorFactory[] args,
- IAType[] argTypes, IEvaluatorContext ctx) throws HyracksDataException {
+ IAType[] argTypes, IEvaluatorContext ctx, SourceLocation sourceLoc) throws HyracksDataException {
super(finfo, args, argTypes, ctx);
File pythonPath = new File(ctx.getServiceContext().getAppConfig().getString(NCConfig.Option.PYTHON_HOME));
- this.pointableAllocator = new PointableAllocator();
- this.pointableVisitor = new JObjectPointableVisitor();
-
DataverseName dataverseName = FunctionSignature.getDataverseName(finfo.getFunctionIdentifier());
try {
- libraryEvaluator = PythonLibraryEvaluator.getInstance(dataverseName, finfo, libraryManager, pythonPath,
- ctx.getTaskContext());
- } catch (IOException | InterruptedException e) {
+ libraryEvaluator = PythonLibraryEvaluator.getInstance(dataverseName, finfo, libraryManager, router, ipcSys,
+ pythonPath, ctx.getTaskContext(), ctx.getWarningCollector(), sourceLoc);
+ } catch (IOException | AsterixException e) {
throw new HyracksDataException("Failed to initialize Python", e);
}
argValues = new IPointable[args.length];
for (int i = 0; i < argValues.length; i++) {
argValues[i] = VoidPointable.FACTORY.createPointable();
}
- this.argHolder = new Object[args.length];
+ //TODO: these should be dynamic
+ this.argHolder = ByteBuffer.wrap(new byte[Short.MAX_VALUE * 2]);
+ this.outputWrapper = ByteBuffer.wrap(new byte[Short.MAX_VALUE * 2]);
+ this.evaluatorContext = ctx;
}
@Override
public void evaluate(IFrameTupleReference tuple, IPointable result) throws HyracksDataException {
+ argHolder.clear();
for (int i = 0, ln = argEvals.length; i < ln; i++) {
argEvals[i].evaluate(tuple, argValues[i]);
try {
@@ -114,10 +108,10 @@
}
}
try {
- Object res = libraryEvaluator.callPython(argHolder);
+ ByteBuffer res = libraryEvaluator.callPython(argHolder, argTypes.length);
resultBuffer.reset();
wrap(res, resultBuffer.getDataOutput());
- } catch (IOException e) {
+ } catch (Exception e) {
throw new HyracksDataException("Error evaluating Python UDF", e);
}
result.set(resultBuffer.getByteArray(), resultBuffer.getStartOffset(), resultBuffer.getLength());
@@ -125,26 +119,39 @@
private static class PythonLibraryEvaluator extends AbstractStateObject implements IDeallocatable {
Process p;
- PyroProxy remoteObj;
IExternalFunctionInfo finfo;
ILibraryManager libMgr;
File pythonHome;
+ PythonIPCProto proto;
+ ExternalFunctionResultRouter router;
+ IPCSystem ipcSys;
+ String module;
+ String clazz;
+ String fn;
+ TaskAttemptId task;
+ IWarningCollector warningCollector;
+ SourceLocation sourceLoc;
private PythonLibraryEvaluator(JobId jobId, PythonLibraryEvaluatorId evaluatorId, IExternalFunctionInfo finfo,
- ILibraryManager libMgr, File pythonHome) {
+ ILibraryManager libMgr, File pythonHome, ExternalFunctionResultRouter router, IPCSystem ipcSys,
+ TaskAttemptId task, IWarningCollector warningCollector, SourceLocation sourceLoc) {
super(jobId, evaluatorId);
this.finfo = finfo;
this.libMgr = libMgr;
this.pythonHome = pythonHome;
+ this.router = router;
+ this.task = task;
+ this.ipcSys = ipcSys;
+ this.warningCollector = warningCollector;
+ this.sourceLoc = sourceLoc;
}
- public void initialize() throws IOException, InterruptedException {
+ public void initialize() throws IOException, AsterixException {
PythonLibraryEvaluatorId fnId = (PythonLibraryEvaluatorId) id;
List<String> externalIdents = finfo.getExternalIdentifier();
PythonLibrary library = (PythonLibrary) libMgr.getLibrary(fnId.dataverseName, fnId.libraryName);
String wd = library.getFile().getAbsolutePath();
- int port = getFreeHighPort();
String packageModule = externalIdents.get(0);
String clazz = "None";
String fn;
@@ -154,59 +161,60 @@
} else {
fn = externalIdents.get(1);
}
- ProcessBuilder pb = new ProcessBuilder(pythonHome.getAbsolutePath(), PY_NO_SITE_PKGS_OPT,
- PY_NO_USER_PKGS_OPT, ENTRYPOINT, Integer.toString(port), packageModule, clazz, fn);
+ this.fn = fn;
+ this.clazz = clazz;
+ this.module = packageModule;
+ int port = ipcSys.getSocketAddress().getPort();
+ ProcessBuilder pb = new ProcessBuilder(pythonHome.getAbsolutePath(), ENTRYPOINT,
+ InetAddress.getLoopbackAddress().getHostAddress(), Integer.toString(port));
pb.directory(new File(wd));
- pb.environment().clear();
- pb.inheritIO();
p = pb.start();
- remoteObj = new PyroProxy("127.0.0.1", port, "nextTuple");
- waitForPython();
+ proto = new PythonIPCProto(p.getOutputStream(), router, ipcSys);
+ proto.start();
+ proto.helo();
+ proto.init(packageModule, clazz, fn);
}
- Object callPython(Object[] arguments) throws IOException {
- return remoteObj.call("nextTuple", arguments);
+ ByteBuffer callPython(ByteBuffer arguments, int numArgs) throws Exception {
+ ByteBuffer ret = null;
+ try {
+ ret = proto.call(arguments, numArgs);
+ } catch (AsterixException e) {
+ warningCollector
+ .warn(WarningUtil.forAsterix(sourceLoc, ErrorCode.EXTERNAL_UDF_EXCEPTION, e.getMessage()));
+ }
+ return ret;
}
@Override
public void deallocate() {
- p.destroyForcibly();
+ boolean dead = false;
+ try {
+ p.destroy();
+ dead = p.waitFor(100, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ //gonna kill it anyway
+ }
+ if (!dead) {
+ p.destroyForcibly();
+ }
}
private static PythonLibraryEvaluator getInstance(DataverseName dataverseName, IExternalFunctionInfo finfo,
- ILibraryManager libMgr, File pythonHome, IHyracksTaskContext ctx)
- throws IOException, InterruptedException {
+ ILibraryManager libMgr, ExternalFunctionResultRouter router, IPCSystem ipcSys, File pythonHome,
+ IHyracksTaskContext ctx, IWarningCollector warningCollector, SourceLocation sourceLoc)
+ throws IOException, AsterixException {
PythonLibraryEvaluatorId evaluatorId = new PythonLibraryEvaluatorId(dataverseName, finfo.getLibrary());
PythonLibraryEvaluator evaluator = (PythonLibraryEvaluator) ctx.getStateObject(evaluatorId);
if (evaluator == null) {
evaluator = new PythonLibraryEvaluator(ctx.getJobletContext().getJobId(), evaluatorId, finfo, libMgr,
- pythonHome);
- evaluator.initialize();
+ pythonHome, router, ipcSys, ctx.getTaskAttemptId(), warningCollector, sourceLoc);
ctx.registerDeallocatable(evaluator);
+ evaluator.initialize();
ctx.setStateObject(evaluator);
}
return evaluator;
}
-
- private int getFreeHighPort() throws IOException {
- int port;
- try (ServerSocket socket = new ServerSocket(0)) {
- socket.setReuseAddress(true);
- port = socket.getLocalPort();
- }
- return port;
- }
-
- private void waitForPython() throws IOException, InterruptedException {
- for (int i = 0; i < 100; i++) {
- try {
- remoteObj.call("ping");
- break;
- } catch (ConnectException e) {
- Thread.sleep(100);
- }
- }
- }
}
private static final class PythonLibraryEvaluatorId {
@@ -237,76 +245,32 @@
}
private void setArgument(int index, IValueReference valueReference) throws IOException {
- IVisitablePointable pointable;
- IJObject jobj;
IAType type = argTypes[index];
- TypeInfo info;
- switch (type.getTypeTag()) {
- case OBJECT:
- pointable = pointableAllocator.allocateRecordValue(type);
- pointable.set(valueReference);
- info = getTypeInfo(type);
- jobj = pointableVisitor.visit((ARecordVisitablePointable) pointable, info);
- break;
- case ARRAY:
- case MULTISET:
- pointable = pointableAllocator.allocateListValue(type);
- pointable.set(valueReference);
- info = getTypeInfo(type);
- jobj = pointableVisitor.visit((AListVisitablePointable) pointable, info);
- break;
+ ATypeTag tag = type.getTypeTag();
+ switch (tag) {
case ANY:
TaggedValuePointable pointy = TaggedValuePointable.FACTORY.createPointable();
pointy.set(valueReference);
ATypeTag rtTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(pointy.getTag());
IAType rtType = TypeTagUtil.getBuiltinTypeByTag(rtTypeTag);
- info = getTypeInfo(rtType);
- switch (rtTypeTag) {
- case OBJECT:
- pointable = pointableAllocator.allocateRecordValue(rtType);
- pointable.set(valueReference);
- jobj = pointableVisitor.visit((ARecordVisitablePointable) pointable, info);
- break;
- case ARRAY:
- case MULTISET:
- pointable = pointableAllocator.allocateListValue(rtType);
- pointable.set(valueReference);
- jobj = pointableVisitor.visit((AListVisitablePointable) pointable, info);
- break;
- default:
- pointable = pointableAllocator.allocateFieldValue(rtType);
- pointable.set(valueReference);
- jobj = pointableVisitor.visit((AFlatValuePointable) pointable, info);
- break;
- }
+ MessagePackerFromADM.pack(valueReference, rtType, argHolder);
break;
default:
- pointable = pointableAllocator.allocateFieldValue(type);
- pointable.set(valueReference);
- info = getTypeInfo(type);
- jobj = pointableVisitor.visit((AFlatValuePointable) pointable, info);
+ MessagePackerFromADM.pack(valueReference, type, argHolder);
break;
}
- argHolder[index] = jobj.getValueGeneric();
}
- private TypeInfo getTypeInfo(IAType type) {
- TypeInfo typeInfo = infoPool.get(type);
- if (typeInfo == null) {
- typeInfo = new TypeInfo(reflectingPool, type, type.getTypeTag());
- infoPool.put(type, typeInfo);
+ private void wrap(ByteBuffer resultWrapper, DataOutput out) throws HyracksDataException {
+ //TODO: output wrapper needs to grow with result wrapper
+ outputWrapper.clear();
+ outputWrapper.position(0);
+ MessageUnpackerToADM.unpack(resultWrapper, outputWrapper, true);
+ try {
+ out.write(outputWrapper.array(), 0, outputWrapper.position() + outputWrapper.arrayOffset());
+ } catch (IOException e) {
+ throw new HyracksDataException(e.getMessage());
}
- return typeInfo;
- }
- private void wrap(Object o, DataOutput out) throws HyracksDataException {
- Class concrete = o.getClass();
- IAType asxConv = JObject.convertType(concrete);
- IJObject res = reflectingPool.allocate(asxConv);
- if (res instanceof JComplexObject) {
- ((JComplexObject) res).setPool(reflectingPool);
- }
- res.setValueGeneric(o);
- res.serialize(out, true);
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/MessagePackerFromADM.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/MessagePackerFromADM.java
new file mode 100644
index 0000000..383b2f1
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/MessagePackerFromADM.java
@@ -0,0 +1,359 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.library.msgpack;
+
+import static org.msgpack.core.MessagePack.Code.ARRAY32;
+import static org.msgpack.core.MessagePack.Code.FALSE;
+import static org.msgpack.core.MessagePack.Code.FIXARRAY_PREFIX;
+import static org.msgpack.core.MessagePack.Code.FIXSTR_PREFIX;
+import static org.msgpack.core.MessagePack.Code.FLOAT32;
+import static org.msgpack.core.MessagePack.Code.FLOAT64;
+import static org.msgpack.core.MessagePack.Code.INT16;
+import static org.msgpack.core.MessagePack.Code.INT32;
+import static org.msgpack.core.MessagePack.Code.INT64;
+import static org.msgpack.core.MessagePack.Code.INT8;
+import static org.msgpack.core.MessagePack.Code.MAP32;
+import static org.msgpack.core.MessagePack.Code.STR32;
+import static org.msgpack.core.MessagePack.Code.TRUE;
+import static org.msgpack.core.MessagePack.Code.UINT16;
+import static org.msgpack.core.MessagePack.Code.UINT32;
+import static org.msgpack.core.MessagePack.Code.UINT64;
+import static org.msgpack.core.MessagePack.Code.UINT8;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.AUnionType;
+import org.apache.asterix.om.types.AbstractCollectionType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.types.TypeTagUtil;
+import org.apache.asterix.om.utils.NonTaggedFormatUtil;
+import org.apache.asterix.om.utils.RecordUtil;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.BooleanPointable;
+import org.apache.hyracks.data.std.primitive.BytePointable;
+import org.apache.hyracks.data.std.primitive.DoublePointable;
+import org.apache.hyracks.data.std.primitive.FloatPointable;
+import org.apache.hyracks.data.std.primitive.IntegerPointable;
+import org.apache.hyracks.data.std.primitive.LongPointable;
+import org.apache.hyracks.data.std.primitive.ShortPointable;
+import org.apache.hyracks.util.string.UTF8StringUtil;
+
+public class MessagePackerFromADM {
+
+ private static final int TYPE_TAG_SIZE = 1;
+ private static final int TYPE_SIZE = 1;
+ private static final int LENGTH_SIZE = 4;
+ private static final int ITEM_COUNT_SIZE = 4;
+ private static final int ITEM_OFFSET_SIZE = 4;
+
+ public static void pack(IValueReference ptr, IAType type, ByteBuffer out) throws HyracksDataException {
+ pack(ptr.getByteArray(), ptr.getStartOffset(), type, true, out);
+ }
+
+ public static void pack(byte[] ptr, int offs, IAType type, boolean tagged, ByteBuffer out)
+ throws HyracksDataException {
+ int relOffs = tagged ? offs + 1 : offs;
+ ATypeTag tag = type.getTypeTag();
+ switch (tag) {
+ case STRING:
+ packStr(ptr, relOffs, out);
+ break;
+ case BOOLEAN:
+ if (BooleanPointable.getBoolean(ptr, relOffs)) {
+ out.put(TRUE);
+ } else {
+ out.put(FALSE);
+ }
+ break;
+ case TINYINT:
+ packByte(out, BytePointable.getByte(ptr, relOffs));
+ break;
+ case SMALLINT:
+ packShort(out, ShortPointable.getShort(ptr, relOffs));
+ break;
+ case INTEGER:
+ packInt(out, IntegerPointable.getInteger(ptr, relOffs));
+ break;
+ case BIGINT:
+ packLong(out, LongPointable.getLong(ptr, relOffs));
+ break;
+ case FLOAT:
+ packFloat(out, FloatPointable.getFloat(ptr, relOffs));
+ break;
+ case DOUBLE:
+ packDouble(out, DoublePointable.getDouble(ptr, relOffs));
+ break;
+ case ARRAY:
+ case MULTISET:
+ packArray(ptr, offs, type, out);
+ break;
+ case OBJECT:
+ packObject(ptr, offs, type, out);
+ break;
+ default:
+ throw new IllegalArgumentException("NYI");
+ }
+ }
+
+ public static byte minPackPosLong(ByteBuffer out, long in) {
+ if (in < 127) {
+ packFixPos(out, (byte) in);
+ return 1;
+ } else if (in < Byte.MAX_VALUE) {
+ out.put(UINT8);
+ out.put((byte) in);
+ return 2;
+ } else if (in < Short.MAX_VALUE) {
+ out.put(UINT16);
+ out.putShort((short) in);
+ return 3;
+ } else if (in < Integer.MAX_VALUE) {
+ out.put(UINT32);
+ out.putInt((int) in);
+ return 5;
+ } else {
+ out.put(UINT64);
+ out.putLong(in);
+ return 9;
+ }
+ }
+
+ public static void packByte(ByteBuffer out, byte in) {
+ out.put(INT8);
+ out.put(in);
+ }
+
+ public static void packShort(ByteBuffer out, short in) {
+ out.put(INT16);
+ out.putShort(in);
+ }
+
+ public static void packInt(ByteBuffer out, int in) {
+ out.put(INT32);
+ out.putInt(in);
+
+ }
+
+ public static void packLong(ByteBuffer out, long in) {
+ out.put(INT64);
+ out.putLong(in);
+ }
+
+ public static void packFloat(ByteBuffer out, float in) {
+ out.put(FLOAT32);
+ out.putFloat(in);
+ }
+
+ public static void packDouble(ByteBuffer out, double in) {
+ out.put(FLOAT64);
+ out.putDouble(in);
+ }
+
+ public static void packFixPos(ByteBuffer out, byte in) {
+ byte mask = (byte) (1 << 7);
+ if ((in & mask) != 0) {
+ throw new IllegalArgumentException("fixint7 must be positive");
+ }
+ out.put(in);
+ }
+
+ public static void packFixStr(ByteBuffer buf, String in) {
+ byte[] strBytes = in.getBytes(Charset.forName("UTF-8"));
+ if (strBytes.length > 31) {
+ throw new IllegalArgumentException("fixstr cannot be longer than 31");
+ }
+ buf.put((byte) (FIXSTR_PREFIX + strBytes.length));
+ buf.put(strBytes);
+ }
+
+ public static void packStr(ByteBuffer out, String in) {
+ out.put(STR32);
+ byte[] strBytes = in.getBytes(Charset.forName("UTF-8"));
+ out.putInt(strBytes.length);
+ out.put(strBytes);
+ }
+
+ private static void packStr(byte[] in, int offs, ByteBuffer out) {
+ out.put(STR32);
+ //TODO: tagged/untagged. closed support is borked so always tagged rn
+ String str = UTF8StringUtil.toString(in, offs);
+ byte[] strBytes = str.getBytes(Charset.forName("UTF-8"));
+ out.putInt(strBytes.length);
+ out.put(strBytes);
+ }
+
+ public static void packStr(String str, ByteBuffer out) {
+ out.put(STR32);
+ byte[] strBytes = str.getBytes(Charset.forName("UTF-8"));
+ out.putInt(strBytes.length);
+ out.put(strBytes);
+ }
+
+ private static void packArray(byte[] in, int offs, IAType type, ByteBuffer out) throws HyracksDataException {
+ //TODO: - could optimize to pack fixarray/array16 for small arrays
+ // - this code is basically a static version of AListPointable, could be deduped
+ AbstractCollectionType collType = (AbstractCollectionType) type;
+ out.put(ARRAY32);
+ int lenOffs = offs + TYPE_TAG_SIZE + TYPE_SIZE;
+ int itemCtOffs = LENGTH_SIZE + lenOffs;
+ int itemCt = IntegerPointable.getInteger(in, itemCtOffs);
+ boolean fixType = NonTaggedFormatUtil.isFixedSizedCollection(type);
+ out.putInt(itemCt);
+ for (int i = 0; i < itemCt; i++) {
+ if (fixType) {
+ int itemOffs = itemCtOffs + ITEM_COUNT_SIZE + (i
+ * NonTaggedFormatUtil.getFieldValueLength(in, 0, collType.getItemType().getTypeTag(), false));
+ pack(in, itemOffs, collType.getItemType(), false, out);
+ } else {
+ int itemOffs =
+ offs + IntegerPointable.getInteger(in, itemCtOffs + ITEM_COUNT_SIZE + (i * ITEM_OFFSET_SIZE));
+ ATypeTag tag = ATypeTag.VALUE_TYPE_MAPPING[BytePointable.getByte(in, itemOffs)];
+ pack(in, itemOffs, TypeTagUtil.getBuiltinTypeByTag(tag), true, out);
+ }
+ }
+ }
+
+ private static void packObject(byte[] in, int offs, IAType type, ByteBuffer out) throws HyracksDataException {
+ ARecordType recType = (ARecordType) type;
+ out.put(MAP32);
+ int fieldCt = recType.getFieldNames().length + RecordUtils.getOpenFieldCount(in, offs, recType);
+ out.putInt(fieldCt);
+ for (int i = 0; i < recType.getFieldNames().length; i++) {
+ String field = recType.getFieldNames()[i];
+ IAType fieldType = RecordUtils.getClosedFieldType(recType, i);
+ packStr(field, out);
+ pack(in, RecordUtils.getClosedFieldOffset(in, offs, recType, i), fieldType, false, out);
+ }
+ if (RecordUtils.isExpanded(in, offs, recType)) {
+ for (int i = 0; i < RecordUtils.getOpenFieldCount(in, offs, recType); i++) {
+ packStr(in, RecordUtils.getOpenFieldNameOffset(in, offs, recType, i), out);
+ ATypeTag tag = ATypeTag.VALUE_TYPE_MAPPING[RecordUtils.getOpenFieldTag(in, offs, recType, i)];
+ pack(in, RecordUtils.getOpenFieldValueOffset(in, offs, recType, i),
+ TypeTagUtil.getBuiltinTypeByTag(tag), true, out);
+ }
+ }
+
+ }
+
+ public static void packFixArrayHeader(ByteBuffer buf, byte numObj) {
+ buf.put((byte) (FIXARRAY_PREFIX + (0x0F & numObj)));
+ }
+
+ private static class RecordUtils {
+
+ static final int TAG_SIZE = 1;
+ static final int RECORD_LENGTH_SIZE = 4;
+ static final int EXPANDED_SIZE = 1;
+ static final int OPEN_OFFSET_SIZE = 4;
+ static final int CLOSED_COUNT_SIZE = 4;
+ static final int FIELD_OFFSET_SIZE = 4;
+ static final int OPEN_COUNT_SIZE = 4;
+ private static final int OPEN_FIELD_HASH_SIZE = 4;
+ private static final int OPEN_FIELD_OFFSET_SIZE = 4;
+ private static final int OPEN_FIELD_HEADER = OPEN_FIELD_HASH_SIZE + OPEN_FIELD_OFFSET_SIZE;
+
+ private static boolean isOpen(ARecordType recordType) {
+ return recordType == null || recordType.isOpen();
+ }
+
+ public static int getLength(byte[] bytes, int start) {
+ return IntegerPointable.getInteger(bytes, start + TAG_SIZE);
+ }
+
+ public static boolean isExpanded(byte[] bytes, int start, ARecordType recordType) {
+ return isOpen(recordType) && BooleanPointable.getBoolean(bytes, start + TAG_SIZE + RECORD_LENGTH_SIZE);
+ }
+
+ public static int getOpenPartOffset(int start, ARecordType recordType) {
+ return start + TAG_SIZE + RECORD_LENGTH_SIZE + (isOpen(recordType) ? EXPANDED_SIZE : 0);
+ }
+
+ public static int getNullBitmapOffset(byte[] bytes, int start, ARecordType recordType) {
+ return getOpenPartOffset(start, recordType) + (isExpanded(bytes, start, recordType) ? OPEN_OFFSET_SIZE : 0)
+ + CLOSED_COUNT_SIZE;
+ }
+
+ public static int getNullBitmapSize(ARecordType recordType) {
+ return RecordUtil.computeNullBitmapSize(recordType);
+ }
+
+ public static final IAType getClosedFieldType(ARecordType recordType, int fieldId) {
+ IAType aType = recordType.getFieldTypes()[fieldId];
+ if (NonTaggedFormatUtil.isOptional(aType)) {
+ // optional field: add the embedded non-null type tag
+ aType = ((AUnionType) aType).getActualType();
+ }
+ return aType;
+ }
+
+ public static final int getClosedFieldOffset(byte[] bytes, int start, ARecordType recordType, int fieldId) {
+ int offset = getNullBitmapOffset(bytes, start, recordType) + getNullBitmapSize(recordType)
+ + fieldId * FIELD_OFFSET_SIZE;
+ return start + IntegerPointable.getInteger(bytes, offset);
+ }
+
+ public static final int getOpenFieldCount(byte[] bytes, int start, ARecordType recordType) {
+ return isExpanded(bytes, start, recordType)
+ ? IntegerPointable.getInteger(bytes, getOpenFieldCountOffset(bytes, start, recordType)) : 0;
+ }
+
+ public static int getOpenFieldCountSize(byte[] bytes, int start, ARecordType recordType) {
+ return isExpanded(bytes, start, recordType) ? OPEN_COUNT_SIZE : 0;
+ }
+
+ public static int getOpenFieldCountOffset(byte[] bytes, int start, ARecordType recordType) {
+ return start + IntegerPointable.getInteger(bytes, getOpenPartOffset(start, recordType));
+ }
+
+ public static final int getOpenFieldValueOffset(byte[] bytes, int start, ARecordType recordType, int fieldId) {
+ return getOpenFieldNameOffset(bytes, start, recordType, fieldId)
+ + getOpenFieldNameSize(bytes, start, recordType, fieldId);
+ }
+
+ public static int getOpenFieldNameSize(byte[] bytes, int start, ARecordType recordType, int fieldId) {
+ int utfleng = UTF8StringUtil.getUTFLength(bytes, getOpenFieldNameOffset(bytes, start, recordType, fieldId));
+ return utfleng + UTF8StringUtil.getNumBytesToStoreLength(utfleng);
+ }
+
+ public static int getOpenFieldNameOffset(byte[] bytes, int start, ARecordType recordType, int fieldId) {
+ return getOpenFieldOffset(bytes, start, recordType, fieldId);
+ }
+
+ public static final byte getOpenFieldTag(byte[] bytes, int start, ARecordType recordType, int fieldId) {
+ return bytes[getOpenFieldValueOffset(bytes, start, recordType, fieldId)];
+ }
+
+ public static int getOpenFieldHashOffset(byte[] bytes, int start, ARecordType recordType, int fieldId) {
+ return getOpenFieldCountOffset(bytes, start, recordType) + getOpenFieldCountSize(bytes, start, recordType)
+ + fieldId * OPEN_FIELD_HEADER;
+ }
+
+ public static int getOpenFieldOffset(byte[] bytes, int start, ARecordType recordType, int fieldId) {
+ return start
+ + IntegerPointable.getInteger(bytes, getOpenFieldOffsetOffset(bytes, start, recordType, fieldId));
+ }
+
+ public static int getOpenFieldOffsetOffset(byte[] bytes, int start, ARecordType recordType, int fieldId) {
+ return getOpenFieldHashOffset(bytes, start, recordType, fieldId) + OPEN_FIELD_HASH_SIZE;
+ }
+ }
+
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/MessageUnpackerToADM.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/MessageUnpackerToADM.java
new file mode 100644
index 0000000..fedd1f6
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/MessageUnpackerToADM.java
@@ -0,0 +1,288 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.library.msgpack;
+
+import static org.msgpack.core.MessagePack.Code.*;
+
+import java.nio.ByteBuffer;
+
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.util.encoding.VarLenIntEncoderDecoder;
+import org.apache.hyracks.util.string.UTF8StringUtil;
+
+public class MessageUnpackerToADM {
+
+ public static void unpack(ByteBuffer in, ByteBuffer out, boolean tagged) {
+ byte tag = NIL;
+ if (in != null) {
+ tag = in.get();
+ }
+ if (isFixStr(tag)) {
+ unpackStr(in, out, (tag ^ FIXSTR_PREFIX), tagged);
+ } else if (isFixInt(tag)) {
+ if (tagged) {
+ out.put(ATypeTag.SERIALIZED_INT8_TYPE_TAG);
+ }
+ if (isPosFixInt(tag)) {
+ out.put(tag);
+ } else if (isNegFixInt(tag)) {
+ out.put(tag);
+ }
+ } else if (isFixedArray(tag)) {
+ unpackArray(in, out, (tag ^ FIXARRAY_PREFIX));
+ } else if (isFixedMap(tag)) {
+ unpackMap(in, out, (tag ^ FIXMAP_PREFIX));
+ } else {
+ switch (tag) {
+ case TRUE:
+ out.put(ATypeTag.SERIALIZED_BOOLEAN_TYPE_TAG);
+ out.put((byte) 1);
+ break;
+ case FALSE:
+ out.put(ATypeTag.SERIALIZED_BOOLEAN_TYPE_TAG);
+ out.put((byte) 0);
+ break;
+ case NIL:
+ out.put(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
+ break;
+ case UINT8:
+ unpackUByte(in, out, tagged);
+ break;
+ case UINT16:
+ unpackUShort(in, out, tagged);
+ break;
+ case UINT32:
+ unpackUInt(in, out, tagged);
+ break;
+ case INT8:
+ unpackByte(in, out, tagged);
+ break;
+ case INT16:
+ unpackShort(in, out, tagged);
+ break;
+ case INT32:
+ unpackInt(in, out, tagged);
+ break;
+ case INT64:
+ unpackLong(in, out, tagged);
+ break;
+ case FLOAT32:
+ unpackFloat(in, out, tagged);
+ break;
+ case FLOAT64:
+ unpackDouble(in, out, tagged);
+ break;
+ case STR8:
+ unpackStr(in, out, Byte.toUnsignedInt(in.get()), tagged);
+ break;
+ case STR16:
+ unpackStr(in, out, Short.toUnsignedInt(in.getShort()), tagged);
+ break;
+ case STR32:
+ unpackStr(in, out, Integer.toUnsignedLong(in.getInt()), tagged);
+ break;
+ case ARRAY16:
+ unpackArray(in, out, Short.toUnsignedInt(in.getShort()));
+ break;
+ case ARRAY32:
+ unpackArray(in, out, Integer.toUnsignedLong(in.getInt()));
+ break;
+ case MAP16:
+ unpackMap(in, out, Short.toUnsignedInt(in.getShort()));
+ break;
+ case MAP32:
+ unpackMap(in, out, (int) Integer.toUnsignedLong(in.getInt()));
+ break;
+
+ default:
+ throw new IllegalArgumentException("NYI");
+ }
+ }
+ }
+
+ public static long unpackNextInt(ByteBuffer in) {
+ byte tag = in.get();
+ if (isFixInt(tag)) {
+ if (isPosFixInt(tag)) {
+ return tag;
+ } else if (isNegFixInt(tag)) {
+ return (tag ^ NEGFIXINT_PREFIX);
+ }
+ } else {
+ switch (tag) {
+ case INT8:
+ return in.get();
+ case UINT8:
+ return Byte.toUnsignedInt(in.get());
+ case INT16:
+ return in.getShort();
+ case UINT16:
+ return Short.toUnsignedInt(in.getShort());
+ case INT32:
+ return in.getInt();
+ case UINT32:
+ return Integer.toUnsignedLong(in.getInt());
+ case INT64:
+ return in.getLong();
+ default:
+ throw new IllegalArgumentException("NYI");
+ }
+ }
+ return -1;
+ }
+
+ public static void unpackByte(ByteBuffer in, ByteBuffer out, boolean tagged) {
+ if (tagged) {
+ out.put(ATypeTag.SERIALIZED_INT8_TYPE_TAG);
+ }
+ out.put(in.get());
+ }
+
+ public static void unpackShort(ByteBuffer in, ByteBuffer out, boolean tagged) {
+ if (tagged) {
+ out.put(ATypeTag.SERIALIZED_INT16_TYPE_TAG);
+ }
+ out.putShort(in.getShort());
+ }
+
+ public static void unpackInt(ByteBuffer in, ByteBuffer out, boolean tagged) {
+ if (tagged) {
+ out.put(ATypeTag.SERIALIZED_INT32_TYPE_TAG);
+ }
+ out.putInt(in.getInt());
+ }
+
+ public static void unpackLong(ByteBuffer in, ByteBuffer out, boolean tagged) {
+ if (tagged) {
+ out.put(ATypeTag.SERIALIZED_INT64_TYPE_TAG);
+ }
+ out.putLong(in.getLong());
+ }
+
+ public static void unpackUByte(ByteBuffer in, ByteBuffer out, boolean tagged) {
+ if (tagged) {
+ out.put(ATypeTag.SERIALIZED_INT16_TYPE_TAG);
+ }
+ out.putShort((short) (in.get() & ((short) 0x00FF)));
+ }
+
+ public static void unpackUShort(ByteBuffer in, ByteBuffer out, boolean tagged) {
+ if (tagged) {
+ out.put(ATypeTag.SERIALIZED_INT32_TYPE_TAG);
+ }
+ out.putInt(in.getShort() & 0x0000FFFF);
+ }
+
+ public static void unpackUInt(ByteBuffer in, ByteBuffer out, boolean tagged) {
+ if (tagged) {
+ out.put(ATypeTag.SERIALIZED_INT64_TYPE_TAG);
+ }
+ out.putLong(in.getInt() & 0x00000000FFFFFFFFl);
+ }
+
+ public static void unpackFloat(ByteBuffer in, ByteBuffer out, boolean tagged) {
+ if (tagged) {
+ out.put(ATypeTag.SERIALIZED_FLOAT_TYPE_TAG);
+ }
+ out.putFloat(in.getFloat());
+
+ }
+
+ public static void unpackDouble(ByteBuffer in, ByteBuffer out, boolean tagged) {
+ if (tagged) {
+ out.put(ATypeTag.SERIALIZED_DOUBLE_TYPE_TAG);
+ }
+ out.putDouble(in.getDouble());
+ }
+
+ public static void unpackArray(ByteBuffer in, ByteBuffer out, long uLen) {
+ if (uLen > Integer.MAX_VALUE) {
+ throw new UnsupportedOperationException("String is too long");
+ }
+ int count = (int) uLen;
+ int offs = out.position();
+ out.put(ATypeTag.SERIALIZED_ORDEREDLIST_TYPE_TAG);
+ out.put(ATypeTag.ANY.serialize());
+ int asxLenPos = out.position();
+ //reserve space
+ out.putInt(-1);
+ out.putInt(count);
+ int slotStartOffs = out.position() + out.arrayOffset();
+ for (int i = 0; i < count; i++) {
+ out.putInt(0xFFFF);
+ }
+ for (int i = 0; i < count; i++) {
+ out.putInt(slotStartOffs + (i * 4), (out.position() - offs));
+ unpack(in, out, true);
+ }
+ int totalLen = out.position() - offs;
+ out.putInt(asxLenPos, totalLen);
+ }
+
+ public static void unpackMap(ByteBuffer in, ByteBuffer out, int count) {
+ //TODO: need to handle typed records. this only produces a completely open record.
+ //hdr size = 6?
+ int startOffs = out.position();
+ out.put(ATypeTag.SERIALIZED_RECORD_TYPE_TAG);
+ int totalSizeOffs = out.position();
+ out.putInt(-1);
+ //isExpanded
+ out.put((byte) 1);
+ int openPartOffs = out.position();
+ out.putInt(-1);
+ //isExpanded, so num of open fields
+ out.putInt(openPartOffs, out.position() - startOffs);
+ out.putInt(count);
+ int offsetAryPos = out.position();
+ int offsetArySz = count * 2;
+ //allocate space for open field offsets
+ for (int i = 0; i < offsetArySz; i++) {
+ out.putInt(0xDEADBEEF);
+ }
+ for (int i = 0; i < count; i++) {
+ int offs = out.position() + out.arrayOffset();
+ int relOffs = offs - startOffs;
+ unpack(in, out, false);
+ int hash = UTF8StringUtil.hash(out.array(), offs);
+ out.putInt(offsetAryPos, hash);
+ offsetAryPos += 4;
+ out.putInt(offsetAryPos, relOffs);
+ offsetAryPos += 4;
+ unpack(in, out, true);
+ }
+ out.putInt(totalSizeOffs, out.position() - startOffs);
+ }
+
+ public static void unpackStr(ByteBuffer in, ByteBuffer out, long uLen, boolean tag) {
+ //TODO: this probably breaks for 3 and 4 byte UTF-8
+ if (tag) {
+ out.put(ATypeTag.SERIALIZED_STRING_TYPE_TAG);
+ }
+ if (Long.compareUnsigned(uLen, Integer.MAX_VALUE) > 0) {
+ throw new UnsupportedOperationException("String is too long");
+ }
+ int len = (int) uLen;
+ int strLen = UTF8StringUtil.getStringLength(in.array(), in.position() + in.arrayOffset(), len);
+ int adv = VarLenIntEncoderDecoder.encode(strLen, out.array(), out.position() + out.arrayOffset());
+ out.position(out.position() + adv);
+ System.arraycopy(in.array(), in.arrayOffset() + in.position(), out.array(), out.arrayOffset() + out.position(),
+ len);
+ out.position(out.position() + len);
+ in.position(in.position() + len);
+ }
+
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/LibraryDeployPrepareOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/LibraryDeployPrepareOperatorDescriptor.java
index a576c34..8929a60 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/LibraryDeployPrepareOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/LibraryDeployPrepareOperatorDescriptor.java
@@ -22,6 +22,7 @@
import static org.apache.asterix.external.library.ExternalLibraryManager.DESCRIPTOR_FILE_NAME;
import java.io.ByteArrayInputStream;
+import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -286,12 +287,15 @@
private void shiv(FileReference sourceFile, FileReference stageDir, FileReference contentsDir)
throws IOException {
- FileReference pyro4 = stageDir.getChild("pyro4.pyz");
- writeShim(pyro4);
+ FileReference msgpack = stageDir.getChild("msgpack.pyz");
+ writeShim(msgpack);
unzip(sourceFile, contentsDir);
- unzip(pyro4, contentsDir);
+ File msgPackFolder = new File(contentsDir.getRelativePath(), "ipc");
+ FileReference msgPackFolderRef =
+ new FileReference(contentsDir.getDeviceHandle(), msgPackFolder.getPath());
+ unzip(msgpack, msgPackFolderRef);
writeShim(contentsDir.getChild("entrypoint.py"));
- Files.delete(pyro4.getFile().toPath());
+ Files.delete(msgpack.getFile().toPath());
}
private void writeShim(FileReference outputFile) throws IOException {
diff --git a/asterixdb/pom.xml b/asterixdb/pom.xml
index 70e23c1..28f1139 100644
--- a/asterixdb/pom.xml
+++ b/asterixdb/pom.xml
@@ -1488,14 +1488,9 @@
<version>4.5.11</version>
</dependency>
<dependency>
- <groupId>net.razorvine</groupId>
- <artifactId>pyrolite</artifactId>
- <version>4.30</version>
- </dependency>
- <dependency>
- <groupId>net.razorvine</groupId>
- <artifactId>serpent</artifactId>
- <version>1.23</version>
+ <groupId>org.msgpack</groupId>
+ <artifactId>msgpack-core</artifactId>
+ <version>0.8.20</version>
</dependency>
</dependencies>
</dependencyManagement>
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
index 2ba4768..6b5b5db 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java
@@ -70,6 +70,7 @@
import org.apache.hyracks.control.common.job.profiling.om.TaskProfile;
import org.apache.hyracks.ipc.api.IPayloadSerializerDeserializer;
import org.apache.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
+import org.apache.hyracks.ipc.impl.Message;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -1431,12 +1432,12 @@
}
@Override
- public Object deserializeObject(ByteBuffer buffer, int length) throws Exception {
+ public Object deserializeObject(ByteBuffer buffer, int length, byte flag) throws Exception {
if (length < FID_CODE_SIZE) {
throw new IllegalStateException("Message size too small: " + length);
}
byte fid = buffer.get();
- return deserialize(fid, buffer, length - FID_CODE_SIZE);
+ return deserialize(fid, buffer, length - FID_CODE_SIZE, flag);
}
@Override
@@ -1448,7 +1449,7 @@
if (fid != FunctionId.OTHER.ordinal()) {
throw new IllegalStateException("Expected FID for OTHER, found: " + fid);
}
- return (Exception) deserialize(fid, buffer, length - FID_CODE_SIZE);
+ return (Exception) deserialize(fid, buffer, length - FID_CODE_SIZE, Message.ERROR);
}
@Override
@@ -1515,7 +1516,7 @@
JavaSerializationBasedPayloadSerializerDeserializer.serialize(out, object);
}
- private Object deserialize(byte fid, ByteBuffer buffer, int length) throws Exception {
+ private Object deserialize(byte fid, ByteBuffer buffer, int length, byte flag) throws Exception {
switch (FunctionId.values()[fid]) {
case REGISTER_PARTITION_PROVIDER:
return RegisterPartitionProviderFunction.deserialize(buffer, length);
@@ -1542,7 +1543,7 @@
return CleanupJobletFunction.deserialize(buffer, length);
}
- return javaSerde.deserializeObject(buffer, length);
+ return javaSerde.deserializeObject(buffer, length, flag);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCHandle.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCHandle.java
index d66f233..6920cfb 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCHandle.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IIPCHandle.java
@@ -31,5 +31,7 @@
public Object getAttachment();
+ public int getAttachmentLen();
+
public boolean isConnected();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IPayloadSerializerDeserializer.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IPayloadSerializerDeserializer.java
index 1d2c754..2b69513 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IPayloadSerializerDeserializer.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/IPayloadSerializerDeserializer.java
@@ -21,7 +21,7 @@
import java.nio.ByteBuffer;
public interface IPayloadSerializerDeserializer {
- public Object deserializeObject(ByteBuffer buffer, int length) throws Exception;
+ public Object deserializeObject(ByteBuffer buffer, int length, byte flag) throws Exception;
public Exception deserializeException(ByteBuffer buffer, int length) throws Exception;
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java
index ddcc677..cc0a852 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCHandle.java
@@ -39,6 +39,8 @@
private Object attachment;
+ private int attachmentLen;
+
private ByteBuffer inBuffer;
private ByteBuffer outBuffer;
@@ -95,6 +97,11 @@
return attachment;
}
+ @Override
+ public int getAttachmentLen() {
+ return attachmentLen;
+ }
+
SelectionKey getKey() {
return key;
}
@@ -178,6 +185,7 @@
throw new IllegalStateException();
}
} else {
+ attachmentLen = message.getPayloadLen();
system.deliverIncomingMessage(message);
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/JavaSerializationBasedPayloadSerializerDeserializer.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/JavaSerializationBasedPayloadSerializerDeserializer.java
index 439f230..64befde 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/JavaSerializationBasedPayloadSerializerDeserializer.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/JavaSerializationBasedPayloadSerializerDeserializer.java
@@ -33,7 +33,7 @@
public class JavaSerializationBasedPayloadSerializerDeserializer implements IPayloadSerializerDeserializer {
@Override
- public Object deserializeObject(ByteBuffer buffer, int length) throws Exception {
+ public Object deserializeObject(ByteBuffer buffer, int length, byte flag) throws Exception {
return deserialize(buffer, length);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/Message.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/Message.java
index 550ce45..5f73890 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/Message.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/Message.java
@@ -22,18 +22,18 @@
import org.apache.hyracks.ipc.api.IPayloadSerializerDeserializer;
-class Message {
+public class Message {
private static final int MSG_SIZE_SIZE = 4;
- private static final int HEADER_SIZE = 17;
+ public static final int HEADER_SIZE = 17;
- static final byte INITIAL_REQ = 1;
+ public static final byte INITIAL_REQ = 1;
- static final byte INITIAL_ACK = 2;
+ public static final byte INITIAL_ACK = 2;
- static final byte ERROR = 3;
+ public static final byte ERROR = 3;
- static final byte NORMAL = 0;
+ public static final byte NORMAL = 0;
private IPCHandle ipcHandle;
@@ -45,7 +45,9 @@
private Object payload;
- Message(IPCHandle ipcHandle) {
+ private int payloadLen;
+
+ public Message(IPCHandle ipcHandle) {
this.ipcHandle = ipcHandle;
}
@@ -53,31 +55,31 @@
return ipcHandle;
}
- void setMessageId(long messageId) {
+ public void setMessageId(long messageId) {
this.messageId = messageId;
}
- long getMessageId() {
+ public long getMessageId() {
return messageId;
}
- void setRequestMessageId(long requestMessageId) {
+ public void setRequestMessageId(long requestMessageId) {
this.requestMessageId = requestMessageId;
}
- long getRequestMessageId() {
+ public long getRequestMessageId() {
return requestMessageId;
}
- void setFlag(byte flag) {
+ public void setFlag(byte flag) {
this.flag = flag;
}
- byte getFlag() {
+ public byte getFlag() {
return flag;
}
- void setPayload(Object payload) {
+ public void setPayload(Object payload) {
this.payload = payload;
}
@@ -85,15 +87,19 @@
return payload;
}
- static boolean hasMessage(ByteBuffer buffer) {
+ int getPayloadLen() {
+ return payloadLen;
+ }
+
+ public static boolean hasMessage(ByteBuffer buffer) {
if (buffer.remaining() < MSG_SIZE_SIZE) {
return false;
}
int msgSize = buffer.getInt(buffer.position());
- return buffer.remaining() >= msgSize + MSG_SIZE_SIZE;
+ return msgSize > 0 && buffer.remaining() >= msgSize + MSG_SIZE_SIZE;
}
- void read(ByteBuffer buffer) throws Exception {
+ public void read(ByteBuffer buffer) throws Exception {
assert hasMessage(buffer);
int msgSize = buffer.getInt();
messageId = buffer.getLong();
@@ -101,29 +107,49 @@
flag = buffer.get();
int finalPosition = buffer.position() + msgSize - HEADER_SIZE;
int length = msgSize - HEADER_SIZE;
+ payloadLen = length;
try {
IPayloadSerializerDeserializer serde = ipcHandle.getIPCSystem().getSerializerDeserializer();
- payload = flag == ERROR ? serde.deserializeException(buffer, length)
- : serde.deserializeObject(buffer, length);
+ switch (flag) {
+ case NORMAL:
+ case INITIAL_ACK:
+ case INITIAL_REQ:
+ payload = serde.deserializeObject(buffer, length, flag);
+ break;
+ case ERROR:
+ payload = serde.deserializeException(buffer, length);
+ break;
+ default:
+ throw new UnsupportedOperationException("Unknown message flag");
+ }
+
} finally {
buffer.position(finalPosition);
}
}
- boolean write(ByteBuffer buffer) throws Exception {
+ public boolean write(ByteBuffer buffer) throws Exception {
IPayloadSerializerDeserializer serde = ipcHandle.getIPCSystem().getSerializerDeserializer();
+ return write(buffer, serde);
+ }
+
+ public boolean write(ByteBuffer buffer, IPayloadSerializerDeserializer serde) throws Exception {
byte[] bytes = flag == ERROR ? serde.serializeException((Exception) payload) : serde.serializeObject(payload);
if (buffer.remaining() >= MSG_SIZE_SIZE + HEADER_SIZE + bytes.length) {
- buffer.putInt(HEADER_SIZE + bytes.length);
- buffer.putLong(messageId);
- buffer.putLong(requestMessageId);
- buffer.put(flag);
+ writeHeader(buffer, bytes.length, messageId, requestMessageId, flag);
buffer.put(bytes);
return true;
}
return false;
}
+ public static void writeHeader(ByteBuffer buffer, int dlen, long messageId, long requestMessageId, byte flag) {
+ buffer.putInt(HEADER_SIZE + dlen);
+ buffer.putLong(messageId);
+ buffer.putLong(requestMessageId);
+ buffer.put(flag);
+ }
+
@Override
public String toString() {
return "MSG[" + messageId + ":" + requestMessageId + ":" + flag + ":" + payload + "]";
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/ReconnectingIPCHandle.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/ReconnectingIPCHandle.java
index a3578ad..ff048a4 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/ReconnectingIPCHandle.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/ReconnectingIPCHandle.java
@@ -65,6 +65,11 @@
}
@Override
+ public int getAttachmentLen() {
+ return delegate.getAttachmentLen();
+ }
+
+ @Override
public boolean isConnected() {
return delegate.isConnected();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/string/UTF8StringUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/string/UTF8StringUtil.java
index ebc1301..e1a7cac 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/string/UTF8StringUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/string/UTF8StringUtil.java
@@ -185,6 +185,11 @@
public static int getStringLength(byte[] b, int s) {
int len = getUTFLength(b, s);
int pos = s + getNumBytesToStoreLength(len);
+ return getStringLength(b, pos, len);
+ }
+
+ public static int getStringLength(byte[] b, int offs, int len) {
+ int pos = offs;
int end = pos + len;
int charCount = 0;
while (pos < end) {