Adding virtual pages to BufferCache for lsm-on-hdfs

Change-Id: Ifc69b80fc485f4b3057d717a314f0e203e557b3f
Reviewed-on: http://fulliautomatix.ics.uci.edu:8443/104
Reviewed-by: Sattam Alsubaiee <salsubaiee@gmail.com>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/MultitenantVirtualBufferCache.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/MultitenantVirtualBufferCache.java
index 409cf96..8af857c 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/MultitenantVirtualBufferCache.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/MultitenantVirtualBufferCache.java
@@ -115,4 +115,30 @@
     public IFileMapManager getFileMapProvider() {
         return vbc.getFileMapProvider();
     }
+
+    //These 4 methods are not applicable here
+    @Override
+    public int createMemFile() throws HyracksDataException {
+        // TODO Auto-generated method stub
+        return 0;
+    }
+
+    @Override
+    public void deleteMemFile(int fileId) throws HyracksDataException {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public ICachedPage pinVirtual(long vpid) throws HyracksDataException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public ICachedPage unpinVirtual(long vpid, long dpid) throws HyracksDataException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
 }
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/VirtualBufferCache.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/VirtualBufferCache.java
index 7f804c1..18d5e8b 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/VirtualBufferCache.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/VirtualBufferCache.java
@@ -341,4 +341,28 @@
         }
 
     }
+    //These 4 methods aren't applicable here.
+    @Override
+    public int createMemFile() throws HyracksDataException {
+        // TODO Auto-generated method stub
+        return 0;
+    }
+
+    @Override
+    public void deleteMemFile(int fileId) throws HyracksDataException {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public ICachedPage pinVirtual(long vpid) throws HyracksDataException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public ICachedPage unpinVirtual(long vpid, long dpid) throws HyracksDataException {
+        // TODO Auto-generated method stub
+        return null;
+    }
 }
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
index 740c447..4248dc6 100644
--- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
@@ -20,6 +20,8 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.HashSet;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
@@ -52,6 +54,7 @@
     private final IFileMapManager fileMapManager;
     private final CleanerThread cleanerThread;
     private final Map<Integer, BufferedFileHandle> fileInfoMap;
+    private final Set<Integer> virtualFiles;
 
     private List<ICachedPageInternal> cachedPages = new ArrayList<ICachedPageInternal>();
 
@@ -71,8 +74,10 @@
         this.pageReplacementStrategy = pageReplacementStrategy;
         this.pageCleanerPolicy = pageCleanerPolicy;
         this.fileMapManager = fileMapManager;
+
         Executor executor = Executors.newCachedThreadPool(threadFactory);
         fileInfoMap = new HashMap<Integer, BufferedFileHandle>();
+        virtualFiles = new HashSet<Integer>();
         cleanerThread = new CleanerThread();
         executor.execute(cleanerThread);
         closed = false;
@@ -99,9 +104,9 @@
         synchronized (fileInfoMap) {
             fInfo = fileInfoMap.get(fileId);
         }
-        if (fInfo == null) {
+        if (fInfo == null && !virtualFiles.contains(fileId)) {
             throw new HyracksDataException("pin called on a fileId " + fileId + " that has not been created.");
-        } else if (fInfo.getReferenceCount() <= 0) {
+        } else if (fInfo != null && fInfo.getReferenceCount() <= 0) {
             throw new HyracksDataException("pin called on a fileId " + fileId + " that has not been opened.");
         }
     }
