Added IPageCleanerPolicy

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_staging@1963 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/edu/uci/ics/hyracks/examples/btree/helper/RuntimeContext.java b/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/edu/uci/ics/hyracks/examples/btree/helper/RuntimeContext.java
index 3336d6d..56d517f 100644
--- a/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/edu/uci/ics/hyracks/examples/btree/helper/RuntimeContext.java
+++ b/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/edu/uci/ics/hyracks/examples/btree/helper/RuntimeContext.java
@@ -21,6 +21,7 @@
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexRegistry;
 import edu.uci.ics.hyracks.storage.common.buffercache.BufferCache;
 import edu.uci.ics.hyracks.storage.common.buffercache.ClockPageReplacementStrategy;
+import edu.uci.ics.hyracks.storage.common.buffercache.DelayPageCleanerPolicy;
 import edu.uci.ics.hyracks.storage.common.buffercache.HeapBufferAllocator;
 import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
 import edu.uci.ics.hyracks.storage.common.buffercache.ICacheMemoryAllocator;
@@ -38,7 +39,8 @@
         fileMapManager = new TransientFileMapManager();
         ICacheMemoryAllocator allocator = new HeapBufferAllocator();
         IPageReplacementStrategy prs = new ClockPageReplacementStrategy();
-        bufferCache = new BufferCache(appCtx.getRootContext().getIOManager(), allocator, prs, fileMapManager, 32768, 50, 100);
+        bufferCache = new BufferCache(appCtx.getRootContext().getIOManager(), allocator, prs,
+                new DelayPageCleanerPolicy(1000), fileMapManager, 32768, 50, 100);
         indexRegistry = new IndexRegistry<IIndex>();
     }
 
@@ -57,7 +59,7 @@
     public IndexRegistry<IIndex> getIndexRegistry() {
         return indexRegistry;
     }
-    
+
     public static RuntimeContext get(IHyracksTaskContext ctx) {
         return (RuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject();
     }
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
index e170a95..7f17f38 100644
--- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
@@ -39,7 +39,7 @@
 
     private static final int MIN_CLEANED_COUNT_DIFF = 3;
     private static final int PIN_MAX_WAIT_TIME = 50;
-    
+
     private final int maxOpenFiles;
 
     private final IIOManager ioManager;
@@ -48,6 +48,7 @@
     private final CachedPage[] cachedPages;
     private final CacheBucket[] pageMap;
     private final IPageReplacementStrategy pageReplacementStrategy;
+    private final IPageCleanerPolicy pageCleanerPolicy;
     private final IFileMapManager fileMapManager;
     private final CleanerThread cleanerThread;
     private final Map<Integer, BufferedFileHandle> fileInfoMap;
@@ -55,8 +56,8 @@
     private boolean closed;
 
     public BufferCache(IIOManager ioManager, ICacheMemoryAllocator allocator,
-            IPageReplacementStrategy pageReplacementStrategy, IFileMapManager fileMapManager, int pageSize,
-            int numPages, int maxOpenFiles) {
+            IPageReplacementStrategy pageReplacementStrategy, IPageCleanerPolicy pageCleanerPolicy,
+            IFileMapManager fileMapManager, int pageSize, int numPages, int maxOpenFiles) {
         this.ioManager = ioManager;
         this.pageSize = pageSize;
         this.numPages = numPages;
@@ -72,6 +73,7 @@
             pageMap[i] = new CacheBucket();
         }
         this.pageReplacementStrategy = pageReplacementStrategy;
+        this.pageCleanerPolicy = pageCleanerPolicy;
         this.fileMapManager = fileMapManager;
         fileInfoMap = new HashMap<Integer, BufferedFileHandle>();
         cleanerThread = new CleanerThread();
@@ -157,7 +159,7 @@
         return cPage;
     }
 
