[ASTERIXDB-2896] Increase UDF argument buffer size
-user model changes: no
-storage format changes: no
-interface changes: yes
Details:
Bump buffer sizes in Python IPC to 1MB for individual
arguments, and in the case of batching, match the
buffer size of the Hyracks IPC layer for deserialization.
Change-Id: If847ac3b09406d1e9e6a976a7e0193b6e81bcc8b
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/11243
Reviewed-by: Ian Maxon <imaxon@uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Ian Maxon <imaxon@uci.edu>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarPythonFunctionEvaluator.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarPythonFunctionEvaluator.java
index eb87399..7c860a2 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarPythonFunctionEvaluator.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarPythonFunctionEvaluator.java
@@ -29,6 +29,7 @@
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.external.library.msgpack.MessageUnpackerToADM;
+import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.om.functions.IExternalFunctionInfo;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.types.IAType;
@@ -77,9 +78,11 @@
for (int i = 0; i < argValues.length; i++) {
argValues[i] = VoidPointable.FACTORY.createPointable();
}
- //TODO: these should be dynamic
- this.argHolder = ByteBuffer.wrap(new byte[Short.MAX_VALUE * 2]);
- this.outputWrapper = ByteBuffer.wrap(new byte[Short.MAX_VALUE * 2]);
+ //TODO: these should be dynamic. this static size picking is a temporary bodge until this works like
+ // v-size frames do or these construction buffers are removed entirely
+ int maxArgSz = ExternalDataUtils.getArgBufferSize();
+ this.argHolder = ByteBuffer.wrap(new byte[maxArgSz]);
+ this.outputWrapper = ByteBuffer.wrap(new byte[maxArgSz]);
this.evaluatorContext = ctx;
this.sourceLocation = sourceLoc;
this.unpackerInput = new ArrayBufferInput(new byte[0]);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalAssignBatchRuntimeFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalAssignBatchRuntimeFactory.java
index 39e480a..593bac6 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalAssignBatchRuntimeFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalAssignBatchRuntimeFactory.java
@@ -36,6 +36,7 @@
import org.apache.asterix.external.library.PythonLibraryEvaluator;
import org.apache.asterix.external.library.PythonLibraryEvaluatorFactory;
import org.apache.asterix.external.library.msgpack.MessageUnpackerToADM;
+import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.om.functions.IExternalFunctionDescriptor;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.hyracks.algebricks.common.utils.Pair;
@@ -61,6 +62,8 @@
private final IExternalFunctionDescriptor[] fnDescs;
private final int[][] fnArgColumns;
+ private int rpcBufferSize;
+
public ExternalAssignBatchRuntimeFactory(int[] outColumns, IExternalFunctionDescriptor[] fnDescs,
int[][] fnArgColumns, int[] projectionList) {
super(projectionList);
@@ -73,6 +76,9 @@
public AbstractOneInputOneOutputPushRuntime createOneOutputPushRuntime(IHyracksTaskContext ctx) {
final int[] projectionToOutColumns = new int[projectionList.length];
+ //this is a temporary bodge. these buffers need to work like vsize frames, or be absent entirely
+ int maxArgSz = ExternalDataUtils.getArgBufferSize();
+ rpcBufferSize = ExternalDataUtils.roundUpToNearestFrameSize(maxArgSz, ctx.getInitialFrameSize());
for (int j = 0; j < projectionList.length; j++) {
projectionToOutColumns[j] = Arrays.binarySearch(outColumns, projectionList[j]);
}
@@ -110,14 +116,14 @@
}
argHolders = new ArrayList<>(fnArgColumns.length);
for (int i = 0; i < fnArgColumns.length; i++) {
- argHolders.add(ctx.allocateFrame());
+ argHolders.add(ctx.allocateFrame(rpcBufferSize));
}
outputWrapper = ctx.allocateFrame();
nullCalls = new ATypeTag[argHolders.size()][0];
numCalls = new int[fnArgColumns.length];
batchResults = new ArrayList<>(argHolders.size());
for (int i = 0; i < argHolders.size(); i++) {
- batchResults.add(new Pair<>(ctx.allocateFrame(), new Counter(-1)));
+ batchResults.add(new Pair<>(ctx.allocateFrame(rpcBufferSize), new Counter(-1)));
}
unpackerInput = new ArrayBufferInput(new byte[0]);
unpacker = MessagePack.newDefaultUnpacker(unpackerInput);
@@ -230,7 +236,8 @@
if (columnResult != null) {
Pair<ByteBuffer, Counter> resultholder = batchResults.get(argHolderIdx);
if (resultholder.getFirst().capacity() < columnResult.capacity()) {
- resultholder.setFirst(ctx.allocateFrame(columnResult.capacity()));
+ resultholder.setFirst(ctx.allocateFrame(ExternalDataUtils.roundUpToNearestFrameSize(
+ columnResult.capacity(), ctx.getInitialFrameSize())));
}
ByteBuffer resultBuf = resultholder.getFirst();
resultBuf.clear();
@@ -262,6 +269,12 @@
outputWrapper.clear();
outputWrapper.position(0);
Pair<ByteBuffer, Counter> result = batchResults.get(k);
+ if (result.getFirst() != null) {
+ if (result.getFirst().capacity() > outputWrapper.capacity()) {
+ outputWrapper = ctx.allocateFrame(ExternalDataUtils.roundUpToNearestFrameSize(
+ outputWrapper.capacity(), ctx.getInitialFrameSize()));
+ }
+ }
int start = outputWrapper.arrayOffset();
ATypeTag functionCalled = nullCalls[k][i];
if (functionCalled == ATypeTag.TYPE) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index f5c62a5..8e94263 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -80,6 +80,7 @@
import org.apache.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
import org.apache.hyracks.dataflow.common.data.parsers.LongParserFactory;
import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+import org.apache.hyracks.util.StorageUtil;
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.BlobServiceClient;
@@ -106,6 +107,8 @@
public class ExternalDataUtils {
private static final Map<ATypeTag, IValueParserFactory> valueParserFactoryMap = new EnumMap<>(ATypeTag.class);
+ private static final int DEFAULT_MAX_ARGUMENT_SZ = 1024 * 1024;
+ private static final int HEADER_FUDGE = 64;
static {
valueParserFactoryMap.put(ATypeTag.INTEGER, IntegerParserFactory.INSTANCE);
@@ -997,4 +1000,20 @@
}
}
}
+
+ public static int roundUpToNearestFrameSize(int size, int framesize) {
+ return ((size / framesize) + 1) * framesize;
+ }
+
+ public static int getArgBufferSize() {
+ int maxArgSz = DEFAULT_MAX_ARGUMENT_SZ + HEADER_FUDGE;
+ String userArgSz = System.getProperty("udf.buf.size");
+ if (userArgSz != null) {
+ long parsedSize = StorageUtil.getByteValue(userArgSz) + HEADER_FUDGE;
+ if (parsedSize < Integer.MAX_VALUE && parsedSize > 0) {
+ maxArgSz = (int) parsedSize;
+ }
+ }
+ return maxArgSz;
+ }
}