added virtual buffercache with test
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/IVirtualBufferCache.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/IVirtualBufferCache.java
new file mode 100644
index 0000000..ba273e2
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/IVirtualBufferCache.java
@@ -0,0 +1,7 @@
+package edu.uci.ics.hyracks.storage.am.lsm.common.api;
+
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+
+public interface IVirtualBufferCache extends IBufferCache {
+ public void open();
+}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/VirtualBufferCache.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/VirtualBufferCache.java
new file mode 100644
index 0000000..db310d6
--- /dev/null
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/VirtualBufferCache.java
@@ -0,0 +1,303 @@
+package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
+import edu.uci.ics.hyracks.storage.common.buffercache.ICacheMemoryAllocator;
+import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
+import edu.uci.ics.hyracks.storage.common.file.BufferedFileHandle;
+import edu.uci.ics.hyracks.storage.common.file.IFileMapManager;
+
+public class VirtualBufferCache implements IVirtualBufferCache {
+ private static final int OVERFLOW_PADDING = 8;
+
+ private final ICacheMemoryAllocator allocator;
+ private final IFileMapManager fileMapManager;
+ private final int pageSize;
+ private final int numPages;
+
+ private final CacheBucket[] buckets;
+ private final ArrayList<VirtualPage> pages;
+
+ private int nextFree;
+
+ private boolean open;
+
+ public VirtualBufferCache(ICacheMemoryAllocator allocator, IFileMapManager fileMapManager, int pageSize,
+ int numPages) {
+ this.allocator = allocator;
+ this.fileMapManager = fileMapManager;
+ this.pageSize = pageSize;
+ this.numPages = numPages;
+
+ buckets = new CacheBucket[numPages];
+ pages = new ArrayList<VirtualPage>();
+ nextFree = 0;
+ open = false;
+ }
+
+ @Override
+ public void createFile(FileReference fileRef) throws HyracksDataException {
+ synchronized (fileMapManager) {
+ if (fileMapManager.isMapped(fileRef)) {
+ throw new HyracksDataException("File " + fileRef + " is already mapped");
+ }
+ fileMapManager.registerFile(fileRef);
+ }
+ }
+
+ @Override
+ public void openFile(int fileId) throws HyracksDataException {
+ }
+
+ @Override
+ public void closeFile(int fileId) throws HyracksDataException {
+ }
+
+ @Override
+ public void deleteFile(int fileId, boolean flushDirtyPages) throws HyracksDataException {
+ synchronized (fileMapManager) {
+ if (!fileMapManager.isMapped(fileId)) {
+ throw new HyracksDataException("File with id " + fileId + " is not mapped");
+ }
+ fileMapManager.unregisterFile(fileId);
+ }
+
+ for (int i = 0; i < buckets.length; i++) {
+ CacheBucket bucket = buckets[i];
+ bucket.bucketLock.lock();
+ try {
+ VirtualPage prev = null;
+ VirtualPage curr = bucket.cachedPage;
+ while (curr != null) {
+ if (BufferedFileHandle.getFileId(curr.dpid) == fileId) {
+ if (prev == null) {
+ bucket.cachedPage = curr.next;
+ curr.reset();
+ curr = bucket.cachedPage;
+ } else {
+ prev.next = curr.next;
+ curr.reset();
+ curr = prev.next;
+ }
+ } else {
+ prev = curr;
+ curr = curr.next;
+ }
+ }
+ } finally {
+ bucket.bucketLock.unlock();
+ }
+ }
+ defragPageList();
+ }
+
+ private void defragPageList() {
+ int start = 0;
+ int end = nextFree - 1;
+ synchronized (pages) {
+ while (start < end) {
+ VirtualPage lastUsed = pages.get(end);
+ while (end > 0 && lastUsed.dpid == -1) {
+ --end;
+ lastUsed = pages.get(end);
+ }
+
+ if (end <= 0) {
+ nextFree = 0;
+ }
+
+ VirtualPage firstUnused = pages.get(start);
+ while (start < end && firstUnused.dpid != -1) {
+ ++start;
+ firstUnused = pages.get(start);
+ }
+
+ if (start >= end) {
+ break;
+ }
+
+ Collections.swap(pages, start, end);
+ nextFree = end;
+ --end;
+ ++start;
+ }
+ }
+ }
+
+ @Override
+ public ICachedPage tryPin(long dpid) throws HyracksDataException {
+ return pin(dpid, false);
+ }
+
+ @Override
+ public ICachedPage pin(long dpid, boolean newPage) throws HyracksDataException {
+ VirtualPage page = null;
+ int hash = hash(dpid);
+ CacheBucket bucket = buckets[hash];
+
+ bucket.bucketLock.lock();
+ try {
+ page = bucket.cachedPage;
+ while (page != null) {
+ if (page.dpid == dpid) {
+ return page;
+ }
+ page = page.next;
+ }
+
+ if (!newPage) {
+ throw new HyracksDataException("Page " + BufferedFileHandle.getPageId(dpid)
+ + " does not exist in file "
+ + fileMapManager.lookupFileName(BufferedFileHandle.getFileId(dpid)));
+ }
+ page = getOrAllocPage(dpid);
+ page.next = bucket.cachedPage;
+ bucket.cachedPage = page;
+ } finally {
+ bucket.bucketLock.unlock();
+ }
+
+ return page;
+ }
+
+ private int hash(long dpid) {
+ return (int) (dpid % buckets.length);
+ }
+
+ private VirtualPage getOrAllocPage(long dpid) {
+ VirtualPage page = null;
+ synchronized (pages) {
+ if (nextFree >= pages.size()) {
+ page = new VirtualPage(allocator.allocate(pageSize, 1)[0]);
+ pages.add(page);
+ } else {
+ page = pages.get(nextFree);
+ }
+ ++nextFree;
+ page.dpid = dpid;
+ }
+ return page;
+ }
+
+ @Override
+ public void unpin(ICachedPage page) throws HyracksDataException {
+ }
+
+ @Override
+ public void flushDirtyPage(ICachedPage page) throws HyracksDataException {
+ }
+
+ @Override
+ public void force(int fileId, boolean metadata) throws HyracksDataException {
+ }
+
+ @Override
+ public int getPageSize() {
+ return pageSize;
+ }
+
+ @Override
+ public int getNumPages() {
+ return numPages;
+ }
+
+ @Override
+ public void open() {
+ if (open) {
+ return;
+ }
+ pages.trimToSize();
+ pages.ensureCapacity(numPages + OVERFLOW_PADDING);
+ ByteBuffer[] buffers = allocator.allocate(pageSize, numPages);
+ for (int i = 0; i < numPages; i++) {
+ pages.add(new VirtualPage(buffers[i]));
+ buckets[i] = new CacheBucket();
+ }
+ nextFree = 0;
+ open = true;
+ }
+
+ @Override
+ public void close() {
+ if (!open) {
+ return;
+ }
+
+ pages.clear();
+ for (int i = 0; i < numPages; i++) {
+ buckets[i].cachedPage = null;
+ }
+ open = false;
+ }
+
+ public String dumpState() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(String.format("Page size = %d\n", pageSize));
+ sb.append(String.format("Capacity = %d\n", numPages));
+ sb.append(String.format("Allocated pages = %d\n", pages.size()));
+ sb.append(String.format("Next free page = %d\n", nextFree));
+ return sb.toString();
+ }
+
+ private static class CacheBucket {
+ private final ReentrantLock bucketLock;
+ private VirtualPage cachedPage;
+
+ public CacheBucket() {
+ this.bucketLock = new ReentrantLock();
+ }
+ }
+
+ private class VirtualPage implements ICachedPage {
+ final ByteBuffer buffer;
+ final ReadWriteLock latch;
+ volatile long dpid;
+ VirtualPage next;
+
+ public VirtualPage(ByteBuffer buffer) {
+ this.buffer = buffer;
+ latch = new ReentrantReadWriteLock(true);
+ dpid = -1;
+ next = null;
+ }
+
+ public void reset() {
+ dpid = -1;
+ next = null;
+ }
+
+ @Override
+ public ByteBuffer getBuffer() {
+ return buffer;
+ }
+
+ @Override
+ public void acquireReadLatch() {
+ latch.readLock().lock();
+ }
+
+ @Override
+ public void releaseReadLatch() {
+ latch.readLock().unlock();
+ }
+
+ @Override
+ public void acquireWriteLatch() {
+ latch.writeLock().lock();
+ }
+
+ @Override
+ public void releaseWriteLatch() {
+ latch.writeLock().unlock();
+ }
+
+ }
+}
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/common/VirtualBufferCacheTest.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/common/VirtualBufferCacheTest.java
new file mode 100644
index 0000000..d6ec1c5
--- /dev/null
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/common/VirtualBufferCacheTest.java
@@ -0,0 +1,124 @@
+package edu.uci.ics.hyracks.storage.am.lsm.common;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.VirtualBufferCache;
+import edu.uci.ics.hyracks.storage.common.buffercache.HeapBufferAllocator;
+import edu.uci.ics.hyracks.storage.common.buffercache.ICacheMemoryAllocator;
+import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
+import edu.uci.ics.hyracks.storage.common.file.BufferedFileHandle;
+import edu.uci.ics.hyracks.storage.common.file.IFileMapManager;
+import edu.uci.ics.hyracks.storage.common.file.TransientFileMapManager;
+
+public class VirtualBufferCacheTest {
+ private static final long SEED = 123456789L;
+ private static final int NUM_OVERPIN = 128;
+ private static final int PAGE_SIZE = 256;
+ private static final int NUM_FILES = 10;
+ private static final int NUM_PAGES = 1000;
+
+ private final Random random;
+ private final FileState[] fileStates;
+
+ private IFileMapManager fileMapManager;
+ private VirtualBufferCache vbc;
+
+ public VirtualBufferCacheTest() {
+ fileStates = new FileState[NUM_FILES];
+ for (int i = 0; i < NUM_FILES; i++) {
+ fileStates[i] = new FileState();
+ }
+ random = new Random(SEED);
+ vbc = null;
+ fileMapManager = null;
+ }
+
+ private static class FileState {
+ private int fileId;
+ private FileReference fileRef;
+ private int pinCount;
+ private Set<ICachedPage> pinnedPages;
+
+ public FileState() {
+ fileId = -1;
+ fileRef = null;
+ pinCount = 0;
+ pinnedPages = new HashSet<ICachedPage>();
+ }
+ }
+
+ /**
+ * Pins NUM_PAGES randomly distributed across NUM_FILES and checks that each
+ * set of cached pages pinned on behalf of a file are disjoint from all other sets of
+ * cached pages pinned on behalf of other files.
+ *
+ * Additionally, the test perform the same test when pinning over soft cap (NUM_PAGES)
+ * of pages.
+ */
+ @Test
+ public void test01() throws Exception {
+ ICacheMemoryAllocator allocator = new HeapBufferAllocator();
+ fileMapManager = new TransientFileMapManager();
+ vbc = new VirtualBufferCache(allocator, fileMapManager, PAGE_SIZE, NUM_PAGES);
+ vbc.open();
+ createFiles();
+
+ kPins(NUM_PAGES);
+ assertTrue(pagesDisjointed());
+
+ kPins(NUM_OVERPIN);
+ assertTrue(pagesDisjointed());
+
+ deleteFiles();
+ vbc.close();
+ }
+
+ private boolean pagesDisjointed() {
+ boolean disjoint = true;
+ for (int i = 0; i < NUM_FILES; i++) {
+ FileState fi = fileStates[i];
+ for (int j = i + 1; j < NUM_FILES; j++) {
+ FileState fj = fileStates[j];
+ disjoint = disjoint && Collections.disjoint(fi.pinnedPages, fj.pinnedPages);
+ }
+ }
+ return disjoint;
+ }
+
+ private void createFiles() throws Exception {
+ for (int i = 0; i < NUM_FILES; i++) {
+ FileState f = fileStates[i];
+ String fName = String.format("f%d", i);
+ f.fileRef = new FileReference(new File(fName));
+ vbc.createFile(f.fileRef);
+ f.fileId = fileMapManager.lookupFileId(f.fileRef);
+ }
+ }
+
+ private void deleteFiles() throws Exception {
+ for (int i = 0; i < NUM_FILES; i++) {
+ vbc.deleteFile(fileStates[i].fileId, false);
+ }
+ }
+
+ private void kPins(int k) throws Exception {
+ int numPinned = 0;
+ while (numPinned < k) {
+ int fsIdx = random.nextInt(NUM_FILES);
+ FileState f = fileStates[fsIdx];
+ ICachedPage p = vbc.pin(BufferedFileHandle.getDiskPageId(f.fileId, f.pinCount), true);
+ f.pinnedPages.add(p);
+ ++f.pinCount;
+ ++numPinned;
+ }
+ }
+}