ASTERIXDB-1058: ensure memory availablity before allocation
This change includes the following:
- IResourceMemeoryManager responsible for resources memory budget calculation.
- ResourceHeapBufferAllocator which checks with IResourceManager before allocation.
Change-Id: I846ff45402410835f5ed0afd2f701509abc95222
Reviewed-on: https://asterix-gerrit.ics.uci.edu/480
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ian Maxon <imaxon@apache.org>
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IIndexLifecycleManager.java b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IIndexLifecycleManager.java
index 05395a8..6d16921 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IIndexLifecycleManager.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IIndexLifecycleManager.java
@@ -21,8 +21,9 @@
import java.util.List;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.common.IResourceMemoryManager;
-public interface IIndexLifecycleManager {
+public interface IIndexLifecycleManager extends IResourceMemoryManager {
public List<IIndex> getOpenIndexes();
public void register(String resourceName, IIndex index) throws HyracksDataException;
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java
index c884cbb..4ba2b80 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java
@@ -194,8 +194,8 @@
public void open(String resourceName) throws HyracksDataException {
IndexInfo info = indexInfos.get(resourceName);
if (info == null) {
- throw new HyracksDataException("Failed to open index with resource name " + resourceName
- + " since it does not exist.");
+ throw new HyracksDataException(
+ "Failed to open index with resource name " + resourceName + " since it does not exist.");
}
if (!info.isOpen) {
@@ -236,7 +236,8 @@
indexInfos.remove(resourceName);
}
- private void allocateMemory(String resourceName) throws HyracksDataException {
+ @Override
+ public void allocateMemory(String resourceName) throws HyracksDataException {
IndexInfo info = indexInfos.get(resourceName);
if (info == null) {
throw new HyracksDataException("Failed to allocate memory for index with resource ID " + resourceName
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/VirtualBufferCache.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/VirtualBufferCache.java
index ff0c6a7..54c4d7c 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/VirtualBufferCache.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/VirtualBufferCache.java
@@ -174,8 +174,7 @@
}
if (!newPage) {
- throw new HyracksDataException("Page " + BufferedFileHandle.getPageId(dpid)
- + " does not exist in file "
+ throw new HyracksDataException("Page " + BufferedFileHandle.getPageId(dpid) + " does not exist in file "
+ fileMapManager.lookupFileName(BufferedFileHandle.getFileId(dpid)));
}
@@ -238,7 +237,7 @@
}
pages.trimToSize();
pages.ensureCapacity(numPages + OVERFLOW_PADDING);
- ByteBuffer[] buffers = allocator.allocate(pageSize, numPages);
+ ByteBuffer[] buffers = allocator.ensureAvailabilityThenAllocate(pageSize, numPages);
for (int i = 0; i < numPages; i++) {
pages.add(new VirtualPage(buffers[i]));
buckets[i] = new CacheBucket();
diff --git a/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IResourceMemoryManager.java b/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IResourceMemoryManager.java
new file mode 100644
index 0000000..c8b530f
--- /dev/null
+++ b/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IResourceMemoryManager.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface IResourceMemoryManager {
+ void allocateMemory(String resourceName) throws HyracksDataException;
+}
diff --git a/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/DirectBufferAllocator.java b/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/DirectBufferAllocator.java
index c5bf1c1..bcaed9b 100644
--- a/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/DirectBufferAllocator.java
+++ b/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/DirectBufferAllocator.java
@@ -20,6 +20,8 @@
import java.nio.ByteBuffer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
public class DirectBufferAllocator implements ICacheMemoryAllocator {
@Override
public ByteBuffer[] allocate(int pageSize, int numPages) {
@@ -29,4 +31,9 @@
}
return buffers;
}
+
+ @Override
+ public ByteBuffer[] ensureAvailabilityThenAllocate(int pageSize, int numPages) throws HyracksDataException {
+ return allocate(pageSize, numPages);
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/HeapBufferAllocator.java b/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/HeapBufferAllocator.java
index d450176..92c22bb 100644
--- a/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/HeapBufferAllocator.java
+++ b/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/HeapBufferAllocator.java
@@ -20,6 +20,8 @@
import java.nio.ByteBuffer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
public class HeapBufferAllocator implements ICacheMemoryAllocator {
@Override
public ByteBuffer[] allocate(int pageSize, int numPages) {
@@ -29,4 +31,9 @@
}
return buffers;
}
+
+ @Override
+ public ByteBuffer[] ensureAvailabilityThenAllocate(int pageSize, int numPages) throws HyracksDataException {
+ return allocate(pageSize, numPages);
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ICacheMemoryAllocator.java b/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ICacheMemoryAllocator.java
index bb85bd1..a347cf8 100644
--- a/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ICacheMemoryAllocator.java
+++ b/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ICacheMemoryAllocator.java
@@ -20,6 +20,24 @@
import java.nio.ByteBuffer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
public interface ICacheMemoryAllocator {
+ /**
+ * @param pageSize
+ * @param numPages
+ * @return a ByteBuffer array of size numPages and each buffer of size pageSize.
+ */
public ByteBuffer[] allocate(int pageSize, int numPages);
+
+ /**
+ * Ensures the availability of the memory budget with the ResourceMemoryManager (if exists) before allocation. Otherwise, it acts as a call to {@link #allocate(int, int)}
+ *
+ * @param pageSize
+ * @param numPages
+ * @return
+ * @throws HyracksDataException
+ */
+ public ByteBuffer[] ensureAvailabilityThenAllocate(int pageSize, int numPages) throws HyracksDataException;
+
}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ResourceHeapBufferAllocator.java b/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ResourceHeapBufferAllocator.java
new file mode 100644
index 0000000..db6213c
--- /dev/null
+++ b/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ResourceHeapBufferAllocator.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common.buffercache;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.common.IResourceMemoryManager;
+
+public class ResourceHeapBufferAllocator implements ICacheMemoryAllocator {
+
+ final IResourceMemoryManager memoryManager;
+ final String resourceName;
+
+ public ResourceHeapBufferAllocator(IResourceMemoryManager memoryManager, String resourceName) {
+ this.memoryManager = memoryManager;
+ this.resourceName = resourceName;
+ }
+
+ @Override
+ public ByteBuffer[] allocate(int pageSize, int numPages) {
+ ByteBuffer[] buffers = new ByteBuffer[numPages];
+ for (int i = 0; i < numPages; ++i) {
+ buffers[i] = ByteBuffer.allocate(pageSize);
+ }
+ return buffers;
+ }
+
+ @Override
+ public ByteBuffer[] ensureAvailabilityThenAllocate(int pageSize, int numPages) throws HyracksDataException {
+ memoryManager.allocateMemory(resourceName);
+ return allocate(pageSize, numPages);
+ }
+}
\ No newline at end of file