[ASTERIXDB-2895][RT] Vsize buffers in PyUDF IPC
- user mode changes: no
- storage format changes: no
- interface changes: no
Details:
- Convert most uses of ByteBuffer to ArrayBackedValueStorage
so that the size of the buffer can grow arbitrarily with
the data
- Convert ADM-to-Msgpack serialiation to use IVisitablePointable
- Convert all serialization interfaces that used ByteBuffer
to use DataOutput instead
- Fix UTF8 encoding bugs by using StandardToModifiedUTF8DataOutput
- Adapt some of the UTF8 printing code to be used for
UTF8 output to msgpack
- Fix CSV output printer to not ignore surrogate pairs
- Fix ASTERIXDB-29773 (returned records from PyUDF aren't sorted)
Change-Id: Ic95e592b42139b4750af8bb20291f926b3c973e2
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/12643
Reviewed-by: Wael Alkowaileet <wael.y.k@gmail.com>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Ian Maxon <imaxon@uci.edu>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/big_object_roundtrip_20M/big_object_roundtrip_20M.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/big_object_roundtrip_20M/big_object_roundtrip_20M.1.ddl.sqlpp
new file mode 100644
index 0000000..d3bf31c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/big_object_roundtrip_20M/big_object_roundtrip_20M.1.ddl.sqlpp
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse testdv2 if exists;
+create dataverse testdv2;
+use testdv2;
+
+create type testtype as closed {
+ id: int64,
+ name: string,
+ hobbies: {{string}}
+};
+
+create dataset testds(testtype) primary key id;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/big_object_roundtrip_20M/big_object_roundtrip_20M.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/big_object_roundtrip_20M/big_object_roundtrip_20M.2.update.sqlpp
new file mode 100644
index 0000000..0353284
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/big_object_roundtrip_20M/big_object_roundtrip_20M.2.update.sqlpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use testdv2;
+
+set `compiler.sortmemory` "64MB";
+
+load dataset testds
+using localfs
+(("path"="asterix_nc1://target/data/big-object/big_object_20M.adm"),("format"="adm"));
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/big_object_roundtrip_20M/big_object_roundtrip_20M.3.lib.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/big_object_roundtrip_20M/big_object_roundtrip_20M.3.lib.sqlpp
new file mode 100644
index 0000000..923dcb2
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/big_object_roundtrip_20M/big_object_roundtrip_20M.3.lib.sqlpp
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+install testdv2 testlib python admin admin target/TweetSent.pyz
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/big_object_roundtrip_20M/big_object_roundtrip_20M.4.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/big_object_roundtrip_20M/big_object_roundtrip_20M.4.ddl.sqlpp
new file mode 100644
index 0000000..8b304fc
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/big_object_roundtrip_20M/big_object_roundtrip_20M.4.ddl.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ USE testdv2;
+
+create function roundtrip(s) as "roundtrip",
+ "Tests.roundstr" at testlib;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/big_object_roundtrip_20M/big_object_roundtrip_20M.5.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/big_object_roundtrip_20M/big_object_roundtrip_20M.5.query.sqlpp
new file mode 100644
index 0000000..fdb285e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/big_object_roundtrip_20M/big_object_roundtrip_20M.5.query.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use testdv2;
+
+select value roundtrip(d)
+from testds d
+where d.id=1;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.4.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.4.query.sqlpp
index e17f03f..72dc83c 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.4.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.4.query.sqlpp
@@ -28,4 +28,5 @@
select element result
from Animals as test
with result as roundtrip(test)[0].class.fullClassification.lower.lower.lower.lower.lower.lower.Species
-order by result;
+order by result
+;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/python_open_type_validation/type_validation.2.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/python_open_type_validation/type_validation.2.ddl.sqlpp
index 8041b5e..ff61810 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/python_open_type_validation/type_validation.2.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/python_open_type_validation/type_validation.2.ddl.sqlpp
@@ -23,3 +23,6 @@
create function typeValidationNullCall(a, b, c, d, e, f, g, h)
as "roundtrip", "Tests.roundtrip" at testlib with {"null-call": true};
+
+create function stringTest(s) as "roundtrip",
+ "Tests.roundtrip" at testlib;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/big_object_roundtrip_20M/big_object_roundtrip_20M.1.adm.template b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/big_object_roundtrip_20M/big_object_roundtrip_20M.1.adm.template
new file mode 100644
index 0000000..a0cc7aa
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/big_object_roundtrip_20M/big_object_roundtrip_20M.1.adm.template
@@ -0,0 +1 @@
+{ "id": 1, "name": "Person One", "hobbies": [ "%lorembytes:20971520%" ] }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.13.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.13.adm
index 65a7c81..c960344 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.13.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.13.adm
@@ -1,9 +1,9 @@
{ "id": 670301227662491648, "len": 20, "sent": 1 }
{ "id": 670301227553566720, "len": 139, "sent": 0 }
{ "id": 670301227041857536, "len": 112, "sent": 0 }
-{ "id": 670301227037519876, "len": 33, "sent": 0 }
+{ "id": 670301227037519876, "len": 34, "sent": 0 }
{ "id": 670301226987159552, "len": 57, "sent": 0 }
-{ "id": 670301226513391616, "len": 28, "sent": 1 }
+{ "id": 670301226513391616, "len": 29, "sent": 1 }
{ "id": 670301226337202180, "len": 77, "sent": 1 }
{ "id": 670301226190278656, "len": 25, "sent": 0 }
{ "id": 670301225959579648, "len": 112, "sent": 1 }
@@ -15,25 +15,25 @@
{ "id": 670301225162661889, "len": 28, "sent": 1 }
{ "id": 670301224885837824, "len": 63, "sent": 0 }
{ "id": 670301224814698496, "len": 59, "sent": 0 }
-{ "id": 670301224709849090, "len": 33, "sent": 1 }
+{ "id": 670301224709849090, "len": 37, "sent": 1 }
{ "id": 670301224684556288, "len": 21, "sent": 0 }
{ "id": 670301224680480768, "len": 39, "sent": 0 }
{ "id": 670301224348946433, "len": 64, "sent": 1 }
{ "id": 670301224261058560, "len": 61, "sent": 1 }
-{ "id": 670301224231690240, "len": 33, "sent": 0 }
-{ "id": 670301224214794240, "len": 33, "sent": 0 }
+{ "id": 670301224231690240, "len": 34, "sent": 0 }
+{ "id": 670301224214794240, "len": 41, "sent": 0 }
{ "id": 670301223753351168, "len": 105, "sent": 1 }
{ "id": 670301223426367488, "len": 23, "sent": 0 }
-{ "id": 670301223216545792, "len": 31, "sent": 0 }
+{ "id": 670301223216545792, "len": 34, "sent": 0 }
{ "id": 670301223182974976, "len": 34, "sent": 1 }
{ "id": 670301223128535041, "len": 21, "sent": 0 }
{ "id": 670301222759301121, "len": 132, "sent": 0 }
{ "id": 670301222734307329, "len": 110, "sent": 1 }
{ "id": 670301222717419520, "len": 81, "sent": 0 }
{ "id": 670301222318936064, "len": 110, "sent": 1 }
-{ "id": 670301222302150657, "len": 131, "sent": 0 }
+{ "id": 670301222302150657, "len": 135, "sent": 0 }
{ "id": 670301222222602240, "len": 43, "sent": 1 }
-{ "id": 670301222113517568, "len": 27, "sent": 0 }
+{ "id": 670301222113517568, "len": 29, "sent": 0 }
{ "id": 670301221836615680, "len": 44, "sent": 1 }
{ "id": 670301221719310336, "len": 28, "sent": 0 }
{ "id": 670301221442486272, "len": 34, "sent": 0 }
@@ -44,13 +44,13 @@
{ "id": 670301220305821696, "len": 140, "sent": 0 }
{ "id": 670301220247072770, "len": 83, "sent": 1 }
{ "id": 670301220196626432, "len": 36, "sent": 0 }
-{ "id": 670301220079312901, "len": 31, "sent": 1 }
-{ "id": 670301219949305857, "len": 70, "sent": 1 }
+{ "id": 670301220079312901, "len": 32, "sent": 1 }
+{ "id": 670301219949305857, "len": 94, "sent": 1 }
{ "id": 670301219739574273, "len": 131, "sent": 1 }
{ "id": 670301219206877184, "len": 27, "sent": 0 }
{ "id": 670301219139620864, "len": 124, "sent": 0 }
-{ "id": 670301218737123328, "len": 124, "sent": 0 }
-{ "id": 670301218640531458, "len": 31, "sent": 1 }
+{ "id": 670301218737123328, "len": 126, "sent": 0 }
+{ "id": 670301218640531458, "len": 33, "sent": 1 }
{ "id": 670301218598756352, "len": 47, "sent": 0 }
{ "id": 670301218565156865, "len": 44, "sent": 0 }
{ "id": 670301218414206976, "len": 71, "sent": 1 }
@@ -58,14 +58,14 @@
{ "id": 670301218078629888, "len": 9, "sent": 0 }
{ "id": 670301217851990017, "len": 111, "sent": 0 }
{ "id": 670301217793269760, "len": 113, "sent": 0 }
-{ "id": 670301217508036608, "len": 47, "sent": 0 }
+{ "id": 670301217508036608, "len": 55, "sent": 0 }
{ "id": 670301217369657344, "len": 137, "sent": 0 }
-{ "id": 670301217311088641, "len": 28, "sent": 0 }
+{ "id": 670301217311088641, "len": 29, "sent": 0 }
{ "id": 670301217231347712, "len": 123, "sent": 0 }
{ "id": 670301216891473920, "len": 44, "sent": 0 }
{ "id": 670301216874721280, "len": 68, "sent": 0 }
{ "id": 670301216799232000, "len": 50, "sent": 1 }
-{ "id": 670301216669171713, "len": 54, "sent": 0 }
+{ "id": 670301216669171713, "len": 55, "sent": 0 }
{ "id": 670301216493060097, "len": 113, "sent": 1 }
{ "id": 670301216400924676, "len": 35, "sent": 1 }
{ "id": 670301216371552258, "len": 58, "sent": 0 }
@@ -78,21 +78,21 @@
{ "id": 670301214958055424, "len": 58, "sent": 1 }
{ "id": 670301214605733888, "len": 139, "sent": 1 }
{ "id": 670301214509129728, "len": 114, "sent": 1 }
-{ "id": 670301214442041344, "len": 18, "sent": 1 }
+{ "id": 670301214442041344, "len": 19, "sent": 1 }
{ "id": 670301214295392256, "len": 47, "sent": 0 }
-{ "id": 670301213737529344, "len": 9, "sent": 0 }
+{ "id": 670301213737529344, "len": 10, "sent": 0 }
{ "id": 670301213544595457, "len": 63, "sent": 1 }
{ "id": 670301213515235333, "len": 107, "sent": 0 }
{ "id": 670301213464899584, "len": 105, "sent": 1 }
{ "id": 670301213120942080, "len": 39, "sent": 0 }
{ "id": 670301212961603585, "len": 63, "sent": 0 }
-{ "id": 670301212961603584, "len": 20, "sent": 0 }
-{ "id": 670301212856737792, "len": 51, "sent": 0 }
+{ "id": 670301212961603584, "len": 25, "sent": 0 }
+{ "id": 670301212856737792, "len": 55, "sent": 0 }
{ "id": 670301212760117248, "len": 133, "sent": 1 }
{ "id": 670301211808010240, "len": 103, "sent": 0 }
-{ "id": 670301211774468096, "len": 40, "sent": 0 }
+{ "id": 670301211774468096, "len": 41, "sent": 0 }
{ "id": 670301211703144450, "len": 138, "sent": 1 }
-{ "id": 670301211581685761, "len": 25, "sent": 1 }
+{ "id": 670301211581685761, "len": 26, "sent": 1 }
{ "id": 670301211560685568, "len": 12, "sent": 1 }
{ "id": 670301211090751490, "len": 140, "sent": 0 }
{ "id": 670301210654699520, "len": 13, "sent": 0 }
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_it_python.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_it_python.xml
index 5fc7316..686ede2 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_it_python.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_it_python.xml
@@ -85,5 +85,10 @@
<output-dir compare="Text">big_object_pyudf</output-dir>
</compilation-unit>
</test-case>
+ <test-case FilePath="external-library">
+ <compilation-unit name="big_object_roundtrip_20M">
+ <output-dir compare="Text">big_object_roundtrip_20M</output-dir>
+ </compilation-unit>
+ </test-case>
</test-group>
</test-suite>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index d49dffc..84e5c22 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -85,6 +85,7 @@
EXTERNAL_UDF_RESULT_TYPE_ERROR(200),
EXTERNAL_UDF_EXCEPTION(201),
+ EXTERNAL_UDF_PROTO_RETURN_EXCEPTION(202),
// Compilation errors
PARSE_ERROR(1001),
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 136e169..be4f512 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -92,6 +92,7 @@
200 = External UDF cannot produce expected result. Please check the UDF configuration
201 = External UDF returned exception. Returned exception was: %1$s
+202 = External UDF protocol encountered an unexpected return result.
# Compile-time check errors
1001 = Syntax error: %1$s
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/builders/ListLikeNumericArrayFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/builders/ListLikeNumericArrayFactory.java
new file mode 100644
index 0000000..e53f157
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/builders/ListLikeNumericArrayFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream.builders;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.asterix.om.util.container.IObjectFactory;
+
+public class ListLikeNumericArrayFactory<T extends Number> implements IObjectFactory<List<T>, T> {
+
+ @Override
+ public List<T> create(T arg) {
+ List<T> list = new ArrayList<>(arg.intValue());
+ list.addAll(Collections.nCopies(arg.intValue(), arg));
+ return list;
+ }
+
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/builders/StandardToModifiedUTF8DataOutputFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/builders/StandardToModifiedUTF8DataOutputFactory.java
new file mode 100644
index 0000000..606aee2
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/builders/StandardToModifiedUTF8DataOutputFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream.builders;
+
+import org.apache.asterix.dataflow.data.nontagged.serde.AStringSerializerDeserializer;
+import org.apache.asterix.external.input.stream.StandardUTF8ToModifiedUTF8DataOutput;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.util.container.IObjectFactory;
+import org.apache.hyracks.util.string.UTF8StringReader;
+import org.apache.hyracks.util.string.UTF8StringWriter;
+
+public class StandardToModifiedUTF8DataOutputFactory
+ implements IObjectFactory<StandardUTF8ToModifiedUTF8DataOutput, ATypeTag> {
+
+ @Override
+ public StandardUTF8ToModifiedUTF8DataOutput create(ATypeTag type) {
+ return new StandardUTF8ToModifiedUTF8DataOutput(
+ new AStringSerializerDeserializer(new UTF8StringWriter(), new UTF8StringReader()));
+ }
+
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonIPCProto.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonIPCProto.java
index cd7ec18..c803517 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonIPCProto.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonIPCProto.java
@@ -18,14 +18,28 @@
import static org.apache.hyracks.ipc.impl.Message.HEADER_SIZE;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.external.library.msgpack.MsgPackPointableVisitor;
+import org.apache.asterix.om.pointables.AFlatValuePointable;
+import org.apache.asterix.om.pointables.AListVisitablePointable;
+import org.apache.asterix.om.pointables.ARecordVisitablePointable;
+import org.apache.asterix.om.pointables.PointableAllocator;
+import org.apache.asterix.om.pointables.base.IVisitablePointable;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.EnumDeserializer;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.types.TypeTagUtil;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.ipc.impl.Message;
import org.msgpack.core.MessagePack;
import org.msgpack.core.MessageUnpacker;
@@ -33,26 +47,32 @@
public class PythonIPCProto {
- private PythonMessageBuilder messageBuilder;
- private OutputStream sockOut;
- private ByteBuffer headerBuffer = ByteBuffer.allocate(21);
+ private final PythonMessageBuilder messageBuilder;
+ private final DataOutputStream sockOut;
+ private final ByteBuffer headerBuffer = ByteBuffer.allocate(21);
private ByteBuffer recvBuffer = ByteBuffer.allocate(32768);
- private ExternalFunctionResultRouter router;
+ private final ExternalFunctionResultRouter router;
private long routeId;
private Pair<ByteBuffer, Exception> bufferBox;
- private Process pythonProc;
+ private final Process pythonProc;
private long maxFunctionId;
- private ArrayBufferInput unpackerInput;
- private MessageUnpacker unpacker;
+ private final ArrayBufferInput unpackerInput;
+ private final MessageUnpacker unpacker;
+ private final ArrayBackedValueStorage argsStorage;
+ private final PointableAllocator pointableAllocator;
+ private final MsgPackPointableVisitor pointableVisitor;
public PythonIPCProto(OutputStream sockOut, ExternalFunctionResultRouter router, Process pythonProc) {
- this.sockOut = sockOut;
+ this.sockOut = new DataOutputStream(sockOut);
messageBuilder = new PythonMessageBuilder();
this.router = router;
this.pythonProc = pythonProc;
- this.maxFunctionId = 0l;
+ this.maxFunctionId = 0L;
unpackerInput = new ArrayBufferInput(new byte[0]);
unpacker = MessagePack.newDefaultUnpacker(unpackerInput);
+ this.argsStorage = new ArrayBackedValueStorage();
+ this.pointableAllocator = new PointableAllocator();
+ this.pointableVisitor = new MsgPackPointableVisitor();
}
public void start() {
@@ -65,10 +85,10 @@
recvBuffer.clear();
recvBuffer.position(0);
recvBuffer.limit(0);
- messageBuilder.buf.clear();
- messageBuilder.buf.position(0);
+ messageBuilder.reset();
messageBuilder.hello();
- sendMsg(routeId);
+ sendHeader(routeId, messageBuilder.getLength());
+ sendMsg();
receiveMsg();
if (getResponseType() != MessageType.HELO) {
throw HyracksDataException.create(org.apache.hyracks.api.exceptions.ErrorCode.ILLEGAL_STATE,
@@ -81,10 +101,10 @@
recvBuffer.clear();
recvBuffer.position(0);
recvBuffer.limit(0);
- messageBuilder.buf.clear();
- messageBuilder.buf.position(0);
+ messageBuilder.reset();
messageBuilder.init(module, clazz, fn);
- sendMsg(functionId);
+ sendHeader(functionId, messageBuilder.getLength());
+ sendMsg();
receiveMsg();
if (getResponseType() != MessageType.INIT_RSP) {
throw HyracksDataException.create(org.apache.hyracks.api.exceptions.ErrorCode.ILLEGAL_STATE,
@@ -93,14 +113,21 @@
return functionId;
}
- public ByteBuffer call(long functionId, ByteBuffer args, int numArgs) throws IOException, AsterixException {
+ public ByteBuffer call(long functionId, IAType[] argTypes, IValueReference[] argValues, boolean nullCall)
+ throws IOException, AsterixException {
recvBuffer.clear();
recvBuffer.position(0);
recvBuffer.limit(0);
- messageBuilder.buf.clear();
- messageBuilder.buf.position(0);
- messageBuilder.call(args.array(), args.position(), numArgs);
- sendMsg(functionId);
+ messageBuilder.reset();
+ argsStorage.reset();
+ for (int i = 0; i < argTypes.length; i++) {
+ visitValueRef(argTypes[i], argsStorage.getDataOutput(), argValues[i], pointableAllocator, pointableVisitor,
+ nullCall);
+ }
+ int len = argsStorage.getLength() + 5;
+ sendHeader(functionId, len);
+ messageBuilder.call(argValues.length, len);
+ sendMsg(argsStorage);
receiveMsg();
if (getResponseType() != MessageType.CALL_RSP) {
throw HyracksDataException.create(org.apache.hyracks.api.exceptions.ErrorCode.ILLEGAL_STATE,
@@ -109,14 +136,16 @@
return recvBuffer;
}
- public ByteBuffer callMulti(long key, ByteBuffer args, int numTuples) throws IOException, AsterixException {
+ public ByteBuffer callMulti(long key, ArrayBackedValueStorage args, int numTuples)
+ throws IOException, AsterixException {
recvBuffer.clear();
recvBuffer.position(0);
recvBuffer.limit(0);
- messageBuilder.buf.clear();
- messageBuilder.buf.position(0);
- messageBuilder.callMulti(args.array(), args.position(), numTuples);
- sendMsg(key);
+ messageBuilder.reset();
+ int len = args.getLength() + 4;
+ sendHeader(key, len);
+ messageBuilder.callMulti(0, numTuples);
+ sendMsg(args);
receiveMsg();
if (getResponseType() != MessageType.CALL_RSP) {
throw HyracksDataException.create(org.apache.hyracks.api.exceptions.ErrorCode.ILLEGAL_STATE,
@@ -132,7 +161,7 @@
}
public void receiveMsg() throws IOException, AsterixException {
- Exception except = null;
+ Exception except;
try {
synchronized (bufferBox) {
while ((bufferBox.getFirst().limit() == 0 || bufferBox.getSecond() != null) && pythonProc.isAlive()) {
@@ -162,15 +191,27 @@
}
}
- public void sendMsg(long key) throws IOException {
+ public void sendHeader(long key, int msgLen) throws IOException {
headerBuffer.clear();
headerBuffer.position(0);
- headerBuffer.putInt(HEADER_SIZE + Integer.BYTES + messageBuilder.buf.position());
+ headerBuffer.putInt(HEADER_SIZE + Integer.BYTES + msgLen);
headerBuffer.putLong(key);
headerBuffer.putLong(routeId);
headerBuffer.put(Message.NORMAL);
sockOut.write(headerBuffer.array(), 0, HEADER_SIZE + Integer.BYTES);
- sockOut.write(messageBuilder.buf.array(), 0, messageBuilder.buf.position());
+ sockOut.flush();
+ }
+
+ public void sendMsg(ArrayBackedValueStorage content) throws IOException {
+ sockOut.write(messageBuilder.getBuf().array(), messageBuilder.getBuf().arrayOffset(),
+ messageBuilder.getBuf().position());
+ sockOut.write(content.getByteArray(), content.getStartOffset(), content.getLength());
+ sockOut.flush();
+ }
+
+ public void sendMsg() throws IOException {
+ sockOut.write(messageBuilder.getBuf().array(), messageBuilder.getBuf().arrayOffset(),
+ messageBuilder.getBuf().position());
sockOut.flush();
}
@@ -182,4 +223,68 @@
return routeId;
}
+ public DataOutputStream getSockOut() {
+ return sockOut;
+ }
+
+ public static void visitValueRef(IAType type, DataOutput out, IValueReference valueReference,
+ PointableAllocator pointableAllocator, MsgPackPointableVisitor pointableVisitor, boolean visitNull)
+ throws IOException {
+ IVisitablePointable pointable;
+ switch (type.getTypeTag()) {
+ case OBJECT:
+ pointable = pointableAllocator.allocateRecordValue(type);
+ pointable.set(valueReference);
+ pointableVisitor.visit((ARecordVisitablePointable) pointable, pointableVisitor.getTypeInfo(type, out));
+ break;
+ case ARRAY:
+ case MULTISET:
+ pointable = pointableAllocator.allocateListValue(type);
+ pointable.set(valueReference);
+ pointableVisitor.visit((AListVisitablePointable) pointable, pointableVisitor.getTypeInfo(type, out));
+ break;
+ case ANY:
+ ATypeTag rtTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER
+ .deserialize(valueReference.getByteArray()[valueReference.getStartOffset()]);
+ IAType rtType = TypeTagUtil.getBuiltinTypeByTag(rtTypeTag);
+ switch (rtTypeTag) {
+ case OBJECT:
+ pointable = pointableAllocator.allocateRecordValue(rtType);
+ pointable.set(valueReference);
+ pointableVisitor.visit((ARecordVisitablePointable) pointable,
+ pointableVisitor.getTypeInfo(rtType, out));
+ break;
+ case ARRAY:
+ case MULTISET:
+ pointable = pointableAllocator.allocateListValue(rtType);
+ pointable.set(valueReference);
+ pointableVisitor.visit((AListVisitablePointable) pointable,
+ pointableVisitor.getTypeInfo(rtType, out));
+ break;
+ case MISSING:
+ case NULL:
+ if (!visitNull) {
+ return;
+ }
+ default:
+ pointable = pointableAllocator.allocateFieldValue(rtType);
+ pointable.set(valueReference);
+ pointableVisitor.visit((AFlatValuePointable) pointable,
+ pointableVisitor.getTypeInfo(rtType, out));
+ break;
+ }
+ break;
+ case MISSING:
+ case NULL:
+ if (!visitNull) {
+ return;
+ }
+ default:
+ pointable = pointableAllocator.allocateFieldValue(type);
+ pointable.set(valueReference);
+ pointableVisitor.visit((AFlatValuePointable) pointable, pointableVisitor.getTypeInfo(type, out));
+ break;
+ }
+ }
+
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonMessageBuilder.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonMessageBuilder.java
index 5052eb4..5429657 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonMessageBuilder.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonMessageBuilder.java
@@ -26,15 +26,13 @@
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
-import org.apache.asterix.external.library.msgpack.MessagePackerFromADM;
-import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.asterix.external.library.msgpack.MessagePackUtils;
import org.apache.hyracks.api.exceptions.HyracksDataException;
public class PythonMessageBuilder {
- private static final int MAX_BUF_SIZE = 64 * 1024 * 1024; //64MB.
MessageType type;
long dataLength;
- ByteBuffer buf;
+ private final ByteBuffer buf;
public PythonMessageBuilder() {
this.type = null;
@@ -42,12 +40,25 @@
this.buf = ByteBuffer.allocate(4096);
}
+ public void reset() {
+ //TODO: should be able to get away w/o clearing buf?
+ buf.clear();
+ }
+
+ public ByteBuffer getBuf() {
+ return buf;
+ }
+
+ public int getLength() {
+ return buf.position() - buf.arrayOffset();
+ }
+
public void setType(MessageType type) {
this.type = type;
}
public void packHeader() throws HyracksDataException {
- MessagePackerFromADM.packFixPos(buf, (byte) type.ordinal());
+ MessagePackUtils.packFixPos(buf, (byte) type.ordinal());
}
//TODO: this is wrong for any multibyte chars
@@ -75,7 +86,7 @@
this.type = MessageType.QUIT;
dataLength = getStringLength("QUIT");
packHeader();
- MessagePackerFromADM.packFixStr(buf, "QUIT");
+ MessagePackUtils.packFixStr(buf, "QUIT");
}
public void init(final String module, final String clazz, final String fn) throws HyracksDataException {
@@ -89,46 +100,27 @@
}
packHeader();
int numArgs = clazz == null ? 2 : 3;
- MessagePackerFromADM.packFixArrayHeader(buf, (byte) numArgs);
- MessagePackerFromADM.packStr(buf, module);
+ MessagePackUtils.packFixArrayHeader(buf, (byte) numArgs);
+ MessagePackUtils.packStr(buf, module);
if (clazz != null) {
- MessagePackerFromADM.packStr(buf, clazz);
+ MessagePackUtils.packStr(buf, clazz);
}
- MessagePackerFromADM.packStr(buf, fn);
+ MessagePackUtils.packStr(buf, fn);
}
- public void call(byte[] args, int lim, int numArgs) throws HyracksDataException {
- if (args.length > buf.capacity()) {
- int growTo = ExternalFunctionResultRouter.closestPow2(args.length);
- if (growTo > MAX_BUF_SIZE) {
- throw HyracksDataException.create(ErrorCode.ILLEGAL_STATE,
- "Unable to allocate message buffer larger than:" + MAX_BUF_SIZE + " bytes");
- }
- buf = ByteBuffer.allocate(growTo);
- }
+ public void call(int numArgs, int len) throws HyracksDataException {
buf.clear();
buf.position(0);
this.type = MessageType.CALL;
- dataLength = 5 + 1 + lim;
+ dataLength = 5 + 1 + len;
packHeader();
//TODO: make this switch between fixarray/array16/array32
buf.put((byte) (FIXARRAY_PREFIX + 1));
- buf.put(ARRAY32);
- buf.putInt(numArgs);
- if (numArgs > 0) {
- buf.put(args, 0, lim);
- }
+ buf.put(ARRAY16);
+ buf.putShort((short) numArgs);
}
- public void callMulti(byte[] args, int lim, int numArgs) throws HyracksDataException {
- if (args.length > buf.capacity()) {
- int growTo = ExternalFunctionResultRouter.closestPow2(args.length);
- if (growTo > MAX_BUF_SIZE) {
- throw HyracksDataException.create(ErrorCode.ILLEGAL_STATE,
- "Unable to allocate message buffer larger than:" + MAX_BUF_SIZE + " bytes");
- }
- buf = ByteBuffer.allocate(growTo);
- }
+ public void callMulti(int lim, int numArgs) throws HyracksDataException {
buf.clear();
buf.position(0);
this.type = MessageType.CALL;
@@ -137,9 +129,6 @@
//TODO: make this switch between fixarray/array16/array32
buf.put(ARRAY16);
buf.putShort((short) numArgs);
- if (numArgs > 0) {
- buf.put(args, 0, lim);
- }
}
//this is used to send a serialized java inetaddress to the entrypoint so it can send it back
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarPythonFunctionEvaluator.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarPythonFunctionEvaluator.java
index 7c860a2..94a4dd2 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarPythonFunctionEvaluator.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarPythonFunctionEvaluator.java
@@ -61,6 +61,7 @@
private MessageUnpacker unpacker;
private ArrayBufferInput unpackerInput;
+ private MessageUnpackerToADM unpackerToADM;
private long fnId;
@@ -87,6 +88,7 @@
this.sourceLocation = sourceLoc;
this.unpackerInput = new ArrayBufferInput(new byte[0]);
this.unpacker = MessagePack.newDefaultUnpacker(unpackerInput);
+ this.unpackerToADM = new MessageUnpackerToADM();
}
@Override
@@ -107,18 +109,13 @@
hasNullArg = true;
}
}
- try {
- PythonLibraryEvaluator.setArgument(argTypes[i], argValues[i], argHolder, nullCall);
- } catch (IOException e) {
- throw new HyracksDataException("Error evaluating Python UDF", e);
- }
}
if (!nullCall && hasNullArg) {
PointableHelper.setNull(result);
return;
}
try {
- ByteBuffer res = libraryEvaluator.callPython(fnId, argHolder, argTypes.length);
+ ByteBuffer res = libraryEvaluator.callPython(fnId, argTypes, argValues, nullCall);
resultBuffer.reset();
wrap(res, resultBuffer.getDataOutput());
} catch (Exception e) {
@@ -133,30 +130,28 @@
outputWrapper.position(0);
try {
if (resultWrapper == null) {
- outputWrapper.put(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
- out.write(outputWrapper.array(), 0, outputWrapper.position() + outputWrapper.arrayOffset());
+ out.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
return;
}
if ((resultWrapper.get() ^ FIXARRAY_PREFIX) != (byte) 2) {
- throw HyracksDataException.create(AsterixException.create(ErrorCode.EXTERNAL_UDF_EXCEPTION,
- "Returned result missing outer wrapper"));
+ throw HyracksDataException
+ .create(AsterixException.create(ErrorCode.EXTERNAL_UDF_PROTO_RETURN_EXCEPTION));
}
int numresults = resultWrapper.get() ^ FIXARRAY_PREFIX;
if (numresults > 0) {
- MessageUnpackerToADM.unpack(resultWrapper, outputWrapper, true);
+ unpackerToADM.unpack(resultWrapper, out, true);
}
unpackerInput.reset(resultWrapper.array(), resultWrapper.position() + resultWrapper.arrayOffset(),
resultWrapper.remaining());
unpacker.reset(unpackerInput);
- int numEntries = unpacker.unpackArrayHeader();
- for (int j = 0; j < numEntries; j++) {
- outputWrapper.put(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
+ int numErrors = unpacker.unpackArrayHeader();
+ for (int j = 0; j < numErrors; j++) {
+ out.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
if (evaluatorContext.getWarningCollector().shouldWarn()) {
evaluatorContext.getWarningCollector().warn(
Warning.of(sourceLocation, ErrorCode.EXTERNAL_UDF_EXCEPTION, unpacker.unpackString()));
}
}
- out.write(outputWrapper.array(), 0, outputWrapper.position() + outputWrapper.arrayOffset());
} catch (IOException e) {
throw HyracksDataException.create(e);
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/JavaFunctionHelper.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/JavaFunctionHelper.java
index d1b2685..98755fc 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/JavaFunctionHelper.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/JavaFunctionHelper.java
@@ -56,7 +56,7 @@
private final IObjectPool<IJObject, IAType> objectPool = new ListObjectPool<>(JTypeObjectFactory.INSTANCE);
private final JObjectPointableVisitor pointableVisitor;
private final PointableAllocator pointableAllocator;
- private final Map<Integer, TypeInfo> poolTypeInfo;
+ private final Map<Integer, JavaTypeInfo> poolTypeInfo;
private final Map<String, String> parameters;
private final IAType[] argTypes;
@@ -164,10 +164,10 @@
arguments[index] = jObject;
}
- private TypeInfo getTypeInfo(int index, IAType type) {
- TypeInfo typeInfo = poolTypeInfo.get(index);
+ private JavaTypeInfo getTypeInfo(int index, IAType type) {
+ JavaTypeInfo typeInfo = poolTypeInfo.get(index);
if (typeInfo == null) {
- typeInfo = new TypeInfo(objectPool, type, type.getTypeTag());
+ typeInfo = new JavaTypeInfo(objectPool, type, type.getTypeTag());
poolTypeInfo.put(index, typeInfo);
}
return typeInfo;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/TypeInfo.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/JavaTypeInfo.java
similarity index 93%
rename from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/TypeInfo.java
rename to asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/JavaTypeInfo.java
index 453cf39..e60cc5d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/TypeInfo.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/JavaTypeInfo.java
@@ -23,13 +23,13 @@
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.om.util.container.IObjectPool;
-public class TypeInfo {
+public class JavaTypeInfo {
private IObjectPool<IJObject, IAType> objectPool;
private IAType atype;
private ATypeTag typeTag;
- public TypeInfo(IObjectPool<IJObject, IAType> objectPool, IAType atype, ATypeTag typeTag) {
+ public JavaTypeInfo(IObjectPool<IJObject, IAType> objectPool, IAType atype, ATypeTag typeTag) {
this.objectPool = objectPool;
this.atype = atype;
this.typeTag = typeTag;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PyTypeInfo.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PyTypeInfo.java
new file mode 100644
index 0000000..e2b7ad4
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PyTypeInfo.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.library;
+
+import java.io.DataOutput;
+import java.util.Objects;
+
+import org.apache.asterix.om.types.IAType;
+
+public class PyTypeInfo {
+
+ private final IAType type;
+ private final DataOutput out;
+
+ public PyTypeInfo(IAType type, DataOutput out) {
+ this.type = type;
+ this.out = out;
+ }
+
+ public DataOutput getDataOutput() {
+ return out;
+ }
+
+ public IAType getType() {
+ return type;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(out, type);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return out.equals(out) && type.equals(type);
+ }
+
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryEvaluator.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryEvaluator.java
index 457b86a..f82b30d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryEvaluator.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryEvaluator.java
@@ -34,7 +34,7 @@
import org.apache.asterix.common.library.ILibraryManager;
import org.apache.asterix.external.ipc.ExternalFunctionResultRouter;
import org.apache.asterix.external.ipc.PythonIPCProto;
-import org.apache.asterix.external.library.msgpack.MessagePackerFromADM;
+import org.apache.asterix.external.library.msgpack.MessagePackUtils;
import org.apache.asterix.om.functions.IExternalFunctionInfo;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.types.EnumDeserializer;
@@ -50,6 +50,7 @@
import org.apache.hyracks.api.resources.IDeallocatable;
import org.apache.hyracks.data.std.api.IValueReference;
import org.apache.hyracks.data.std.primitive.TaggedValuePointable;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.dataflow.std.base.AbstractStateObject;
import org.apache.hyracks.ipc.impl.IPCSystem;
@@ -86,7 +87,6 @@
this.ipcSys = ipcSys;
this.warningCollector = warningCollector;
this.sourceLoc = sourceLoc;
-
}
private void initialize() throws IOException, AsterixException {
@@ -128,10 +128,11 @@
return proto.init(packageModule, clazz, fn);
}
- public ByteBuffer callPython(long id, ByteBuffer arguments, int numArgs) throws IOException {
+ public ByteBuffer callPython(long id, IAType[] argTypes, IValueReference[] valueReferences, boolean nullCall)
+ throws IOException {
ByteBuffer ret = null;
try {
- ret = proto.call(id, arguments, numArgs);
+ ret = proto.call(id, argTypes, valueReferences, nullCall);
} catch (AsterixException e) {
if (warningCollector.shouldWarn()) {
warningCollector.warn(Warning.of(sourceLoc, EXTERNAL_UDF_EXCEPTION, e.getMessage()));
@@ -140,7 +141,7 @@
return ret;
}
- public ByteBuffer callPythonMulti(long id, ByteBuffer arguments, int numTuples) throws IOException {
+ public ByteBuffer callPythonMulti(long id, ArrayBackedValueStorage arguments, int numTuples) throws IOException {
ByteBuffer ret = null;
try {
ret = proto.callMulti(id, arguments, numTuples);
@@ -169,20 +170,6 @@
router.removeRoute(proto.getRouteId());
}
- public static ATypeTag setArgument(IAType type, IValueReference valueReference, ByteBuffer argHolder,
- boolean nullCall) throws IOException {
- ATypeTag tag = type.getTypeTag();
- if (tag == ATypeTag.ANY) {
- TaggedValuePointable pointy = TaggedValuePointable.FACTORY.createPointable();
- pointy.set(valueReference);
- ATypeTag rtTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(pointy.getTag());
- IAType rtType = TypeTagUtil.getBuiltinTypeByTag(rtTypeTag);
- return MessagePackerFromADM.pack(valueReference, rtType, argHolder, nullCall);
- } else {
- return MessagePackerFromADM.pack(valueReference, type, argHolder, nullCall);
- }
- }
-
public static ATypeTag peekArgument(IAType type, IValueReference valueReference) throws HyracksDataException {
ATypeTag tag = type.getTypeTag();
if (tag == ATypeTag.ANY) {
@@ -190,15 +177,15 @@
pointy.set(valueReference);
ATypeTag rtTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(pointy.getTag());
IAType rtType = TypeTagUtil.getBuiltinTypeByTag(rtTypeTag);
- return MessagePackerFromADM.peekUnknown(rtType);
+ return MessagePackUtils.peekUnknown(rtType);
} else {
- return MessagePackerFromADM.peekUnknown(type);
+ return MessagePackUtils.peekUnknown(type);
}
}
- public static void setVoidArgument(ByteBuffer argHolder) {
- argHolder.put(ARRAY16);
- argHolder.putShort((short) 0);
+ public static void setVoidArgument(ArrayBackedValueStorage argHolder) throws IOException {
+ argHolder.getDataOutput().writeByte(ARRAY16);
+ argHolder.getDataOutput().writeShort((short) 0);
}
public static PythonLibraryEvaluator getInstance(IExternalFunctionInfo finfo, ILibraryManager libMgr,
@@ -218,4 +205,5 @@
}
return evaluator;
}
+
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java
index 230627f..5479b56a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java
@@ -42,7 +42,7 @@
import org.apache.asterix.external.api.IJObject;
import org.apache.asterix.external.api.IJObjectAccessor;
import org.apache.asterix.external.api.IJRecordAccessor;
-import org.apache.asterix.external.library.TypeInfo;
+import org.apache.asterix.external.library.JavaTypeInfo;
import org.apache.asterix.external.library.java.base.JBoolean;
import org.apache.asterix.external.library.java.base.JByte;
import org.apache.asterix.external.library.java.base.JDate;
@@ -356,14 +356,14 @@
public static class JRecordAccessor implements IJRecordAccessor {
- private final TypeInfo typeInfo;
+ private final JavaTypeInfo typeInfo;
private final JRecord jRecord;
private final IJObject[] jObjects;
private final LinkedHashMap<String, IJObject> openFields;
private final UTF8StringReader reader = new UTF8StringReader();
public JRecordAccessor(ARecordType recordType, IObjectPool<IJObject, IAType> objectPool) {
- this.typeInfo = new TypeInfo(objectPool, null, null);
+ this.typeInfo = new JavaTypeInfo(objectPool, null, null);
this.jObjects = new IJObject[recordType.getFieldNames().length];
this.openFields = new LinkedHashMap<>();
this.jRecord = new JRecord(recordType, jObjects, openFields);
@@ -439,10 +439,10 @@
public static class JListAccessor implements IJListAccessor {
- private final TypeInfo typeInfo;
+ private final JavaTypeInfo typeInfo;
public JListAccessor(IObjectPool<IJObject, IAType> objectPool) {
- this.typeInfo = new TypeInfo(objectPool, null, null);
+ this.typeInfo = new JavaTypeInfo(objectPool, null, null);
}
@Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectPointableVisitor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectPointableVisitor.java
index 9edc569..bd64922 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectPointableVisitor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectPointableVisitor.java
@@ -25,7 +25,7 @@
import org.apache.asterix.external.api.IJObject;
import org.apache.asterix.external.api.IJObjectAccessor;
import org.apache.asterix.external.api.IJRecordAccessor;
-import org.apache.asterix.external.library.TypeInfo;
+import org.apache.asterix.external.library.JavaTypeInfo;
import org.apache.asterix.external.library.java.JObjectAccessors.JListAccessor;
import org.apache.asterix.external.library.java.JObjectAccessors.JRecordAccessor;
import org.apache.asterix.om.pointables.AFlatValuePointable;
@@ -37,7 +37,7 @@
import org.apache.asterix.om.types.ATypeTag;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-public class JObjectPointableVisitor implements IVisitablePointableVisitor<IJObject, TypeInfo> {
+public class JObjectPointableVisitor implements IVisitablePointableVisitor<IJObject, JavaTypeInfo> {
private final Map<ATypeTag, IJObjectAccessor> flatJObjectAccessors = new HashMap<ATypeTag, IJObjectAccessor>();
private final Map<IVisitablePointable, IJRecordAccessor> raccessorToJObject =
@@ -46,7 +46,7 @@
new HashMap<IVisitablePointable, IJListAccessor>();
@Override
- public IJObject visit(AListVisitablePointable accessor, TypeInfo arg) throws HyracksDataException {
+ public IJObject visit(AListVisitablePointable accessor, JavaTypeInfo arg) throws HyracksDataException {
IJObject result = null;
IJListAccessor jListAccessor = laccessorToPrinter.get(accessor);
if (jListAccessor == null) {
@@ -58,7 +58,7 @@
}
@Override
- public IJObject visit(ARecordVisitablePointable accessor, TypeInfo arg) throws HyracksDataException {
+ public IJObject visit(ARecordVisitablePointable accessor, JavaTypeInfo arg) throws HyracksDataException {
IJObject result = null;
IJRecordAccessor jRecordAccessor = raccessorToJObject.get(accessor);
if (jRecordAccessor == null) {
@@ -70,7 +70,7 @@
}
@Override
- public IJObject visit(AFlatValuePointable accessor, TypeInfo arg) throws HyracksDataException {
+ public IJObject visit(AFlatValuePointable accessor, JavaTypeInfo arg) throws HyracksDataException {
ATypeTag typeTag = arg.getTypeTag();
IJObject result = null;
IJObjectAccessor jObjectAccessor = flatJObjectAccessors.get(typeTag);
@@ -83,7 +83,8 @@
return result;
}
- public IJObject visit(AFlatValuePointable accessor, ATypeTag typeTag, TypeInfo arg) throws HyracksDataException {
+ public IJObject visit(AFlatValuePointable accessor, ATypeTag typeTag, JavaTypeInfo arg)
+ throws HyracksDataException {
IJObject result = null;
IJObjectAccessor jObjectAccessor = flatJObjectAccessors.get(typeTag);
if (jObjectAccessor == null) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/IMsgPackAccessor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/IMsgPackAccessor.java
new file mode 100644
index 0000000..6b2f0f8
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/IMsgPackAccessor.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.library.msgpack;
+
+import java.io.IOException;
+
+@FunctionalInterface
+public interface IMsgPackAccessor<T, U, R> {
+ R apply(T t, U u) throws IOException;
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/IMsgPackListAccessor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/IMsgPackListAccessor.java
new file mode 100644
index 0000000..a061a06
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/IMsgPackListAccessor.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.library.msgpack;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+@FunctionalInterface
+public interface IMsgPackListAccessor<P, D, T, V, R> {
+
+ R access(P p, D d, T t, V v) throws HyracksDataException;
+
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/IMsgPackRecordAccessor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/IMsgPackRecordAccessor.java
new file mode 100644
index 0000000..0ce417d
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/IMsgPackRecordAccessor.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.library.msgpack;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+@FunctionalInterface
+public interface IMsgPackRecordAccessor<P, D, T, V, R> {
+
+ R access(P p, D d, T t, V v) throws HyracksDataException;
+
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/MessagePackUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/MessagePackUtils.java
new file mode 100644
index 0000000..2377f9a
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/MessagePackUtils.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.library.msgpack;
+
+import static org.msgpack.core.MessagePack.Code.FIXARRAY_PREFIX;
+import static org.msgpack.core.MessagePack.Code.FIXSTR_PREFIX;
+import static org.msgpack.core.MessagePack.Code.STR32;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class MessagePackUtils {
+
+ public static ATypeTag peekUnknown(IAType type) {
+ switch (type.getTypeTag()) {
+ case MISSING:
+ case NULL:
+ return type.getTypeTag();
+ default:
+ return ATypeTag.TYPE;
+ }
+ }
+
+ public static void packFixPos(ByteBuffer buf, byte in) throws HyracksDataException {
+ byte mask = (byte) (1 << 7);
+ if ((in & mask) != 0) {
+ throw HyracksDataException.create(org.apache.hyracks.api.exceptions.ErrorCode.ILLEGAL_STATE,
+ "fixint7 must be positive");
+ }
+ buf.put(in);
+ }
+
+ public static void packFixStr(ByteBuffer buf, String in) throws HyracksDataException {
+ byte[] strBytes = in.getBytes(StandardCharsets.UTF_8);
+ if (strBytes.length > 31) {
+ throw HyracksDataException.create(org.apache.hyracks.api.exceptions.ErrorCode.ILLEGAL_STATE,
+ "fixint7 must be positive");
+ }
+ buf.put((byte) (FIXSTR_PREFIX + strBytes.length));
+ buf.put(strBytes);
+ }
+
+ public static void packStr(ByteBuffer out, String in) {
+ out.put(STR32);
+ byte[] strBytes = in.getBytes(StandardCharsets.UTF_8);
+ out.putInt(strBytes.length);
+ out.put(strBytes);
+ }
+
+ public static void packFixArrayHeader(ByteBuffer buf, byte numObj) {
+ buf.put((byte) (FIXARRAY_PREFIX + (0x0F & numObj)));
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/MessagePackerFromADM.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/MessagePackerFromADM.java
deleted file mode 100644
index f0ac56e..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/MessagePackerFromADM.java
+++ /dev/null
@@ -1,362 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.library.msgpack;
-
-import static org.msgpack.core.MessagePack.Code.ARRAY32;
-import static org.msgpack.core.MessagePack.Code.FALSE;
-import static org.msgpack.core.MessagePack.Code.FIXARRAY_PREFIX;
-import static org.msgpack.core.MessagePack.Code.FIXSTR_PREFIX;
-import static org.msgpack.core.MessagePack.Code.FLOAT32;
-import static org.msgpack.core.MessagePack.Code.FLOAT64;
-import static org.msgpack.core.MessagePack.Code.INT16;
-import static org.msgpack.core.MessagePack.Code.INT32;
-import static org.msgpack.core.MessagePack.Code.INT64;
-import static org.msgpack.core.MessagePack.Code.INT8;
-import static org.msgpack.core.MessagePack.Code.MAP32;
-import static org.msgpack.core.MessagePack.Code.NIL;
-import static org.msgpack.core.MessagePack.Code.STR32;
-import static org.msgpack.core.MessagePack.Code.TRUE;
-
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.AUnionType;
-import org.apache.asterix.om.types.AbstractCollectionType;
-import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.types.TypeTagUtil;
-import org.apache.asterix.om.utils.NonTaggedFormatUtil;
-import org.apache.asterix.om.utils.RecordUtil;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.data.std.api.IValueReference;
-import org.apache.hyracks.data.std.primitive.BooleanPointable;
-import org.apache.hyracks.data.std.primitive.BytePointable;
-import org.apache.hyracks.data.std.primitive.DoublePointable;
-import org.apache.hyracks.data.std.primitive.FloatPointable;
-import org.apache.hyracks.data.std.primitive.IntegerPointable;
-import org.apache.hyracks.data.std.primitive.LongPointable;
-import org.apache.hyracks.data.std.primitive.ShortPointable;
-import org.apache.hyracks.util.string.UTF8StringUtil;
-
-public class MessagePackerFromADM {
-
- private static final int TYPE_TAG_SIZE = 1;
- private static final int TYPE_SIZE = 1;
- private static final int LENGTH_SIZE = 4;
- private static final int ITEM_COUNT_SIZE = 4;
- private static final int ITEM_OFFSET_SIZE = 4;
-
- public static ATypeTag pack(IValueReference ptr, IAType type, ByteBuffer out, boolean packUnknown)
- throws HyracksDataException {
- return pack(ptr.getByteArray(), ptr.getStartOffset(), type, true, packUnknown, out);
- }
-
- public static ATypeTag pack(byte[] ptr, int offs, IAType type, boolean tagged, boolean packUnknown, ByteBuffer out)
- throws HyracksDataException {
- int relOffs = tagged ? offs + 1 : offs;
- ATypeTag tag = type.getTypeTag();
- switch (tag) {
- case STRING:
- packStr(ptr, relOffs, out);
- break;
- case BOOLEAN:
- if (BooleanPointable.getBoolean(ptr, relOffs)) {
- out.put(TRUE);
- } else {
- out.put(FALSE);
- }
- break;
- case TINYINT:
- packByte(out, BytePointable.getByte(ptr, relOffs));
- break;
- case SMALLINT:
- packShort(out, ShortPointable.getShort(ptr, relOffs));
- break;
- case INTEGER:
- packInt(out, IntegerPointable.getInteger(ptr, relOffs));
- break;
- case BIGINT:
- packLong(out, LongPointable.getLong(ptr, relOffs));
- break;
- case FLOAT:
- packFloat(out, FloatPointable.getFloat(ptr, relOffs));
- break;
- case DOUBLE:
- packDouble(out, DoublePointable.getDouble(ptr, relOffs));
- break;
- case ARRAY:
- case MULTISET:
- packArray(ptr, offs, type, out);
- break;
- case OBJECT:
- packObject(ptr, offs, type, out);
- break;
- case MISSING:
- case NULL:
- if (packUnknown) {
- packNull(out);
- break;
- } else {
- return tag;
- }
- default:
- throw HyracksDataException.create(AsterixException.create(ErrorCode.PARSER_ADM_DATA_PARSER_CAST_ERROR,
- tag.name(), "to a msgpack"));
- }
- return ATypeTag.TYPE;
- }
-
- public static ATypeTag peekUnknown(IAType type) {
- switch (type.getTypeTag()) {
- case MISSING:
- case NULL:
- return type.getTypeTag();
- default:
- return ATypeTag.TYPE;
- }
- }
-
- public static void packNull(ByteBuffer out) {
- out.put(NIL);
- }
-
- public static void packByte(ByteBuffer out, byte in) {
- out.put(INT8);
- out.put(in);
- }
-
- public static void packShort(ByteBuffer out, short in) {
- out.put(INT16);
- out.putShort(in);
- }
-
- public static void packInt(ByteBuffer out, int in) {
- out.put(INT32);
- out.putInt(in);
-
- }
-
- public static void packLong(ByteBuffer out, long in) {
- out.put(INT64);
- out.putLong(in);
- }
-
- public static void packFloat(ByteBuffer out, float in) {
- out.put(FLOAT32);
- out.putFloat(in);
- }
-
- public static void packDouble(ByteBuffer out, double in) {
- out.put(FLOAT64);
- out.putDouble(in);
- }
-
- public static void packFixPos(ByteBuffer out, byte in) throws HyracksDataException {
- byte mask = (byte) (1 << 7);
- if ((in & mask) != 0) {
- throw HyracksDataException.create(org.apache.hyracks.api.exceptions.ErrorCode.ILLEGAL_STATE,
- "fixint7 must be positive");
- }
- out.put(in);
- }
-
- public static void packFixStr(ByteBuffer buf, String in) throws HyracksDataException {
- byte[] strBytes = in.getBytes(StandardCharsets.UTF_8);
- if (strBytes.length > 31) {
- throw HyracksDataException.create(org.apache.hyracks.api.exceptions.ErrorCode.ILLEGAL_STATE,
- "fixint7 must be positive");
- }
- buf.put((byte) (FIXSTR_PREFIX + strBytes.length));
- buf.put(strBytes);
- }
-
- public static void packStr(ByteBuffer out, String in) {
- out.put(STR32);
- byte[] strBytes = in.getBytes(StandardCharsets.UTF_8);
- out.putInt(strBytes.length);
- out.put(strBytes);
- }
-
- private static void packStr(byte[] in, int offs, ByteBuffer out) {
- out.put(STR32);
- //TODO: tagged/untagged. closed support is borked so always tagged rn
- String str = UTF8StringUtil.toString(in, offs);
- byte[] strBytes = str.getBytes(StandardCharsets.UTF_8);
- out.putInt(strBytes.length);
- out.put(strBytes);
- }
-
- public static void packStr(String str, ByteBuffer out) {
- out.put(STR32);
- byte[] strBytes = str.getBytes(StandardCharsets.UTF_8);
- out.putInt(strBytes.length);
- out.put(strBytes);
- }
-
- private static void packArray(byte[] in, int offs, IAType type, ByteBuffer out) throws HyracksDataException {
- //TODO: - could optimize to pack fixarray/array16 for small arrays
- // - this code is basically a static version of AListPointable, could be deduped
- AbstractCollectionType collType = (AbstractCollectionType) type;
- out.put(ARRAY32);
- int lenOffs = offs + TYPE_TAG_SIZE + TYPE_SIZE;
- int itemCtOffs = LENGTH_SIZE + lenOffs;
- int itemCt = IntegerPointable.getInteger(in, itemCtOffs);
- boolean fixType = NonTaggedFormatUtil.isFixedSizedCollection(type);
- out.putInt(itemCt);
- for (int i = 0; i < itemCt; i++) {
- if (fixType) {
- int itemOffs = itemCtOffs + ITEM_COUNT_SIZE + (i
- * NonTaggedFormatUtil.getFieldValueLength(in, 0, collType.getItemType().getTypeTag(), false));
- pack(in, itemOffs, collType.getItemType(), false, true, out);
- } else {
- int itemOffs =
- offs + IntegerPointable.getInteger(in, itemCtOffs + ITEM_COUNT_SIZE + (i * ITEM_OFFSET_SIZE));
- ATypeTag tag = ATypeTag.VALUE_TYPE_MAPPING[BytePointable.getByte(in, itemOffs)];
- pack(in, itemOffs, TypeTagUtil.getBuiltinTypeByTag(tag), true, true, out);
- }
- }
- }
-
- private static void packObject(byte[] in, int offs, IAType type, ByteBuffer out) throws HyracksDataException {
- ARecordType recType = (ARecordType) type;
- out.put(MAP32);
- int fieldCt = recType.getFieldNames().length + RecordUtils.getOpenFieldCount(in, offs, recType);
- out.putInt(fieldCt);
- for (int i = 0; i < recType.getFieldNames().length; i++) {
- String field = recType.getFieldNames()[i];
- IAType fieldType = RecordUtils.getClosedFieldType(recType, i);
- packStr(field, out);
- pack(in, RecordUtils.getClosedFieldOffset(in, offs, recType, i), fieldType, false, true, out);
- }
- if (RecordUtils.isExpanded(in, offs, recType)) {
- for (int i = 0; i < RecordUtils.getOpenFieldCount(in, offs, recType); i++) {
- packStr(in, RecordUtils.getOpenFieldNameOffset(in, offs, recType, i), out);
- ATypeTag tag = ATypeTag.VALUE_TYPE_MAPPING[RecordUtils.getOpenFieldTag(in, offs, recType, i)];
- pack(in, RecordUtils.getOpenFieldValueOffset(in, offs, recType, i),
- TypeTagUtil.getBuiltinTypeByTag(tag), true, true, out);
- }
- }
-
- }
-
- public static void packFixArrayHeader(ByteBuffer buf, byte numObj) {
- buf.put((byte) (FIXARRAY_PREFIX + (0x0F & numObj)));
- }
-
- private static class RecordUtils {
-
- static final int TAG_SIZE = 1;
- static final int RECORD_LENGTH_SIZE = 4;
- static final int EXPANDED_SIZE = 1;
- static final int OPEN_OFFSET_SIZE = 4;
- static final int CLOSED_COUNT_SIZE = 4;
- static final int FIELD_OFFSET_SIZE = 4;
- static final int OPEN_COUNT_SIZE = 4;
- private static final int OPEN_FIELD_HASH_SIZE = 4;
- private static final int OPEN_FIELD_OFFSET_SIZE = 4;
- private static final int OPEN_FIELD_HEADER = OPEN_FIELD_HASH_SIZE + OPEN_FIELD_OFFSET_SIZE;
-
- private static boolean isOpen(ARecordType recordType) {
- return recordType == null || recordType.isOpen();
- }
-
- public static int getLength(byte[] bytes, int start) {
- return IntegerPointable.getInteger(bytes, start + TAG_SIZE);
- }
-
- public static boolean isExpanded(byte[] bytes, int start, ARecordType recordType) {
- return isOpen(recordType) && BooleanPointable.getBoolean(bytes, start + TAG_SIZE + RECORD_LENGTH_SIZE);
- }
-
- public static int getOpenPartOffset(int start, ARecordType recordType) {
- return start + TAG_SIZE + RECORD_LENGTH_SIZE + (isOpen(recordType) ? EXPANDED_SIZE : 0);
- }
-
- public static int getNullBitmapOffset(byte[] bytes, int start, ARecordType recordType) {
- return getOpenPartOffset(start, recordType) + (isExpanded(bytes, start, recordType) ? OPEN_OFFSET_SIZE : 0)
- + CLOSED_COUNT_SIZE;
- }
-
- public static int getNullBitmapSize(ARecordType recordType) {
- return RecordUtil.computeNullBitmapSize(recordType);
- }
-
- public static final IAType getClosedFieldType(ARecordType recordType, int fieldId) {
- IAType aType = recordType.getFieldTypes()[fieldId];
- if (NonTaggedFormatUtil.isOptional(aType)) {
- // optional field: add the embedded non-null type tag
- aType = ((AUnionType) aType).getActualType();
- }
- return aType;
- }
-
- public static final int getClosedFieldOffset(byte[] bytes, int start, ARecordType recordType, int fieldId) {
- int offset = getNullBitmapOffset(bytes, start, recordType) + getNullBitmapSize(recordType)
- + fieldId * FIELD_OFFSET_SIZE;
- return start + IntegerPointable.getInteger(bytes, offset);
- }
-
- public static final int getOpenFieldCount(byte[] bytes, int start, ARecordType recordType) {
- return isExpanded(bytes, start, recordType)
- ? IntegerPointable.getInteger(bytes, getOpenFieldCountOffset(bytes, start, recordType)) : 0;
- }
-
- public static int getOpenFieldCountSize(byte[] bytes, int start, ARecordType recordType) {
- return isExpanded(bytes, start, recordType) ? OPEN_COUNT_SIZE : 0;
- }
-
- public static int getOpenFieldCountOffset(byte[] bytes, int start, ARecordType recordType) {
- return start + IntegerPointable.getInteger(bytes, getOpenPartOffset(start, recordType));
- }
-
- public static final int getOpenFieldValueOffset(byte[] bytes, int start, ARecordType recordType, int fieldId) {
- return getOpenFieldNameOffset(bytes, start, recordType, fieldId)
- + getOpenFieldNameSize(bytes, start, recordType, fieldId);
- }
-
- public static int getOpenFieldNameSize(byte[] bytes, int start, ARecordType recordType, int fieldId) {
- int utfleng = UTF8StringUtil.getUTFLength(bytes, getOpenFieldNameOffset(bytes, start, recordType, fieldId));
- return utfleng + UTF8StringUtil.getNumBytesToStoreLength(utfleng);
- }
-
- public static int getOpenFieldNameOffset(byte[] bytes, int start, ARecordType recordType, int fieldId) {
- return getOpenFieldOffset(bytes, start, recordType, fieldId);
- }
-
- public static final byte getOpenFieldTag(byte[] bytes, int start, ARecordType recordType, int fieldId) {
- return bytes[getOpenFieldValueOffset(bytes, start, recordType, fieldId)];
- }
-
- public static int getOpenFieldHashOffset(byte[] bytes, int start, ARecordType recordType, int fieldId) {
- return getOpenFieldCountOffset(bytes, start, recordType) + getOpenFieldCountSize(bytes, start, recordType)
- + fieldId * OPEN_FIELD_HEADER;
- }
-
- public static int getOpenFieldOffset(byte[] bytes, int start, ARecordType recordType, int fieldId) {
- return start
- + IntegerPointable.getInteger(bytes, getOpenFieldOffsetOffset(bytes, start, recordType, fieldId));
- }
-
- public static int getOpenFieldOffsetOffset(byte[] bytes, int start, ARecordType recordType, int fieldId) {
- return getOpenFieldHashOffset(bytes, start, recordType, fieldId) + OPEN_FIELD_HASH_SIZE;
- }
- }
-
-}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/MessageUnpackerToADM.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/MessageUnpackerToADM.java
index 4af1121..d6121e8 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/MessageUnpackerToADM.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/MessageUnpackerToADM.java
@@ -18,18 +18,39 @@
import static org.msgpack.core.MessagePack.Code.*;
+import java.io.DataOutput;
+import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import org.apache.asterix.builders.AbvsBuilderFactory;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.external.input.stream.StandardUTF8ToModifiedUTF8DataOutput;
+import org.apache.asterix.external.input.stream.builders.ListLikeNumericArrayFactory;
+import org.apache.asterix.external.input.stream.builders.StandardToModifiedUTF8DataOutputFactory;
import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.util.container.IObjectPool;
+import org.apache.asterix.om.util.container.ListObjectPool;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.util.encoding.VarLenIntEncoderDecoder;
+import org.apache.hyracks.data.std.api.IMutableValueStorage;
+import org.apache.hyracks.data.std.primitive.IntegerPointable;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.util.string.UTF8StringUtil;
public class MessageUnpackerToADM {
- public static void unpack(ByteBuffer in, ByteBuffer out, boolean tagged) throws HyracksDataException {
+ private final IObjectPool<IMutableValueStorage, ATypeTag> abvsBuilderPool =
+ new ListObjectPool<>(new AbvsBuilderFactory());
+ private final IObjectPool<StandardUTF8ToModifiedUTF8DataOutput, ATypeTag> utfPool =
+ new ListObjectPool<>(new StandardToModifiedUTF8DataOutputFactory());
+ private final IObjectPool<List<Long>, Long> listPool = new ListObjectPool<>(new ListLikeNumericArrayFactory<>());
+
+ public MessageUnpackerToADM() {
+ }
+
+ public void unpack(ByteBuffer in, DataOutput out, boolean tagged) throws IOException {
byte tag = NIL;
if (in != null) {
tag = in.get();
@@ -38,12 +59,12 @@
unpackStr(in, out, (tag ^ FIXSTR_PREFIX), tagged);
} else if (isFixInt(tag)) {
if (tagged) {
- out.put(ATypeTag.SERIALIZED_INT8_TYPE_TAG);
+ out.writeByte(ATypeTag.SERIALIZED_INT8_TYPE_TAG);
}
if (isPosFixInt(tag)) {
- out.put(tag);
+ out.writeByte(tag);
} else if (isNegFixInt(tag)) {
- out.put(tag);
+ out.writeByte(tag);
}
} else if (isFixedArray(tag)) {
unpackArray(in, out, (tag ^ FIXARRAY_PREFIX));
@@ -52,15 +73,15 @@
} else {
switch (tag) {
case TRUE:
- out.put(ATypeTag.SERIALIZED_BOOLEAN_TYPE_TAG);
- out.put((byte) 1);
+ out.writeByte(ATypeTag.SERIALIZED_BOOLEAN_TYPE_TAG);
+ out.writeByte((byte) 1);
break;
case FALSE:
- out.put(ATypeTag.SERIALIZED_BOOLEAN_TYPE_TAG);
- out.put((byte) 0);
+ out.writeByte(ATypeTag.SERIALIZED_BOOLEAN_TYPE_TAG);
+ out.writeByte((byte) 0);
break;
case NIL:
- out.put(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
+ out.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
break;
case UINT8:
unpackUByte(in, out, tagged);
@@ -113,7 +134,6 @@
case MAP32:
unpackMap(in, out, (int) Integer.toUnsignedLong(in.getInt()));
break;
-
default:
throw HyracksDataException.create(AsterixException.create(
ErrorCode.PARSER_ADM_DATA_PARSER_CAST_ERROR, "msgpack tag " + tag + " ", "to an ADM type"));
@@ -121,154 +141,159 @@
}
}
- public static void unpackByte(ByteBuffer in, ByteBuffer out, boolean tagged) {
+ public static void unpackByte(ByteBuffer in, DataOutput out, boolean tagged) throws IOException {
if (tagged) {
- out.put(ATypeTag.SERIALIZED_INT8_TYPE_TAG);
+ out.writeByte(ATypeTag.SERIALIZED_INT8_TYPE_TAG);
}
- out.put(in.get());
+ out.writeByte(in.get());
}
- public static void unpackShort(ByteBuffer in, ByteBuffer out, boolean tagged) {
+ public static void unpackShort(ByteBuffer in, DataOutput out, boolean tagged) throws IOException {
if (tagged) {
- out.put(ATypeTag.SERIALIZED_INT16_TYPE_TAG);
+ out.writeByte(ATypeTag.SERIALIZED_INT16_TYPE_TAG);
}
- out.putShort(in.getShort());
+ out.writeShort(in.getShort());
}
- public static void unpackInt(ByteBuffer in, ByteBuffer out, boolean tagged) {
+ public static void unpackInt(ByteBuffer in, DataOutput out, boolean tagged) throws IOException {
if (tagged) {
- out.put(ATypeTag.SERIALIZED_INT32_TYPE_TAG);
+ out.writeByte(ATypeTag.SERIALIZED_INT32_TYPE_TAG);
}
- out.putInt(in.getInt());
+ out.writeInt(in.getInt());
}
- public static void unpackLong(ByteBuffer in, ByteBuffer out, boolean tagged) {
+ public static void unpackLong(ByteBuffer in, DataOutput out, boolean tagged) throws IOException {
if (tagged) {
- out.put(ATypeTag.SERIALIZED_INT64_TYPE_TAG);
+ out.writeByte(ATypeTag.SERIALIZED_INT64_TYPE_TAG);
}
- out.putLong(in.getLong());
+ out.writeLong(in.getLong());
}
- public static void unpackUByte(ByteBuffer in, ByteBuffer out, boolean tagged) {
+ public static void unpackUByte(ByteBuffer in, DataOutput out, boolean tagged) throws IOException {
if (tagged) {
- out.put(ATypeTag.SERIALIZED_INT16_TYPE_TAG);
+ out.writeByte(ATypeTag.SERIALIZED_INT16_TYPE_TAG);
}
- out.putShort((short) (in.get() & ((short) 0x00FF)));
+ out.writeShort((short) (in.get() & ((short) 0x00FF)));
}
- public static void unpackUShort(ByteBuffer in, ByteBuffer out, boolean tagged) {
+ public static void unpackUShort(ByteBuffer in, DataOutput out, boolean tagged) throws IOException {
if (tagged) {
- out.put(ATypeTag.SERIALIZED_INT32_TYPE_TAG);
+ out.writeByte(ATypeTag.SERIALIZED_INT32_TYPE_TAG);
}
- out.putInt(in.getShort() & 0x0000FFFF);
+ out.writeInt(in.getShort() & 0x0000FFFF);
}
- public static void unpackUInt(ByteBuffer in, ByteBuffer out, boolean tagged) {
+ public static void unpackUInt(ByteBuffer in, DataOutput out, boolean tagged) throws IOException {
if (tagged) {
- out.put(ATypeTag.SERIALIZED_INT64_TYPE_TAG);
+ out.writeByte(ATypeTag.SERIALIZED_INT64_TYPE_TAG);
}
- out.putLong(in.getInt() & 0x00000000FFFFFFFFl);
+ out.writeLong(in.getInt() & 0x00000000FFFFFFFFl);
}
- public static void unpackULong(ByteBuffer in, ByteBuffer out, boolean tagged) {
+ public static void unpackULong(ByteBuffer in, DataOutput out, boolean tagged) throws IOException {
if (tagged) {
- out.put(ATypeTag.SERIALIZED_INT64_TYPE_TAG);
+ out.writeByte(ATypeTag.SERIALIZED_INT64_TYPE_TAG);
}
long val = in.getLong();
if (val < 0) {
throw new IllegalArgumentException("Integer overflow");
}
- out.putLong(val);
+ out.writeLong(val);
}
- public static void unpackFloat(ByteBuffer in, ByteBuffer out, boolean tagged) {
+ public static void unpackFloat(ByteBuffer in, DataOutput out, boolean tagged) throws IOException {
if (tagged) {
- out.put(ATypeTag.SERIALIZED_FLOAT_TYPE_TAG);
+ out.writeByte(ATypeTag.SERIALIZED_FLOAT_TYPE_TAG);
}
- out.putFloat(in.getFloat());
+ out.writeFloat(in.getFloat());
}
- public static void unpackDouble(ByteBuffer in, ByteBuffer out, boolean tagged) {
+ public static void unpackDouble(ByteBuffer in, DataOutput out, boolean tagged) throws IOException {
if (tagged) {
- out.put(ATypeTag.SERIALIZED_DOUBLE_TYPE_TAG);
+ out.writeByte(ATypeTag.SERIALIZED_DOUBLE_TYPE_TAG);
}
- out.putDouble(in.getDouble());
+ out.writeDouble(in.getDouble());
}
- public static void unpackArray(ByteBuffer in, ByteBuffer out, long uLen) throws HyracksDataException {
+ public void unpackArray(ByteBuffer in, DataOutput out, long uLen) throws IOException {
if (uLen > Integer.MAX_VALUE) {
throw new UnsupportedOperationException("Array is too long");
}
+ ArrayBackedValueStorage buildBuf = (ArrayBackedValueStorage) abvsBuilderPool.allocate(ATypeTag.ARRAY);
+ buildBuf.reset();
+ DataOutput bufOut = buildBuf.getDataOutput();
int count = (int) uLen;
- int offs = out.position();
- out.put(ATypeTag.SERIALIZED_ORDEREDLIST_TYPE_TAG);
- out.put(ATypeTag.ANY.serialize());
- int asxLenPos = out.position();
+ bufOut.writeByte(ATypeTag.SERIALIZED_ORDEREDLIST_TYPE_TAG);
+ bufOut.writeByte(ATypeTag.ANY.serialize());
+ int asxLenPos = buildBuf.getLength();
//reserve space
- out.putInt(-1);
- out.putInt(count);
- int slotStartOffs = out.position() + out.arrayOffset();
+ bufOut.writeInt(-1);
+ bufOut.writeInt(count);
+ int slotStartOffs = buildBuf.getLength();
for (int i = 0; i < count; i++) {
- out.putInt(0xFFFF);
+ bufOut.writeInt(0xDEADBEEF);
}
for (int i = 0; i < count; i++) {
- out.putInt(slotStartOffs + (i * 4), (out.position() - offs));
- unpack(in, out, true);
+ IntegerPointable.setInteger(buildBuf.getByteArray(), ((slotStartOffs) + (i * 4)), buildBuf.getLength());
+ //tagged b/c any
+ unpack(in, bufOut, true);
}
- int totalLen = out.position() - offs;
- out.putInt(asxLenPos, totalLen);
+ IntegerPointable.setInteger(buildBuf.getByteArray(), asxLenPos, buildBuf.getLength());
+ out.write(buildBuf.getByteArray(), buildBuf.getStartOffset(), buildBuf.getLength());
}
- public static void unpackMap(ByteBuffer in, ByteBuffer out, int count) throws HyracksDataException {
+ public void unpackMap(ByteBuffer in, DataOutput out, int count) throws IOException {
//TODO: need to handle typed records. this only produces a completely open record.
+ ArrayBackedValueStorage buildBuf = (ArrayBackedValueStorage) abvsBuilderPool.allocate(ATypeTag.OBJECT);
+ List<Long> offsets = listPool.allocate((long) count);
+ DataOutput bufOut = buildBuf.getDataOutput();
//hdr size = 6?
- int startOffs = out.position();
- out.put(ATypeTag.SERIALIZED_RECORD_TYPE_TAG);
- int totalSizeOffs = out.position();
- out.putInt(-1);
+ bufOut.writeByte(ATypeTag.SERIALIZED_RECORD_TYPE_TAG);
+ int totalSizeOffs = buildBuf.getLength();
+ bufOut.writeInt(-1);
//isExpanded
- out.put((byte) 1);
- int openPartOffs = out.position();
- out.putInt(-1);
+ bufOut.writeByte((byte) 1);
+ int openPartOffs = buildBuf.getLength();
+ bufOut.writeInt(-1);
//isExpanded, so num of open fields
- out.putInt(openPartOffs, out.position() - startOffs);
- out.putInt(count);
- int offsetAryPos = out.position();
+ IntegerPointable.setInteger(buildBuf.getByteArray(), openPartOffs, buildBuf.getLength());
+ bufOut.writeInt(count);
+ int offsetAryPos = buildBuf.getLength();
int offsetArySz = count * 2;
//allocate space for open field offsets
for (int i = 0; i < offsetArySz; i++) {
- out.putInt(0xDEADBEEF);
+ bufOut.writeInt(0xDEADBEEF);
}
for (int i = 0; i < count; i++) {
- int offs = out.position() + out.arrayOffset();
- int relOffs = offs - startOffs;
- unpack(in, out, false);
- int hash = UTF8StringUtil.hash(out.array(), offs);
- out.putInt(offsetAryPos, hash);
- offsetAryPos += 4;
- out.putInt(offsetAryPos, relOffs);
- offsetAryPos += 4;
- unpack(in, out, true);
+ int offs = buildBuf.getLength();
+ unpack(in, bufOut, false);
+ long hash = UTF8StringUtil.hash(buildBuf.getByteArray(), offs);
+ offsets.set(i, (hash << 32) + offs);
+ unpack(in, bufOut, true);
}
- out.putInt(totalSizeOffs, out.position() - startOffs);
+ Collections.sort(offsets);
+ for (Long l : offsets) {
+ IntegerPointable.setInteger(buildBuf.getByteArray(), offsetAryPos, (int) (l >> 32));
+ offsetAryPos += 4;
+ IntegerPointable.setInteger(buildBuf.getByteArray(), offsetAryPos, (int) ((l << 32) >> 32));
+ offsetAryPos += 4;
+ }
+ IntegerPointable.setInteger(buildBuf.getByteArray(), totalSizeOffs, buildBuf.getLength());
+ out.write(buildBuf.getByteArray(), buildBuf.getStartOffset(), buildBuf.getLength());
}
- public static void unpackStr(ByteBuffer in, ByteBuffer out, long uLen, boolean tag) {
- //TODO: this probably breaks for 3 and 4 byte UTF-8
+ public void unpackStr(ByteBuffer in, DataOutput out, long uLen, boolean tag) throws IOException {
if (tag) {
- out.put(ATypeTag.SERIALIZED_STRING_TYPE_TAG);
+ out.writeByte(ATypeTag.SERIALIZED_STRING_TYPE_TAG);
}
if (Long.compareUnsigned(uLen, Integer.MAX_VALUE) > 0) {
throw new UnsupportedOperationException("String is too long");
}
int len = (int) uLen;
- int strLen = UTF8StringUtil.getStringLength(in.array(), in.position() + in.arrayOffset(), len);
- int adv = VarLenIntEncoderDecoder.encode(strLen, out.array(), out.position() + out.arrayOffset());
- out.position(out.position() + adv);
- System.arraycopy(in.array(), in.arrayOffset() + in.position(), out.array(), out.arrayOffset() + out.position(),
- len);
- out.position(out.position() + len);
+ StandardUTF8ToModifiedUTF8DataOutput conv = utfPool.allocate(ATypeTag.STRING);
+ conv.setDataOutput(out);
+ conv.write(in.array(), in.arrayOffset() + in.position(), len);
in.position(in.position() + len);
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/MsgPackAccessors.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/MsgPackAccessors.java
new file mode 100644
index 0000000..a90a183
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/MsgPackAccessors.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.library.msgpack;
+
+import static org.apache.hyracks.util.string.UTF8StringUtil.getUTFLength;
+import static org.msgpack.core.MessagePack.Code.ARRAY32;
+import static org.msgpack.core.MessagePack.Code.FALSE;
+import static org.msgpack.core.MessagePack.Code.FLOAT32;
+import static org.msgpack.core.MessagePack.Code.FLOAT64;
+import static org.msgpack.core.MessagePack.Code.INT16;
+import static org.msgpack.core.MessagePack.Code.INT32;
+import static org.msgpack.core.MessagePack.Code.INT64;
+import static org.msgpack.core.MessagePack.Code.INT8;
+import static org.msgpack.core.MessagePack.Code.MAP32;
+import static org.msgpack.core.MessagePack.Code.NIL;
+import static org.msgpack.core.MessagePack.Code.STR32;
+import static org.msgpack.core.MessagePack.Code.TRUE;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.dataflow.data.nontagged.printers.PrintTools;
+import org.apache.asterix.dataflow.data.nontagged.serde.ABooleanSerializerDeserializer;
+import org.apache.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
+import org.apache.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
+import org.apache.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
+import org.apache.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
+import org.apache.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
+import org.apache.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
+import org.apache.asterix.external.library.PyTypeInfo;
+import org.apache.asterix.om.pointables.AListVisitablePointable;
+import org.apache.asterix.om.pointables.ARecordVisitablePointable;
+import org.apache.asterix.om.pointables.base.IVisitablePointable;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.AbstractCollectionType;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.EnumDeserializer;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.types.TypeTagUtil;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.util.encoding.VarLenIntEncoderDecoder;
+
+public class MsgPackAccessors {
+
+ private MsgPackAccessors() {
+ }
+
+ public static IMsgPackAccessor<IPointable, DataOutput, Void> createFlatMsgPackAccessor(ATypeTag aTypeTag)
+ throws HyracksDataException {
+ switch (aTypeTag) {
+ case BOOLEAN:
+ return MsgPackBooleanAccessor::apply;
+ case TINYINT:
+ return MsgPackInt8Accessor::apply;
+ case SMALLINT:
+ return MsgPackInt16Accessor::apply;
+ case INTEGER:
+ return MsgPackInt32Accessor::apply;
+ case BIGINT:
+ return MsgPackInt64Accessor::apply;
+ case FLOAT:
+ return MsgPackFloatAccessor::apply;
+ case DOUBLE:
+ return MsgPackDoubleAccessor::apply;
+ case STRING:
+ return MsgPackStringAccessor::apply;
+ case MISSING:
+ case NULL:
+ return MsgPackNullAccessor::apply;
+ default:
+ throw HyracksDataException
+ .create(AsterixException.create(ErrorCode.TYPE_UNSUPPORTED, "msgpack", aTypeTag.name()));
+ }
+ }
+
+ public static class MsgPackInt8Accessor {
+ public static Void apply(IPointable pointable, DataOutput out) throws IOException {
+ byte[] b = pointable.getByteArray();
+ int s = pointable.getStartOffset();
+ byte o = AInt8SerializerDeserializer.getByte(b, s + 1);
+ out.writeByte(INT8);
+ out.writeByte(o);
+ return null;
+ }
+ }
+
+ public static class MsgPackInt16Accessor {
+
+ public static Void apply(IPointable pointable, DataOutput out) throws IOException {
+ byte[] b = pointable.getByteArray();
+ int s = pointable.getStartOffset();
+ short i = AInt16SerializerDeserializer.getShort(b, s + 1);
+ out.writeByte(INT16);
+ out.writeShort(i);
+ return null;
+ }
+ }
+
+ public static class MsgPackInt32Accessor {
+
+ public static Void apply(IPointable pointable, DataOutput out) throws IOException {
+ byte[] b = pointable.getByteArray();
+ int s = pointable.getStartOffset();
+ int i = AInt32SerializerDeserializer.getInt(b, s + 1);
+ out.writeByte(INT32);
+ out.writeByte(i);
+ return null;
+ }
+ }
+
+ public static class MsgPackNullAccessor {
+ public static Void apply(IPointable pointable, DataOutput out) throws IOException {
+ out.writeByte(NIL);
+ return null;
+ }
+ }
+
+ public static class MsgPackInt64Accessor {
+
+ public static Void apply(IPointable pointable, DataOutput out) throws IOException {
+ byte[] b = pointable.getByteArray();
+ int s = pointable.getStartOffset();
+ long v = AInt64SerializerDeserializer.getLong(b, s + 1);
+ out.writeByte(INT64);
+ out.writeLong(v);
+ return null;
+ }
+ }
+
+ public static class MsgPackFloatAccessor {
+
+ public static Void apply(IPointable pointable, DataOutput out) throws IOException {
+ byte[] b = pointable.getByteArray();
+ int s = pointable.getStartOffset();
+ float v = AFloatSerializerDeserializer.getFloat(b, s + 1);
+ out.writeByte(FLOAT32);
+ out.writeFloat(v);
+ return null;
+ }
+ }
+
+ public static class MsgPackDoubleAccessor {
+ public static Void apply(IPointable pointable, DataOutput out) throws IOException {
+ byte[] b = pointable.getByteArray();
+ int s = pointable.getStartOffset();
+ double v = ADoubleSerializerDeserializer.getDouble(b, s + 1);
+ out.writeByte(FLOAT64);
+ out.writeDouble(v);
+ return null;
+ }
+ }
+
+ public static class MsgPackStringAccessor {
+ public static Void apply(IPointable pointable, DataOutput out) throws IOException {
+ byte[] b = pointable.getByteArray();
+ int s = pointable.getStartOffset();
+ out.writeByte(STR32);
+ final int calculatedLength = getUTFLength(b, s + 1);
+ out.writeInt(calculatedLength);
+ PrintTools.writeUTF8StringRaw(b, s + 1, calculatedLength, out);
+ return null;
+ }
+
+ }
+
+ public static class MsgPackBooleanAccessor {
+ public static Void apply(IPointable pointable, DataOutput out) throws IOException {
+ byte[] b = pointable.getByteArray();
+ int s = pointable.getStartOffset();
+ boolean v = ABooleanSerializerDeserializer.getBoolean(b, s + 1);
+ if (v) {
+ out.writeByte(TRUE);
+ } else {
+ out.writeByte(FALSE);
+ }
+ return null;
+ }
+ }
+
+ public static class MsgPackRecordAccessor {
+
+ public static int getUTFLength(byte[] b, int s) {
+ return VarLenIntEncoderDecoder.decode(b, s);
+ }
+
+ public static Void access(ARecordVisitablePointable pointable, PyTypeInfo arg,
+ MsgPackPointableVisitor pointableVisitor) throws HyracksDataException {
+ List<IVisitablePointable> fieldPointables = pointable.getFieldValues();
+ List<IVisitablePointable> fieldTypeTags = pointable.getFieldTypeTags();
+ List<IVisitablePointable> fieldNames = pointable.getFieldNames();
+ boolean closedPart;
+ int index = 0;
+ DataOutput out = arg.getDataOutput();
+ ARecordType recordType = ((ARecordType) arg.getType());
+ try {
+ out.writeByte(MAP32);
+ out.writeInt(fieldNames.size());
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ try {
+ for (IVisitablePointable fieldPointable : fieldPointables) {
+ closedPart = index < recordType.getFieldTypes().length;
+ IVisitablePointable tt = fieldTypeTags.get(index);
+ ATypeTag typeTag =
+ EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(tt.getByteArray()[tt.getStartOffset()]);
+ IAType fieldType;
+ fieldType =
+ closedPart ? recordType.getFieldTypes()[index] : TypeTagUtil.getBuiltinTypeByTag(typeTag);
+ IPointable fieldName = fieldNames.get(index);
+ MsgPackAccessors.createFlatMsgPackAccessor(BuiltinType.ASTRING.getTypeTag()).apply(fieldName,
+ arg.getDataOutput());
+ PyTypeInfo fieldTypeInfo = pointableVisitor.getTypeInfo(fieldType, out);
+ fieldPointable.accept(pointableVisitor, fieldTypeInfo);
+ index++;
+ }
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ return null;
+ }
+ }
+
+ public static class MsgPackListAccessor {
+
+ public static Void access(AListVisitablePointable pointable, PyTypeInfo arg,
+ MsgPackPointableVisitor pointableVisitor) throws HyracksDataException {
+ List<IVisitablePointable> items = pointable.getItems();
+ List<IVisitablePointable> itemTags = pointable.getItemTags();
+ DataOutput out = arg.getDataOutput();
+ try {
+ out.writeByte(ARRAY32);
+ out.writeInt(items.size());
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ for (int iter1 = 0; iter1 < items.size(); iter1++) {
+ IVisitablePointable itemPointable = items.get(iter1);
+ // First, try to get defined type.
+ IAType fieldType = ((AbstractCollectionType) arg.getType()).getItemType();
+ if (fieldType.getTypeTag() == ATypeTag.ANY) {
+ // Second, if defined type is not available, try to infer it from data
+ IVisitablePointable itemTagPointable = itemTags.get(iter1);
+ ATypeTag itemTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER
+ .deserialize(itemTagPointable.getByteArray()[itemTagPointable.getStartOffset()]);
+ fieldType = TypeTagUtil.getBuiltinTypeByTag(itemTypeTag);
+ }
+ PyTypeInfo fieldTypeInfo = pointableVisitor.getTypeInfo(fieldType, out);
+ itemPointable.accept(pointableVisitor, fieldTypeInfo);
+ }
+ return null;
+ }
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/MsgPackPointableVisitor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/MsgPackPointableVisitor.java
new file mode 100644
index 0000000..08be6ac
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/MsgPackPointableVisitor.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.external.library.msgpack;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.asterix.external.library.PyTypeInfo;
+import org.apache.asterix.om.pointables.AFlatValuePointable;
+import org.apache.asterix.om.pointables.AListVisitablePointable;
+import org.apache.asterix.om.pointables.ARecordVisitablePointable;
+import org.apache.asterix.om.pointables.visitor.IVisitablePointableVisitor;
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class MsgPackPointableVisitor implements IVisitablePointableVisitor<Void, PyTypeInfo> {
+
+ Map<DataOutput, Map<IAType, PyTypeInfo>> typeInfoMap = new HashMap<>();
+
+ public Void visit(AListVisitablePointable accessor, PyTypeInfo arg) throws HyracksDataException {
+ MsgPackAccessors.MsgPackListAccessor.access(accessor, arg, this);
+ return null;
+ }
+
+ @Override
+ public Void visit(ARecordVisitablePointable accessor, PyTypeInfo arg) throws HyracksDataException {
+ MsgPackAccessors.MsgPackRecordAccessor.access(accessor, arg, this);
+ return null;
+ }
+
+ @Override
+ public Void visit(AFlatValuePointable accessor, PyTypeInfo arg) throws HyracksDataException {
+ try {
+ MsgPackAccessors.createFlatMsgPackAccessor(arg.getType().getTypeTag()).apply(accessor, arg.getDataOutput());
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ return null;
+ }
+
+ public PyTypeInfo getTypeInfo(IAType type, DataOutput out) {
+ PyTypeInfo tInfo = null;
+ Map<IAType, PyTypeInfo> type2TypeInfo = typeInfoMap.get(out);
+ if (type2TypeInfo == null) {
+ type2TypeInfo = new HashMap<>();
+ tInfo = new PyTypeInfo(type, out);
+ type2TypeInfo.put(type, tInfo);
+ typeInfoMap.put(out, type2TypeInfo);
+ }
+ tInfo = tInfo == null ? type2TypeInfo.get(type) : tInfo;
+ if (tInfo == null) {
+ tInfo = new PyTypeInfo(type, out);
+ type2TypeInfo.put(type, tInfo);
+ }
+ return tInfo;
+ }
+
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalAssignBatchRuntimeFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalAssignBatchRuntimeFactory.java
index 593bac6..741dad2 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalAssignBatchRuntimeFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalAssignBatchRuntimeFactory.java
@@ -33,11 +33,14 @@
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.external.ipc.PythonIPCProto;
import org.apache.asterix.external.library.PythonLibraryEvaluator;
import org.apache.asterix.external.library.PythonLibraryEvaluatorFactory;
import org.apache.asterix.external.library.msgpack.MessageUnpackerToADM;
-import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.external.library.msgpack.MsgPackPointableVisitor;
+import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.om.functions.IExternalFunctionDescriptor;
+import org.apache.asterix.om.pointables.PointableAllocator;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.core.algebra.base.Counter;
@@ -48,6 +51,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.Warning;
import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
import org.msgpack.core.MessagePack;
@@ -58,12 +62,10 @@
public final class ExternalAssignBatchRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
private static final long serialVersionUID = 1L;
- private int[] outColumns;
+ private final int[] outColumns;
private final IExternalFunctionDescriptor[] fnDescs;
private final int[][] fnArgColumns;
- private int rpcBufferSize;
-
public ExternalAssignBatchRuntimeFactory(int[] outColumns, IExternalFunctionDescriptor[] fnDescs,
int[][] fnArgColumns, int[] projectionList) {
super(projectionList);
@@ -76,17 +78,14 @@
public AbstractOneInputOneOutputPushRuntime createOneOutputPushRuntime(IHyracksTaskContext ctx) {
final int[] projectionToOutColumns = new int[projectionList.length];
- //this is a temporary bodge. these buffers need to work like vsize frames, or be absent entirely
- int maxArgSz = ExternalDataUtils.getArgBufferSize();
- rpcBufferSize = ExternalDataUtils.roundUpToNearestFrameSize(maxArgSz, ctx.getInitialFrameSize());
for (int j = 0; j < projectionList.length; j++) {
projectionToOutColumns[j] = Arrays.binarySearch(outColumns, projectionList[j]);
}
return new AbstractOneInputOneOutputOneFramePushRuntime() {
- private ByteBuffer outputWrapper;
- private List<ByteBuffer> argHolders;
+ private ArrayBackedValueStorage outputWrapper;
+ private List<ArrayBackedValueStorage> argHolders;
ArrayTupleBuilder tupleBuilder;
private List<Pair<Long, PythonLibraryEvaluator>> libraryEvaluators;
private ATypeTag[][] nullCalls;
@@ -95,6 +94,9 @@
private MessageUnpacker unpacker;
private ArrayBufferInput unpackerInput;
private List<Pair<ByteBuffer, Counter>> batchResults;
+ private MessageUnpackerToADM unpackerToADM;
+ private PointableAllocator pointableAllocator;
+ private MsgPackPointableVisitor pointableVisitor;
@Override
public void open() throws HyracksDataException {
@@ -116,23 +118,26 @@
}
argHolders = new ArrayList<>(fnArgColumns.length);
for (int i = 0; i < fnArgColumns.length; i++) {
- argHolders.add(ctx.allocateFrame(rpcBufferSize));
+ argHolders.add(new ArrayBackedValueStorage());
}
- outputWrapper = ctx.allocateFrame();
+ outputWrapper = new ArrayBackedValueStorage();
nullCalls = new ATypeTag[argHolders.size()][0];
numCalls = new int[fnArgColumns.length];
batchResults = new ArrayList<>(argHolders.size());
for (int i = 0; i < argHolders.size(); i++) {
- batchResults.add(new Pair<>(ctx.allocateFrame(rpcBufferSize), new Counter(-1)));
+ batchResults.add(new Pair<>(ByteBuffer.allocate(ExternalDataConstants.DEFAULT_BUFFER_SIZE),
+ new Counter(-1)));
}
unpackerInput = new ArrayBufferInput(new byte[0]);
unpacker = MessagePack.newDefaultUnpacker(unpackerInput);
+ unpackerToADM = new MessageUnpackerToADM();
+ pointableAllocator = new PointableAllocator();
+ pointableVisitor = new MsgPackPointableVisitor();
}
private void resetBuffers(int numTuples, int[] numCalls) {
for (int func = 0; func < fnArgColumns.length; func++) {
- argHolders.get(func).clear();
- argHolders.get(func).position(0);
+ argHolders.get(func).reset();
if (nullCalls[func].length < numTuples) {
nullCalls[func] = new ATypeTag[numTuples];
}
@@ -188,6 +193,8 @@
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ /*TODO: this whole transposition stuff is not necessary
+ the evaulator should accept a format that is a collection of rows, logically*/
tAccess.reset(buffer);
tupleBuilder.reset();
try {
@@ -211,14 +218,15 @@
}
if (argumentStatus == ATypeTag.TYPE) {
if (cols.length > 0) {
- argHolders.get(func).put(ARRAY16);
- argHolders.get(func).putShort((short) cols.length);
+ argHolders.get(func).getDataOutput().writeByte(ARRAY16);
+ argHolders.get(func).getDataOutput().writeShort((short) cols.length);
}
for (int colIdx = 0; colIdx < cols.length; colIdx++) {
ref.set(buffer.array(), tRef.getFieldStart(cols[colIdx]),
tRef.getFieldLength(cols[colIdx]));
- PythonLibraryEvaluator.setArgument(fnDescs[func].getArgumentTypes()[colIdx], ref,
- argHolders.get(func), fnDescs[func].getFunctionInfo().getNullCall());
+ PythonIPCProto.visitValueRef(fnDescs[func].getArgumentTypes()[colIdx],
+ argHolders.get(func).getDataOutput(), ref, pointableAllocator,
+ pointableVisitor, fnDescs[func].getFunctionInfo().getNullCall());
}
} else {
numCalls[func]--;
@@ -228,6 +236,7 @@
}
}
}
+
//TODO: maybe this could be done in parallel for each unique library evaluator?
for (int argHolderIdx = 0; argHolderIdx < argHolders.size(); argHolderIdx++) {
Pair<Long, PythonLibraryEvaluator> fnEval = libraryEvaluators.get(argHolderIdx);
@@ -236,15 +245,14 @@
if (columnResult != null) {
Pair<ByteBuffer, Counter> resultholder = batchResults.get(argHolderIdx);
if (resultholder.getFirst().capacity() < columnResult.capacity()) {
- resultholder.setFirst(ctx.allocateFrame(ExternalDataUtils.roundUpToNearestFrameSize(
- columnResult.capacity(), ctx.getInitialFrameSize())));
+ ByteBuffer realloc = ctx.reallocateFrame(resultholder.getFirst(),
+ columnResult.capacity() * 2, false);
+ resultholder.setFirst(realloc);
}
ByteBuffer resultBuf = resultholder.getFirst();
- resultBuf.clear();
- resultBuf.position(0);
//offset 1 to skip message type
- System.arraycopy(columnResult.array(), columnResult.arrayOffset() + 1, resultBuf.array(),
- resultBuf.arrayOffset(), columnResult.capacity() - 1);
+ System.arraycopy(columnResult.array(), 1, resultBuf.array(), 0,
+ columnResult.remaining() - 1);
//wrapper for results and warnings arrays. always length 2
consumeAndGetBatchLength(resultBuf);
int numResults = (int) consumeAndGetBatchLength(resultBuf);
@@ -260,37 +268,30 @@
}
}
}
+
//decompose returned function columns into frame tuple format
for (int i = 0; i < numTuples; i++) {
tupleBuilder.reset();
for (int f = 0; f < projectionList.length; f++) {
int k = projectionToOutColumns[f];
if (k >= 0) {
- outputWrapper.clear();
- outputWrapper.position(0);
+ outputWrapper.reset();
Pair<ByteBuffer, Counter> result = batchResults.get(k);
- if (result.getFirst() != null) {
- if (result.getFirst().capacity() > outputWrapper.capacity()) {
- outputWrapper = ctx.allocateFrame(ExternalDataUtils.roundUpToNearestFrameSize(
- outputWrapper.capacity(), ctx.getInitialFrameSize()));
- }
- }
- int start = outputWrapper.arrayOffset();
ATypeTag functionCalled = nullCalls[k][i];
if (functionCalled == ATypeTag.TYPE) {
if (result.getSecond().get() > 0) {
- MessageUnpackerToADM.unpack(result.getFirst(), outputWrapper, true);
+ unpackerToADM.unpack(result.getFirst(), outputWrapper.getDataOutput(), true);
result.getSecond().set(result.getSecond().get() - 1);
} else {
//emit NULL for functions which failed with a warning
- outputWrapper.put(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
+ outputWrapper.getDataOutput().writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
}
} else if (functionCalled == ATypeTag.NULL) {
- outputWrapper.put(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
+ outputWrapper.getDataOutput().writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
} else {
- outputWrapper.put(ATypeTag.SERIALIZED_MISSING_TYPE_TAG);
+ outputWrapper.getDataOutput().writeByte(ATypeTag.SERIALIZED_MISSING_TYPE_TAG);
}
- tupleBuilder.addField(outputWrapper.array(), start, start + outputWrapper.position());
+ tupleBuilder.addField(outputWrapper.getByteArray(), 0, outputWrapper.getLength());
} else {
tupleBuilder.addField(tAccess, i, projectionList[f]);
}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/PrintTools.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/PrintTools.java
index b337777..449ce13 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/PrintTools.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/PrintTools.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.dataflow.data.nontagged.printers;
+import java.io.DataOutput;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
@@ -285,12 +286,39 @@
if (c == '"') {
os.write('"');
}
- os.write(c);
- position += sz;
+ if (Character.isHighSurrogate(c)) {
+ position += writeSupplementaryChar(os, b, maxPosition, position, c, sz);
+ continue;
+ }
+ while (sz > 0) {
+ os.write(b[position]);
+ ++position;
+ --sz;
+ }
+ break;
}
os.write('"');
}
+ public static void writeUTF8StringRaw(byte[] b, int s, int l, DataOutput os) throws IOException {
+ int utfLength = UTF8StringUtil.getUTFLength(b, s);
+ int position = s + UTF8StringUtil.getNumBytesToStoreLength(utfLength); // skip 2 bytes containing string size
+ int maxPosition = position + utfLength;
+ while (position < maxPosition) {
+ char c = UTF8StringUtil.charAt(b, position);
+ int sz = UTF8StringUtil.charSize(b, position);
+ if (Character.isHighSurrogate(c)) {
+ position += writeSupplementaryChar(os, b, maxPosition, position, c, sz);
+ continue;
+ }
+ while (sz > 0) {
+ os.write(b[position]);
+ ++position;
+ --sz;
+ }
+ }
+ }
+
public static void writeUTF8StringAsJSON(byte[] b, int s, int l, OutputStream os) throws IOException {
int utfLength = UTF8StringUtil.getUTFLength(b, s);
os.write('"');
@@ -420,4 +448,17 @@
return highSurrogateSize + lowSurrogateSize;
}
+ //TODO: some way to dedupe this?
+ private static int writeSupplementaryChar(DataOutput os, byte[] src, int limit, int highSurrogatePos,
+ char highSurrogate, int highSurrogateSize) throws IOException {
+ final int lowSurrogatePos = highSurrogatePos + highSurrogateSize;
+ if (lowSurrogatePos >= limit) {
+ throw new IllegalStateException("malformed utf8 input");
+ }
+ final char lowSurrogate = UTF8StringUtil.charAt(src, lowSurrogatePos);
+ final int lowSurrogateSize = UTF8StringUtil.charSize(src, lowSurrogatePos);
+ os.write(new String(new char[] { highSurrogate, lowSurrogate }).getBytes(StandardCharsets.UTF_8));
+ return highSurrogateSize + lowSurrogateSize;
+ }
+
}