[NO ISSUE][*DB] Enable large parsing resources to be freed on memory pressure
Change-Id: Ieaca566f765a5d3531baff8128d6ff03c95956e2
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/11365
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/jackson/AbstractObjectPool.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/jackson/AbstractObjectPool.java
new file mode 100644
index 0000000..ef75688
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/jackson/AbstractObjectPool.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.parser.jackson;
+
+import java.util.ArrayDeque;
+import java.util.Queue;
+
+import org.apache.asterix.om.util.container.IObjectFactory;
+
+/**
+ * Object pool for DFS traversal mode, which allows to recycle objects
+ * as soon as it is not needed.
+ */
+public abstract class AbstractObjectPool<E, T, Q> implements IObjectPool<E> {
+ private final IObjectFactory<E, T> objectFactory;
+ private final Queue<Q> recycledObjects;
+ private final T param;
+
+ protected AbstractObjectPool(IObjectFactory<E, T> objectFactory, T param) {
+ this.objectFactory = objectFactory;
+ recycledObjects = new ArrayDeque<>();
+ this.param = param;
+ }
+
+ public E getInstance() {
+ E instance = unwrap(recycledObjects.poll());
+ if (objectFactory != null && instance == null) {
+ instance = objectFactory.create(param);
+ }
+ return instance;
+ }
+
+ public void recycle(E object) {
+ if (object != null) {
+ recycledObjects.add(wrap(object));
+ }
+ }
+
+ protected abstract E unwrap(Q element);
+
+ protected abstract Q wrap(E element);
+
+ @Override
+ public String toString() {
+ return recycledObjects.toString();
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/jackson/IObjectPool.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/jackson/IObjectPool.java
new file mode 100644
index 0000000..a2990ec
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/jackson/IObjectPool.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.parser.jackson;
+
+/**
+ * Object pool for DFS traversal mode, which allows to recycle objects
+ * as soon as it is not needed.
+ */
+public interface IObjectPool<E> {
+ E getInstance();
+
+ void recycle(E object);
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/jackson/ObjectPool.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/jackson/ObjectPool.java
index 8945e71..864d9a9 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/jackson/ObjectPool.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/jackson/ObjectPool.java
@@ -18,20 +18,13 @@
*/
package org.apache.asterix.external.parser.jackson;
-import java.util.ArrayDeque;
-import java.util.Queue;
-
import org.apache.asterix.om.util.container.IObjectFactory;
/**
* Object pool for DFS traversal mode, which allows to recycle objects
* as soon as it is not needed.
*/
-public class ObjectPool<E, T> {
- private final IObjectFactory<E, T> objectFactory;
- private final Queue<E> recycledObjects;
- private final T element;
-
+public class ObjectPool<E, T> extends AbstractObjectPool<E, T, E> {
public ObjectPool() {
this(null, null);
}
@@ -40,28 +33,17 @@
this(objectFactory, null);
}
- public ObjectPool(IObjectFactory<E, T> objectFactory, T element) {
- this.objectFactory = objectFactory;
- recycledObjects = new ArrayDeque<>();
- this.element = element;
- }
-
- public E getInstance() {
- E instance = recycledObjects.poll();
- if (objectFactory != null && instance == null) {
- instance = objectFactory.create(element);
- }
- return instance;
- }
-
- public void recycle(E object) {
- if (object != null) {
- recycledObjects.add(object);
- }
+ public ObjectPool(IObjectFactory<E, T> objectFactory, T param) {
+ super(objectFactory, param);
}
@Override
- public String toString() {
- return recycledObjects.toString();
+ protected E unwrap(E wrapped) {
+ return wrapped;
+ }
+
+ @Override
+ protected E wrap(E element) {
+ return element;
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/jackson/ParserContext.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/jackson/ParserContext.java
index d0b79b1..ef9ff08 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/jackson/ParserContext.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/jackson/ParserContext.java
@@ -51,7 +51,7 @@
public class ParserContext {
private static final int SERIALIZED_FIELDNAME_MAP_MAX_SIZE = 128;
- private final ObjectPool<IARecordBuilder, ATypeTag> objectBuilderPool;
+ private final IObjectPool<IARecordBuilder> objectBuilderPool;
private final ObjectPool<IAsterixListBuilder, ATypeTag> arrayBuilderPool;
/**
@@ -61,7 +61,7 @@
* <p>
* Scalar value 5 is written 4 times in tempBuffer("d") then tempBuffer("c") ... tempBuffer("a")
*/
- private final ObjectPool<IMutableValueStorage, ATypeTag> tempBufferPool;
+ private final IObjectPool<IMutableValueStorage> tempBufferPool;
private final ObjectPool<BitSet, Void> nullBitmapPool;
private final Map<String, IMutableValueStorage> serializedFieldNames;
private final ISerializerDeserializer<AString> stringSerDe;
@@ -76,9 +76,9 @@
@SuppressWarnings("unchecked")
public ParserContext(boolean allocateModfiedUTF8Writer) {
- objectBuilderPool = new ObjectPool<>(new RecordBuilderFactory());
+ objectBuilderPool = new SoftObjectPool<>(new RecordBuilderFactory());
arrayBuilderPool = new ObjectPool<>(new ListBuilderFactory(), ATypeTag.ARRAY);
- tempBufferPool = new ObjectPool<>(new AbvsBuilderFactory());
+ tempBufferPool = new SoftObjectPool<>(new AbvsBuilderFactory());
nullBitmapPool = new ObjectPool<>();
serializedFieldNames = new LRUMap<>(SERIALIZED_FIELDNAME_MAP_MAX_SIZE);
stringSerDe = SerializerDeserializerProvider.INSTANCE.getAStringSerializerDeserializer();
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/jackson/SoftObjectPool.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/jackson/SoftObjectPool.java
new file mode 100644
index 0000000..91655db
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/jackson/SoftObjectPool.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.parser.jackson;
+
+import java.lang.ref.SoftReference;
+
+import org.apache.asterix.om.util.container.IObjectFactory;
+
+/**
+ * Object pool for DFS traversal mode, which allows to recycle objects
+ * as soon as it is not needed.
+ */
+public class SoftObjectPool<E, T> extends AbstractObjectPool<E, T, SoftReference<E>> {
+ public SoftObjectPool() {
+ this(null, null);
+ }
+
+ public SoftObjectPool(IObjectFactory<E, T> objectFactory) {
+ this(objectFactory, null);
+ }
+
+ public SoftObjectPool(IObjectFactory<E, T> objectFactory, T element) {
+ super(objectFactory, element);
+ }
+
+ @Override
+ protected E unwrap(SoftReference<E> wrapped) {
+ return wrapped == null ? null : wrapped.get();
+ }
+
+ @Override
+ protected SoftReference<E> wrap(E element) {
+ return new SoftReference<>(element);
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/string/UTF8StringUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/string/UTF8StringUtil.java
index f96ed72..38de4ac 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/string/UTF8StringUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/string/UTF8StringUtil.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -25,6 +25,7 @@
import java.io.IOException;
import java.io.OutputStream;
import java.io.UTFDataFormatException;
+import java.lang.ref.SoftReference;
import org.apache.hyracks.util.encoding.VarLenIntEncoderDecoder;
@@ -690,10 +691,12 @@
if (writer == null) {
tempBytes = new byte[utflen + 5];
} else {
- if (writer.tempBytes == null || writer.tempBytes.length < utflen + 5) {
- writer.tempBytes = new byte[utflen + 5];
+ byte[] writerTempBytes = writer.tempBytesRef != null ? writer.tempBytesRef.get() : null;
+ if (writerTempBytes == null || writerTempBytes.length < utflen + 5) {
+ writerTempBytes = new byte[utflen + 5];
+ writer.tempBytesRef = new SoftReference<>(writerTempBytes);
}
- tempBytes = writer.tempBytes;
+ tempBytes = writerTempBytes;
}
return tempBytes;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/string/UTF8StringWriter.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/string/UTF8StringWriter.java
index 563f865..a0cc7d0 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/string/UTF8StringWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/string/UTF8StringWriter.java
@@ -21,11 +21,12 @@
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
+import java.lang.ref.SoftReference;
public class UTF8StringWriter implements Serializable {
private static final long serialVersionUID = 1L;
- transient byte[] tempBytes;
+ transient SoftReference<byte[]> tempBytesRef;
public final void writeUTF8(CharSequence str, DataOutput out) throws IOException {
UTF8StringUtil.writeUTF8(str, out, this);