Adding virtual pages to BufferCache for lsm-on-hdfs
Change-Id: Ifc69b80fc485f4b3057d717a314f0e203e557b3f
Reviewed-on: http://fulliautomatix.ics.uci.edu:8443/104
Reviewed-by: Sattam Alsubaiee <salsubaiee@gmail.com>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/MultitenantVirtualBufferCache.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/MultitenantVirtualBufferCache.java
index 409cf96..8af857c 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/MultitenantVirtualBufferCache.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/MultitenantVirtualBufferCache.java
@@ -115,4 +115,30 @@
public IFileMapManager getFileMapProvider() {
return vbc.getFileMapProvider();
}
+
+ //These 4 methods are not applicable here
+ @Override
+ public int createMemFile() throws HyracksDataException {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public void deleteMemFile(int fileId) throws HyracksDataException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public ICachedPage pinVirtual(long vpid) throws HyracksDataException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public ICachedPage unpinVirtual(long vpid, long dpid) throws HyracksDataException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/VirtualBufferCache.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/VirtualBufferCache.java
index 7f804c1..18d5e8b 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/VirtualBufferCache.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/VirtualBufferCache.java
@@ -341,4 +341,28 @@
}
}
+ //These 4 methods aren't applicable here.
+ @Override
+ public int createMemFile() throws HyracksDataException {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public void deleteMemFile(int fileId) throws HyracksDataException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public ICachedPage pinVirtual(long vpid) throws HyracksDataException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public ICachedPage unpinVirtual(long vpid, long dpid) throws HyracksDataException {
+ // TODO Auto-generated method stub
+ return null;
+ }
}
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
index 740c447..4248dc6 100644
--- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
@@ -20,6 +20,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.HashSet;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
@@ -52,6 +54,7 @@
private final IFileMapManager fileMapManager;
private final CleanerThread cleanerThread;
private final Map<Integer, BufferedFileHandle> fileInfoMap;
+ private final Set<Integer> virtualFiles;
private List<ICachedPageInternal> cachedPages = new ArrayList<ICachedPageInternal>();
@@ -71,8 +74,10 @@
this.pageReplacementStrategy = pageReplacementStrategy;
this.pageCleanerPolicy = pageCleanerPolicy;
this.fileMapManager = fileMapManager;
+
Executor executor = Executors.newCachedThreadPool(threadFactory);
fileInfoMap = new HashMap<Integer, BufferedFileHandle>();
+ virtualFiles = new HashSet<Integer>();
cleanerThread = new CleanerThread();
executor.execute(cleanerThread);
closed = false;
@@ -99,9 +104,9 @@
synchronized (fileInfoMap) {
fInfo = fileInfoMap.get(fileId);
}
- if (fInfo == null) {
+ if (fInfo == null && !virtualFiles.contains(fileId)) {
throw new HyracksDataException("pin called on a fileId " + fileId + " that has not been created.");
- } else if (fInfo.getReferenceCount() <= 0) {
+ } else if (fInfo != null && fInfo.getReferenceCount() <= 0) {
throw new HyracksDataException("pin called on a fileId " + fileId + " that has not been opened.");
}
}
@@ -134,7 +139,7 @@
public ICachedPage pin(long dpid, boolean newPage) throws HyracksDataException {
// Calling the pinSanityCheck should be used only for debugging, since the synchronized block over the fileInfoMap is a hot spot.
//pinSanityCheck(dpid);
- CachedPage cPage = findPage(dpid, newPage);
+ CachedPage cPage = findPage(dpid, false);
if (!newPage) {
// Resolve race of multiple threads trying to read the page from
// disk.
@@ -151,7 +156,40 @@
return cPage;
}
- private CachedPage findPage(long dpid, boolean newPage) throws HyracksDataException {
+ @Override
+ /**
+ * Allocate and pin a virtual page. This is just like a normal page, except that it will never be flushed.
+ */
+ public ICachedPage pinVirtual(long vpid) throws HyracksDataException {
+ //pinSanityCheck(vpid);
+ CachedPage cPage = findPage(vpid, true);
+ cPage.virtual = true;
+ return cPage;
+ }
+
+ @Override
+ /**
+ * Takes a virtual page, and copies it to a new page at the physical identifier.
+ */
+ //TODO: I should not have to copy the page. I should just append it to the end of the hash bucket, but this is
+ //safer/easier for now.
+ public ICachedPage unpinVirtual(long vpid, long dpid) throws HyracksDataException {
+ CachedPage virtPage = findPage(vpid, true); //should definitely succeed.
+ //pinSanityCheck(dpid); //debug
+ ICachedPage realPage = pin(dpid, false);
+ virtPage.acquireReadLatch();
+ realPage.acquireWriteLatch();
+ try {
+ System.arraycopy(virtPage.buffer.array(), 0, realPage.getBuffer().array(), 0, virtPage.buffer.capacity());
+ } finally {
+ realPage.releaseWriteLatch(true);
+ virtPage.releaseReadLatch();
+ }
+ virtPage.reset(-1); //now cause the virtual page to die
+ return realPage;
+ }
+
+ private CachedPage findPage(long dpid, boolean virtual) throws HyracksDataException {
while (true) {
int startCleanedCount = cleanerThread.cleanedCount;
@@ -186,7 +224,7 @@
* on the CachedPage may or may not be valid. 2. We have a pin
* on the CachedPage. We have to deal with three cases here.
* Case 1: The dpid on the CachedPage is invalid (-1). This
- * indicates that this buffer has never been used. So we are the
+ * indicates that this buffer has never been used or is a virtual page. So we are the
* only ones holding it. Get a lock on the required dpid's hash
* bucket, check if someone inserted the page we want into the
* table. If so, decrement the pincount on the victim and return
@@ -428,7 +466,7 @@
}
public void cleanPage(CachedPage cPage, boolean force) {
- if (cPage.dirty.get()) {
+ if (cPage.dirty.get() && !cPage.virtual) {
boolean proceed = false;
if (force) {
cPage.latch.writeLock().lock();
@@ -536,6 +574,22 @@
}
@Override
+ public int createMemFile() throws HyracksDataException {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Creating memory file in cache: " + this);
+ }
+ int fileId;
+ synchronized (fileInfoMap) {
+ fileId = fileMapManager.registerMemoryFile();
+ }
+ synchronized (virtualFiles) {
+ virtualFiles.add(fileId);
+ }
+ return fileId;
+
+ }
+
+ @Override
public void openFile(int fileId) throws HyracksDataException {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Opening file: " + fileId + " in cache: " + this);
@@ -707,6 +761,20 @@
}
@Override
+ public synchronized void deleteMemFile(int fileId) throws HyracksDataException {
+ //TODO: possible sanity chcecking here like in above?
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Deleting memory file: " + fileId + " in cache: " + this);
+ }
+ synchronized (virtualFiles) {
+ virtualFiles.remove(fileId);
+ }
+ synchronized(fileInfoMap){
+ fileMapManager.unregisterMemFile(fileId);
+ }
+ }
+
+ @Override
public void start() {
// no op
}
@@ -727,4 +795,5 @@
public void dumpState(OutputStream os) throws IOException {
os.write(dumpState().getBytes());
}
+
}
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/CachedPage.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/CachedPage.java
index d57a356..094c4f3 100644
--- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/CachedPage.java
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/CachedPage.java
@@ -34,6 +34,7 @@
volatile long dpid;
CachedPage next;
volatile boolean valid;
+ volatile boolean virtual;
public CachedPage(int cpid, ByteBuffer buffer, IPageReplacementStrategy pageReplacementStrategy) {
this.cpid = cpid;
@@ -45,6 +46,7 @@
replacementStrategyObject = pageReplacementStrategy.createPerPageStrategyObject(cpid);
dpid = -1;
valid = false;
+ virtual = false;
}
public void reset(long dpid) {
@@ -70,7 +72,10 @@
@Override
public boolean pinIfGoodVictim() {
- return pinCount.compareAndSet(0, 1);
+ if (virtual)
+ return false; //i am not a good victim because i cant flush!
+ else
+ return pinCount.compareAndSet(0, 1);
}
@Override
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/DebugBufferCache.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/DebugBufferCache.java
index 4fac8ff..743de15 100644
--- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/DebugBufferCache.java
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/DebugBufferCache.java
@@ -165,4 +165,27 @@
public void force(int fileId, boolean metadata) throws HyracksDataException {
bufferCache.force(fileId, metadata);
}
+
+ @Override
+ public int createMemFile() throws HyracksDataException {
+ return bufferCache.createMemFile();
+ }
+
+ @Override
+ public void deleteMemFile(int fileId) throws HyracksDataException {
+ bufferCache.deleteMemFile(fileId);
+ }
+
+ @Override
+ public ICachedPage pinVirtual(long vpid) throws HyracksDataException {
+ pinCount.addAndGet(1);
+ return bufferCache.pinVirtual(vpid);
+ }
+
+ @Override
+ public ICachedPage unpinVirtual(long vpid, long dpid) throws HyracksDataException {
+ unpinCount.addAndGet(1);
+ return bufferCache.unpinVirtual(vpid, dpid);
+ }
+
}
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IBufferCache.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IBufferCache.java
index cb892ff..a8e17a5 100644
--- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IBufferCache.java
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IBufferCache.java
@@ -15,10 +15,13 @@
package edu.uci.ics.hyracks.storage.common.buffercache;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.IFileHandle;
import edu.uci.ics.hyracks.api.io.FileReference;
public interface IBufferCache {
public void createFile(FileReference fileRef) throws HyracksDataException;
+
+ public int createMemFile() throws HyracksDataException;
public void openFile(int fileId) throws HyracksDataException;
@@ -26,9 +29,15 @@
public void deleteFile(int fileId, boolean flushDirtyPages) throws HyracksDataException;
+ public void deleteMemFile(int fileId) throws HyracksDataException;
+
public ICachedPage tryPin(long dpid) throws HyracksDataException;
public ICachedPage pin(long dpid, boolean newPage) throws HyracksDataException;
+
+ public ICachedPage pinVirtual(long vpid) throws HyracksDataException;
+
+ public ICachedPage unpinVirtual(long vpid, long dpid) throws HyracksDataException;
public void unpin(ICachedPage page) throws HyracksDataException;
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/file/IFileMapManager.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/file/IFileMapManager.java
index f9ce887..cc6c397 100644
--- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/file/IFileMapManager.java
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/file/IFileMapManager.java
@@ -42,4 +42,8 @@
* - If the fileid is not mapped currently in this manager.
*/
public void unregisterFile(int fileId) throws HyracksDataException;
+
+ public int registerMemoryFile();
+
+ public void unregisterMemFile(int fileId) throws HyracksDataException;
}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/file/IFileMapProvider.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/file/IFileMapProvider.java
index dcab11f..62e8b71 100644
--- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/file/IFileMapProvider.java
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/file/IFileMapProvider.java
@@ -57,4 +57,5 @@
* - If the file id is not mapped currently in this manager.
*/
public FileReference lookupFileName(int fileId) throws HyracksDataException;
+
}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/file/TransientFileMapManager.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/file/TransientFileMapManager.java
index 20aec3f..4d1d675 100644
--- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/file/TransientFileMapManager.java
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/file/TransientFileMapManager.java
@@ -67,4 +67,16 @@
id2nameMap.put(fileId, fileRef);
name2IdMap.put(fileRef, fileId);
}
+
+ @Override
+ public int registerMemoryFile() {
+ Integer fileId = idCounter++;
+ id2nameMap.put(fileId, null);
+ return fileId;
+ }
+
+ @Override
+ public void unregisterMemFile(int fileId) throws HyracksDataException {
+ id2nameMap.remove(fileId);
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/edu/uci/ics/hyracks/storage/common/BufferCacheTest.java b/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/edu/uci/ics/hyracks/storage/common/BufferCacheTest.java
index 118a27c..a0566e3 100644
--- a/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/edu/uci/ics/hyracks/storage/common/BufferCacheTest.java
+++ b/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/edu/uci/ics/hyracks/storage/common/BufferCacheTest.java
@@ -96,7 +96,7 @@
}
bufferCache.closeFile(fileId);
-
+
// This code is commented because the method pinSanityCheck in the BufferCache is commented.
/*boolean exceptionThrown = false;
@@ -315,6 +315,102 @@
bufferCache.close();
}
+ @Test
+ public void virtualPageTest() throws HyracksDataException {
+ TestStorageManagerComponentHolder.init(PAGE_SIZE, NUM_PAGES, MAX_OPEN_FILES);
+ IBufferCache bufferCache = TestStorageManagerComponentHolder.getBufferCache(ctx);
+ IFileMapProvider fmp = TestStorageManagerComponentHolder.getFileMapProvider(ctx);
+
+ List<Integer> fileIds = new ArrayList<Integer>();
+ Map<Integer, ArrayList<Integer>> pageContents = new HashMap<Integer, ArrayList<Integer>>();
+ ArrayList<Integer> memVals;
+ int num = 10;
+ int testPageId = 0;
+ int lastRealPage = 0;
+ String fileName = getFileName();
+ FileReference file = new FileReference(new File(fileName));
+ bufferCache.createFile(file);
+ int memFileId = bufferCache.createMemFile();
+ int fileId = fmp.lookupFileId(file);
+ bufferCache.openFile(fileId);
+ fileIds.add(fileId);
+
+ // try and write a few somethings into an on-disk paged file
+ ICachedPage page = null;
+ for (; lastRealPage < 10; lastRealPage++) {
+ page = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, lastRealPage), true);
+ page.acquireWriteLatch();
+ try {
+ ArrayList<Integer> values = new ArrayList<Integer>();
+ for (int j = 0; j < num; j++) {
+ int x = Math.abs(rnd.nextInt());
+ page.getBuffer().putInt(j * 4, x);
+ values.add(x);
+ }
+ pageContents.put(lastRealPage, values);
+ } finally {
+ page.releaseWriteLatch(true);
+ bufferCache.unpin(page);
+ }
+ }
+ //now try the same thing, but for a virtual page
+ page = bufferCache.pinVirtual(BufferedFileHandle.getDiskPageId(memFileId, testPageId));
+ page.acquireWriteLatch();
+ try {
+ ArrayList<Integer> values = new ArrayList<Integer>();
+ for (int j = 0; j < num; j++) {
+ int x = Math.abs(rnd.nextInt());
+ page.getBuffer().putInt(j * 4, x);
+ values.add(x);
+ }
+ memVals = values;
+ } finally {
+ page.releaseWriteLatch(true);
+ //no unpin here.
+ }
+ //write some more stuff...
+ for (; lastRealPage < 20; lastRealPage++) {
+ page = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, lastRealPage), true);
+ page.acquireWriteLatch();
+ try {
+ ArrayList<Integer> values = new ArrayList<Integer>();
+ for (int j = 0; j < num; j++) {
+ int x = Math.abs(rnd.nextInt());
+ page.getBuffer().putInt(j * 4, x);
+ values.add(x);
+ }
+ pageContents.put(lastRealPage, values);
+ } finally {
+ page.releaseWriteLatch(true);
+ bufferCache.unpin(page);
+ }
+ }
+ //now try putting the virtual page after the other pages
+ ICachedPage realPage = bufferCache.unpinVirtual(BufferedFileHandle.getDiskPageId(memFileId, testPageId),
+ BufferedFileHandle.getDiskPageId(fileId, lastRealPage));
+ bufferCache.unpin(realPage);
+ pageContents.put(lastRealPage, memVals);
+
+ bufferCache.closeFile(fileId);
+
+ //now try reading it back!
+ bufferCache.openFile(fileId);
+ for (int i : pageContents.keySet()) {
+ page = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, i), false);
+ page.acquireReadLatch();
+ try {
+ ArrayList<Integer> values = pageContents.get(i);
+ for (int j = 0; j < values.size(); j++) {
+ Assert.assertEquals(values.get(j).intValue(), page.getBuffer().getInt(j * 4));
+ }
+ } finally {
+ page.releaseReadLatch();
+ bufferCache.unpin(page);
+ }
+ }
+ bufferCache.closeFile(fileId);
+ }
+
@AfterClass
public static void cleanup() throws Exception {
for (String s : openedFiles) {