-    private CachedPage findPage(long dpid, boolean newPage) {
+    private CachedPage findPage(long dpid, boolean newPage) throws HyracksDataException {
         while (true) {
             int startCleanedCount = cleanerThread.cleanedCount;
 
@@ -300,7 +302,7 @@
                 }
             }
             synchronized (cleanerThread) {
-                cleanerThread.notifyAll();
+                pageCleanerPolicy.notifyVictimNotFound(cleanerThread);
             }
             // Heuristic optimization. Check whether the cleaner thread has
             // cleaned pages since we did our last pin attempt.
@@ -502,6 +504,7 @@
 
         public CleanerThread() {
             setPriority(MAX_PRIORITY);
+            setDaemon(true);
         }
 
         public void cleanPage(CachedPage cPage, boolean force) {
@@ -551,6 +554,7 @@
         public synchronized void run() {
             try {
                 while (true) {
+                    pageCleanerPolicy.notifyCleanCycleStart(this);
                     for (int i = 0; i < numPages; ++i) {
                         CachedPage cPage = cachedPages[i];
                         cleanPage(cPage, false);
@@ -558,12 +562,10 @@
                     if (shutdownStart) {
                         break;
                     }
-                    try {
-                        wait(1000);
-                    } catch (InterruptedException e) {
-                        e.printStackTrace();
-                    }
+                    pageCleanerPolicy.notifyCleanCycleFinish(this);
                 }
+            } catch (Exception e) {
+                e.printStackTrace();
             } finally {
                 shutdownComplete = true;
                 notifyAll();
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/DelayPageCleanerPolicy.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/DelayPageCleanerPolicy.java
new file mode 100644
index 0000000..c48b235
--- /dev/null
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/DelayPageCleanerPolicy.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.common.buffercache;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class DelayPageCleanerPolicy implements IPageCleanerPolicy {
+    private long delay;
+
+    public DelayPageCleanerPolicy(long delay) {
+        this.delay = delay;
+    }
+
+    @Override
+    public void notifyCleanCycleStart(Object monitor) throws HyracksDataException {
+
+    }
+
+    @Override
+    public void notifyCleanCycleFinish(Object monitor) throws HyracksDataException {
+        try {
+            monitor.wait(delay);
+        } catch (InterruptedException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @Override
+    public void notifyVictimNotFound(Object monitor) throws HyracksDataException {
+        monitor.notifyAll();
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IPageCleanerPolicy.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IPageCleanerPolicy.java
new file mode 100644
index 0000000..197dc28
--- /dev/null
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IPageCleanerPolicy.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.common.buffercache;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * Allows customization of the page cleaning strategy by the cleaner thread.
+ * 
+ * @author vinayakb
+ */
+public interface IPageCleanerPolicy {
+    /**
+     * Callback from the cleaner just before the beginning of a cleaning cycle.
+     * 
+     * @param monitor
+     *            - The monitor on which a mutex is held while in this call
+     * @throws HyracksDataException
+     */
+    public void notifyCleanCycleStart(Object monitor) throws HyracksDataException;
+
+    /**
+     * Callback from the cleaner just after the finish of a cleaning cycle.
+     * 
+     * @param monitor
+     *            - The monitor on which a mutex is held while in this call.
+     * @throws HyracksDataException
+     */
+    public void notifyCleanCycleFinish(Object monitor) throws HyracksDataException;
+
+    /**
+     * Callback to indicate that no victim was found.
+     * 
+     * @param monitor
+     *            - The monitor on which a mutex is held while in this call.
+     * @throws HyracksDataException
+     */
+    public void notifyVictimNotFound(Object monitor) throws HyracksDataException;
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestStorageManagerComponentHolder.java b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestStorageManagerComponentHolder.java
index ccbe862..ce5e989 100644
--- a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestStorageManagerComponentHolder.java
+++ b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestStorageManagerComponentHolder.java
@@ -27,6 +27,7 @@
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexRegistry;
 import edu.uci.ics.hyracks.storage.common.buffercache.BufferCache;
 import edu.uci.ics.hyracks.storage.common.buffercache.ClockPageReplacementStrategy;
+import edu.uci.ics.hyracks.storage.common.buffercache.DelayPageCleanerPolicy;
 import edu.uci.ics.hyracks.storage.common.buffercache.HeapBufferAllocator;
 import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
 import edu.uci.ics.hyracks.storage.common.buffercache.ICacheMemoryAllocator;
@@ -40,7 +41,7 @@
     private static IFileMapProvider fileMapProvider;
     private static IndexRegistry<IIndex> indexRegistry;
     private static IOManager ioManager;
-    
+
     private static int pageSize;
     private static int numPages;
     private static int maxOpenFiles;
@@ -59,8 +60,8 @@
             ICacheMemoryAllocator allocator = new HeapBufferAllocator();
             IPageReplacementStrategy prs = new ClockPageReplacementStrategy();
             IFileMapProvider fileMapProvider = getFileMapProvider(ctx);
-            bufferCache = new BufferCache(ctx.getIOManager(), allocator, prs, (IFileMapManager) fileMapProvider,
-                    pageSize, numPages, maxOpenFiles);
+            bufferCache = new BufferCache(ctx.getIOManager(), allocator, prs, new DelayPageCleanerPolicy(1000),
+                    (IFileMapManager) fileMapProvider, pageSize, numPages, maxOpenFiles);
         }
         return bufferCache;
     }
@@ -78,13 +79,13 @@
         }
         return indexRegistry;
     }
-    
+
     public synchronized static IOManager getIOManager() throws HyracksException {
-    	if (ioManager == null) {
-    		List<IODeviceHandle> devices = new ArrayList<IODeviceHandle>();
-    		devices.add(new IODeviceHandle(new File(System.getProperty("java.io.tmpdir")), "iodev_test_wa"));
-    		ioManager = new IOManager(devices, Executors.newCachedThreadPool());
-    	}
-    	return ioManager;
+        if (ioManager == null) {
+            List<IODeviceHandle> devices = new ArrayList<IODeviceHandle>();
+            devices.add(new IODeviceHandle(new File(System.getProperty("java.io.tmpdir")), "iodev_test_wa"));
+            ioManager = new IOManager(devices, Executors.newCachedThreadPool());
+        }
+        return ioManager;
     }
 }
\ No newline at end of file
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
index f18be9e..cb76637 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
@@ -32,6 +32,7 @@
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexRegistry;
 import edu.uci.ics.hyracks.storage.common.buffercache.BufferCache;
 import edu.uci.ics.hyracks.storage.common.buffercache.ClockPageReplacementStrategy;
+import edu.uci.ics.hyracks.storage.common.buffercache.DelayPageCleanerPolicy;
 import edu.uci.ics.hyracks.storage.common.buffercache.HeapBufferAllocator;
 import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
 import edu.uci.ics.hyracks.storage.common.buffercache.ICacheMemoryAllocator;
@@ -61,8 +62,8 @@
         long memSize = Runtime.getRuntime().maxMemory();
         long bufferSize = memSize / 4;
         int numPages = (int) (bufferSize / pageSize);
-        bufferCache = new BufferCache(appCtx.getRootContext().getIOManager(), allocator, prs, fileMapManager, pageSize,
-                numPages, 1000000);
+        bufferCache = new BufferCache(appCtx.getRootContext().getIOManager(), allocator, prs,
+                new DelayPageCleanerPolicy(100000), fileMapManager, pageSize, numPages, 1000000);
         treeIndexRegistry = new IndexRegistry<IIndex>();
         ioManager = (IOManager) appCtx.getRootContext().getIOManager();
     }