Added IPageCleanerPolicy
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_staging@1963 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/edu/uci/ics/hyracks/examples/btree/helper/RuntimeContext.java b/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/edu/uci/ics/hyracks/examples/btree/helper/RuntimeContext.java
index 3336d6d..56d517f 100644
--- a/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/edu/uci/ics/hyracks/examples/btree/helper/RuntimeContext.java
+++ b/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/edu/uci/ics/hyracks/examples/btree/helper/RuntimeContext.java
@@ -21,6 +21,7 @@
import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexRegistry;
import edu.uci.ics.hyracks.storage.common.buffercache.BufferCache;
import edu.uci.ics.hyracks.storage.common.buffercache.ClockPageReplacementStrategy;
+import edu.uci.ics.hyracks.storage.common.buffercache.DelayPageCleanerPolicy;
import edu.uci.ics.hyracks.storage.common.buffercache.HeapBufferAllocator;
import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
import edu.uci.ics.hyracks.storage.common.buffercache.ICacheMemoryAllocator;
@@ -38,7 +39,8 @@
fileMapManager = new TransientFileMapManager();
ICacheMemoryAllocator allocator = new HeapBufferAllocator();
IPageReplacementStrategy prs = new ClockPageReplacementStrategy();
- bufferCache = new BufferCache(appCtx.getRootContext().getIOManager(), allocator, prs, fileMapManager, 32768, 50, 100);
+ bufferCache = new BufferCache(appCtx.getRootContext().getIOManager(), allocator, prs,
+ new DelayPageCleanerPolicy(1000), fileMapManager, 32768, 50, 100);
indexRegistry = new IndexRegistry<IIndex>();
}
@@ -57,7 +59,7 @@
public IndexRegistry<IIndex> getIndexRegistry() {
return indexRegistry;
}
-
+
public static RuntimeContext get(IHyracksTaskContext ctx) {
return (RuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject();
}
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
index e170a95..7f17f38 100644
--- a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
@@ -39,7 +39,7 @@
private static final int MIN_CLEANED_COUNT_DIFF = 3;
private static final int PIN_MAX_WAIT_TIME = 50;
-
+
private final int maxOpenFiles;
private final IIOManager ioManager;
@@ -48,6 +48,7 @@
private final CachedPage[] cachedPages;
private final CacheBucket[] pageMap;
private final IPageReplacementStrategy pageReplacementStrategy;
+ private final IPageCleanerPolicy pageCleanerPolicy;
private final IFileMapManager fileMapManager;
private final CleanerThread cleanerThread;
private final Map<Integer, BufferedFileHandle> fileInfoMap;
@@ -55,8 +56,8 @@
private boolean closed;
public BufferCache(IIOManager ioManager, ICacheMemoryAllocator allocator,
- IPageReplacementStrategy pageReplacementStrategy, IFileMapManager fileMapManager, int pageSize,
- int numPages, int maxOpenFiles) {
+ IPageReplacementStrategy pageReplacementStrategy, IPageCleanerPolicy pageCleanerPolicy,
+ IFileMapManager fileMapManager, int pageSize, int numPages, int maxOpenFiles) {
this.ioManager = ioManager;
this.pageSize = pageSize;
this.numPages = numPages;
@@ -72,6 +73,7 @@
pageMap[i] = new CacheBucket();
}
this.pageReplacementStrategy = pageReplacementStrategy;
+ this.pageCleanerPolicy = pageCleanerPolicy;
this.fileMapManager = fileMapManager;
fileInfoMap = new HashMap<Integer, BufferedFileHandle>();
cleanerThread = new CleanerThread();
@@ -157,7 +159,7 @@
return cPage;
}
- private CachedPage findPage(long dpid, boolean newPage) {
+ private CachedPage findPage(long dpid, boolean newPage) throws HyracksDataException {
while (true) {
int startCleanedCount = cleanerThread.cleanedCount;
@@ -300,7 +302,7 @@
}
}
synchronized (cleanerThread) {
- cleanerThread.notifyAll();
+ pageCleanerPolicy.notifyVictimNotFound(cleanerThread);
}
// Heuristic optimization. Check whether the cleaner thread has
// cleaned pages since we did our last pin attempt.
@@ -502,6 +504,7 @@
public CleanerThread() {
setPriority(MAX_PRIORITY);
+ setDaemon(true);
}
public void cleanPage(CachedPage cPage, boolean force) {
@@ -551,6 +554,7 @@
public synchronized void run() {
try {
while (true) {
+ pageCleanerPolicy.notifyCleanCycleStart(this);
for (int i = 0; i < numPages; ++i) {
CachedPage cPage = cachedPages[i];
cleanPage(cPage, false);
@@ -558,12 +562,10 @@
if (shutdownStart) {
break;
}
- try {
- wait(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
+ pageCleanerPolicy.notifyCleanCycleFinish(this);
}
+ } catch (Exception e) {
+ e.printStackTrace();
} finally {
shutdownComplete = true;
notifyAll();
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/DelayPageCleanerPolicy.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/DelayPageCleanerPolicy.java
new file mode 100644
index 0000000..c48b235
--- /dev/null
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/DelayPageCleanerPolicy.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.common.buffercache;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class DelayPageCleanerPolicy implements IPageCleanerPolicy {
+ private long delay;
+
+ public DelayPageCleanerPolicy(long delay) {
+ this.delay = delay;
+ }
+
+ @Override
+ public void notifyCleanCycleStart(Object monitor) throws HyracksDataException {
+
+ }
+
+ @Override
+ public void notifyCleanCycleFinish(Object monitor) throws HyracksDataException {
+ try {
+ monitor.wait(delay);
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
+ @Override
+ public void notifyVictimNotFound(Object monitor) throws HyracksDataException {
+ monitor.notifyAll();
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IPageCleanerPolicy.java b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IPageCleanerPolicy.java
new file mode 100644
index 0000000..197dc28
--- /dev/null
+++ b/hyracks/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/IPageCleanerPolicy.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.common.buffercache;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * Allows customization of the page cleaning strategy by the cleaner thread.
+ *
+ * @author vinayakb
+ */
+public interface IPageCleanerPolicy {
+ /**
+ * Callback from the cleaner just before the beginning of a cleaning cycle.
+ *
+ * @param monitor
+ * - The monitor on which a mutex is held while in this call
+ * @throws HyracksDataException
+ */
+ public void notifyCleanCycleStart(Object monitor) throws HyracksDataException;
+
+ /**
+ * Callback from the cleaner just after the finish of a cleaning cycle.
+ *
+ * @param monitor
+ * - The monitor on which a mutex is held while in this call.
+ * @throws HyracksDataException
+ */
+ public void notifyCleanCycleFinish(Object monitor) throws HyracksDataException;
+
+ /**
+ * Callback to indicate that no victim was found.
+ *
+ * @param monitor
+ * - The monitor on which a mutex is held while in this call.
+ * @throws HyracksDataException
+ */
+ public void notifyVictimNotFound(Object monitor) throws HyracksDataException;
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestStorageManagerComponentHolder.java b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestStorageManagerComponentHolder.java
index ccbe862..ce5e989 100644
--- a/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestStorageManagerComponentHolder.java
+++ b/hyracks/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestStorageManagerComponentHolder.java
@@ -27,6 +27,7 @@
import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexRegistry;
import edu.uci.ics.hyracks.storage.common.buffercache.BufferCache;
import edu.uci.ics.hyracks.storage.common.buffercache.ClockPageReplacementStrategy;
+import edu.uci.ics.hyracks.storage.common.buffercache.DelayPageCleanerPolicy;
import edu.uci.ics.hyracks.storage.common.buffercache.HeapBufferAllocator;
import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
import edu.uci.ics.hyracks.storage.common.buffercache.ICacheMemoryAllocator;
@@ -40,7 +41,7 @@
private static IFileMapProvider fileMapProvider;
private static IndexRegistry<IIndex> indexRegistry;
private static IOManager ioManager;
-
+
private static int pageSize;
private static int numPages;
private static int maxOpenFiles;
@@ -59,8 +60,8 @@
ICacheMemoryAllocator allocator = new HeapBufferAllocator();
IPageReplacementStrategy prs = new ClockPageReplacementStrategy();
IFileMapProvider fileMapProvider = getFileMapProvider(ctx);
- bufferCache = new BufferCache(ctx.getIOManager(), allocator, prs, (IFileMapManager) fileMapProvider,
- pageSize, numPages, maxOpenFiles);
+ bufferCache = new BufferCache(ctx.getIOManager(), allocator, prs, new DelayPageCleanerPolicy(1000),
+ (IFileMapManager) fileMapProvider, pageSize, numPages, maxOpenFiles);
}
return bufferCache;
}
@@ -78,13 +79,13 @@
}
return indexRegistry;
}
-
+
public synchronized static IOManager getIOManager() throws HyracksException {
- if (ioManager == null) {
- List<IODeviceHandle> devices = new ArrayList<IODeviceHandle>();
- devices.add(new IODeviceHandle(new File(System.getProperty("java.io.tmpdir")), "iodev_test_wa"));
- ioManager = new IOManager(devices, Executors.newCachedThreadPool());
- }
- return ioManager;
+ if (ioManager == null) {
+ List<IODeviceHandle> devices = new ArrayList<IODeviceHandle>();
+ devices.add(new IODeviceHandle(new File(System.getProperty("java.io.tmpdir")), "iodev_test_wa"));
+ ioManager = new IOManager(devices, Executors.newCachedThreadPool());
+ }
+ return ioManager;
}
}
\ No newline at end of file
diff --git a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
index f18be9e..cb76637 100644
--- a/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
+++ b/pregelix/pregelix-dataflow/src/main/java/edu/uci/ics/pregelix/dataflow/context/RuntimeContext.java
@@ -32,6 +32,7 @@
import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexRegistry;
import edu.uci.ics.hyracks.storage.common.buffercache.BufferCache;
import edu.uci.ics.hyracks.storage.common.buffercache.ClockPageReplacementStrategy;
+import edu.uci.ics.hyracks.storage.common.buffercache.DelayPageCleanerPolicy;
import edu.uci.ics.hyracks.storage.common.buffercache.HeapBufferAllocator;
import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
import edu.uci.ics.hyracks.storage.common.buffercache.ICacheMemoryAllocator;
@@ -61,8 +62,8 @@
long memSize = Runtime.getRuntime().maxMemory();
long bufferSize = memSize / 4;
int numPages = (int) (bufferSize / pageSize);
- bufferCache = new BufferCache(appCtx.getRootContext().getIOManager(), allocator, prs, fileMapManager, pageSize,
- numPages, 1000000);
+ bufferCache = new BufferCache(appCtx.getRootContext().getIOManager(), allocator, prs,
+ new DelayPageCleanerPolicy(100000), fileMapManager, pageSize, numPages, 1000000);
treeIndexRegistry = new IndexRegistry<IIndex>();
ioManager = (IOManager) appCtx.getRootContext().getIOManager();
}