Fixed LSM tests that I broke while cleaning up the inverted index. Minor improvements to buffercache.
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1246 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java b/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
index 8cdf891..a6f1a4b 100644
--- a/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks-storage-common/src/main/java/edu/uci/ics/hyracks/storage/common/buffercache/BufferCache.java
@@ -39,7 +39,8 @@
private static final int MAP_FACTOR = 2;
private static final int MAX_VICTIMIZATION_TRY_COUNT = 5;
- private static final int MAX_WAIT_FOR_CLEANER_TRY_COUNT = 5;
+ private static final int MAX_WAIT_FOR_CLEANER_THREAD_TIME = 1000;
+ private static final int MIN_CLEANED_COUNT_DIFF = 4;
private final int maxOpenFiles;
@@ -168,27 +169,11 @@
return cPage;
}
- private int incrementPinCount(CachedPage page) {
- int pinCount = page.pinCount.incrementAndGet();
- // If this was the first pin, we decrement the freepages count.
- if (pinCount == 1) {
- cleanerThread.freePages.decrementAndGet();
- }
- return pinCount;
- }
-
- private int decrementPinCount(CachedPage page) {
- int pinCount = ((CachedPage) page).pinCount.decrementAndGet();
- // If this was the last unpin, we increment the freepages count.
- if (pinCount == 0) {
- cleanerThread.freePages.incrementAndGet();
- }
- return pinCount;
- }
-
private CachedPage findPage(long dpid, boolean newPage) {
int victimizationTryCount = 0;
while (true) {
+ int startCleanedCount = cleanerThread.cleanedCount;
+
CachedPage cPage = null;
/*
* Hash dpid to get a bucket and then check if the page exists in the bucket.
@@ -336,17 +321,19 @@
synchronized (cleanerThread) {
cleanerThread.notifyAll();
}
- int waitTryCount = 0;
+ // Heuristic optimization. Check whether the cleaner thread has
+ // cleaned pages since we did our last pin attempt.
+ if (cleanerThread.cleanedCount - startCleanedCount > MIN_CLEANED_COUNT_DIFF) {
+ // Don't go to sleep and wait for notification from the cleaner,
+ // just try to pin again immediately.
+ continue;
+ }
synchronized (cleanerThread.cleanNotification) {
- // Sleep until a page becomes available.
- do {
- try {
- cleanerThread.cleanNotification.wait(100);
- } catch (InterruptedException e) {
- // Do nothing
- }
- waitTryCount++;
- } while (cleanerThread.freePages.get() == 0 && waitTryCount < MAX_WAIT_FOR_CLEANER_TRY_COUNT);
+ try {
+ cleanerThread.cleanNotification.wait(MAX_WAIT_FOR_CLEANER_THREAD_TIME);
+ } catch (InterruptedException e) {
+ // Do nothing
+ }
}
}
}
@@ -417,7 +404,7 @@
if (closed) {
throw new HyracksDataException("unpin called on a closed cache");
}
- decrementPinCount((CachedPage) page);
+ ((CachedPage) page).pinCount.decrementAndGet();
}
private int hash(long dpid) {
@@ -478,11 +465,7 @@
@Override
public boolean pinIfGoodVictim() {
- if (pinCount.compareAndSet(0, 1)) {
- cleanerThread.freePages.decrementAndGet();
- return true;
- }
- return false;
+ return pinCount.compareAndSet(0, 1);
}
@Override
@@ -529,11 +512,15 @@
private boolean shutdownStart = false;
private boolean shutdownComplete = false;
private final Object cleanNotification = new Object();
- private AtomicInteger freePages = new AtomicInteger();
+ // Simply keeps incrementing this counter when a page is cleaned.
+ // Used to implement wait-for-cleanerthread heuristic optimizations.
+ // A waiter can detect whether pages have been cleaned.
+ // No need to make this var volatile or synchronize it's access in any
+ // way because it is used for heuristics.
+ private int cleanedCount = 0;
public CleanerThread() {
setPriority(MAX_PRIORITY);
- freePages.set(numPages);
}
public void cleanPage(CachedPage cPage, boolean force) {
@@ -559,8 +546,9 @@
}
if (cleaned) {
cPage.dirty.set(false);
- decrementPinCount(cPage);
- synchronized (cleanNotification) {
+ cPage.pinCount.decrementAndGet();
+ cleanedCount++;
+ synchronized (cleanNotification) {
cleanNotification.notifyAll();
}
}
@@ -730,7 +718,7 @@
write(cPage);
}
cPage.dirty.set(false);
- pinCount = decrementPinCount(cPage);
+ pinCount = cPage.pinCount.decrementAndGet();
} else {
pinCount = cPage.pinCount.get();
}
diff --git a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/OrderedIndexExamplesTest.java b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/OrderedIndexExamplesTest.java
index 6230a22..a0af6e3 100644
--- a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/OrderedIndexExamplesTest.java
+++ b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/OrderedIndexExamplesTest.java
@@ -35,7 +35,9 @@
import edu.uci.ics.hyracks.dataflow.common.util.TupleUtils;
import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
import edu.uci.ics.hyracks.storage.am.btree.util.BTreeUtils;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexBulkLoadContext;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
@@ -91,7 +93,7 @@
}
ArrayTupleBuilder tb = new ArrayTupleBuilder(fieldCount);
ArrayTupleReference tuple = new ArrayTupleReference();
- ITreeIndexAccessor indexAccessor = (ITreeIndexAccessor) treeIndex.createAccessor();
+ IIndexAccessor indexAccessor = (IIndexAccessor) treeIndex.createAccessor();
int numInserts = 10000;
for (int i = 0; i < numInserts; i++) {
int f0 = rnd.nextInt() % numInserts;
@@ -170,7 +172,7 @@
}
ArrayTupleBuilder tb = new ArrayTupleBuilder(fieldCount);
ArrayTupleReference tuple = new ArrayTupleReference();
- ITreeIndexAccessor indexAccessor = (ITreeIndexAccessor) treeIndex.createAccessor();
+ IIndexAccessor indexAccessor = (IIndexAccessor) treeIndex.createAccessor();
int numInserts = 10000;
for (int i = 0; i < 10000; i++) {
int f0 = rnd.nextInt() % 2000;
@@ -247,7 +249,7 @@
}
ArrayTupleBuilder tb = new ArrayTupleBuilder(fieldCount);
ArrayTupleReference tuple = new ArrayTupleReference();
- ITreeIndexAccessor indexAccessor = (ITreeIndexAccessor) treeIndex.createAccessor();
+ IIndexAccessor indexAccessor = (IIndexAccessor) treeIndex.createAccessor();
// Max string length to be generated.
int maxLength = 10;
int numInserts = 10000;
@@ -322,7 +324,7 @@
ArrayTupleBuilder tb = new ArrayTupleBuilder(fieldCount);
ArrayTupleReference tuple = new ArrayTupleReference();
- ITreeIndexAccessor indexAccessor = (ITreeIndexAccessor) treeIndex.createAccessor();
+ IIndexAccessor indexAccessor = (IIndexAccessor) treeIndex.createAccessor();
// Max string length to be generated.
int runs = 3;
for (int run = 0; run < runs; run++) {
@@ -424,7 +426,7 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Inserting into tree...");
}
- ITreeIndexAccessor indexAccessor = (ITreeIndexAccessor) treeIndex.createAccessor();
+ IIndexAccessor indexAccessor = (IIndexAccessor) treeIndex.createAccessor();
ArrayTupleBuilder tb = new ArrayTupleBuilder(fieldCount);
ArrayTupleReference tuple = new ArrayTupleReference();
int maxLength = 10;
@@ -527,7 +529,7 @@
LOGGER.info(ins + " tuples loaded in " + (end - start) + "ms");
}
- ITreeIndexAccessor indexAccessor = (ITreeIndexAccessor) treeIndex.createAccessor();
+ IIndexAccessor indexAccessor = (IIndexAccessor) treeIndex.createAccessor();
// Build low key.
ArrayTupleBuilder lowKeyTb = new ArrayTupleBuilder(1);
@@ -545,12 +547,12 @@
treeIndex.close();
}
- private void orderedScan(ITreeIndexAccessor indexAccessor, ISerializerDeserializer[] fieldSerdes)
+ private void orderedScan(IIndexAccessor indexAccessor, ISerializerDeserializer[] fieldSerdes)
throws Exception {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Ordered Scan:");
}
- ITreeIndexCursor scanCursor = (ITreeIndexCursor) indexAccessor.createSearchCursor();
+ IIndexCursor scanCursor = (IIndexCursor) indexAccessor.createSearchCursor();
RangePredicate nullPred = new RangePredicate(null, null, true, true, null, null);
indexAccessor.search(scanCursor, nullPred);
try {
@@ -567,15 +569,16 @@
}
}
- private void diskOrderScan(ITreeIndexAccessor indexAccessor,
+ private void diskOrderScan(IIndexAccessor indexAccessor,
ISerializerDeserializer[] fieldSerdes) throws Exception {
try {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Disk-Order Scan:");
}
- TreeDiskOrderScanCursor diskOrderCursor = (TreeDiskOrderScanCursor) indexAccessor
+ ITreeIndexAccessor treeIndexAccessor = (ITreeIndexAccessor) indexAccessor;
+ TreeDiskOrderScanCursor diskOrderCursor = (TreeDiskOrderScanCursor) treeIndexAccessor
.createDiskOrderScanCursor();
- indexAccessor.diskOrderScan(diskOrderCursor);
+ treeIndexAccessor.diskOrderScan(diskOrderCursor);
try {
while (diskOrderCursor.hasNext()) {
diskOrderCursor.next();
@@ -594,10 +597,16 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Ignoring disk-order scan since it's not supported.");
}
+ } catch (ClassCastException e) {
+ // Ignore exception because IIndexAccessor sometimes isn't
+ // an ITreeIndexAccessor, e.g., for the LSMBTree.
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Ignoring disk-order scan since it's not supported.");
+ }
}
}
- private void rangeSearch(IBinaryComparatorFactory[] cmpFactories, ITreeIndexAccessor indexAccessor, ISerializerDeserializer[] fieldSerdes,
+ private void rangeSearch(IBinaryComparatorFactory[] cmpFactories, IIndexAccessor indexAccessor, ISerializerDeserializer[] fieldSerdes,
ITupleReference lowKey, ITupleReference highKey) throws Exception {
if (LOGGER.isLoggable(Level.INFO)) {
String lowKeyString = TupleUtils.printTuple(lowKey, fieldSerdes);
diff --git a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/common/ITreeIndexTestContext.java b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/common/ITreeIndexTestContext.java
index e4a40c1..9384757 100644
--- a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/common/ITreeIndexTestContext.java
+++ b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/common/ITreeIndexTestContext.java
@@ -21,8 +21,8 @@
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
@SuppressWarnings("rawtypes")
public interface ITreeIndexTestContext<T extends CheckTuple> {
@@ -34,7 +34,7 @@
public IBinaryComparatorFactory[] getComparatorFactories();
- public ITreeIndexAccessor getIndexAccessor();
+ public IIndexAccessor getIndexAccessor();
public ITreeIndex getIndex();
diff --git a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/common/TreeIndexTestContext.java b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/common/TreeIndexTestContext.java
index 0fd61c3..bc5312c 100644
--- a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/common/TreeIndexTestContext.java
+++ b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/common/TreeIndexTestContext.java
@@ -20,8 +20,8 @@
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
@SuppressWarnings("rawtypes")
public abstract class TreeIndexTestContext<T extends CheckTuple> implements ITreeIndexTestContext<T> {
@@ -29,12 +29,12 @@
protected final ITreeIndex treeIndex;
protected final ArrayTupleBuilder tupleBuilder;
protected final ArrayTupleReference tuple = new ArrayTupleReference();
- protected final ITreeIndexAccessor indexAccessor;
+ protected final IIndexAccessor indexAccessor;
public TreeIndexTestContext(ISerializerDeserializer[] fieldSerdes, ITreeIndex treeIndex) {
this.fieldSerdes = fieldSerdes;
this.treeIndex = treeIndex;
- this.indexAccessor = (ITreeIndexAccessor) treeIndex.createAccessor();
+ this.indexAccessor = (IIndexAccessor) treeIndex.createAccessor();
this.tupleBuilder = new ArrayTupleBuilder(fieldSerdes.length);
}
@@ -44,7 +44,7 @@
}
@Override
- public ITreeIndexAccessor getIndexAccessor() {
+ public IIndexAccessor getIndexAccessor() {
return indexAccessor;
}
diff --git a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/common/TreeIndexTestUtils.java b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/common/TreeIndexTestUtils.java
index 447ce0d..b6a97ab 100644
--- a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/common/TreeIndexTestUtils.java
+++ b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/common/TreeIndexTestUtils.java
@@ -20,6 +20,7 @@
import edu.uci.ics.hyracks.dataflow.common.util.TupleUtils;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexBulkLoadContext;
import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
@@ -94,8 +95,9 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Testing Disk-Order Scan.");
}
- ITreeIndexCursor diskOrderCursor = ctx.getIndexAccessor().createDiskOrderScanCursor();
- ctx.getIndexAccessor().diskOrderScan(diskOrderCursor);
+ ITreeIndexAccessor treeIndexAccessor = (ITreeIndexAccessor) ctx.getIndexAccessor();
+ ITreeIndexCursor diskOrderCursor = treeIndexAccessor.createDiskOrderScanCursor();
+ treeIndexAccessor.diskOrderScan(diskOrderCursor);
int actualCount = 0;
try {
while (diskOrderCursor.hasNext()) {
@@ -125,7 +127,13 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Ignoring disk-order scan since it's not supported.");
}
- }
+ } catch (ClassCastException e) {
+ // Ignore exception because IIndexAccessor sometimes isn't
+ // an ITreeIndexAccessor, e.g., for the LSMBTree.
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Ignoring disk-order scan since it's not supported.");
+ }
+ }
}
@SuppressWarnings("unchecked")
diff --git a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/AbstractRTreeExamplesTest.java b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/AbstractRTreeExamplesTest.java
index 020194e..7192c53 100644
--- a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/AbstractRTreeExamplesTest.java
+++ b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/AbstractRTreeExamplesTest.java
@@ -33,6 +33,7 @@
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.DoubleSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.common.util.TupleUtils;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexBulkLoadContext;
import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
@@ -116,7 +117,7 @@
}
ArrayTupleBuilder tb = new ArrayTupleBuilder(fieldCount);
ArrayTupleReference tuple = new ArrayTupleReference();
- ITreeIndexAccessor indexAccessor = (ITreeIndexAccessor) treeIndex.createAccessor();
+ IIndexAccessor indexAccessor = (IIndexAccessor) treeIndex.createAccessor();
int numInserts = 10000;
for (int i = 0; i < numInserts; i++) {
int p1x = rnd.nextInt();
@@ -223,7 +224,7 @@
}
ArrayTupleBuilder tb = new ArrayTupleBuilder(fieldCount);
ArrayTupleReference tuple = new ArrayTupleReference();
- ITreeIndexAccessor indexAccessor = (ITreeIndexAccessor) treeIndex.createAccessor();
+ IIndexAccessor indexAccessor = (IIndexAccessor) treeIndex.createAccessor();
int numInserts = 10000;
for (int i = 0; i < numInserts; i++) {
double p1x = rnd.nextDouble();
@@ -318,7 +319,7 @@
ArrayTupleBuilder tb = new ArrayTupleBuilder(fieldCount);
ArrayTupleReference tuple = new ArrayTupleReference();
- ITreeIndexAccessor indexAccessor = (ITreeIndexAccessor) treeIndex.createAccessor();
+ IIndexAccessor indexAccessor = (IIndexAccessor) treeIndex.createAccessor();
int runs = 3;
for (int run = 0; run < runs; run++) {
@@ -477,7 +478,7 @@
LOGGER.info(numInserts + " tuples loaded in " + (end - start) + "ms");
}
- ITreeIndexAccessor indexAccessor = (ITreeIndexAccessor) treeIndex.createAccessor();
+ IIndexAccessor indexAccessor = (IIndexAccessor) treeIndex.createAccessor();
// Build key.
ArrayTupleBuilder keyTb = new ArrayTupleBuilder(rtreeKeyFieldCount);
@@ -489,7 +490,7 @@
treeIndex.close();
}
- private void scan(ITreeIndexAccessor indexAccessor, ISerializerDeserializer[] fieldSerdes) throws Exception {
+ private void scan(IIndexAccessor indexAccessor, ISerializerDeserializer[] fieldSerdes) throws Exception {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Scan:");
}
@@ -510,15 +511,16 @@
}
}
- private void diskOrderScan(ITreeIndexAccessor indexAccessor, ISerializerDeserializer[] fieldSerdes)
+ private void diskOrderScan(IIndexAccessor indexAccessor, ISerializerDeserializer[] fieldSerdes)
throws Exception {
try {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Disk-Order Scan:");
}
- TreeDiskOrderScanCursor diskOrderCursor = (TreeDiskOrderScanCursor) indexAccessor
+ ITreeIndexAccessor treeIndexAccessor = (ITreeIndexAccessor) indexAccessor;
+ TreeDiskOrderScanCursor diskOrderCursor = (TreeDiskOrderScanCursor) treeIndexAccessor
.createDiskOrderScanCursor();
- indexAccessor.diskOrderScan(diskOrderCursor);
+ treeIndexAccessor.diskOrderScan(diskOrderCursor);
try {
while (diskOrderCursor.hasNext()) {
diskOrderCursor.next();
@@ -537,10 +539,16 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Ignoring disk-order scan since it's not supported.");
}
- }
+ } catch (ClassCastException e) {
+ // Ignore exception because IIndexAccessor sometimes isn't
+ // an ITreeIndexAccessor, e.g., for the LSMRTree.
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Ignoring disk-order scan since it's not supported.");
+ }
+ }
}
- private void rangeSearch(IBinaryComparatorFactory[] cmpFactories, ITreeIndexAccessor indexAccessor,
+ private void rangeSearch(IBinaryComparatorFactory[] cmpFactories, IIndexAccessor indexAccessor,
ISerializerDeserializer[] fieldSerdes, ITupleReference key) throws Exception {
if (LOGGER.isLoggable(Level.INFO)) {
String kString = TupleUtils.printTuple(key, fieldSerdes);