@@ -134,7 +139,7 @@
     public ICachedPage pin(long dpid, boolean newPage) throws HyracksDataException {
         // Calling the pinSanityCheck should be used only for debugging, since the synchronized block over the fileInfoMap is a hot spot.
         //pinSanityCheck(dpid);
-        CachedPage cPage = findPage(dpid, newPage);
+        CachedPage cPage = findPage(dpid, false);
         if (!newPage) {
             // Resolve race of multiple threads trying to read the page from
             // disk.
@@ -151,7 +156,40 @@
         return cPage;
     }
 
-    private CachedPage findPage(long dpid, boolean newPage) throws HyracksDataException {
+    @Override
+    /**
+     * Allocate and pin a virtual page. This is just like a normal page, except that it will never be flushed.
+     */
+    public ICachedPage pinVirtual(long vpid) throws HyracksDataException {
+        //pinSanityCheck(vpid);
+        CachedPage cPage = findPage(vpid, true);
+        cPage.virtual = true;
+        return cPage;
+    }
+
+    @Override
+    /**
+     * Takes a virtual page, and copies it to a new page at the physical identifier. 
+     */
+    //TODO: I should not have to copy the page. I should just append it to the end of the hash bucket, but this is 
+    //safer/easier for now. 
+    public ICachedPage unpinVirtual(long vpid, long dpid) throws HyracksDataException {
+        CachedPage virtPage = findPage(vpid, true); //should definitely succeed. 
+        //pinSanityCheck(dpid); //debug
+        ICachedPage realPage = pin(dpid, false);
+        virtPage.acquireReadLatch();
+        realPage.acquireWriteLatch();
+        try {
+            System.arraycopy(virtPage.buffer.array(), 0, realPage.getBuffer().array(), 0, virtPage.buffer.capacity());
+        } finally {
+            realPage.releaseWriteLatch(true);
+            virtPage.releaseReadLatch();
+        }
+        virtPage.reset(-1); //now cause the virtual page to die
+        return realPage;
+    }
+
+    private CachedPage findPage(long dpid, boolean virtual) throws HyracksDataException {
         while (true) {
             int startCleanedCount = cleanerThread.cleanedCount;
 
@@ -186,7 +224,7 @@
                  * on the CachedPage may or may not be valid. 2. We have a pin
                  * on the CachedPage. We have to deal with three cases here.
                  * Case 1: The dpid on the CachedPage is invalid (-1). This
-                 * indicates that this buffer has never been used. So we are the
+                 * indicates that this buffer has never been used or is a virtual page. So we are the
                  * only ones holding it. Get a lock on the required dpid's hash
                  * bucket, check if someone inserted the page we want into the
                  * table. If so, decrement the pincount on the victim and return
@@ -428,7 +466,7 @@
         }
 
         public void cleanPage(CachedPage cPage, boolean force) {
-            if (cPage.dirty.get()) {
+            if (cPage.dirty.get() && !cPage.virtual) {
                 boolean proceed = false;
                 if (force) {
                     cPage.latch.writeLock().lock();
@@ -536,6 +574,22 @@
     }
 
     @Override
+    public int createMemFile() throws HyracksDataException {
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Creating memory file in cache: " + this);
+        }
+        int fileId;
+        synchronized (fileInfoMap) {
+            fileId = fileMapManager.registerMemoryFile();
+        }
+        synchronized (virtualFiles) {
+            virtualFiles.add(fileId);
+        }
+        return fileId;
+
+    }
+
+    @Override
     public void openFile(int fileId) throws HyracksDataException {
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Opening file: " + fileId + " in cache: " + this);
@@ -707,6 +761,20 @@
     }
 
     @Override
+    public synchronized void deleteMemFile(int fileId) throws HyracksDataException {
+        //TODO: possible sanity chcecking here like in above?
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Deleting memory file: " + fileId + " in cache: " + this);
+        }
+        synchronized (virtualFiles) {
+            virtualFiles.remove(fileId);
+        }
+        synchronized(fileInfoMap){
+            fileMapManager.unregisterMemFile(fileId);
+        }
+    }
+
+    @Override
     public void start() {
         // no op
     }
@@ -727,4 +795,5 @@
     public void dumpState(OutputStream os) throws IOException {
         os.write(dumpState().getBytes());
     }
+
 }
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/CachedPage.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/CachedPage.java
index d57a356..094c4f3 100644
--- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/CachedPage.java
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/CachedPage.java
@@ -34,6 +34,7 @@
     volatile long dpid;
     CachedPage next;
     volatile boolean valid;
+    volatile boolean virtual;
 
     public CachedPage(int cpid, ByteBuffer buffer, IPageReplacementStrategy pageReplacementStrategy) {
         this.cpid = cpid;
@@ -45,6 +46,7 @@
         replacementStrategyObject = pageReplacementStrategy.createPerPageStrategyObject(cpid);
         dpid = -1;
         valid = false;
+        virtual = false;
     }
 
     public void reset(long dpid) {
@@ -70,7 +72,10 @@
 
     @Override
     public boolean pinIfGoodVictim() {
-        return pinCount.compareAndSet(0, 1);
+        if (virtual)
+            return false; //i am not a good victim because i cant flush!
+        else
+            return pinCount.compareAndSet(0, 1);
     }
 
     @Override
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/DebugBufferCache.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/DebugBufferCache.java
index 4fac8ff..743de15 100644
--- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/DebugBufferCache.java
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/DebugBufferCache.java
@@ -165,4 +165,27 @@
     public void force(int fileId, boolean metadata) throws HyracksDataException {
         bufferCache.force(fileId, metadata);
     }
+
+    @Override
+    public int createMemFile() throws HyracksDataException {
+        return bufferCache.createMemFile();
+    }
+
+    @Override
+    public void deleteMemFile(int fileId) throws HyracksDataException {
+        bufferCache.deleteMemFile(fileId); 
+    }
+
+    @Override
+    public ICachedPage pinVirtual(long vpid) throws HyracksDataException {
+        pinCount.addAndGet(1);
+        return bufferCache.pinVirtual(vpid);
+    }
+
+    @Override
+    public ICachedPage unpinVirtual(long vpid, long dpid) throws HyracksDataException {
+        unpinCount.addAndGet(1);
+        return bufferCache.unpinVirtual(vpid, dpid);
+    }
+
 }
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IBufferCache.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IBufferCache.java
index cb892ff..a8e17a5 100644
--- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IBufferCache.java
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IBufferCache.java
@@ -15,10 +15,13 @@
 package edu.uci.ics.hyracks.storage.common.buffercache;
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.IFileHandle;
 import edu.uci.ics.hyracks.api.io.FileReference;
 
 public interface IBufferCache {
     public void createFile(FileReference fileRef) throws HyracksDataException;
+    
+    public int createMemFile() throws HyracksDataException;
 
     public void openFile(int fileId) throws HyracksDataException;
 
@@ -26,9 +29,15 @@
 
     public void deleteFile(int fileId, boolean flushDirtyPages) throws HyracksDataException;
 
+    public void deleteMemFile(int fileId) throws HyracksDataException;
+
     public ICachedPage tryPin(long dpid) throws HyracksDataException;
 
     public ICachedPage pin(long dpid, boolean newPage) throws HyracksDataException;
+    
+    public ICachedPage pinVirtual(long vpid) throws HyracksDataException;
+    
+    public ICachedPage unpinVirtual(long vpid, long dpid) throws HyracksDataException;
 
     public void unpin(ICachedPage page) throws HyracksDataException;
 
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/file/IFileMapManager.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/file/IFileMapManager.java
index f9ce887..cc6c397 100644
--- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/file/IFileMapManager.java
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/file/IFileMapManager.java
@@ -42,4 +42,8 @@
      *             - If the fileid is not mapped currently in this manager.
      */
     public void unregisterFile(int fileId) throws HyracksDataException;
+    
+    public int registerMemoryFile();
+    
+    public void unregisterMemFile(int fileId) throws HyracksDataException;
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/file/IFileMapProvider.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/file/IFileMapProvider.java
index dcab11f..62e8b71 100644
--- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/file/IFileMapProvider.java
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/file/IFileMapProvider.java
@@ -57,4 +57,5 @@
      *             - If the file id is not mapped currently in this manager.
      */
     public FileReference lookupFileName(int fileId) throws HyracksDataException;
+    
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/file/TransientFileMapManager.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/file/TransientFileMapManager.java
index 20aec3f..4d1d675 100644
--- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/file/TransientFileMapManager.java
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/file/TransientFileMapManager.java
@@ -67,4 +67,16 @@
         id2nameMap.put(fileId, fileRef);
         name2IdMap.put(fileRef, fileId);
     }
+
+    @Override
+    public int registerMemoryFile() {
+        Integer fileId = idCounter++;
+        id2nameMap.put(fileId, null);
+        return fileId;
+    }
+
+    @Override
+    public void unregisterMemFile(int fileId) throws HyracksDataException {
+        id2nameMap.remove(fileId);
+    }
 }
\ No newline at end of file
diff --git a/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/edu/uci/ics/hyracks/storage/common/BufferCacheTest.java b/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/edu/uci/ics/hyracks/storage/common/BufferCacheTest.java
index 118a27c..a0566e3 100644
--- a/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/edu/uci/ics/hyracks/storage/common/BufferCacheTest.java
+++ b/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/edu/uci/ics/hyracks/storage/common/BufferCacheTest.java
@@ -96,7 +96,7 @@
         }
 
         bufferCache.closeFile(fileId);
-        
+
         // This code is commented because the method pinSanityCheck in the BufferCache is commented.
         /*boolean exceptionThrown = false;
 
@@ -315,6 +315,102 @@
         bufferCache.close();
     }
 
+    @Test
+    public void virtualPageTest() throws HyracksDataException {
+        TestStorageManagerComponentHolder.init(PAGE_SIZE, NUM_PAGES, MAX_OPEN_FILES);
+        IBufferCache bufferCache = TestStorageManagerComponentHolder.getBufferCache(ctx);
+        IFileMapProvider fmp = TestStorageManagerComponentHolder.getFileMapProvider(ctx);
+
+        List<Integer> fileIds = new ArrayList<Integer>();
+        Map<Integer, ArrayList<Integer>> pageContents = new HashMap<Integer, ArrayList<Integer>>();
+        ArrayList<Integer> memVals;
+        int num = 10;
+        int testPageId = 0;
+        int lastRealPage = 0;
+        String fileName = getFileName();
+        FileReference file = new FileReference(new File(fileName));
+        bufferCache.createFile(file);
+        int memFileId = bufferCache.createMemFile();
+        int fileId = fmp.lookupFileId(file);
+        bufferCache.openFile(fileId);
+        fileIds.add(fileId);
+
+        // try and write a few somethings into an on-disk paged file
+        ICachedPage page = null;
+        for (; lastRealPage < 10; lastRealPage++) {
+            page = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, lastRealPage), true);
+            page.acquireWriteLatch();
+            try {
+                ArrayList<Integer> values = new ArrayList<Integer>();
+                for (int j = 0; j < num; j++) {
+                    int x = Math.abs(rnd.nextInt());
+                    page.getBuffer().putInt(j * 4, x);
+                    values.add(x);
+                }
+                pageContents.put(lastRealPage, values);
+            } finally {
+                page.releaseWriteLatch(true);
+                bufferCache.unpin(page);
+            }
+        }
+        //now try the same thing, but for a virtual page
+        page = bufferCache.pinVirtual(BufferedFileHandle.getDiskPageId(memFileId, testPageId));
+        page.acquireWriteLatch();
+        try {
+            ArrayList<Integer> values = new ArrayList<Integer>();
+            for (int j = 0; j < num; j++) {
+                int x = Math.abs(rnd.nextInt());
+                page.getBuffer().putInt(j * 4, x);
+                values.add(x);
+            }
+            memVals = values;
+        } finally {
+            page.releaseWriteLatch(true);
+            //no unpin here.
+        }
+        //write some more stuff...
+        for (; lastRealPage < 20; lastRealPage++) {
+            page = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, lastRealPage), true);
+            page.acquireWriteLatch();
+            try {
+                ArrayList<Integer> values = new ArrayList<Integer>();
+                for (int j = 0; j < num; j++) {
+                    int x = Math.abs(rnd.nextInt());
+                    page.getBuffer().putInt(j * 4, x);
+                    values.add(x);
+                }
+                pageContents.put(lastRealPage, values);
+            } finally {
+                page.releaseWriteLatch(true);
+                bufferCache.unpin(page);
+            }
+        }
+        //now try putting the virtual page after the other pages
+        ICachedPage realPage = bufferCache.unpinVirtual(BufferedFileHandle.getDiskPageId(memFileId, testPageId),
+                BufferedFileHandle.getDiskPageId(fileId, lastRealPage));
+        bufferCache.unpin(realPage);
+        pageContents.put(lastRealPage, memVals);
+
+        bufferCache.closeFile(fileId);
+
+        //now try reading it back!
+        bufferCache.openFile(fileId);
+        for (int i : pageContents.keySet()) {
+            page = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, i), false);
+            page.acquireReadLatch();
+            try {
+                ArrayList<Integer> values = pageContents.get(i);
+                for (int j = 0; j < values.size(); j++) {
+                    Assert.assertEquals(values.get(j).intValue(), page.getBuffer().getInt(j * 4));
+                }
+            } finally {
+                page.releaseReadLatch();
+                bufferCache.unpin(page);
+            }
+        }
+        bufferCache.closeFile(fileId);
+    }
+
     @AfterClass
     public static void cleanup() throws Exception {
         for (String s : openedFiles) {