Hopefully fixed a performance bug, still need to test on cluster.

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_experiments@2675 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/api/IPartitionedInvertedIndex.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/api/IPartitionedInvertedIndex.java
index cec4691..89fd69d 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/api/IPartitionedInvertedIndex.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/api/IPartitionedInvertedIndex.java
@@ -15,6 +15,8 @@
 
 package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api;
 
+import java.util.ArrayList;
+
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexOperationContext;
 import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
@@ -22,8 +24,8 @@
 
 public interface IPartitionedInvertedIndex {
     public boolean openInvertedListPartitionCursors(IInvertedIndexSearcher searcher, IIndexOperationContext ictx,
-            short numTokensLowerBound, short numTokensUpperBound, InvertedListPartitions invListPartitions)
-            throws HyracksDataException, IndexException;
+            short numTokensLowerBound, short numTokensUpperBound, InvertedListPartitions invListPartitions,
+            ArrayList<IInvertedListCursor> cursorsOrderedByTokens) throws HyracksDataException, IndexException;
 
     public boolean isEmpty();
 }
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndex.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndex.java
index 5bc47fd..7c3f4e4 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndex.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndex.java
@@ -14,6 +14,7 @@
  */
 package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.inmemory;
 
+import java.util.ArrayList;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -30,6 +31,7 @@
 import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexSearcher;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IPartitionedInvertedIndex;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search.InvertedListPartitions;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search.PartitionedTOccurrenceSearcher;
@@ -87,8 +89,8 @@
 
     @Override
     public boolean openInvertedListPartitionCursors(IInvertedIndexSearcher searcher, IIndexOperationContext ictx,
-            short numTokensLowerBound, short numTokensUpperBound, InvertedListPartitions invListPartitions)
-            throws HyracksDataException, IndexException {
+            short numTokensLowerBound, short numTokensUpperBound, InvertedListPartitions invListPartitions,
+            ArrayList<IInvertedListCursor> cursorsOrderedByTokens) throws HyracksDataException, IndexException {
         short minPartitionIndex;
         short maxPartitionIndex;
         partitionIndexLock.readLock().lock();
@@ -108,7 +110,7 @@
         if (numTokensUpperBound >= 0) {
             partitionEndIndex = (short) Math.min(maxPartitionIndex, numTokensUpperBound);
         }
-        
+
         PartitionedTOccurrenceSearcher partSearcher = (PartitionedTOccurrenceSearcher) searcher;
         PartitionedInMemoryInvertedIndexOpContext ctx = (PartitionedInMemoryInvertedIndexOpContext) ictx;
         ctx.setOperation(IndexOperation.SEARCH);
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/FixedSizeElementInvertedListCursor.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/FixedSizeElementInvertedListCursor.java
index ed8f600..f55a700 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/FixedSizeElementInvertedListCursor.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/FixedSizeElementInvertedListCursor.java
@@ -46,6 +46,8 @@
     private final FixedSizeTupleReference tuple;
     private ICachedPage[] pages = new ICachedPage[10];
     private int[] elementIndexes = new int[10];
+    
+    private boolean pinned = false;
 
     public FixedSizeElementInvertedListCursor(IBufferCache bufferCache, int fileId, ITypeTraits[] invListFields) {
         this.bufferCache = bufferCache;
@@ -84,12 +86,16 @@
 
     @Override
     public void pinPages() throws HyracksDataException {
+        if (pinned) {
+            return;
+        }
         int pix = 0;
         for (int i = startPageId; i <= endPageId; i++) {
             pages[pix] = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, i), false);
             pages[pix].acquireReadLatch();
             pix++;
         }
+        pinned = true;
     }
 
     @Override
@@ -99,6 +105,7 @@
             pages[i].releaseReadLatch();
             bufferCache.unpin(pages[i]);
         }
