Reintegrated changes from hyracks_lsm_experiments.
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@2826 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/api/IPartitionedInvertedIndex.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/api/IPartitionedInvertedIndex.java
index 7db972c..89fd69d 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/api/IPartitionedInvertedIndex.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/api/IPartitionedInvertedIndex.java
@@ -15,13 +15,17 @@
package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api;
+import java.util.ArrayList;
+
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexOperationContext;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search.InvertedListPartitions;
public interface IPartitionedInvertedIndex {
- public void openInvertedListPartitionCursors(IInvertedIndexSearcher searcher, IIndexOperationContext ictx,
- short numTokensLowerBound, short numTokensUpperBound, InvertedListPartitions invListPartitions)
- throws HyracksDataException, IndexException;
+ public boolean openInvertedListPartitionCursors(IInvertedIndexSearcher searcher, IIndexOperationContext ictx,
+ short numTokensLowerBound, short numTokensUpperBound, InvertedListPartitions invListPartitions,
+ ArrayList<IInvertedListCursor> cursorsOrderedByTokens) throws HyracksDataException, IndexException;
+
+ public boolean isEmpty();
}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
index 0ad10a7..6f28f61 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
@@ -121,14 +121,13 @@
}
}
}
-
- if (appender.getTupleCount() > 0) {
- FrameUtils.flushFrame(writeBuffer, writer);
- }
}
@Override
public void close() throws HyracksDataException {
+ if (appender.getTupleCount() > 0) {
+ FrameUtils.flushFrame(writeBuffer, writer);
+ }
writer.close();
}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndex.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndex.java
index 13e9b5c..7c3f4e4 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndex.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndex.java
@@ -14,6 +14,7 @@
*/
package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.inmemory;
+import java.util.ArrayList;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -30,6 +31,7 @@
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexSearcher;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IPartitionedInvertedIndex;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search.InvertedListPartitions;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search.PartitionedTOccurrenceSearcher;
@@ -86,9 +88,9 @@
}
@Override
- public void openInvertedListPartitionCursors(IInvertedIndexSearcher searcher, IIndexOperationContext ictx,
- short numTokensLowerBound, short numTokensUpperBound, InvertedListPartitions invListPartitions)
- throws HyracksDataException, IndexException {
+ public boolean openInvertedListPartitionCursors(IInvertedIndexSearcher searcher, IIndexOperationContext ictx,
+ short numTokensLowerBound, short numTokensUpperBound, InvertedListPartitions invListPartitions,
+ ArrayList<IInvertedListCursor> cursorsOrderedByTokens) throws HyracksDataException, IndexException {
short minPartitionIndex;
short maxPartitionIndex;
partitionIndexLock.readLock().lock();
@@ -98,7 +100,7 @@
if (minPartitionIndex == Short.MAX_VALUE && maxPartitionIndex == Short.MIN_VALUE) {
// Index must be empty.
- return;
+ return false;
}
short partitionStartIndex = minPartitionIndex;
short partitionEndIndex = maxPartitionIndex;
@@ -108,7 +110,7 @@
if (numTokensUpperBound >= 0) {
partitionEndIndex = (short) Math.min(maxPartitionIndex, numTokensUpperBound);
}
-
+
PartitionedTOccurrenceSearcher partSearcher = (PartitionedTOccurrenceSearcher) searcher;
PartitionedInMemoryInvertedIndexOpContext ctx = (PartitionedInMemoryInvertedIndexOpContext) ictx;
ctx.setOperation(IndexOperation.SEARCH);
@@ -127,5 +129,18 @@
inMemListCursor.reset(searchKey);
invListPartitions.addInvertedListCursor(inMemListCursor, i);
}
+ return true;
+ }
+
+ @Override
+ public boolean isEmpty() {
+ partitionIndexLock.readLock().lock();
+ if (minPartitionIndex == Short.MAX_VALUE && maxPartitionIndex == Short.MIN_VALUE) {
+ // Index must be empty.
+ partitionIndexLock.readLock().unlock();
+ return true;
+ }
+ partitionIndexLock.readLock().unlock();
+ return false;
}
}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/FixedSizeElementInvertedListCursor.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/FixedSizeElementInvertedListCursor.java
index ed8f600..f55a700 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/FixedSizeElementInvertedListCursor.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/FixedSizeElementInvertedListCursor.java
@@ -46,6 +46,8 @@
private final FixedSizeTupleReference tuple;
private ICachedPage[] pages = new ICachedPage[10];
private int[] elementIndexes = new int[10];
+
+ private boolean pinned = false;
public FixedSizeElementInvertedListCursor(IBufferCache bufferCache, int fileId, ITypeTraits[] invListFields) {
this.bufferCache = bufferCache;
@@ -84,12 +86,16 @@
@Override
public void pinPages() throws HyracksDataException {
+ if (pinned) {
+ return;
+ }
int pix = 0;
for (int i = startPageId; i <= endPageId; i++) {
pages[pix] = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, i), false);
pages[pix].acquireReadLatch();
pix++;
}
+ pinned = true;
}
@Override
@@ -99,6 +105,7 @@
pages[i].releaseReadLatch();
bufferCache.unpin(pages[i]);
}
+ pinned = false;
}
private void positionCursor(int elementIx) {
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndex.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndex.java
index 5b2ec60..6e395e7 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndex.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndex.java
@@ -15,6 +15,8 @@
package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk;
+import java.util.ArrayList;
+
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -61,9 +63,9 @@
}
@Override
- public void openInvertedListPartitionCursors(IInvertedIndexSearcher searcher, IIndexOperationContext ictx,
- short numTokensLowerBound, short numTokensUpperBound, InvertedListPartitions invListPartitions)
- throws HyracksDataException, IndexException {
+ public boolean openInvertedListPartitionCursors(IInvertedIndexSearcher searcher, IIndexOperationContext ictx,
+ short numTokensLowerBound, short numTokensUpperBound, InvertedListPartitions invListPartitions,
+ ArrayList<IInvertedListCursor> cursorsOrderedByTokens) throws HyracksDataException, IndexException {
PartitionedTOccurrenceSearcher partSearcher = (PartitionedTOccurrenceSearcher) searcher;
OnDiskInvertedIndexOpContext ctx = (OnDiskInvertedIndexOpContext) ictx;
ITupleReference lowSearchKey = null;
@@ -86,6 +88,7 @@
ctx.btreePred.setLowKey(lowSearchKey, true);
ctx.btreePred.setHighKey(highSearchKey, true);
ctx.btreeAccessor.search(ctx.btreeCursor, ctx.btreePred);
+ boolean tokenExists = false;
try {
while (ctx.btreeCursor.hasNext()) {
ctx.btreeCursor.next();
@@ -95,11 +98,19 @@
btreeTuple.getFieldStart(PARTITIONING_NUM_TOKENS_FIELD));
IInvertedListCursor invListCursor = partSearcher.getCachedInvertedListCursor();
resetInvertedListCursor(btreeTuple, invListCursor);
+ cursorsOrderedByTokens.add(invListCursor);
invListPartitions.addInvertedListCursor(invListCursor, numTokens);
+ tokenExists = true;
}
} finally {
ctx.btreeCursor.close();
ctx.btreeCursor.reset();
}
+ return tokenExists;
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return false;
}
}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/PartitionedTOccurrenceSearcher.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/PartitionedTOccurrenceSearcher.java
index fb8b9b0..3ce1f48 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/PartitionedTOccurrenceSearcher.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/PartitionedTOccurrenceSearcher.java
@@ -43,6 +43,10 @@
protected final ConcatenatingTupleReference fullLowSearchKey = new ConcatenatingTupleReference(2);
protected final ConcatenatingTupleReference fullHighSearchKey = new ConcatenatingTupleReference(2);
+ // Inverted list cursors ordered by token. Used to read relevant inverted-list partitions of one token one after
+ // the other for better I/O performance (because the partitions of one inverted list are stored contiguously in a file).
+ // The above implies that we currently require holding all inverted list for a query in memory.
+ protected final ArrayList<IInvertedListCursor> cursorsOrderedByTokens = new ArrayList<IInvertedListCursor>();
protected final InvertedListPartitions partitions = new InvertedListPartitions();
public PartitionedTOccurrenceSearcher(IHyracksCommonContext ctx, IInvertedIndex invIndex) {
@@ -80,32 +84,69 @@
public void search(OnDiskInvertedIndexSearchCursor resultCursor, InvertedIndexSearchPredicate searchPred,
IIndexOperationContext ictx) throws HyracksDataException, IndexException {
+ IPartitionedInvertedIndex partInvIndex = (IPartitionedInvertedIndex) invIndex;
+ searchResult.reset();
+ if (partInvIndex.isEmpty()) {
+ return;
+ }
+
tokenizeQuery(searchPred);
short numQueryTokens = (short) queryTokenAccessor.getTupleCount();
IInvertedIndexSearchModifier searchModifier = searchPred.getSearchModifier();
short numTokensLowerBound = searchModifier.getNumTokensLowerBound(numQueryTokens);
short numTokensUpperBound = searchModifier.getNumTokensUpperBound(numQueryTokens);
-
- IPartitionedInvertedIndex partInvIndex = (IPartitionedInvertedIndex) invIndex;
- invListCursorCache.reset();
- partitions.reset(numTokensLowerBound, numTokensUpperBound);
- for (int i = 0; i < numQueryTokens; i++) {
- searchKey.reset(queryTokenAccessor, i);
- partInvIndex.openInvertedListPartitionCursors(this, ictx, numTokensLowerBound, numTokensUpperBound,
- partitions);
- }
-
+
occurrenceThreshold = searchModifier.getOccurrenceThreshold(numQueryTokens);
if (occurrenceThreshold <= 0) {
throw new OccurrenceThresholdPanicException("Merge Threshold is <= 0. Failing Search.");
}
-
- // Process the partitions one-by-one.
+
+ short maxCountPossible = numQueryTokens;
+ invListCursorCache.reset();
+ partitions.reset(numTokensLowerBound, numTokensUpperBound);
+ cursorsOrderedByTokens.clear();
+ for (int i = 0; i < numQueryTokens; i++) {
+ searchKey.reset(queryTokenAccessor, i);
+ if (!partInvIndex.openInvertedListPartitionCursors(this, ictx, numTokensLowerBound, numTokensUpperBound,
+ partitions, cursorsOrderedByTokens)) {
+ maxCountPossible--;
+ // No results possible.
+ if (maxCountPossible < occurrenceThreshold) {
+ return;
+ }
+ }
+ }
+
ArrayList<IInvertedListCursor>[] partitionCursors = partitions.getPartitions();
short start = partitions.getMinValidPartitionIndex();
short end = partitions.getMaxValidPartitionIndex();
- searchResult.reset();
+
+ // Typically, we only enter this case for disk-based inverted indexes.
+ // TODO: This behavior could potentially lead to a deadlock if we cannot pin
+ // all inverted lists in memory, and are forced to wait for a page to get evicted
+ // (other concurrent searchers may be in the same situation).
+ // We should detect such cases, then unpin all pages, and then keep retrying to pin until we succeed.
+ // This will require a different "tryPin()" mechanism in the BufferCache that will return false
+ // if we'd have to wait for a page to get evicted.
+ if (!cursorsOrderedByTokens.isEmpty()) {
+ for (int i = start; i <= end; i++) {
+ if (partitionCursors[i] == null) {
+ continue;
+ }
+ // Prune partition because no element in it can satisfy the occurrence threshold.
+ if (partitionCursors[i].size() < occurrenceThreshold) {
+ cursorsOrderedByTokens.removeAll(partitionCursors[i]);
+ }
+ }
+ // Pin all the cursors in the order of tokens.
+ int numCursors = cursorsOrderedByTokens.size();
+ for (int i = 0; i < numCursors; i++) {
+ cursorsOrderedByTokens.get(i).pinPages();
+ }
+ }
+
+ // Process the partitions one-by-one.
for (int i = start; i <= end; i++) {
if (partitionCursors[i] == null) {
continue;
@@ -119,7 +160,7 @@
invListMerger.reset();
invListMerger.merge(partitionCursors[i], occurrenceThreshold, numPrefixLists, searchResult);
}
-
+
resultCursor.open(null, searchPred);
}