+        pinned = false;
     }
 
     private void positionCursor(int elementIx) {
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndex.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndex.java
index 4c6cabc..6e395e7 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndex.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndex.java
@@ -15,6 +15,8 @@
 
 package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk;
 
+import java.util.ArrayList;
+
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -62,8 +64,8 @@
 
     @Override
     public boolean openInvertedListPartitionCursors(IInvertedIndexSearcher searcher, IIndexOperationContext ictx,
-            short numTokensLowerBound, short numTokensUpperBound, InvertedListPartitions invListPartitions)
-            throws HyracksDataException, IndexException {
+            short numTokensLowerBound, short numTokensUpperBound, InvertedListPartitions invListPartitions,
+            ArrayList<IInvertedListCursor> cursorsOrderedByTokens) throws HyracksDataException, IndexException {
         PartitionedTOccurrenceSearcher partSearcher = (PartitionedTOccurrenceSearcher) searcher;
         OnDiskInvertedIndexOpContext ctx = (OnDiskInvertedIndexOpContext) ictx;
         ITupleReference lowSearchKey = null;
@@ -96,6 +98,7 @@
                         btreeTuple.getFieldStart(PARTITIONING_NUM_TOKENS_FIELD));
                 IInvertedListCursor invListCursor = partSearcher.getCachedInvertedListCursor();
                 resetInvertedListCursor(btreeTuple, invListCursor);
+                cursorsOrderedByTokens.add(invListCursor);
                 invListPartitions.addInvertedListCursor(invListCursor, numTokens);
                 tokenExists = true;
             }
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/PartitionedTOccurrenceSearcher.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/PartitionedTOccurrenceSearcher.java
index 2884bbc..05aa6d0 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/PartitionedTOccurrenceSearcher.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/PartitionedTOccurrenceSearcher.java
@@ -43,6 +43,9 @@
     protected final ConcatenatingTupleReference fullLowSearchKey = new ConcatenatingTupleReference(2);
     protected final ConcatenatingTupleReference fullHighSearchKey = new ConcatenatingTupleReference(2);
 
+    // HACK for better IO perf
+    protected final ArrayList<IInvertedListCursor> cursorsOrderedByTokens = new ArrayList<IInvertedListCursor>();
+    
     protected final InvertedListPartitions partitions = new InvertedListPartitions();
 
     public PartitionedTOccurrenceSearcher(IHyracksCommonContext ctx, IInvertedIndex invIndex) {
@@ -101,10 +104,11 @@
         short maxCountPossible = numQueryTokens;
         invListCursorCache.reset();
         partitions.reset(numTokensLowerBound, numTokensUpperBound);
+        cursorsOrderedByTokens.clear();
         for (int i = 0; i < numQueryTokens; i++) {
             searchKey.reset(queryTokenAccessor, i);
             if (!partInvIndex.openInvertedListPartitionCursors(this, ictx, numTokensLowerBound, numTokensUpperBound,
-                    partitions)) {
+                    partitions, cursorsOrderedByTokens)) {
                 maxCountPossible--;
                 // No results possible.
                 if (maxCountPossible < occurrenceThreshold) {                    
@@ -112,11 +116,30 @@
                 }
             }
         }
-
-        // Process the partitions one-by-one.
+        
         ArrayList<IInvertedListCursor>[] partitionCursors = partitions.getPartitions();
         short start = partitions.getMinValidPartitionIndex();
         short end = partitions.getMaxValidPartitionIndex();
+        
+        // HACK FOR BETTER IO
+        if (!cursorsOrderedByTokens.isEmpty()) {
+            for (int i = start; i <= end; i++) {
+                if (partitionCursors[i] == null) {
+                    continue;
+                }
+                // Prune partition because no element in it can satisfy the occurrence threshold.
+                if (partitionCursors[i].size() < occurrenceThreshold) {
+                    cursorsOrderedByTokens.removeAll(partitionCursors[i]);
+                }
+            }
+            // Pin all the cursors in the order of tokens.
+            int numCursors = cursorsOrderedByTokens.size();
+            for (int i = 0; i < numCursors; i++) {
+                cursorsOrderedByTokens.get(i).pinPages();
+            }
+        }
+        
+        // Process the partitions one-by-one.
         for (int i = start; i <= end; i++) {
             if (partitionCursors[i] == null) {
                 continue;
@@ -130,7 +153,7 @@
             invListMerger.reset();
             invListMerger.merge(partitionCursors[i], occurrenceThreshold, numPrefixLists, searchResult);
         }
-
+        
         resultCursor.open(null, searchPred);
     }