Merged hyracks_lsm_tree r2465:r2483.

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_length_filter@2484 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTreeCountingSearchCursor.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTreeCountingSearchCursor.java
index 4f667b2..c1cab0a 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTreeCountingSearchCursor.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTreeCountingSearchCursor.java
@@ -204,6 +204,7 @@
             }
             bufferCache.unpin(page);
         }
+        tupleBuilder.reset();
         tupleIndex = 0;
         page = null;
         pred = null;
@@ -216,7 +217,7 @@
             close();
         } catch (Exception e) {
             e.printStackTrace();
-        }
+        }        
     }
 
     @Override
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/MultiComparator.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/MultiComparator.java
index 01c6cc8..280bb41 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/MultiComparator.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/MultiComparator.java
@@ -96,6 +96,14 @@
         return new MultiComparator(cmps);
     }
     
+    public static MultiComparator create(IBinaryComparatorFactory[] cmpFactories, int startIndex, int numCmps) {
+        IBinaryComparator[] cmps = new IBinaryComparator[numCmps];
+        for (int i = startIndex; i < startIndex + numCmps; i++) {
+            cmps[i] = cmpFactories[i].createBinaryComparator();
+        }
+        return new MultiComparator(cmps);
+    }
+    
     public static MultiComparator create(IBinaryComparatorFactory[]... cmpFactories) {
         int size = 0;
         for (int i = 0; i < cmpFactories.length; i++) {
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/api/IInvertedIndexSearchModifier.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/api/IInvertedIndexSearchModifier.java
index 619937f..afe082d 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/api/IInvertedIndexSearchModifier.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/api/IInvertedIndexSearchModifier.java
@@ -19,5 +19,9 @@
 public interface IInvertedIndexSearchModifier {
     public int getOccurrenceThreshold(int numQueryTokens);
 
-    public int getNumPrefixLists(int numQueryTokens);
+    public int getNumPrefixLists(int occurrenceThreshold, int numInvLists);
+    
+    public int getNumTokensLowerBound(int numQueryTokens);
+    
+    public int getNumTokensUpperBound(int numQueryTokens);
 }
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/api/IInvertedListCursor.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/api/IInvertedListCursor.java
index c62cc57..de703ac 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/api/IInvertedListCursor.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/api/IInvertedListCursor.java
@@ -44,7 +44,7 @@
     public int getStartOff();
 
     public boolean containsKey(ITupleReference searchTuple, MultiComparator invListCmp) throws HyracksDataException, IndexException;
-
+    
     // for debugging
     @SuppressWarnings("rawtypes")
     public String printInvList(ISerializerDeserializer[] serdes) throws HyracksDataException, IndexException;
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/api/IObjectFactory.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/api/IObjectFactory.java
new file mode 100644
index 0000000..9068a2b
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/api/IObjectFactory.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api;
+
+public interface IObjectFactory<T> {
+    public T create();
+}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/api/IPartitionedInvertedIndex.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/api/IPartitionedInvertedIndex.java
new file mode 100644
index 0000000..013cff4
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/api/IPartitionedInvertedIndex.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api;
+
+import java.util.ArrayList;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexOperationContext;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search.InvertedListPartitions;
+
+public interface IPartitionedInvertedIndex {
+    public void openInvertedListPartitionCursors(IInvertedIndexSearcher searcher, IIndexOperationContext ictx,
+            int numTokensLowerBound, int numTokensUpperBound, InvertedListPartitions invListPartitions)
+            throws HyracksDataException, IndexException;
+
+    public void cleanupPartitionState(ArrayList<IInvertedListCursor> invListCursors) throws HyracksDataException;
+}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
index 851451e..1e79d5c 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
@@ -96,33 +96,33 @@
         }
     }
 
-    private final ILSMHarness lsmHarness;
+    protected final ILSMHarness lsmHarness;
 
     // In-memory components.
-    private final LSMInvertedIndexComponent memComponent;
-    private final IInMemoryFreePageManager memFreePageManager;
-    private final IBinaryTokenizerFactory tokenizerFactory;
+    protected final LSMInvertedIndexComponent memComponent;
+    protected final IInMemoryFreePageManager memFreePageManager;
+    protected final IBinaryTokenizerFactory tokenizerFactory;
 
     // On-disk components.
-    private final ILSMIndexFileManager fileManager;
+    protected final ILSMIndexFileManager fileManager;
     // For creating inverted indexes in flush and merge.
-    private final OnDiskInvertedIndexFactory diskInvIndexFactory;
+    protected final OnDiskInvertedIndexFactory diskInvIndexFactory;
     // For creating deleted-keys BTrees in flush and merge.
-    private final BTreeFactory deletedKeysBTreeFactory;
-    private final IBufferCache diskBufferCache;
+    protected final BTreeFactory deletedKeysBTreeFactory;
+    protected final IBufferCache diskBufferCache;
     // List of LSMInvertedIndexComponent instances. Using Object for better sharing via
     // ILSMIndex + LSMHarness.
-    private final LinkedList<Object> diskComponents;
+    protected final LinkedList<Object> diskComponents;
     // Helps to guarantees physical consistency of LSM components.
-    private final ILSMComponentFinalizer componentFinalizer;
+    protected final ILSMComponentFinalizer componentFinalizer;
 
     // Type traits and comparators for tokens and inverted-list elements.
-    private final ITypeTraits[] invListTypeTraits;
-    private final IBinaryComparatorFactory[] invListCmpFactories;
-    private final ITypeTraits[] tokenTypeTraits;
-    private final IBinaryComparatorFactory[] tokenCmpFactories;
+    protected final ITypeTraits[] invListTypeTraits;
+    protected final IBinaryComparatorFactory[] invListCmpFactories;
+    protected final ITypeTraits[] tokenTypeTraits;
+    protected final IBinaryComparatorFactory[] tokenCmpFactories;
 
-    private boolean isActivated;
+    protected boolean isActivated;
 
     public LSMInvertedIndex(IInMemoryBufferCache memBufferCache, IInMemoryFreePageManager memFreePageManager,
             OnDiskInvertedIndexFactory diskInvIndexFactory, BTreeFactory deletedKeysBTreeFactory,
@@ -131,13 +131,6 @@
             IBinaryComparatorFactory[] tokenCmpFactories, IBinaryTokenizerFactory tokenizerFactory,
             ILSMFlushController flushController, ILSMMergePolicy mergePolicy, ILSMOperationTrackerFactory opTrackerFactory,
             ILSMIOOperationScheduler ioScheduler) throws IndexException {
-        InMemoryInvertedIndex memInvIndex = InvertedIndexUtils.createInMemoryBTreeInvertedindex(memBufferCache,
-                memFreePageManager, invListTypeTraits, invListCmpFactories, tokenTypeTraits, tokenCmpFactories,
-                tokenizerFactory);
-        BTree deleteKeysBTree = BTreeUtils.createBTree(memBufferCache, memFreePageManager,
-                ((InMemoryBufferCache) memBufferCache).getFileMapProvider(), invListTypeTraits, invListCmpFactories,
-                BTreeLeafFrameType.REGULAR_NSM, new FileReference(new File("membtree")));
-        memComponent = new LSMInvertedIndexComponent(memInvIndex, deleteKeysBTree);
         this.memFreePageManager = memFreePageManager;
         this.tokenizerFactory = tokenizerFactory;
         this.fileManager = fileManager;
@@ -153,6 +146,18 @@
         this.componentFinalizer = new LSMInvertedIndexComponentFinalizer(diskFileMapProvider);
         diskComponents = new LinkedList<Object>();
         isActivated = false;
+        // Create in-memory component.
+        InMemoryInvertedIndex memInvIndex = createInMemoryInvertedIndex(memBufferCache);
+        BTree deleteKeysBTree = BTreeUtils.createBTree(memBufferCache, memFreePageManager,
+                ((InMemoryBufferCache) memBufferCache).getFileMapProvider(), invListTypeTraits, invListCmpFactories,
+                BTreeLeafFrameType.REGULAR_NSM, new FileReference(new File("membtree")));
+        memComponent = new LSMInvertedIndexComponent(memInvIndex, deleteKeysBTree);
+    }
+    
+    protected InMemoryInvertedIndex createInMemoryInvertedIndex(IInMemoryBufferCache memBufferCache)
+            throws IndexException {
+        return InvertedIndexUtils.createInMemoryBTreeInvertedindex(memBufferCache, memFreePageManager,
+                invListTypeTraits, invListCmpFactories, tokenTypeTraits, tokenCmpFactories, tokenizerFactory);
     }
 
     @Override
@@ -312,7 +317,7 @@
     /**
      * The keys in the in-memory deleted-keys BTree only refer to on-disk components.
      * We delete documents from the in-memory inverted index by deleting its entries directly,
-     * and do NOT add the deleted key to the deleted-keys BTree.
+     * while still adding the deleted key to the deleted-keys BTree.
      * Otherwise, inserts would have to remove keys from the in-memory deleted-keys BTree which 
      * may cause incorrect behavior (lost deletes) in the following pathological case:
      * Insert doc 1, flush, delete doc 1, insert doc 1
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java
index b3dee89..625ecd4 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java
@@ -28,6 +28,8 @@
 
 public class LSMInvertedIndexOpContext implements ILSMIndexOperationContext {
 
+    private static final int NUM_DOCUMENT_FIELDS = 1;
+    
     private IndexOperation op;
     private final IInvertedIndex memInvIndex;
     private final IIndex memDeletedKeysBTree;
@@ -68,11 +70,10 @@
                     deletedKeysBTreeAccessor = memDeletedKeysBTree.createAccessor(NoOpOperationCallback.INSTANCE,
                             NoOpOperationCallback.INSTANCE);
                     // Project away the document fields, leaving only the key fields.
-                    int numTokenFields = memInvIndex.getTokenTypeTraits().length;
                     int numKeyFields = memInvIndex.getInvListTypeTraits().length;
-                    int[] keyFieldPermutation = new int[memInvIndex.getInvListTypeTraits().length];
+                    int[] keyFieldPermutation = new int[numKeyFields];
                     for (int i = 0; i < numKeyFields; i++) {
-                        keyFieldPermutation[i] = numTokenFields + i;
+                        keyFieldPermutation[i] = NUM_DOCUMENT_FIELDS + i;
                     }
                     keysOnlyTuple = new PermutingTupleReference(keyFieldPermutation);
                 }
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/PartitionedLSMInvertedIndex.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/PartitionedLSMInvertedIndex.java
new file mode 100644
index 0000000..d963dbc
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/PartitionedLSMInvertedIndex.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls;
+
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.storage.am.common.api.IInMemoryFreePageManager;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.IInMemoryBufferCache;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushController;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexFileManager;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.BTreeFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.inmemory.InMemoryInvertedIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.OnDiskInvertedIndexFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.InvertedIndexUtils;
+import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
+
+public class PartitionedLSMInvertedIndex extends LSMInvertedIndex {
+
+    public PartitionedLSMInvertedIndex(IInMemoryBufferCache memBufferCache,
+            IInMemoryFreePageManager memFreePageManager, OnDiskInvertedIndexFactory diskInvIndexFactory,
+            BTreeFactory deletedKeysBTreeFactory, ILSMIndexFileManager fileManager,
+            IFileMapProvider diskFileMapProvider, ITypeTraits[] invListTypeTraits,
+            IBinaryComparatorFactory[] invListCmpFactories, ITypeTraits[] tokenTypeTraits,
+            IBinaryComparatorFactory[] tokenCmpFactories, IBinaryTokenizerFactory tokenizerFactory,
+            ILSMFlushController flushController, ILSMMergePolicy mergePolicy,
+            ILSMOperationTrackerFactory opTrackerFactory, ILSMIOOperationScheduler ioScheduler) throws IndexException {
+        super(memBufferCache, memFreePageManager, diskInvIndexFactory, deletedKeysBTreeFactory, fileManager,
+                diskFileMapProvider, invListTypeTraits, invListCmpFactories, tokenTypeTraits, tokenCmpFactories,
+                tokenizerFactory, flushController, mergePolicy, opTrackerFactory, ioScheduler);
+    }
+
+    protected InMemoryInvertedIndex createInMemoryInvertedIndex(IInMemoryBufferCache memBufferCache)
+            throws IndexException {
+        return InvertedIndexUtils.createPartitionedInMemoryBTreeInvertedindex(memBufferCache, memFreePageManager,
+                invListTypeTraits, invListCmpFactories, tokenTypeTraits, tokenCmpFactories, tokenizerFactory);
+    }
+
+}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java
index f0af06a..668250c 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java
@@ -44,16 +44,16 @@
 
 public class InMemoryInvertedIndex implements IInvertedIndex {
 
-    private final BTree btree;
-    private final FileReference memBTreeFile = new FileReference(new File("memBTree"));
-    private final ITypeTraits[] tokenTypeTraits;
-    private final IBinaryComparatorFactory[] tokenCmpFactories;
-    private final ITypeTraits[] invListTypeTraits;
-    private final IBinaryComparatorFactory[] invListCmpFactories;
-    private final IBinaryTokenizerFactory tokenizerFactory;
+    protected final BTree btree;
+    protected final FileReference memBTreeFile = new FileReference(new File("memBTree"));
+    protected final ITypeTraits[] tokenTypeTraits;
+    protected final IBinaryComparatorFactory[] tokenCmpFactories;
+    protected final ITypeTraits[] invListTypeTraits;
+    protected final IBinaryComparatorFactory[] invListCmpFactories;
+    protected final IBinaryTokenizerFactory tokenizerFactory;
 
-    private final ITypeTraits[] btreeTypeTraits;
-    private final IBinaryComparatorFactory[] btreeCmpFactories;
+    protected final ITypeTraits[] btreeTypeTraits;
+    protected final IBinaryComparatorFactory[] btreeCmpFactories;
 
     public InMemoryInvertedIndex(IBufferCache memBufferCache, IFreePageManager memFreePageManager,
             ITypeTraits[] invListTypeTraits, IBinaryComparatorFactory[] invListCmpFactories,
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndexAccessor.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndexAccessor.java
index 5a352c3..a62aaf1 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndexAccessor.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndexAccessor.java
@@ -46,7 +46,7 @@
     public InMemoryInvertedIndexAccessor(InMemoryInvertedIndex index, IIndexOperationContext opCtx) {
         this.opCtx = opCtx;
         this.index = index;
-        this.searcher = new TOccurrenceSearcher(hyracksCtx, index);
+        this.searcher = createSearcher();
         this.btreeAccessor = (BTreeAccessor) index.getBTree().createAccessor(NoOpOperationCallback.INSTANCE,
                 NoOpOperationCallback.INSTANCE);
     }
@@ -62,7 +62,7 @@
         opCtx.setOperation(IndexOperation.DELETE);
         index.delete(tuple, btreeAccessor, opCtx);
     }
-    
+
     @Override
     public IIndexCursor createSearchCursor() {
         return new OnDiskInvertedIndexSearchCursor(searcher, index.getInvListTypeTraits().length);
@@ -89,13 +89,13 @@
         IBTreeLeafFrame leafFrame = (IBTreeLeafFrame) index.getBTree().getLeafFrameFactory().createFrame();
         return new BTreeRangeSearchCursor(leafFrame, false);
     }
-    
+
     @Override
     public void rangeSearch(IIndexCursor cursor, ISearchPredicate searchPred) throws IndexException,
             HyracksDataException {
         btreeAccessor.search(cursor, searchPred);
     }
-    
+
     public BTreeAccessor getBTreeAccessor() {
         return btreeAccessor;
     }
@@ -109,4 +109,8 @@
     public void upsert(ITupleReference tuple) throws HyracksDataException, IndexException {
         throw new UnsupportedOperationException("Upsert not supported by in-memory inverted index.");
     }
+
+    protected IInvertedIndexSearcher createSearcher() {
+        return new TOccurrenceSearcher(hyracksCtx, index);
+    }
 }
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndexOpContext.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndexOpContext.java
index 4a77e52..c109cfd 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndexOpContext.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndexOpContext.java
@@ -39,7 +39,7 @@
     public MultiComparator tokenFieldsCmp;
 
     // To generate in-memory BTree tuples for insertions.
-    private final IBinaryTokenizerFactory tokenizerFactory;
+    protected final IBinaryTokenizerFactory tokenizerFactory;
     public InvertedIndexTokenizingTupleIterator tupleIter;
 
     public InMemoryInvertedIndexOpContext(BTree btree, IBinaryComparatorFactory[] tokenCmpFactories,
@@ -52,19 +52,16 @@
     @Override
     public void setOperation(IndexOperation newOp) {
         switch (newOp) {
-            case INSERT: 
+            case INSERT:
             case DELETE: {
                 if (tupleIter == null) {
-                    IBinaryTokenizer tokenizer = tokenizerFactory.createTokenizer();
-                    tupleIter = new InvertedIndexTokenizingTupleIterator(tokenCmpFactories.length,
-                            btree.getFieldCount() - tokenCmpFactories.length, tokenizer);
+                    setTokenizingTupleIterator();
                 }
                 break;
             }
             case SEARCH: {
                 if (btreePred == null) {
                     btreePred = new RangePredicate(null, null, true, true, null, null);
-                    // TODO: Ignore opcallbacks for now.
                     btreeAccessor = (BTreeAccessor) btree.createAccessor(NoOpOperationCallback.INSTANCE,
                             NoOpOperationCallback.INSTANCE);
                     btreeCmp = MultiComparator.create(btree.getComparatorFactories());
@@ -88,4 +85,10 @@
     public IndexOperation getOperation() {
         return op;
     }
+
+    protected void setTokenizingTupleIterator() {
+        IBinaryTokenizer tokenizer = tokenizerFactory.createTokenizer();
+        tupleIter = new InvertedIndexTokenizingTupleIterator(tokenCmpFactories.length, btree.getFieldCount()
+                - tokenCmpFactories.length, tokenizer);
+    }
 }
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedListCursor.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedListCursor.java
index 21e2bf7..ceae345 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedListCursor.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedListCursor.java
@@ -63,15 +63,18 @@
     }
     
     public void prepare(BTreeAccessor btreeAccessor, RangePredicate btreePred, MultiComparator tokenFieldsCmp,
-            MultiComparator btreeCmp) {
-        this.btreeAccessor = btreeAccessor;
-        this.btreeCursor = btreeAccessor.createSearchCursor();
-        this.countingCursor = btreeAccessor.createCountingSearchCursor();
-        this.btreePred = btreePred;
-        this.btreePred.setLowKeyComparator(tokenFieldsCmp);
-        this.btreePred.setHighKeyComparator(tokenFieldsCmp);
-        this.tokenFieldsCmp = tokenFieldsCmp;
-        this.btreeCmp = btreeCmp;
+            MultiComparator btreeCmp) throws HyracksDataException, IndexException {
+        // Avoid object creation if this.btreeAccessor == btreeAccessor.
+        if (this.btreeAccessor != btreeAccessor) {
+            this.btreeAccessor = btreeAccessor;
+            this.btreeCursor = btreeAccessor.createSearchCursor();
+            this.countingCursor = btreeAccessor.createCountingSearchCursor();
+            this.btreePred = btreePred;
+            this.btreePred.setLowKeyComparator(tokenFieldsCmp);
+            this.btreePred.setHighKeyComparator(tokenFieldsCmp);
+            this.tokenFieldsCmp = tokenFieldsCmp;
+            this.btreeCmp = btreeCmp;
+        }
     }
     
     @Override
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndex.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndex.java
new file mode 100644
index 0000000..21fc0fb
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndex.java
@@ -0,0 +1,141 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.inmemory;
+
+import java.util.ArrayList;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeException;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree.BTreeAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IFreePageManager;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexOperationContext;
+import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexSearcher;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IPartitionedInvertedIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search.InvertedListPartitions;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search.PartitionedTOccurrenceSearcher;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.PartitionedInvertedIndexTokenizingTupleIterator;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+
+public class PartitionedInMemoryInvertedIndex extends InMemoryInvertedIndex implements IPartitionedInvertedIndex {
+
+    protected final ReentrantReadWriteLock partitionIndexLock = new ReentrantReadWriteLock(true);
+    protected int minPartitionIndex = Integer.MAX_VALUE;
+    protected int maxPartitionIndex = Integer.MIN_VALUE;
+
+    public PartitionedInMemoryInvertedIndex(IBufferCache memBufferCache, IFreePageManager memFreePageManager,
+            ITypeTraits[] invListTypeTraits, IBinaryComparatorFactory[] invListCmpFactories,
+            ITypeTraits[] tokenTypeTraits, IBinaryComparatorFactory[] tokenCmpFactories,
+            IBinaryTokenizerFactory tokenizerFactory) throws BTreeException {
+        super(memBufferCache, memFreePageManager, invListTypeTraits, invListCmpFactories, tokenTypeTraits,
+                tokenCmpFactories, tokenizerFactory);
+    }
+
+    @Override
+    public void insert(ITupleReference tuple, BTreeAccessor btreeAccessor, IIndexOperationContext ictx)
+            throws HyracksDataException, IndexException {
+        super.insert(tuple, btreeAccessor, ictx);
+        PartitionedInMemoryInvertedIndexOpContext ctx = (PartitionedInMemoryInvertedIndexOpContext) ictx;
+        PartitionedInvertedIndexTokenizingTupleIterator tupleIter = (PartitionedInvertedIndexTokenizingTupleIterator) ctx.tupleIter;
+        updatePartitionIndexes(tupleIter.getNumTokens());
+    }
+
+    @Override
+    public void clear() throws HyracksDataException {
+        super.clear();
+        minPartitionIndex = Integer.MAX_VALUE;
+        maxPartitionIndex = Integer.MIN_VALUE;
+    }
+
+    public void updatePartitionIndexes(int numTokens) {
+        partitionIndexLock.writeLock().lock();
+        if (numTokens < minPartitionIndex) {
+            minPartitionIndex = numTokens;
+        }
+        if (numTokens > maxPartitionIndex) {
+            maxPartitionIndex = numTokens;
+        }
+        partitionIndexLock.writeLock().unlock();
+    }
+
+    @Override
+    public IIndexAccessor createAccessor(IModificationOperationCallback modificationCallback,
+            ISearchOperationCallback searchCallback) {
+        return new PartitionedInMemoryInvertedIndexAccessor(this, new PartitionedInMemoryInvertedIndexOpContext(btree,
+                tokenCmpFactories, tokenizerFactory));
+    }
+
+    @Override
+    public void openInvertedListPartitionCursors(IInvertedIndexSearcher searcher, IIndexOperationContext ictx,
+            int numTokensLowerBound, int numTokensUpperBound, InvertedListPartitions invListPartitions)
+            throws HyracksDataException, IndexException {
+        int minPartitionIndex;
+        int maxPartitionIndex;
+        partitionIndexLock.readLock().lock();
+        minPartitionIndex = this.minPartitionIndex;
+        maxPartitionIndex = this.maxPartitionIndex;
+        partitionIndexLock.readLock().unlock();
+
+        if (minPartitionIndex == Integer.MAX_VALUE || maxPartitionIndex == Integer.MIN_VALUE) {
+            // Index must be empty.
+            return;
+        }
+        int partitionStartIndex = minPartitionIndex;
+        int partitionEndIndex = maxPartitionIndex;
+        if (numTokensLowerBound >= 0) {
+            partitionStartIndex = Math.max(minPartitionIndex, numTokensLowerBound);
+        }
+        if (numTokensUpperBound >= 0) {
+            partitionEndIndex = Math.min(maxPartitionIndex, numTokensUpperBound);
+        }
+
+        PartitionedTOccurrenceSearcher partSearcher = (PartitionedTOccurrenceSearcher) searcher;
+        PartitionedInMemoryInvertedIndexOpContext ctx = (PartitionedInMemoryInvertedIndexOpContext) ictx;
+        ctx.setOperation(IndexOperation.SEARCH);
+        // We can pick either of the full low or high search key, since they should be identical here.
+        ITupleReference searchKey = partSearcher.getFullLowSearchKey();
+        ctx.btreePred.setLowKey(searchKey, true);
+        ctx.btreePred.setHighKey(searchKey, true);
+        // Go through all possibly partitions and see if the token matches.
+        // TODO: This procedure could be made more efficient by determining the next partition to search
+        // using the last existing partition and re-searching the BTree with an open interval as low key.
+        for (int i = partitionStartIndex; i <= partitionEndIndex; i++) {
+            partSearcher.setNumTokensBoundsInSearchKeys(i, i);
+            InMemoryInvertedListCursor inMemListCursor = (InMemoryInvertedListCursor) partSearcher
+                    .getCachedInvertedListCursor();
+            inMemListCursor.prepare(ctx.btreeAccessor, ctx.btreePred, ctx.tokenFieldsCmp, ctx.btreeCmp);
+            inMemListCursor.reset(searchKey);
+            invListPartitions.addInvertedListCursor(inMemListCursor, i);
+        }
+    }
+
+    @Override
+    public void cleanupPartitionState(ArrayList<IInvertedListCursor> invListCursors) throws HyracksDataException {
+        // Make sure to unpin/close cursors if the entire partition is pruned.
+        //for (IInvertedListCursor cursor : invListCursors) {
+        //    cursor.unpinPages();
+        //}
+    }
+}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndexAccessor.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndexAccessor.java
new file mode 100644
index 0000000..813961c
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndexAccessor.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.inmemory;
+
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexOperationContext;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexSearcher;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search.PartitionedTOccurrenceSearcher;
+
+public class PartitionedInMemoryInvertedIndexAccessor extends InMemoryInvertedIndexAccessor {
+
+    public PartitionedInMemoryInvertedIndexAccessor(InMemoryInvertedIndex index, IIndexOperationContext opCtx) {
+        super(index, opCtx);
+    }
+
+    protected IInvertedIndexSearcher createSearcher() {
+        return new PartitionedTOccurrenceSearcher(hyracksCtx, index);
+    }
+}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndexOpContext.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndexOpContext.java
new file mode 100644
index 0000000..f0e5046
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndexOpContext.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.inmemory;
+
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizer;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.PartitionedInvertedIndexTokenizingTupleIterator;
+
+public class PartitionedInMemoryInvertedIndexOpContext extends InMemoryInvertedIndexOpContext {
+
+    public PartitionedInMemoryInvertedIndexOpContext(BTree btree, IBinaryComparatorFactory[] tokenCmpFactories,
+            IBinaryTokenizerFactory tokenizerFactory) {
+        super(btree, tokenCmpFactories, tokenizerFactory);
+    }
+
+    protected void setTokenizingTupleIterator() {
+        IBinaryTokenizer tokenizer = tokenizerFactory.createTokenizer();
+        tupleIter = new PartitionedInvertedIndexTokenizingTupleIterator(tokenCmpFactories.length, btree.getFieldCount()
+                - tokenCmpFactories.length, tokenizer);
+    }
+}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
index 69e57be..d1aa854 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
@@ -15,6 +15,8 @@
 
 package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk;
 
+import java.io.DataOutput;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
@@ -67,17 +69,16 @@
  * cannot exceed the size of a Hyracks frame.
  */
 public class OnDiskInvertedIndex implements IInvertedIndex {
-    private final IHyracksCommonContext ctx = new DefaultHyracksCommonContext();
+    protected final IHyracksCommonContext ctx = new DefaultHyracksCommonContext();
 
-    // Schema of BTree tuples.
-    public final int TOKEN_FIELD = 0;
-    public final int INVLIST_START_PAGE_ID_FIELD = 1;
-    public final int INVLIST_END_PAGE_ID_FIELD = 2;
-    public final int INVLIST_START_OFF_FIELD = 3;
-    public final int INVLIST_NUM_ELEMENTS_FIELD = 4;
-
+    // Schema of BTree tuples, set in constructor.    
+    protected final int invListStartPageIdField;
+    protected final int invListEndPageIdField;
+    protected final int invListStartOffField;
+    protected final int invListNumElementsField;
+    
     // Type traits to be appended to the token type trait which finally form the BTree field type traits.
-    private static final ITypeTraits[] btreeValueTypeTraits = new ITypeTraits[4];
+    protected static final ITypeTraits[] btreeValueTypeTraits = new ITypeTraits[4];
     static {
         // startPageId
         btreeValueTypeTraits[0] = IntegerPointable.TYPE_TRAITS;
@@ -89,23 +90,22 @@
         btreeValueTypeTraits[3] = IntegerPointable.TYPE_TRAITS;
     }
 
-    private BTree btree;
-    private int rootPageId = 0;
-    private IBufferCache bufferCache;
-    private IFileMapProvider fileMapProvider;
-    private int fileId = -1;
-    private final ITypeTraits[] invListTypeTraits;
-    private final IBinaryComparatorFactory[] invListCmpFactories;
-    private final ITypeTraits[] tokenTypeTraits;
-    private final IBinaryComparatorFactory[] tokenCmpFactories;
-    private final IInvertedListBuilder invListBuilder;
-    private final int numTokenFields;
-    private final int numInvListKeys;
-    private final FileReference invListsFile;
+    protected BTree btree;
+    protected int rootPageId = 0;
+    protected IBufferCache bufferCache;
+    protected IFileMapProvider fileMapProvider;
+    protected int fileId = -1;
+    protected final ITypeTraits[] invListTypeTraits;
+    protected final IBinaryComparatorFactory[] invListCmpFactories;
+    protected final ITypeTraits[] tokenTypeTraits;
+    protected final IBinaryComparatorFactory[] tokenCmpFactories;
+    protected final IInvertedListBuilder invListBuilder;
+    protected final int numTokenFields;
+    protected final int numInvListKeys;
+    protected final FileReference invListsFile;
     // Last page id of inverted-lists file (inclusive). Set during bulk load.
-    private int invListsMaxPageId = -1;
-
-    private boolean isOpen = false;
+    protected int invListsMaxPageId = -1;
+    protected boolean isOpen = false;
 
     public OnDiskInvertedIndex(IBufferCache bufferCache, IFileMapProvider fileMapProvider,
             IInvertedListBuilder invListBuilder, ITypeTraits[] invListTypeTraits,
@@ -124,6 +124,10 @@
         this.numTokenFields = btree.getComparatorFactories().length;
         this.numInvListKeys = invListCmpFactories.length;
         this.invListsFile = invListsFile;
+        this.invListStartPageIdField = numTokenFields;
+        this.invListEndPageIdField = numTokenFields + 1;
+        this.invListStartOffField = numTokenFields + 2;
+        this.invListNumElementsField = numTokenFields + 3;
     }
 
     @Override
@@ -258,19 +262,7 @@
         try {
             if (ctx.btreeCursor.hasNext()) {
                 ctx.btreeCursor.next();
-                ITupleReference frameTuple = ctx.btreeCursor.getTuple();
-                int startPageId = IntegerSerializerDeserializer.getInt(
-                        frameTuple.getFieldData(INVLIST_START_PAGE_ID_FIELD),
-                        frameTuple.getFieldStart(INVLIST_START_PAGE_ID_FIELD));
-                int endPageId = IntegerSerializerDeserializer.getInt(
-                        frameTuple.getFieldData(INVLIST_END_PAGE_ID_FIELD),
-                        frameTuple.getFieldStart(INVLIST_END_PAGE_ID_FIELD));
-                int startOff = IntegerSerializerDeserializer.getInt(frameTuple.getFieldData(INVLIST_START_OFF_FIELD),
-                        frameTuple.getFieldStart(INVLIST_START_OFF_FIELD));
-                int numElements = IntegerSerializerDeserializer.getInt(
-                        frameTuple.getFieldData(INVLIST_NUM_ELEMENTS_FIELD),
-                        frameTuple.getFieldStart(INVLIST_NUM_ELEMENTS_FIELD));
-                listCursor.reset(startPageId, endPageId, startOff, numElements);
+                resetInvertedListCursor(ctx.btreeCursor.getTuple(), listCursor);
             } else {
                 listCursor.reset(0, 0, 0, 0);
             }
@@ -280,6 +272,18 @@
         }
     }
 
+    public void resetInvertedListCursor(ITupleReference btreeTuple, IInvertedListCursor listCursor) {
+        int startPageId = IntegerSerializerDeserializer.getInt(btreeTuple.getFieldData(invListStartPageIdField),
+                btreeTuple.getFieldStart(invListStartPageIdField));
+        int endPageId = IntegerSerializerDeserializer.getInt(btreeTuple.getFieldData(invListEndPageIdField),
+                btreeTuple.getFieldStart(invListEndPageIdField));
+        int startOff = IntegerSerializerDeserializer.getInt(btreeTuple.getFieldData(invListStartOffField),
+                btreeTuple.getFieldStart(invListStartOffField));
+        int numElements = IntegerSerializerDeserializer.getInt(btreeTuple.getFieldData(invListNumElementsField),
+                btreeTuple.getFieldStart(invListNumElementsField));
+        listCursor.reset(startPageId, endPageId, startOff, numElements);
+    }
+    
     public final class OnDiskInvertedIndexBulkLoader implements IIndexBulkLoader {
         private final ArrayTupleBuilder btreeTupleBuilder;
         private final ArrayTupleReference btreeTupleReference;
@@ -330,14 +334,26 @@
         private void createAndInsertBTreeTuple() throws IndexException, HyracksDataException {
             // Build tuple.        
             btreeTupleBuilder.reset();
-            btreeTupleBuilder.addField(lastTuple.getFieldData(0), lastTuple.getFieldStart(0),
-                    lastTuple.getFieldLength(0));
-            // TODO: Boxing integers here. Fix it.
-            btreeTupleBuilder.addField(IntegerSerializerDeserializer.INSTANCE, currentInvListStartPageId);
-            btreeTupleBuilder.addField(IntegerSerializerDeserializer.INSTANCE, currentPageId);
-            btreeTupleBuilder.addField(IntegerSerializerDeserializer.INSTANCE, currentInvListStartOffset);
-            btreeTupleBuilder.addField(IntegerSerializerDeserializer.INSTANCE, invListBuilder.getListSize());
-            // Reset tuple reference and add it.
+            DataOutput output = btreeTupleBuilder.getDataOutput();
+            // Add key fields.
+            for (int i = 0; i < numTokenFields; i++) {
+                btreeTupleBuilder.addField(lastTuple.getFieldData(i), lastTuple.getFieldStart(i),
+                        lastTuple.getFieldLength(i));
+            }
+            // Add inverted-list 'pointer' value fields.
+            try {
+                output.writeInt(currentInvListStartPageId);
+                btreeTupleBuilder.addFieldEndOffset();
+                output.writeInt(currentPageId);
+                btreeTupleBuilder.addFieldEndOffset();
+                output.writeInt(currentInvListStartOffset);
+                btreeTupleBuilder.addFieldEndOffset();
+                output.writeInt(invListBuilder.getListSize());
+                btreeTupleBuilder.addFieldEndOffset();
+            } catch (IOException e) {
+                throw new HyracksDataException(e);
+            }
+            // Reset tuple reference and add it into the BTree load.
             btreeTupleReference.reset(btreeTupleBuilder.getFieldEndOffsets(), btreeTupleBuilder.getByteArray());
             btreeBulkloader.add(btreeTupleReference);
         }
@@ -456,6 +472,12 @@
             this.searcher = new TOccurrenceSearcher(ctx, index);
         }
 
+        // Let subclasses initialize.
+        protected OnDiskInvertedIndexAccessor(OnDiskInvertedIndex index, IInvertedIndexSearcher searcher) {
+            this.index = index;
+            this.searcher = searcher;
+        }
+        
         @Override
         public IIndexCursor createSearchCursor() {
             return new OnDiskInvertedIndexSearchCursor(searcher, index.getInvListTypeTraits().length);
@@ -615,7 +637,7 @@
         return 0;
     }
 
-    private static ITypeTraits[] getBTreeTypeTraits(ITypeTraits[] tokenTypeTraits) {
+    protected static ITypeTraits[] getBTreeTypeTraits(ITypeTraits[] tokenTypeTraits) {
         ITypeTraits[] btreeTypeTraits = new ITypeTraits[tokenTypeTraits.length + btreeValueTypeTraits.length];
         // Set key type traits.
         for (int i = 0; i < tokenTypeTraits.length; i++) {
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexOpContext.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexOpContext.java
index fd784f3..8ed4cd1 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexOpContext.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexOpContext.java
@@ -30,12 +30,17 @@
     public IIndexAccessor btreeAccessor;
     public IIndexCursor btreeCursor;
     public MultiComparator searchCmp;
+    // For prefix search on partitioned indexes.
+    public MultiComparator prefixSearchCmp;
 
     public OnDiskInvertedIndexOpContext(BTree btree) {
         // TODO: Ignore opcallbacks for now.
         btreeAccessor = btree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
         btreeCursor = btreeAccessor.createSearchCursor();
         searchCmp = MultiComparator.create(btree.getComparatorFactories());
+        if (btree.getComparatorFactories().length > 1) {
+            prefixSearchCmp = MultiComparator.create(btree.getComparatorFactories(), 0, 1);
+        }
     }
 
     @Override
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndex.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndex.java
new file mode 100644
index 0000000..f9a146d
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndex.java
@@ -0,0 +1,112 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk;
+
+import java.util.ArrayList;
+
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexOperationContext;
+import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexSearcher;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedListBuilder;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IPartitionedInvertedIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search.InvertedListPartitions;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search.PartitionedTOccurrenceSearcher;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
+
+public class PartitionedOnDiskInvertedIndex extends OnDiskInvertedIndex implements IPartitionedInvertedIndex {
+
+    protected final int PARTITIONING_NUM_TOKENS_FIELD = 1;
+
+    public PartitionedOnDiskInvertedIndex(IBufferCache bufferCache, IFileMapProvider fileMapProvider,
+            IInvertedListBuilder invListBuilder, ITypeTraits[] invListTypeTraits,
+            IBinaryComparatorFactory[] invListCmpFactories, ITypeTraits[] tokenTypeTraits,
+            IBinaryComparatorFactory[] tokenCmpFactories, FileReference btreeFile, FileReference invListsFile)
+            throws IndexException {
+        super(bufferCache, fileMapProvider, invListBuilder, invListTypeTraits, invListCmpFactories, tokenTypeTraits,
+                tokenCmpFactories, btreeFile, invListsFile);
+    }
+
+    public class PartitionedOnDiskInvertedIndexAccessor extends OnDiskInvertedIndexAccessor {
+        public PartitionedOnDiskInvertedIndexAccessor(OnDiskInvertedIndex index) {
+            super(index, new PartitionedTOccurrenceSearcher(ctx, index));
+        }
+    }
+
+    @Override
+    public IIndexAccessor createAccessor(IModificationOperationCallback modificationCallback,
+            ISearchOperationCallback searchCallback) {
+        return new PartitionedOnDiskInvertedIndexAccessor(this);
+    }
+
+    @Override
+    public void openInvertedListPartitionCursors(IInvertedIndexSearcher searcher, IIndexOperationContext ictx,
+            int numTokensLowerBound, int numTokensUpperBound, InvertedListPartitions invListPartitions)
+            throws HyracksDataException, IndexException {
+        PartitionedTOccurrenceSearcher partSearcher = (PartitionedTOccurrenceSearcher) searcher;
+        OnDiskInvertedIndexOpContext ctx = (OnDiskInvertedIndexOpContext) ictx;
+        ITupleReference lowSearchKey = null;
+        ITupleReference highSearchKey = null;
+        partSearcher.setNumTokensBoundsInSearchKeys(numTokensLowerBound, numTokensUpperBound);
+        if (numTokensLowerBound < 0) {
+            ctx.btreePred.setLowKeyComparator(ctx.prefixSearchCmp);
+            lowSearchKey = partSearcher.getPrefixSearchKey();
+        } else {
+            ctx.btreePred.setLowKeyComparator(ctx.searchCmp);
+            lowSearchKey = partSearcher.getFullLowSearchKey();
+        }
+        if (numTokensUpperBound < 0) {
+            ctx.btreePred.setHighKeyComparator(ctx.prefixSearchCmp);
+            highSearchKey = partSearcher.getPrefixSearchKey();
+        } else {
+            ctx.btreePred.setHighKeyComparator(ctx.searchCmp);
+            highSearchKey = partSearcher.getFullHighSearchKey();
+        }
+        ctx.btreePred.setLowKey(lowSearchKey, true);
+        ctx.btreePred.setHighKey(highSearchKey, true);
+        ctx.btreeAccessor.search(ctx.btreeCursor, ctx.btreePred);
+        try {
+            while (ctx.btreeCursor.hasNext()) {
+                ctx.btreeCursor.next();
+                ITupleReference btreeTuple = ctx.btreeCursor.getTuple();
+                int numTokens = IntegerSerializerDeserializer.getInt(
+                        btreeTuple.getFieldData(PARTITIONING_NUM_TOKENS_FIELD),
+                        btreeTuple.getFieldStart(PARTITIONING_NUM_TOKENS_FIELD));
+                IInvertedListCursor invListCursor = partSearcher.getCachedInvertedListCursor();
+                resetInvertedListCursor(btreeTuple, invListCursor);
+                invListPartitions.addInvertedListCursor(invListCursor, numTokens);
+            }
+        } finally {
+            ctx.btreeCursor.close();
+            ctx.btreeCursor.reset();
+        }
+    }
+
+    @Override
+    public void cleanupPartitionState(ArrayList<IInvertedListCursor> invListCursors) throws HyracksDataException {
+        // Do nothing.
+    }
+}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndexFactory.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndexFactory.java
new file mode 100644
index 0000000..854a30f
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndexFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk;
+
+import java.io.File;
+
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexFileNameMapper;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedListBuilder;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedListBuilderFactory;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
+
+public class PartitionedOnDiskInvertedIndexFactory extends OnDiskInvertedIndexFactory {
+    
+    public PartitionedOnDiskInvertedIndexFactory(IBufferCache bufferCache, IFileMapProvider fileMapProvider,
+            IInvertedListBuilderFactory invListBuilderFactory, ITypeTraits[] invListTypeTraits,
+            IBinaryComparatorFactory[] invListCmpFactories, ITypeTraits[] tokenTypeTraits,
+            IBinaryComparatorFactory[] tokenCmpFactories, IInvertedIndexFileNameMapper fileNameMapper) {
+        super(bufferCache, fileMapProvider, invListBuilderFactory, invListTypeTraits, invListCmpFactories, tokenTypeTraits,
+                tokenCmpFactories, fileNameMapper);
+    }
+
+    @Override
+    public IInvertedIndex createIndexInstance(FileReference dictBTreeFile) throws IndexException {
+        String invListsFilePath = fileNameMapper.getInvListsFilePath(dictBTreeFile.getFile().getPath());
+        FileReference invListsFile = new FileReference(new File(invListsFilePath));
+        IInvertedListBuilder invListBuilder = invListBuilderFactory.create();
+        return new PartitionedOnDiskInvertedIndex(bufferCache, fileMapProvider, invListBuilder, invListTypeTraits,
+                invListCmpFactories, tokenTypeTraits, tokenCmpFactories, dictBTreeFile, invListsFile);
+    }
+}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/AbstractTOccurrenceSearcher.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/AbstractTOccurrenceSearcher.java
new file mode 100644
index 0000000..af597bc
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/AbstractTOccurrenceSearcher.java
@@ -0,0 +1,154 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexSearcher;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IObjectFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.exceptions.OccurrenceThresholdPanicException;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeFrameTupleAccessor;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeTupleReference;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizer;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IToken;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.ObjectCache;
+
+public abstract class AbstractTOccurrenceSearcher implements IInvertedIndexSearcher {
+    protected static final RecordDescriptor QUERY_TOKEN_REC_DESC = new RecordDescriptor(
+            new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
+
+    protected final int OBJECT_CACHE_INIT_SIZE = 10;
+    protected final int OBJECT_CACHE_EXPAND_SIZE = 10;
+
+    protected final IHyracksCommonContext ctx;
+
+    protected final InvertedListMerger invListMerger;
+    protected final SearchResult searchResult;
+    protected final IInvertedIndex invIndex;
+    protected final MultiComparator invListCmp;
+
+    protected final ArrayTupleBuilder queryTokenBuilder = new ArrayTupleBuilder(QUERY_TOKEN_REC_DESC.getFieldCount());
+    protected final ByteBuffer queryTokenFrame;
+    protected final FrameTupleAppender queryTokenAppender;
+    protected final FrameTupleAccessor queryTokenAccessor;
+    protected final FrameTupleReference searchKey = new FrameTupleReference();
+
+    protected int occurrenceThreshold;
+
+    protected final IObjectFactory<IInvertedListCursor> invListCursorFactory;
+    protected final ObjectCache<IInvertedListCursor> invListCursorCache;
+
+    public AbstractTOccurrenceSearcher(IHyracksCommonContext ctx, IInvertedIndex invIndex) {
+        this.ctx = ctx;
+        this.invListMerger = new InvertedListMerger(ctx, invIndex);
+        this.searchResult = new SearchResult(invIndex.getInvListTypeTraits(), ctx);
+        this.invIndex = invIndex;
+        this.invListCmp = MultiComparator.create(invIndex.getInvListCmpFactories());
+        this.invListCursorFactory = new InvertedListCursorFactory(invIndex);
+        this.invListCursorCache = new ObjectCache<IInvertedListCursor>(invListCursorFactory, OBJECT_CACHE_INIT_SIZE,
+                OBJECT_CACHE_EXPAND_SIZE);
+        this.queryTokenFrame = ctx.allocateFrame();
+        this.queryTokenAppender = new FrameTupleAppender(ctx.getFrameSize());
+        this.queryTokenAccessor = new FrameTupleAccessor(ctx.getFrameSize(), QUERY_TOKEN_REC_DESC);
+        this.queryTokenAccessor.reset(queryTokenFrame);
+    }
+
+    public void reset() {
+        searchResult.clear();
+        invListMerger.reset();
+    }
+
+    protected void tokenizeQuery(InvertedIndexSearchPredicate searchPred) throws HyracksDataException,
+            OccurrenceThresholdPanicException {
+        ITupleReference queryTuple = searchPred.getQueryTuple();
+        int queryFieldIndex = searchPred.getQueryFieldIndex();
+        IBinaryTokenizer queryTokenizer = searchPred.getQueryTokenizer();
+
+        queryTokenAppender.reset(queryTokenFrame, true);
+        queryTokenizer.reset(queryTuple.getFieldData(queryFieldIndex), queryTuple.getFieldStart(queryFieldIndex),
+                queryTuple.getFieldLength(queryFieldIndex));
+
+        while (queryTokenizer.hasNext()) {
+            queryTokenizer.next();
+            queryTokenBuilder.reset();
+            try {
+                IToken token = queryTokenizer.getToken();
+                token.serializeToken(queryTokenBuilder.getDataOutput());
+                queryTokenBuilder.addFieldEndOffset();
+                // WARNING: assuming one frame is big enough to hold all tokens
+                queryTokenAppender.append(queryTokenBuilder.getFieldEndOffsets(), queryTokenBuilder.getByteArray(), 0,
+                        queryTokenBuilder.getSize());
+            } catch (IOException e) {
+                throw new HyracksDataException(e);
+            }
+        }
+    }
+
+    public IFrameTupleAccessor createResultFrameTupleAccessor() {
+        return new FixedSizeFrameTupleAccessor(ctx.getFrameSize(), searchResult.getTypeTraits());
+    }
+
+    public ITupleReference createResultFrameTupleReference() {
+        return new FixedSizeTupleReference(searchResult.getTypeTraits());
+    }
+
+    @Override
+    public List<ByteBuffer> getResultBuffers() {
+        return searchResult.getBuffers();
+    }
+
+    @Override
+    public int getNumValidResultBuffers() {
+        return searchResult.getCurrentBufferIndex() + 1;
+    }
+
+    public int getOccurrenceThreshold() {
+        return occurrenceThreshold;
+    }
+
+    public void printNewResults(int maxResultBufIdx, List<ByteBuffer> buffer) {
+        StringBuffer strBuffer = new StringBuffer();
+        FixedSizeFrameTupleAccessor resultFrameTupleAcc = searchResult.getAccessor();
+        for (int i = 0; i <= maxResultBufIdx; i++) {
+            ByteBuffer testBuf = buffer.get(i);
+            resultFrameTupleAcc.reset(testBuf);
+            for (int j = 0; j < resultFrameTupleAcc.getTupleCount(); j++) {
+                strBuffer.append(IntegerSerializerDeserializer.getInt(resultFrameTupleAcc.getBuffer().array(),
+                        resultFrameTupleAcc.getFieldStartOffset(j, 0)) + ",");
+                strBuffer.append(IntegerSerializerDeserializer.getInt(resultFrameTupleAcc.getBuffer().array(),
+                        resultFrameTupleAcc.getFieldStartOffset(j, 1)) + " ");
+            }
+        }
+        System.out.println(strBuffer.toString());
+    }
+}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/ArrayListFactory.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/ArrayListFactory.java
new file mode 100644
index 0000000..493063e
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/ArrayListFactory.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search;
+
+import java.util.ArrayList;
+
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IObjectFactory;
+
+public class ArrayListFactory<T> implements IObjectFactory<ArrayList<T>>{
+    @Override
+    public ArrayList<T> create() {
+        return new ArrayList<T>();
+    }
+}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/ConjunctiveSearchModifier.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/ConjunctiveSearchModifier.java
index 1d260f0..55c5f30 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/ConjunctiveSearchModifier.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/ConjunctiveSearchModifier.java
@@ -25,7 +25,7 @@
     }
 
     @Override
-    public int getNumPrefixLists(int numQueryTokens) {
+    public int getNumPrefixLists(int occurrenceThreshold, int numInvLists) {
         return 1;
     }
     
@@ -33,4 +33,14 @@
     public String toString() {
         return "Conjunctive Search Modifier";
     }
+
+    @Override
+    public int getNumTokensLowerBound(int numQueryTokens) {
+        return -1;
+    }
+
+    @Override
+    public int getNumTokensUpperBound(int numQueryTokens) {
+        return -1;
+    }
 }
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/EditDistanceSearchModifier.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/EditDistanceSearchModifier.java
index 0580319..26e11ba 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/EditDistanceSearchModifier.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/EditDistanceSearchModifier.java
@@ -33,10 +33,20 @@
     }
 
     @Override
-    public int getNumPrefixLists(int numQueryTokens) {
-        return numQueryTokens - getOccurrenceThreshold(numQueryTokens) + 1;
+    public int getNumPrefixLists(int occurrenceThreshold, int numInvLists) {
+        return numInvLists - occurrenceThreshold + 1;
     }
 
+    @Override
+    public int getNumTokensLowerBound(int numQueryTokens) {
+        return numQueryTokens - edThresh;
+    }
+
+    @Override
+    public int getNumTokensUpperBound(int numQueryTokens) {
+        return numQueryTokens + edThresh;
+    }
+    
     public int getGramLength() {
         return gramLength;
     }
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/InvertedListCursorFactory.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/InvertedListCursorFactory.java
new file mode 100644
index 0000000..b4b3c43
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/InvertedListCursorFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search;
+
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IObjectFactory;
+
+public class InvertedListCursorFactory implements IObjectFactory<IInvertedListCursor> {
+
+    private final IInvertedIndex invIndex;
+
+    public InvertedListCursorFactory(IInvertedIndex invIndex) {
+        this.invIndex = invIndex;
+    }
+
+    @Override
+    public IInvertedListCursor create() {
+        return invIndex.createInvertedListCursor();
+    }
+}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/InvertedListMerger.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/InvertedListMerger.java
new file mode 100644
index 0000000..18ed73c
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/InvertedListMerger.java
@@ -0,0 +1,330 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+
+import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeFrameTupleAccessor;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeTupleReference;
+
+// TODO: The merge procedure is rather confusing regarding cursor positions, hasNext() calls etc.
+// Needs an overhaul some time.
+public class InvertedListMerger {
+
+    protected final MultiComparator invListCmp;
+    protected SearchResult prevSearchResult;
+    protected SearchResult newSearchResult;
+
+    public InvertedListMerger(IHyracksCommonContext ctx, IInvertedIndex invIndex) {
+        this.invListCmp = MultiComparator.create(invIndex.getInvListCmpFactories());
+        this.prevSearchResult = new SearchResult(invIndex.getInvListTypeTraits(), ctx);
+        this.newSearchResult = new SearchResult(prevSearchResult);
+    }
+
+    public void merge(ArrayList<IInvertedListCursor> invListCursors, int occurrenceThreshold, int numPrefixLists,
+            SearchResult searchResult) throws HyracksDataException, IndexException {
+        Collections.sort(invListCursors);
+        int numInvLists = invListCursors.size();
+        SearchResult result = null;
+        for (int i = 0; i < numInvLists; i++) {
+            SearchResult swapTemp = prevSearchResult;
+            prevSearchResult = newSearchResult;
+            newSearchResult = swapTemp;
+            newSearchResult.reset();
+            if (i + 1 != numInvLists) {
+                // Use temporary search results when not merging last list.
+                result = newSearchResult;
+            } else {
+                // When merging the last list, append results to the final search result.
+                result = searchResult;
+            }
+            IInvertedListCursor invListCursor = invListCursors.get(i);
+            invListCursor.pinPages();
+            if (i < numPrefixLists) {
+                // Merge prefix list.
+                mergePrefixList(invListCursor, prevSearchResult, result);
+            } else {
+                // Merge suffix list.
+                int numInvListElements = invListCursor.size();
+                int currentNumResults = prevSearchResult.getNumResults();
+                // Should we binary search the next list or should we sort-merge it?
+                if (currentNumResults * Math.log(numInvListElements) < currentNumResults + numInvListElements) {
+                    mergeSuffixListProbe(invListCursor, prevSearchResult, result, i, numInvLists,
+                            occurrenceThreshold);
+                } else {
+                    mergeSuffixListScan(invListCursor, prevSearchResult, result, i, numInvLists,
+                            occurrenceThreshold);
+                }
+            }
+            invListCursor.unpinPages();
+        }
+    }
+
+    protected void mergeSuffixListProbe(IInvertedListCursor invListCursor, SearchResult prevSearchResult,
+            SearchResult newSearchResult, int invListIx, int numInvLists, int occurrenceThreshold)
+            throws HyracksDataException, IndexException {
+
+        int prevBufIdx = 0;
+        int maxPrevBufIdx = prevSearchResult.getCurrentBufferIndex();
+        ByteBuffer prevCurrentBuffer = prevSearchResult.getBuffers().get(0);
+
+        FixedSizeFrameTupleAccessor resultFrameTupleAcc = prevSearchResult.getAccessor();
+        FixedSizeTupleReference resultTuple = prevSearchResult.getTuple();
+
+        int resultTidx = 0;
+
+        resultFrameTupleAcc.reset(prevCurrentBuffer);
+
+        while (resultTidx < resultFrameTupleAcc.getTupleCount()) {
+
+            resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));
+            int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
+                    resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
+
+            if (invListCursor.containsKey(resultTuple, invListCmp)) {
+                count++;
+                newSearchResult.append(resultTuple, count);
+            } else {
+                if (count + numInvLists - invListIx > occurrenceThreshold) {
+                    newSearchResult.append(resultTuple, count);
+                }
+            }
+
+            resultTidx++;
+            if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
+                prevBufIdx++;
+                if (prevBufIdx <= maxPrevBufIdx) {
+                    prevCurrentBuffer = prevSearchResult.getBuffers().get(prevBufIdx);
+                    resultFrameTupleAcc.reset(prevCurrentBuffer);
+                    resultTidx = 0;
+                }
+            }
+        }
+    }
+
+    protected void mergeSuffixListScan(IInvertedListCursor invListCursor, SearchResult prevSearchResult,
+            SearchResult newSearchResult, int invListIx, int numInvLists, int occurrenceThreshold)
+            throws HyracksDataException, IndexException {
+
+        int prevBufIdx = 0;
+        int maxPrevBufIdx = prevSearchResult.getCurrentBufferIndex();
+        ByteBuffer prevCurrentBuffer = prevSearchResult.getBuffers().get(0);
+
+        FixedSizeFrameTupleAccessor resultFrameTupleAcc = prevSearchResult.getAccessor();
+        FixedSizeTupleReference resultTuple = prevSearchResult.getTuple();
+
+        boolean advanceCursor = true;
+        boolean advancePrevResult = false;
+        int resultTidx = 0;
+
+        resultFrameTupleAcc.reset(prevCurrentBuffer);
+
+        int invListTidx = 0;
+        int invListNumTuples = invListCursor.size();
+
+        if (invListCursor.hasNext())
+            invListCursor.next();
+
+        while (invListTidx < invListNumTuples && resultTidx < resultFrameTupleAcc.getTupleCount()) {
+
+            ITupleReference invListTuple = invListCursor.getTuple();
+
+            resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));
+
+            int cmp = invListCmp.compare(invListTuple, resultTuple);
+            if (cmp == 0) {
+                int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
+                        resultTuple.getFieldStart(resultTuple.getFieldCount() - 1)) + 1;
+                newSearchResult.append(resultTuple, count);
+                advanceCursor = true;
+                advancePrevResult = true;
+            } else {
+                if (cmp < 0) {
+                    advanceCursor = true;
+                    advancePrevResult = false;
+                } else {
+                    int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
+                            resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
+                    if (count + numInvLists - invListIx > occurrenceThreshold) {
+                        newSearchResult.append(resultTuple, count);
+                    }
+                    advanceCursor = false;
+                    advancePrevResult = true;
+                }
+            }
+
+            if (advancePrevResult) {
+                resultTidx++;
+                if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
+                    prevBufIdx++;
+                    if (prevBufIdx <= maxPrevBufIdx) {
+                        prevCurrentBuffer = prevSearchResult.getBuffers().get(prevBufIdx);
+                        resultFrameTupleAcc.reset(prevCurrentBuffer);
+                        resultTidx = 0;
+                    }
+                }
+            }
+
+            if (advanceCursor) {
+                invListTidx++;
+                if (invListCursor.hasNext()) {
+                    invListCursor.next();
+                }
+            }
+        }
+
+        // append remaining elements from previous result set
+        while (resultTidx < resultFrameTupleAcc.getTupleCount()) {
+
+            resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));
+
+            int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
+                    resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
+            if (count + numInvLists - invListIx > occurrenceThreshold) {
+                newSearchResult.append(resultTuple, count);
+            }
+
+            resultTidx++;
+            if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
+                prevBufIdx++;
+                if (prevBufIdx <= maxPrevBufIdx) {
+                    prevCurrentBuffer = prevSearchResult.getBuffers().get(prevBufIdx);
+                    resultFrameTupleAcc.reset(prevCurrentBuffer);
+                    resultTidx = 0;
+                }
+            }
+        }
+    }
+
+    protected void mergePrefixList(IInvertedListCursor invListCursor, SearchResult prevSearchResult,
+            SearchResult newSearchResult) throws HyracksDataException, IndexException {
+
+        int prevBufIdx = 0;
+        int maxPrevBufIdx = prevSearchResult.getCurrentBufferIndex();
+        ByteBuffer prevCurrentBuffer = prevSearchResult.getBuffers().get(0);
+
+        FixedSizeFrameTupleAccessor resultFrameTupleAcc = prevSearchResult.getAccessor();
+        FixedSizeTupleReference resultTuple = prevSearchResult.getTuple();
+
+        boolean advanceCursor = true;
+        boolean advancePrevResult = false;
+        int resultTidx = 0;
+
+        resultFrameTupleAcc.reset(prevCurrentBuffer);
+
+        int invListTidx = 0;
+        int invListNumTuples = invListCursor.size();
+
+        if (invListCursor.hasNext())
+            invListCursor.next();
+
+        while (invListTidx < invListNumTuples && resultTidx < resultFrameTupleAcc.getTupleCount()) {
+
+            ITupleReference invListTuple = invListCursor.getTuple();
+            resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));
+
+            int cmp = invListCmp.compare(invListTuple, resultTuple);
+            if (cmp == 0) {
+                int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
+                        resultTuple.getFieldStart(resultTuple.getFieldCount() - 1)) + 1;
+                newSearchResult.append(resultTuple, count);
+                advanceCursor = true;
+                advancePrevResult = true;
+            } else {
+                if (cmp < 0) {
+                    int count = 1;
+                    newSearchResult.append(invListTuple, count);
+                    advanceCursor = true;
+                    advancePrevResult = false;
+                } else {
+                    int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
+                            resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
+                    newSearchResult.append(resultTuple, count);
+                    advanceCursor = false;
+                    advancePrevResult = true;
+                }
+            }
+
+            if (advancePrevResult) {
+                resultTidx++;
+                if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
+                    prevBufIdx++;
+                    if (prevBufIdx <= maxPrevBufIdx) {
+                        prevCurrentBuffer = prevSearchResult.getBuffers().get(prevBufIdx);
+                        resultFrameTupleAcc.reset(prevCurrentBuffer);
+                        resultTidx = 0;
+                    }
+                }
+            }
+
+            if (advanceCursor) {
+                invListTidx++;
+                if (invListCursor.hasNext()) {
+                    invListCursor.next();
+                }
+            }
+        }
+
+        // append remaining new elements from inverted list
+        while (invListTidx < invListNumTuples) {
+            ITupleReference invListTuple = invListCursor.getTuple();
+            newSearchResult.append(invListTuple, 1);
+            invListTidx++;
+            if (invListCursor.hasNext()) {
+                invListCursor.next();
+            }
+        }
+
+        // append remaining elements from previous result set
+        while (resultTidx < resultFrameTupleAcc.getTupleCount()) {
+
+            resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));
+
+            int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
+                    resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
+            newSearchResult.append(resultTuple, count);
+
+            resultTidx++;
+            if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
+                prevBufIdx++;
+                if (prevBufIdx <= maxPrevBufIdx) {
+                    prevCurrentBuffer = prevSearchResult.getBuffers().get(prevBufIdx);
+                    resultFrameTupleAcc.reset(prevCurrentBuffer);
+                    resultTidx = 0;
+                }
+            }
+        }
+    }
+
+    public SearchResult createSearchResult() {
+        return new SearchResult(prevSearchResult);
+    }
+
+    public void reset() {
+        prevSearchResult.clear();
+        newSearchResult.clear();
+    }
+}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/InvertedListPartitions.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/InvertedListPartitions.java
new file mode 100644
index 0000000..99e61bd
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/InvertedListPartitions.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IObjectFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.ObjectCache;
+
+public class InvertedListPartitions {
+    private final int DEFAULT_NUM_PARTITIONS = 10;
+    private final int PARTITIONS_SLACK_SIZE = 10;
+    private final int OBJECT_CACHE_INIT_SIZE = 10;
+    private final int OBJECT_CACHE_EXPAND_SIZE = 10;
+    private final IObjectFactory<ArrayList<IInvertedListCursor>> arrayListFactory;
+    private final ObjectCache<ArrayList<IInvertedListCursor>> arrayListCache;
+    private ArrayList<IInvertedListCursor>[] partitions;
+    private int minValidPartitionIndex;
+    private int maxValidPartitionIndex;
+
+    public InvertedListPartitions() {
+        this.arrayListFactory = new ArrayListFactory<IInvertedListCursor>();
+        this.arrayListCache = new ObjectCache<ArrayList<IInvertedListCursor>>(arrayListFactory, OBJECT_CACHE_INIT_SIZE,
+                OBJECT_CACHE_EXPAND_SIZE);
+    }
+
+    @SuppressWarnings("unchecked")
+    public void reset(int numTokensLowerBound, int numTokensUpperBound) {
+        if (partitions == null) {
+            int initialSize;
+            if (numTokensUpperBound < 0) {
+                initialSize = DEFAULT_NUM_PARTITIONS;
+            } else {
+                initialSize = numTokensUpperBound + 1;
+            }
+            partitions = (ArrayList<IInvertedListCursor>[]) new ArrayList[initialSize];
+        } else {
+            if (numTokensUpperBound + 1 >= partitions.length) {
+                partitions = Arrays.copyOf(partitions, numTokensUpperBound + 1);
+            }
+            Arrays.fill(partitions, null);
+        }
+        arrayListCache.reset();
+        minValidPartitionIndex = Integer.MAX_VALUE;
+        maxValidPartitionIndex = Integer.MIN_VALUE;
+    }
+
+    public void addInvertedListCursor(IInvertedListCursor listCursor, int numTokens) {
+        if (numTokens + 1 >= partitions.length) {
+            partitions = Arrays.copyOf(partitions, numTokens + PARTITIONS_SLACK_SIZE);
+        }
+        ArrayList<IInvertedListCursor> partitionCursors = partitions[numTokens];
+        if (partitionCursors == null) {
+            partitionCursors = arrayListCache.getNext();
+            partitionCursors.clear();
+            partitions[numTokens] = partitionCursors;
+            // Update range of valid partitions.
+            if (numTokens < minValidPartitionIndex) {
+                minValidPartitionIndex = numTokens;
+            }
+            if (numTokens > maxValidPartitionIndex) {
+                maxValidPartitionIndex = numTokens;
+            }
+        }
+        partitionCursors.add(listCursor);
+    }
+
+    public ArrayList<IInvertedListCursor>[] getPartitions() {
+        return partitions;
+    }
+
+    public int getMinValidPartitionIndex() {
+        return minValidPartitionIndex;
+    }
+
+    public int getMaxValidPartitionIndex() {
+        return maxValidPartitionIndex;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/JaccardSearchModifier.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/JaccardSearchModifier.java
index 4cf7e40..b005905 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/JaccardSearchModifier.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/JaccardSearchModifier.java
@@ -31,13 +31,23 @@
     }
 
     @Override
-    public int getNumPrefixLists(int numQueryTokens) {
-        if (numQueryTokens == 0) {
+    public int getNumPrefixLists(int occurrenceThreshold, int numInvLists) {
+        if (numInvLists == 0) {
             return 0;
         }
-        return numQueryTokens - getOccurrenceThreshold(numQueryTokens) + 1;
+        return numInvLists - occurrenceThreshold + 1;
     }
 
+    @Override
+    public int getNumTokensLowerBound(int numQueryTokens) {
+        return (int) Math.floor(numQueryTokens * jaccThresh);
+    }
+
+    @Override
+    public int getNumTokensUpperBound(int numQueryTokens) {
+        return (int) Math.ceil(numQueryTokens / jaccThresh);
+    }
+    
     public float getJaccThresh() {
         return jaccThresh;
     }
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/PartitionedTOccurrenceSearcher.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/PartitionedTOccurrenceSearcher.java
new file mode 100644
index 0000000..81591e6
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/PartitionedTOccurrenceSearcher.java
@@ -0,0 +1,149 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexOperationContext;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.tuples.ConcatenatingTupleReference;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexSearchModifier;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IPartitionedInvertedIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.exceptions.OccurrenceThresholdPanicException;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.OnDiskInvertedIndexSearchCursor;
+
+public class PartitionedTOccurrenceSearcher extends AbstractTOccurrenceSearcher {
+
+    protected final ArrayTupleBuilder lowerBoundTupleBuilder = new ArrayTupleBuilder(1);
+    protected final ArrayTupleReference lowerBoundTuple = new ArrayTupleReference();
+    protected final ArrayTupleBuilder upperBoundTupleBuilder = new ArrayTupleBuilder(1);
+    protected final ArrayTupleReference upperBoundTuple = new ArrayTupleReference();
+    protected final ConcatenatingTupleReference fullLowSearchKey = new ConcatenatingTupleReference(2);
+    protected final ConcatenatingTupleReference fullHighSearchKey = new ConcatenatingTupleReference(2);
+
+    protected final InvertedListPartitions partitions = new InvertedListPartitions();
+
+    public PartitionedTOccurrenceSearcher(IHyracksCommonContext ctx, IInvertedIndex invIndex) {
+        super(ctx, invIndex);
+        initHelperTuples();
+    }
+
+    private void initHelperTuples() {
+        try {
+            lowerBoundTupleBuilder.reset();
+            // Write dummy value.
+            lowerBoundTupleBuilder.getDataOutput().writeInt(Integer.MIN_VALUE);
+            lowerBoundTupleBuilder.addFieldEndOffset();
+            lowerBoundTuple.reset(lowerBoundTupleBuilder.getFieldEndOffsets(), lowerBoundTupleBuilder.getByteArray());
+            // Only needed for setting the number of fields in searchKey.
+            searchKey.reset(queryTokenAccessor, 0);
+            fullLowSearchKey.reset();
+            fullLowSearchKey.addTuple(searchKey);
+            fullLowSearchKey.addTuple(lowerBoundTuple);
+
+            upperBoundTupleBuilder.reset();
+            // Write dummy value.
+            upperBoundTupleBuilder.getDataOutput().writeInt(Integer.MAX_VALUE);
+            upperBoundTupleBuilder.addFieldEndOffset();
+            upperBoundTuple.reset(upperBoundTupleBuilder.getFieldEndOffsets(), upperBoundTupleBuilder.getByteArray());
+            // Only needed for setting the number of fields in searchKey.
+            searchKey.reset(queryTokenAccessor, 0);
+            fullHighSearchKey.reset();
+            fullHighSearchKey.addTuple(searchKey);
+            fullHighSearchKey.addTuple(upperBoundTuple);
+        } catch (IOException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    public void search(OnDiskInvertedIndexSearchCursor resultCursor, InvertedIndexSearchPredicate searchPred,
+            IIndexOperationContext ictx) throws HyracksDataException, IndexException {
+        tokenizeQuery(searchPred);
+        int numQueryTokens = queryTokenAccessor.getTupleCount();
+
+        IInvertedIndexSearchModifier searchModifier = searchPred.getSearchModifier();
+        int numTokensLowerBound = searchModifier.getNumTokensLowerBound(numQueryTokens);
+        int numTokensUpperBound = searchModifier.getNumTokensUpperBound(numQueryTokens);
+
+        IPartitionedInvertedIndex partInvIndex = (IPartitionedInvertedIndex) invIndex;
+        invListCursorCache.reset();
+        partitions.reset(numTokensLowerBound, numTokensUpperBound);
+        for (int i = 0; i < numQueryTokens; i++) {
+            searchKey.reset(queryTokenAccessor, i);
+            partInvIndex.openInvertedListPartitionCursors(this, ictx, numTokensLowerBound, numTokensUpperBound,
+                    partitions);
+        }
+
+        occurrenceThreshold = searchModifier.getOccurrenceThreshold(numQueryTokens);
+        if (occurrenceThreshold <= 0) {
+            throw new OccurrenceThresholdPanicException("Merge Threshold is <= 0. Failing Search.");
+        }
+
+        // Process the partitions one-by-one.
+        ArrayList<IInvertedListCursor>[] partitionCursors = partitions.getPartitions();
+        int start = partitions.getMinValidPartitionIndex();
+        int end = partitions.getMaxValidPartitionIndex();
+        searchResult.reset();
+        for (int i = start; i <= end; i++) {
+            if (partitionCursors[i] == null) {
+                continue;
+            }
+            // Prune partition because no element in it can satisfy the occurrence threshold.
+            if (partitionCursors[i].size() < occurrenceThreshold) {
+                partInvIndex.cleanupPartitionState(partitionCursors[i]);
+                continue;
+            }
+            // Merge inverted lists of current partition.
+            int numPrefixLists = searchModifier.getNumPrefixLists(occurrenceThreshold, partitionCursors[i].size());
+            invListMerger.reset();
+            invListMerger.merge(partitionCursors[i], occurrenceThreshold, numPrefixLists, searchResult);
+        }
+
+        resultCursor.open(null, searchPred);
+    }
+
+    public void setNumTokensBoundsInSearchKeys(int numTokensLowerBound, int numTokensUpperBound) {
+        IntegerSerializerDeserializer.putInt(numTokensLowerBound, lowerBoundTuple.getFieldData(0),
+                lowerBoundTuple.getFieldStart(0));
+        IntegerSerializerDeserializer.putInt(numTokensUpperBound, upperBoundTuple.getFieldData(0),
+                upperBoundTuple.getFieldStart(0));
+    }
+
+    public ITupleReference getPrefixSearchKey() {
+        return searchKey;
+    }
+
+    public ITupleReference getFullLowSearchKey() {
+        return fullLowSearchKey;
+    }
+
+    public ITupleReference getFullHighSearchKey() {
+        return fullHighSearchKey;
+    }
+
+    public IInvertedListCursor getCachedInvertedListCursor() {
+        return invListCursorCache.getNext();
+    }
+}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/SearchResult.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/SearchResult.java
new file mode 100644
index 0000000..aa0d3f2
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/SearchResult.java
@@ -0,0 +1,182 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+
+import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeFrameTupleAccessor;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeFrameTupleAppender;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeTupleReference;
+
+/**
+ * Byte-buffer backed storage for intermediate and final results of inverted-index searches.
+ */
+// TODO: Rename members.
+public class SearchResult {
+    protected final ArrayList<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
+    protected final IHyracksCommonContext ctx;
+    protected final FixedSizeFrameTupleAppender appender;
+    protected final FixedSizeFrameTupleAccessor accessor;
+    protected final FixedSizeTupleReference tuple;
+    protected final ITypeTraits[] typeTraits;
+    protected final int invListElementSize;
+
+    protected int currBufIdx;
+    protected int numResults;
+
+    public SearchResult(ITypeTraits[] invListFields, IHyracksCommonContext ctx) {
+        typeTraits = new ITypeTraits[invListFields.length + 1];
+        int tmp = 0;
+        for (int i = 0; i < invListFields.length; i++) {
+            typeTraits[i] = invListFields[i];
+            tmp += invListFields[i].getFixedLength();
+        }
+        invListElementSize = tmp;
+        // Integer for counting occurrences.
+        typeTraits[invListFields.length] = IntegerPointable.TYPE_TRAITS;
+        this.ctx = ctx;
+        appender = new FixedSizeFrameTupleAppender(ctx.getFrameSize(), typeTraits);
+        accessor = new FixedSizeFrameTupleAccessor(ctx.getFrameSize(), typeTraits);
+        tuple = new FixedSizeTupleReference(typeTraits);
+        buffers.add(ctx.allocateFrame());
+    }
+
+    /**
+     * Initialize from other search-result object to share member instances except for result buffers.
+     */
+    public SearchResult(SearchResult other) {
+        this.ctx = other.ctx;
+        this.appender = other.appender;
+        this.accessor = other.accessor;
+        this.tuple = other.tuple;
+        this.typeTraits = other.typeTraits;
+        this.invListElementSize = other.invListElementSize;
+        buffers.add(ctx.allocateFrame());
+    }
+
+    public FixedSizeFrameTupleAccessor getAccessor() {
+        return accessor;
+    }
+
+    public FixedSizeFrameTupleAppender getAppender() {
+        return appender;
+    }
+
+    public FixedSizeTupleReference getTuple() {
+        return tuple;
+    }
+
+    public ArrayList<ByteBuffer> getBuffers() {
+        return buffers;
+    }
+
+    public void reset() {
+        currBufIdx = 0;
+        numResults = 0;
+        appender.reset(buffers.get(0), true);
+    }
+
+    public void clear() {
+        currBufIdx = 0;
+        numResults = 0;
+        for (ByteBuffer buffer : buffers) {
+            appender.reset(buffer, true);
+        }
+    }
+
+    public void append(ITupleReference invListElement, int count) {
+        ByteBuffer currentBuffer = buffers.get(currBufIdx);
+        if (!appender.hasSpace()) {
+            currBufIdx++;
+            if (currBufIdx >= buffers.size()) {
+                buffers.add(ctx.allocateFrame());
+            }
+            currentBuffer = buffers.get(currBufIdx);
+            appender.reset(currentBuffer, true);
+        }
+        // Append inverted-list element.
+        if (!appender.append(invListElement.getFieldData(0), invListElement.getFieldStart(0), invListElementSize)) {
+            throw new IllegalStateException();
+        }
+        // Append count.
+        if (!appender.append(count)) {
+            throw new IllegalStateException();
+        }
+        appender.incrementTupleCount(1);
+        numResults++;
+    }
+
+    public int getCurrentBufferIndex() {
+        return currBufIdx;
+    }
+
+    public ITypeTraits[] getTypeTraits() {
+        return typeTraits;
+    }
+
+    public int getNumResults() {
+        return numResults;
+    }
+
+    // TODO: This code may help to clean up the core list-merging algorithms.
+    /*
+    public SearchResultCursor getCursor() {
+        cursor.reset();
+        return cursor;
+    }
+    
+    public class SearchResultCursor {
+        private int bufferIndex;
+        private int resultIndex;
+        private int frameResultIndex;
+        private ByteBuffer currentBuffer;
+
+        public void reset() {
+            bufferIndex = 0;
+            resultIndex = 0;
+            frameResultIndex = 0;
+            currentBuffer = buffers.get(0);
+            resultFrameTupleAcc.reset(currentBuffer);
+        }
+
+        public boolean hasNext() {
+            return resultIndex < numResults;
+        }
+
+        public void next() {
+            resultTuple.reset(currentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(frameResultIndex));            
+            if (frameResultIndex < resultFrameTupleAcc.getTupleCount()) {
+                frameResultIndex++;
+            } else {
+                bufferIndex++;
+                currentBuffer = buffers.get(bufferIndex);
+                resultFrameTupleAcc.reset(currentBuffer);
+                frameResultIndex = 0;
+            }            
+            resultIndex++;
+        }
+
+        public ITupleReference getTuple() {
+            return resultTuple;
+        }
+    }
+    */
+}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/TOccurrenceSearcher.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/TOccurrenceSearcher.java
index 419d5aa..4513540 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/TOccurrenceSearcher.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/TOccurrenceSearcher.java
@@ -15,522 +15,49 @@
 
 package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search;
 
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
 
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
 import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexOperationContext;
 import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
-import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndex;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexSearchModifier;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexSearcher;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.exceptions.OccurrenceThresholdPanicException;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeFrameTupleAccessor;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeFrameTupleAppender;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeTupleReference;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.OnDiskInvertedIndexSearchCursor;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizer;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IToken;
 
-// TODO: The search procedure is rather confusing regarding cursor positions, hasNext() calls etc.
-// Needs an overhaul some time.
-public class TOccurrenceSearcher implements IInvertedIndexSearcher {
+public class TOccurrenceSearcher extends AbstractTOccurrenceSearcher {
 
-    protected final IHyracksCommonContext ctx;
-    protected final FixedSizeFrameTupleAppender resultFrameTupleApp;
-    protected final FixedSizeFrameTupleAccessor resultFrameTupleAcc;
-    protected final FixedSizeTupleReference resultTuple;
-    protected final int invListKeyLength;
-    protected int currentNumResults;
-
-    protected List<ByteBuffer> newResultBuffers = new ArrayList<ByteBuffer>();
-    protected List<ByteBuffer> prevResultBuffers = new ArrayList<ByteBuffer>();
-    protected List<ByteBuffer> swap = null;
-    protected int maxResultBufIdx = 0;
-
-    protected RecordDescriptor queryTokenRecDesc = new RecordDescriptor(
-            new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
-    protected ArrayTupleBuilder queryTokenBuilder = new ArrayTupleBuilder(queryTokenRecDesc.getFieldCount());
-    protected DataOutput queryTokenDos = queryTokenBuilder.getDataOutput();
-    protected FrameTupleAppender queryTokenAppender;
-    protected ByteBuffer queryTokenFrame;
-    protected final FrameTupleReference searchKey = new FrameTupleReference();
-
-    protected final IInvertedIndex invIndex;
-    protected final MultiComparator invListCmp;
-    protected final ITypeTraits[] invListFieldsWithCount;
-    protected int occurrenceThreshold;
-
-    protected final int cursorCacheSize = 10;
-    protected List<IInvertedListCursor> invListCursorCache = new ArrayList<IInvertedListCursor>(cursorCacheSize);
-    protected List<IInvertedListCursor> invListCursors = new ArrayList<IInvertedListCursor>(cursorCacheSize);
+    protected final ArrayList<IInvertedListCursor> invListCursors = new ArrayList<IInvertedListCursor>();
 
     public TOccurrenceSearcher(IHyracksCommonContext ctx, IInvertedIndex invIndex) {
-        this.ctx = ctx;
-        this.invIndex = invIndex;
-        this.invListCmp = MultiComparator.create(invIndex.getInvListCmpFactories());
-
-        ITypeTraits[] invListFields = invIndex.getInvListTypeTraits();
-        invListFieldsWithCount = new ITypeTraits[invListFields.length + 1];
-        int tmp = 0;
-        for (int i = 0; i < invListFields.length; i++) {
-            invListFieldsWithCount[i] = invListFields[i];
-            tmp += invListFields[i].getFixedLength();
-        }
-        // using an integer for counting occurrences
-        invListFieldsWithCount[invListFields.length] = IntegerPointable.TYPE_TRAITS;
-        invListKeyLength = tmp;
-
-        resultFrameTupleApp = new FixedSizeFrameTupleAppender(ctx.getFrameSize(), invListFieldsWithCount);
-        resultFrameTupleAcc = new FixedSizeFrameTupleAccessor(ctx.getFrameSize(), invListFieldsWithCount);
-        resultTuple = new FixedSizeTupleReference(invListFieldsWithCount);
-        newResultBuffers.add(ctx.allocateFrame());
-        prevResultBuffers.add(ctx.allocateFrame());
-
-        // Pre-create cursor objects.
-        for (int i = 0; i < cursorCacheSize; i++) {
-            invListCursorCache.add(invIndex.createInvertedListCursor());
-        }
-
-        queryTokenAppender = new FrameTupleAppender(ctx.getFrameSize());
-        queryTokenFrame = ctx.allocateFrame();
-
-        currentNumResults = 0;
-    }
-
-    public void reset() {
-        for (ByteBuffer b : newResultBuffers) {
-            resultFrameTupleApp.reset(b, true);
-        }
-        for (ByteBuffer b : prevResultBuffers) {
-            resultFrameTupleApp.reset(b, true);
-        }
-        currentNumResults = 0;
+        super(ctx, invIndex);
     }
 
     public void search(OnDiskInvertedIndexSearchCursor resultCursor, InvertedIndexSearchPredicate searchPred,
             IIndexOperationContext ictx) throws HyracksDataException, IndexException {
-        ITupleReference queryTuple = searchPred.getQueryTuple();
-        int queryFieldIndex = searchPred.getQueryFieldIndex();
-        IInvertedIndexSearchModifier searchModifier = searchPred.getSearchModifier();
-        IBinaryTokenizer queryTokenizer = searchPred.getQueryTokenizer();
-
-        queryTokenAppender.reset(queryTokenFrame, true);
-        queryTokenizer.reset(queryTuple.getFieldData(queryFieldIndex), queryTuple.getFieldStart(queryFieldIndex),
-                queryTuple.getFieldLength(queryFieldIndex));
-
-        while (queryTokenizer.hasNext()) {
-            queryTokenizer.next();
-            queryTokenBuilder.reset();
-            try {
-                IToken token = queryTokenizer.getToken();
-                token.serializeToken(queryTokenDos);
-                queryTokenBuilder.addFieldEndOffset();
-                // WARNING: assuming one frame is big enough to hold all tokens
-                queryTokenAppender.append(queryTokenBuilder.getFieldEndOffsets(), queryTokenBuilder.getByteArray(), 0,
-                        queryTokenBuilder.getSize());
-            } catch (IOException e) {
-                throw new HyracksDataException(e);
-            }
-        }
-
-        FrameTupleAccessor queryTokenAccessor = new FrameTupleAccessor(ctx.getFrameSize(), queryTokenRecDesc);
-        queryTokenAccessor.reset(queryTokenFrame);
+        tokenizeQuery(searchPred);
         int numQueryTokens = queryTokenAccessor.getTupleCount();
 
-        // Expand cursor cache if necessary.
-        if (numQueryTokens > invListCursorCache.size()) {
-            int diff = numQueryTokens - invListCursorCache.size();
-            for (int i = 0; i < diff; i++) {
-                invListCursorCache.add(invIndex.createInvertedListCursor());
-            }
-        }
-
         invListCursors.clear();
+        invListCursorCache.reset();
         for (int i = 0; i < numQueryTokens; i++) {
             searchKey.reset(queryTokenAccessor, i);
-            invIndex.openInvertedListCursor(invListCursorCache.get(i), searchKey, ictx);
-            invListCursors.add(invListCursorCache.get(i));
+            IInvertedListCursor invListCursor = invListCursorCache.getNext();
+            invIndex.openInvertedListCursor(invListCursor, searchKey, ictx);
+            invListCursors.add(invListCursor);
         }
-        Collections.sort(invListCursors);
-        
-        occurrenceThreshold = searchModifier.getOccurrenceThreshold(invListCursors.size());
-        // TODO: deal with panic cases properly
+
+        IInvertedIndexSearchModifier searchModifier = searchPred.getSearchModifier();
+        occurrenceThreshold = searchModifier.getOccurrenceThreshold(numQueryTokens);
         if (occurrenceThreshold <= 0) {
-            throw new OccurrenceThresholdPanicException("Merge Threshold is <= 0. Failing Search.");
+            throw new OccurrenceThresholdPanicException("Merge threshold is <= 0. Failing Search.");
         }
-        
-        int numPrefixLists = searchModifier.getNumPrefixLists(invListCursors.size());
-        maxResultBufIdx = mergePrefixLists(numPrefixLists, numQueryTokens);
-        maxResultBufIdx = mergeSuffixLists(numPrefixLists, numQueryTokens, maxResultBufIdx);
+        int numPrefixLists = searchModifier.getNumPrefixLists(occurrenceThreshold, invListCursors.size());
 
+        searchResult.reset();
+        invListMerger.merge(invListCursors, occurrenceThreshold, numPrefixLists, searchResult);
         resultCursor.open(null, searchPred);
     }
-
-    protected int mergePrefixLists(int numPrefixTokens, int numQueryTokens) throws HyracksDataException, IndexException {
-        int maxPrevBufIdx = 0;
-        for (int i = 0; i < numPrefixTokens; i++) {
-            swap = prevResultBuffers;
-            prevResultBuffers = newResultBuffers;
-            newResultBuffers = swap;
-            currentNumResults = 0;
-
-            invListCursors.get(i).pinPages();
-            maxPrevBufIdx = mergePrefixList(invListCursors.get(i), prevResultBuffers, maxPrevBufIdx, newResultBuffers);
-            invListCursors.get(i).unpinPages();
-        }
-        return maxPrevBufIdx;
-    }
-
-    protected int mergeSuffixLists(int numPrefixTokens, int numQueryTokens, int maxPrevBufIdx)
-            throws HyracksDataException, IndexException {
-        for (int i = numPrefixTokens; i < numQueryTokens; i++) {
-            swap = prevResultBuffers;
-            prevResultBuffers = newResultBuffers;
-            newResultBuffers = swap;
-
-            invListCursors.get(i).pinPages();
-            int numInvListElements = invListCursors.get(i).size();
-            // should we binary search the next list or should we sort-merge it?
-            if (currentNumResults * Math.log(numInvListElements) < currentNumResults + numInvListElements) {
-                maxPrevBufIdx = mergeSuffixListProbe(invListCursors.get(i), prevResultBuffers, maxPrevBufIdx,
-                        newResultBuffers, i, numQueryTokens);
-            } else {
-                maxPrevBufIdx = mergeSuffixListScan(invListCursors.get(i), prevResultBuffers, maxPrevBufIdx,
-                        newResultBuffers, i, numQueryTokens);
-            }
-            invListCursors.get(i).unpinPages();
-        }
-        return maxPrevBufIdx;
-    }
-
-    protected int mergeSuffixListProbe(IInvertedListCursor invListCursor, List<ByteBuffer> prevResultBuffers,
-            int maxPrevBufIdx, List<ByteBuffer> newResultBuffers, int invListIx, int numQueryTokens) throws HyracksDataException, IndexException {
-
-        int newBufIdx = 0;
-        ByteBuffer newCurrentBuffer = newResultBuffers.get(0);
-
-        int prevBufIdx = 0;
-        ByteBuffer prevCurrentBuffer = prevResultBuffers.get(0);
-
-        int resultTidx = 0;
-
-        currentNumResults = 0;
-
-        resultFrameTupleAcc.reset(prevCurrentBuffer);
-        resultFrameTupleApp.reset(newCurrentBuffer, true);
-
-        while (resultTidx < resultFrameTupleAcc.getTupleCount()) {
-
-            resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));
-            int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
-                    resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
-
-            if (invListCursor.containsKey(resultTuple, invListCmp)) {
-                count++;
-                newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
-            } else {
-                if (count + numQueryTokens - invListIx > occurrenceThreshold) {
-                    newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
-                }
-            }
-
-            resultTidx++;
-            if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
-                prevBufIdx++;
-                if (prevBufIdx <= maxPrevBufIdx) {
-                    prevCurrentBuffer = prevResultBuffers.get(prevBufIdx);
-                    resultFrameTupleAcc.reset(prevCurrentBuffer);
-                    resultTidx = 0;
-                }
-            }
-        }
-
-        return newBufIdx;
-    }
-
-    protected int mergeSuffixListScan(IInvertedListCursor invListCursor, List<ByteBuffer> prevResultBuffers,
-            int maxPrevBufIdx, List<ByteBuffer> newResultBuffers, int invListIx, int numQueryTokens)
-            throws HyracksDataException, IndexException {
-        
-        int newBufIdx = 0;
-        ByteBuffer newCurrentBuffer = newResultBuffers.get(0);
-
-        int prevBufIdx = 0;
-        ByteBuffer prevCurrentBuffer = prevResultBuffers.get(0);
-
-        boolean advanceCursor = true;
-        boolean advancePrevResult = false;
-        int resultTidx = 0;
-
-        currentNumResults = 0;
-        
-        resultFrameTupleAcc.reset(prevCurrentBuffer);
-        resultFrameTupleApp.reset(newCurrentBuffer, true);
-
-        int invListTidx = 0;
-        int invListNumTuples = invListCursor.size();
-
-        if (invListCursor.hasNext())
-            invListCursor.next();
-
-        while (invListTidx < invListNumTuples && resultTidx < resultFrameTupleAcc.getTupleCount()) {
-
-            ITupleReference invListTuple = invListCursor.getTuple();
-
-            resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));
-
-            int cmp = invListCmp.compare(invListTuple, resultTuple);
-            if (cmp == 0) {
-                int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
-                        resultTuple.getFieldStart(resultTuple.getFieldCount() - 1)) + 1;
-                newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
-                advanceCursor = true;
-                advancePrevResult = true;
-            } else {
-                if (cmp < 0) {
-                    advanceCursor = true;
-                    advancePrevResult = false;
-                } else {
-                    int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
-                            resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
-                    if (count + numQueryTokens - invListIx > occurrenceThreshold) {
-                        newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
-                    }
-                    advanceCursor = false;
-                    advancePrevResult = true;
-                }
-            }
-
-            if (advancePrevResult) {
-                resultTidx++;
-                if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
-                    prevBufIdx++;
-                    if (prevBufIdx <= maxPrevBufIdx) {
-                        prevCurrentBuffer = prevResultBuffers.get(prevBufIdx);
-                        resultFrameTupleAcc.reset(prevCurrentBuffer);
-                        resultTidx = 0;
-                    }
-                }
-            }
-
-            if (advanceCursor) {
-                invListTidx++;
-                if (invListCursor.hasNext()) {
-                    invListCursor.next();
-                }
-            }
-        }
-
-        // append remaining elements from previous result set
-        while (resultTidx < resultFrameTupleAcc.getTupleCount()) {
-
-            resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));
-
-            int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
-                    resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
-            if (count + numQueryTokens - invListIx > occurrenceThreshold) {
-                newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
-            }
-
-            resultTidx++;
-            if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
-                prevBufIdx++;
-                if (prevBufIdx <= maxPrevBufIdx) {
-                    prevCurrentBuffer = prevResultBuffers.get(prevBufIdx);
-                    resultFrameTupleAcc.reset(prevCurrentBuffer);
-                    resultTidx = 0;
-                }
-            }
-        }
-
-        return newBufIdx;
-    }
-
-    protected int mergePrefixList(IInvertedListCursor invListCursor, List<ByteBuffer> prevResultBuffers,
-            int maxPrevBufIdx, List<ByteBuffer> newResultBuffers) throws HyracksDataException, IndexException {
-        
-        int newBufIdx = 0;
-        ByteBuffer newCurrentBuffer = newResultBuffers.get(0);
-
-        int prevBufIdx = 0;
-        ByteBuffer prevCurrentBuffer = prevResultBuffers.get(0);
-
-        boolean advanceCursor = true;
-        boolean advancePrevResult = false;
-        int resultTidx = 0;
-
-        resultFrameTupleAcc.reset(prevCurrentBuffer);
-        resultFrameTupleApp.reset(newCurrentBuffer, true);
-
-        int invListTidx = 0;
-        int invListNumTuples = invListCursor.size();
-
-        if (invListCursor.hasNext())
-            invListCursor.next();
-
-        while (invListTidx < invListNumTuples && resultTidx < resultFrameTupleAcc.getTupleCount()) {
-
-            ITupleReference invListTuple = invListCursor.getTuple();
-            resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));
-
-            int cmp = invListCmp.compare(invListTuple, resultTuple);
-            if (cmp == 0) {
-                int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
-                        resultTuple.getFieldStart(resultTuple.getFieldCount() - 1)) + 1;
-                newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
-                advanceCursor = true;
-                advancePrevResult = true;
-            } else {
-                if (cmp < 0) {
-                    int count = 1;
-                    newBufIdx = appendTupleToNewResults(invListTuple, count, newBufIdx);
-                    advanceCursor = true;
-                    advancePrevResult = false;
-                } else {
-                    int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
-                            resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
-                    newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
-                    advanceCursor = false;
-                    advancePrevResult = true;
-                }
-            }
-
-            if (advancePrevResult) {
-                resultTidx++;
-                if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
-                    prevBufIdx++;
-                    if (prevBufIdx <= maxPrevBufIdx) {
-                        prevCurrentBuffer = prevResultBuffers.get(prevBufIdx);
-                        resultFrameTupleAcc.reset(prevCurrentBuffer);
-                        resultTidx = 0;
-                    }
-                }
-            }
-
-            if (advanceCursor) {
-                invListTidx++;
-                if (invListCursor.hasNext()) {
-                    invListCursor.next();
-                }
-            }
-        }
-
-        // append remaining new elements from inverted list
-        while (invListTidx < invListNumTuples) {
-            ITupleReference invListTuple = invListCursor.getTuple();
-            newBufIdx = appendTupleToNewResults(invListTuple, 1, newBufIdx);
-            invListTidx++;
-            if (invListCursor.hasNext()) {
-                invListCursor.next();
-            }
-        }
-
-        // append remaining elements from previous result set
-        while (resultTidx < resultFrameTupleAcc.getTupleCount()) {
-
-            resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));
-
-            int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
-                    resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
-            newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
-
-            resultTidx++;
-            if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
-                prevBufIdx++;
-                if (prevBufIdx <= maxPrevBufIdx) {
-                    prevCurrentBuffer = prevResultBuffers.get(prevBufIdx);
-                    resultFrameTupleAcc.reset(prevCurrentBuffer);
-                    resultTidx = 0;
-                }
-            }
-        }
-
-        return newBufIdx;
-    }
-
-    protected int appendTupleToNewResults(ITupleReference tuple, int newCount, int newBufIdx) {
-        ByteBuffer newCurrentBuffer = newResultBuffers.get(newBufIdx);
-
-        if (!resultFrameTupleApp.hasSpace()) {
-            newBufIdx++;
-            if (newBufIdx >= newResultBuffers.size()) {
-                newResultBuffers.add(ctx.allocateFrame());
-            }
-            newCurrentBuffer = newResultBuffers.get(newBufIdx);
-            resultFrameTupleApp.reset(newCurrentBuffer, true);
-        }
-
-        // append key
-        if (!resultFrameTupleApp.append(tuple.getFieldData(0), tuple.getFieldStart(0), invListKeyLength)) {
-            throw new IllegalStateException();
-        }
-
-        // append new count
-        if (!resultFrameTupleApp.append(newCount)) {
-            throw new IllegalStateException();
-        }
-
-        resultFrameTupleApp.incrementTupleCount(1);
-
-        currentNumResults++;
-
-        return newBufIdx;
-    }
-
-    public IFrameTupleAccessor createResultFrameTupleAccessor() {
-        return new FixedSizeFrameTupleAccessor(ctx.getFrameSize(), invListFieldsWithCount);
-    }
-
-    public ITupleReference createResultFrameTupleReference() {
-        return new FixedSizeTupleReference(invListFieldsWithCount);
-    }
-
-    @Override
-    public List<ByteBuffer> getResultBuffers() {
-        return newResultBuffers;
-    }
-
-    @Override
-    public int getNumValidResultBuffers() {
-        return maxResultBufIdx + 1;
-    }
-
-    public int getOccurrenceThreshold() {
-        return occurrenceThreshold;
-    }
-
-    public void printNewResults(int maxResultBufIdx, List<ByteBuffer> buffer) {
-        StringBuffer strBuffer = new StringBuffer();
-        for (int i = 0; i <= maxResultBufIdx; i++) {
-            ByteBuffer testBuf = buffer.get(i);
-            resultFrameTupleAcc.reset(testBuf);
-            for (int j = 0; j < resultFrameTupleAcc.getTupleCount(); j++) {
-                strBuffer.append(IntegerSerializerDeserializer.getInt(resultFrameTupleAcc.getBuffer().array(),
-                        resultFrameTupleAcc.getFieldStartOffset(j, 0)) + ",");
-                strBuffer.append(IntegerSerializerDeserializer.getInt(resultFrameTupleAcc.getBuffer().array(),
-                        resultFrameTupleAcc.getFieldStartOffset(j, 1)) + " ");
-            }
-        }
-        System.out.println(strBuffer.toString());
-    }
 }
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/TOccurrenceSearcherSuffixProbeOnly.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/TOccurrenceSearcherSuffixProbeOnly.java
deleted file mode 100644
index 630b810..0000000
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/TOccurrenceSearcherSuffixProbeOnly.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
-import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.OnDiskInvertedIndex;
-
-public class TOccurrenceSearcherSuffixProbeOnly extends TOccurrenceSearcher {
-
-	protected final MultiComparator invListCmp;
-	
-    public TOccurrenceSearcherSuffixProbeOnly(IHyracksTaskContext ctx, OnDiskInvertedIndex invIndex) {
-        super(ctx, invIndex);
-        this.invListCmp = MultiComparator.create(invIndex.getInvListCmpFactories());
-    }
-
-    protected int mergeSuffixLists(int numPrefixTokens, int numQueryTokens, int maxPrevBufIdx) throws HyracksDataException, IndexException {
-        for (int i = numPrefixTokens; i < numQueryTokens; i++) {
-            swap = prevResultBuffers;
-            prevResultBuffers = newResultBuffers;
-            newResultBuffers = swap;
-            currentNumResults = 0;
-
-            invListCursors.get(i).pinPages();
-            maxPrevBufIdx = mergeSuffixListProbe(invListCursors.get(i), prevResultBuffers, maxPrevBufIdx,
-                    newResultBuffers, i, numQueryTokens);
-            invListCursors.get(i).unpinPages();
-        }
-        return maxPrevBufIdx;
-    }
-
-    protected int mergeSuffixListProbe(IInvertedListCursor invListCursor, List<ByteBuffer> prevResultBuffers,
-            int maxPrevBufIdx, List<ByteBuffer> newResultBuffers, int invListIx, int numQueryTokens) throws HyracksDataException, IndexException {
-
-        int newBufIdx = 0;
-        ByteBuffer newCurrentBuffer = newResultBuffers.get(0);
-
-        int prevBufIdx = 0;
-        ByteBuffer prevCurrentBuffer = prevResultBuffers.get(0);
-
-        int resultTidx = 0;
-
-        resultFrameTupleAcc.reset(prevCurrentBuffer);
-        resultFrameTupleApp.reset(newCurrentBuffer, true);
-
-        while (resultTidx < resultFrameTupleAcc.getTupleCount()) {
-
-            resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));
-            int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
-                    resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
-
-            if (invListCursor.containsKey(resultTuple, invListCmp)) {
-                count++;
-                newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
-            } else {
-                if (count + numQueryTokens - invListIx > occurrenceThreshold) {
-                    newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
-                }
-            }
-
-            resultTidx++;
-            if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
-                prevBufIdx++;
-                if (prevBufIdx <= maxPrevBufIdx) {
-                    prevCurrentBuffer = prevResultBuffers.get(prevBufIdx);
-                    resultFrameTupleAcc.reset(prevCurrentBuffer);
-                    resultTidx = 0;
-                }
-            }
-        }
-
-        return newBufIdx;
-    }
-}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/TOccurrenceSearcherSuffixScanOnly.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/TOccurrenceSearcherSuffixScanOnly.java
deleted file mode 100644
index 3640511..0000000
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/TOccurrenceSearcherSuffixScanOnly.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
-import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.OnDiskInvertedIndex;
-
-public class TOccurrenceSearcherSuffixScanOnly extends TOccurrenceSearcher {
-
-	protected final MultiComparator invListCmp;
-	
-    public TOccurrenceSearcherSuffixScanOnly(IHyracksTaskContext ctx, OnDiskInvertedIndex invIndex) {
-        super(ctx, invIndex);
-        this.invListCmp = MultiComparator.create(invIndex.getInvListCmpFactories());
-    }
-
-    protected int mergeSuffixLists(int numPrefixTokens, int numQueryTokens, int maxPrevBufIdx) throws HyracksDataException, IndexException {
-        for (int i = numPrefixTokens; i < numQueryTokens; i++) {
-            swap = prevResultBuffers;
-            prevResultBuffers = newResultBuffers;
-            newResultBuffers = swap;
-            currentNumResults = 0;
-
-            invListCursors.get(i).pinPages();
-            maxPrevBufIdx = mergeSuffixListScan(invListCursors.get(i), prevResultBuffers, maxPrevBufIdx,
-                    newResultBuffers, i, numQueryTokens);
-            invListCursors.get(i).unpinPages();
-        }
-        return maxPrevBufIdx;
-    }
-
-    protected int mergeSuffixListScan(IInvertedListCursor invListCursor, List<ByteBuffer> prevResultBuffers,
-            int maxPrevBufIdx, List<ByteBuffer> newResultBuffers, int invListIx, int numQueryTokens) throws HyracksDataException, IndexException {
-
-        int newBufIdx = 0;
-        ByteBuffer newCurrentBuffer = newResultBuffers.get(0);
-
-        int prevBufIdx = 0;
-        ByteBuffer prevCurrentBuffer = prevResultBuffers.get(0);
-
-        boolean advanceCursor = true;
-        boolean advancePrevResult = false;
-        int resultTidx = 0;
-
-        resultFrameTupleAcc.reset(prevCurrentBuffer);
-        resultFrameTupleApp.reset(newCurrentBuffer, true);
-
-        while (invListCursor.hasNext() && resultTidx < resultFrameTupleAcc.getTupleCount()) {
-
-            if (advanceCursor)
-                invListCursor.next();
-
-            ITupleReference invListTuple = invListCursor.getTuple();
-
-            resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));
-
-            int cmp = invListCmp.compare(invListTuple, resultTuple);
-            if (cmp == 0) {
-                int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
-                        resultTuple.getFieldStart(resultTuple.getFieldCount() - 1)) + 1;
-                newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
-                advanceCursor = true;
-                advancePrevResult = true;
-            } else {
-                if (cmp < 0) {
-                    advanceCursor = true;
-                    advancePrevResult = false;
-                } else {
-                    int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
-                            resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
-                    if (count + numQueryTokens - invListIx > occurrenceThreshold) {
-                        newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
-                    }
-                    advanceCursor = false;
-                    advancePrevResult = true;
-                }
-            }
-
-            if (advancePrevResult) {
-                resultTidx++;
-                if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
-                    prevBufIdx++;
-                    if (prevBufIdx <= maxPrevBufIdx) {
-                        prevCurrentBuffer = prevResultBuffers.get(prevBufIdx);
-                        resultFrameTupleAcc.reset(prevCurrentBuffer);
-                        resultTidx = 0;
-                    }
-                }
-            }
-        }
-
-        // append remaining elements from previous result set
-        while (resultTidx < resultFrameTupleAcc.getTupleCount()) {
-
-            int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
-                    resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
-            newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
-
-            resultTidx++;
-            if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
-                prevBufIdx++;
-                if (prevBufIdx <= maxPrevBufIdx) {
-                    prevCurrentBuffer = prevResultBuffers.get(prevBufIdx);
-                    resultFrameTupleAcc.reset(prevCurrentBuffer);
-                    resultTidx = 0;
-                }
-            }
-        }
-
-        return newBufIdx;
-    }
-}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/InvertedIndexTokenizingTupleIterator.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/InvertedIndexTokenizingTupleIterator.java
index 633213a..4f8f635 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/InvertedIndexTokenizingTupleIterator.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/InvertedIndexTokenizingTupleIterator.java
@@ -27,13 +27,13 @@
 // TODO: We can possibly avoid copying the data into a new tuple here.
 public class InvertedIndexTokenizingTupleIterator {
     // Field that is expected to be tokenized.
-    private final int DOC_FIELD_INDEX = 0;
+    protected final int DOC_FIELD_INDEX = 0;
 
-    private final int invListFieldCount;
-    private final ArrayTupleBuilder tupleBuilder;
-    private final ArrayTupleReference tupleReference;
-    private final IBinaryTokenizer tokenizer;
-    private ITupleReference inputTuple;
+    protected final int invListFieldCount;
+    protected final ArrayTupleBuilder tupleBuilder;
+    protected final ArrayTupleReference tupleReference;
+    protected final IBinaryTokenizer tokenizer;
+    protected ITupleReference inputTuple;
 
     public InvertedIndexTokenizingTupleIterator(int tokensFieldCount, int invListFieldCount, IBinaryTokenizer tokenizer) {
         this.invListFieldCount = invListFieldCount;
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/InvertedIndexUtils.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/InvertedIndexUtils.java
index 7da9416..953d72b 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/InvertedIndexUtils.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/InvertedIndexUtils.java
@@ -43,11 +43,15 @@
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedListBuilderFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndex;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndexFileManager;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls.PartitionedLSMInvertedIndex;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.inmemory.InMemoryInvertedIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.inmemory.PartitionedInMemoryInvertedIndex;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeElementInvertedListBuilder;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeElementInvertedListBuilderFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.OnDiskInvertedIndex;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.OnDiskInvertedIndexFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.PartitionedOnDiskInvertedIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.PartitionedOnDiskInvertedIndexFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
 import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
 import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
@@ -63,6 +67,15 @@
                 tokenTypeTraits, tokenCmpFactories, tokenizerFactory);
     }
 
+    public static InMemoryInvertedIndex createPartitionedInMemoryBTreeInvertedindex(IBufferCache memBufferCache,
+            IFreePageManager memFreePageManager, ITypeTraits[] invListTypeTraits,
+            IBinaryComparatorFactory[] invListCmpFactories, ITypeTraits[] tokenTypeTraits,
+            IBinaryComparatorFactory[] tokenCmpFactories, IBinaryTokenizerFactory tokenizerFactory)
+            throws BTreeException {
+        return new PartitionedInMemoryInvertedIndex(memBufferCache, memFreePageManager, invListTypeTraits,
+                invListCmpFactories, tokenTypeTraits, tokenCmpFactories, tokenizerFactory);
+    }
+
     public static OnDiskInvertedIndex createOnDiskInvertedIndex(IBufferCache bufferCache,
             IFileMapProvider fileMapProvider, ITypeTraits[] invListTypeTraits,
             IBinaryComparatorFactory[] invListCmpFactories, ITypeTraits[] tokenTypeTraits,
@@ -73,18 +86,23 @@
                 tokenTypeTraits, tokenCmpFactories, btreeFile, invListsFile);
     }
 
+    public static PartitionedOnDiskInvertedIndex createPartitionedOnDiskInvertedIndex(IBufferCache bufferCache,
+            IFileMapProvider fileMapProvider, ITypeTraits[] invListTypeTraits,
+            IBinaryComparatorFactory[] invListCmpFactories, ITypeTraits[] tokenTypeTraits,
+            IBinaryComparatorFactory[] tokenCmpFactories, FileReference invListsFile) throws IndexException {
+        IInvertedListBuilder builder = new FixedSizeElementInvertedListBuilder(invListTypeTraits);
+        FileReference btreeFile = getBTreeFile(invListsFile);
+        return new PartitionedOnDiskInvertedIndex(bufferCache, fileMapProvider, builder, invListTypeTraits,
+                invListCmpFactories, tokenTypeTraits, tokenCmpFactories, btreeFile, invListsFile);
+    }
+
     public static FileReference getBTreeFile(FileReference invListsFile) {
         return new FileReference(new File(invListsFile.getFile().getPath() + "_btree"));
     }
 
-    public static LSMInvertedIndex createLSMInvertedIndex(IInMemoryBufferCache memBufferCache,
-            IInMemoryFreePageManager memFreePageManager, IFileMapProvider diskFileMapProvider,
+    public static BTreeFactory createDeletedKeysBTreeFactory(IFileMapProvider diskFileMapProvider,
             ITypeTraits[] invListTypeTraits, IBinaryComparatorFactory[] invListCmpFactories,
-            ITypeTraits[] tokenTypeTraits, IBinaryComparatorFactory[] tokenCmpFactories,
-            IBinaryTokenizerFactory tokenizerFactory, IBufferCache diskBufferCache, IIOManager ioManager,
-            String onDiskDir, ILSMFlushController flushController, ILSMMergePolicy mergePolicy,
-            ILSMOperationTrackerFactory opTrackerFactory, ILSMIOOperationScheduler ioScheduler) throws IndexException {
-
+            IBufferCache diskBufferCache) throws BTreeException {
         TypeAwareTupleWriterFactory tupleWriterFactory = new TypeAwareTupleWriterFactory(invListTypeTraits);
         ITreeIndexFrameFactory leafFrameFactory = BTreeUtils.getLeafFrameFactory(tupleWriterFactory,
                 BTreeLeafFrameType.REGULAR_NSM);
@@ -95,6 +113,19 @@
         BTreeFactory deletedKeysBTreeFactory = new BTreeFactory(diskBufferCache, diskFileMapProvider,
                 freePageManagerFactory, interiorFrameFactory, leafFrameFactory, invListCmpFactories,
                 invListCmpFactories.length);
+        return deletedKeysBTreeFactory;
+    }
+
+    public static LSMInvertedIndex createLSMInvertedIndex(IInMemoryBufferCache memBufferCache,
+            IInMemoryFreePageManager memFreePageManager, IFileMapProvider diskFileMapProvider,
+            ITypeTraits[] invListTypeTraits, IBinaryComparatorFactory[] invListCmpFactories,
+            ITypeTraits[] tokenTypeTraits, IBinaryComparatorFactory[] tokenCmpFactories,
+            IBinaryTokenizerFactory tokenizerFactory, IBufferCache diskBufferCache, IIOManager ioManager,
+            String onDiskDir, ILSMFlushController flushController, ILSMMergePolicy mergePolicy,
+            ILSMOperationTrackerFactory opTrackerFactory, ILSMIOOperationScheduler ioScheduler) throws IndexException {
+
+        BTreeFactory deletedKeysBTreeFactory = createDeletedKeysBTreeFactory(diskFileMapProvider, invListTypeTraits,
+                invListCmpFactories, diskBufferCache);
 
         FileReference onDiskDirFileRef = new FileReference(new File(onDiskDir));
         LSMInvertedIndexFileManager fileManager = new LSMInvertedIndexFileManager(ioManager, diskFileMapProvider,
@@ -112,4 +143,32 @@
                 ioScheduler);
         return invIndex;
     }
+
+    public static PartitionedLSMInvertedIndex createPartitionedLSMInvertedIndex(IInMemoryBufferCache memBufferCache,
+            IInMemoryFreePageManager memFreePageManager, IFileMapProvider diskFileMapProvider,
+            ITypeTraits[] invListTypeTraits, IBinaryComparatorFactory[] invListCmpFactories,
+            ITypeTraits[] tokenTypeTraits, IBinaryComparatorFactory[] tokenCmpFactories,
+            IBinaryTokenizerFactory tokenizerFactory, IBufferCache diskBufferCache, IIOManager ioManager,
+            String onDiskDir, ILSMFlushController flushController, ILSMMergePolicy mergePolicy,
+            ILSMOperationTrackerFactory opTrackerFactory, ILSMIOOperationScheduler ioScheduler) throws IndexException {
+
+        BTreeFactory deletedKeysBTreeFactory = createDeletedKeysBTreeFactory(diskFileMapProvider, invListTypeTraits,
+                invListCmpFactories, diskBufferCache);
+
+        FileReference onDiskDirFileRef = new FileReference(new File(onDiskDir));
+        LSMInvertedIndexFileManager fileManager = new LSMInvertedIndexFileManager(ioManager, diskFileMapProvider,
+                onDiskDirFileRef, deletedKeysBTreeFactory);
+
+        IInvertedListBuilderFactory invListBuilderFactory = new FixedSizeElementInvertedListBuilderFactory(
+                invListTypeTraits);
+        PartitionedOnDiskInvertedIndexFactory invIndexFactory = new PartitionedOnDiskInvertedIndexFactory(
+                diskBufferCache, diskFileMapProvider, invListBuilderFactory, invListTypeTraits, invListCmpFactories,
+                tokenTypeTraits, tokenCmpFactories, fileManager);
+
+        PartitionedLSMInvertedIndex invIndex = new PartitionedLSMInvertedIndex(memBufferCache, memFreePageManager,
+                invIndexFactory, deletedKeysBTreeFactory, fileManager, diskFileMapProvider, invListTypeTraits,
+                invListCmpFactories, tokenTypeTraits, tokenCmpFactories, tokenizerFactory, flushController,
+                mergePolicy, opTrackerFactory, ioScheduler);
+        return invIndex;
+    }
 }
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/ObjectCache.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/ObjectCache.java
new file mode 100644
index 0000000..b073f20
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/ObjectCache.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util;
+
+import java.util.ArrayList;
+
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IObjectFactory;
+
+public class ObjectCache<T> {
+    protected final int expandSize;
+    protected final IObjectFactory<T> objFactory;
+    protected final ArrayList<T> cache;
+    protected int lastReturned = 0;
+
+    public ObjectCache(IObjectFactory<T> objFactory, int initialSize, int expandSize) {
+        this.objFactory = objFactory;
+        this.cache = new ArrayList<T>(initialSize);
+        this.expandSize = expandSize;
+        expand(initialSize);
+    }
+
+    private void expand(int expandSize) {
+        for (int i = 0; i < expandSize; i++) {
+            cache.add(objFactory.create());
+        }
+    }
+
+    public void reset() {
+        lastReturned = 0;
+    }
+
+    public T getNext() {
+        if (lastReturned >= cache.size()) {
+            expand(expandSize);
+        }
+        return cache.get(lastReturned++);
+    }
+}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/PartitionedInvertedIndexTokenizingTupleIterator.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/PartitionedInvertedIndexTokenizingTupleIterator.java
new file mode 100644
index 0000000..e0477b2
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/PartitionedInvertedIndexTokenizingTupleIterator.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util;
+
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizer;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IToken;
+
+// TODO: We can possibly avoid copying the data into a new tuple here.
+public class PartitionedInvertedIndexTokenizingTupleIterator extends InvertedIndexTokenizingTupleIterator {
+
+    protected int numTokens = 0;
+
+    public PartitionedInvertedIndexTokenizingTupleIterator(int tokensFieldCount, int invListFieldCount,
+            IBinaryTokenizer tokenizer) {
+        super(tokensFieldCount, invListFieldCount, tokenizer);
+    }
+
+    public void reset(ITupleReference inputTuple) {
+        super.reset(inputTuple);
+        // Run through the tokenizer once to get the total number of tokens.
+        numTokens = 0;
+        while (tokenizer.hasNext()) {
+            tokenizer.next();
+            numTokens++;
+        }
+        super.reset(inputTuple);
+    }
+
+    public void next() throws HyracksDataException {
+        tokenizer.next();
+        IToken token = tokenizer.getToken();
+        tupleBuilder.reset();
+        try {
+            // Add token field.
+            token.serializeToken(tupleBuilder.getDataOutput());
+            tupleBuilder.addFieldEndOffset();
+            // Add field with number of tokens.
+            tupleBuilder.getDataOutput().writeInt(numTokens);
+            tupleBuilder.addFieldEndOffset();
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+        // Add inverted-list element fields.
+        for (int i = 0; i < invListFieldCount; i++) {
+            tupleBuilder.addField(inputTuple.getFieldData(i + 1), inputTuple.getFieldStart(i + 1),
+                    inputTuple.getFieldLength(i + 1));
+        }
+        // Reset tuple reference for insert operation.
+        tupleReference.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray());
+    }
+    
+    public int getNumTokens() {
+        return numTokens;
+    }
+}
diff --git a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexBulkLoadTest.java b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexBulkLoadTest.java
new file mode 100644
index 0000000..f7a36f0
--- /dev/null
+++ b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexBulkLoadTest.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex;
+
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.common.AbstractInvertedIndexLoadTest;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.LSMInvertedIndexTestContext.InvertedIndexType;
+
+public class PartitionedLSMInvertedIndexBulkLoadTest extends AbstractInvertedIndexLoadTest {
+
+    public PartitionedLSMInvertedIndexBulkLoadTest() {
+        super(InvertedIndexType.PARTITIONED_LSM, true, 1);
+    }
+}
diff --git a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexDeleteTest.java b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexDeleteTest.java
new file mode 100644
index 0000000..4fd529b
--- /dev/null
+++ b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexDeleteTest.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex;
+
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.common.AbstractInvertedIndexDeleteTest;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.LSMInvertedIndexTestContext.InvertedIndexType;
+
+public class PartitionedLSMInvertedIndexDeleteTest extends AbstractInvertedIndexDeleteTest {
+
+    public PartitionedLSMInvertedIndexDeleteTest() {
+        super(InvertedIndexType.PARTITIONED_LSM, false);
+    }
+}
\ No newline at end of file
diff --git a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexInsertTest.java b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexInsertTest.java
new file mode 100644
index 0000000..4608f81
--- /dev/null
+++ b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexInsertTest.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex;
+
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.common.AbstractInvertedIndexLoadTest;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.LSMInvertedIndexTestContext.InvertedIndexType;
+
+public class PartitionedLSMInvertedIndexInsertTest extends AbstractInvertedIndexLoadTest {
+
+    public PartitionedLSMInvertedIndexInsertTest() {
+        super(InvertedIndexType.PARTITIONED_LSM, false, 1);
+    }
+}
diff --git a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexMergeTest.java b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexMergeTest.java
new file mode 100644
index 0000000..52bb3f8
--- /dev/null
+++ b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexMergeTest.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex;
+
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.datagen.TupleGenerator;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import edu.uci.ics.hyracks.storage.am.config.AccessMethodTestsConfig;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.common.AbstractInvertedIndexLoadTest;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.LSMInvertedIndexTestContext;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.LSMInvertedIndexTestContext.InvertedIndexType;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.LSMInvertedIndexTestUtils;
+
+public class PartitionedLSMInvertedIndexMergeTest extends AbstractInvertedIndexLoadTest {
+
+    private final int maxTreesToMerge = AccessMethodTestsConfig.LSM_INVINDEX_MAX_TREES_TO_MERGE;
+    
+    public PartitionedLSMInvertedIndexMergeTest() {
+        super(InvertedIndexType.PARTITIONED_LSM, true, 1);
+    }
+
+    @Override
+    protected void runTest(LSMInvertedIndexTestContext testCtx, TupleGenerator tupleGen) throws IOException,
+            IndexException {
+        IIndex invIndex = testCtx.getIndex();        
+        invIndex.create();
+        invIndex.activate();
+        ILSMIndexAccessor invIndexAccessor = (ILSMIndexAccessor) invIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+
+        for (int i = 0; i < maxTreesToMerge; i++) {
+            for (int j = 0; j < i; j++) {
+                if (bulkLoad) {
+                    LSMInvertedIndexTestUtils.bulkLoadInvIndex(testCtx, tupleGen, NUM_DOCS_TO_INSERT);
+                } else {
+                    LSMInvertedIndexTestUtils.insertIntoInvIndex(testCtx, tupleGen, NUM_DOCS_TO_INSERT);
+                }
+            }
+            // Perform merge.
+            ILSMIOOperation ioop = invIndexAccessor.createMergeOperation(NoOpIOOperationCallback.INSTANCE);
+            if (ioop != null) {
+                invIndexAccessor.merge(ioop);
+            }
+            validateAndCheckIndex(testCtx);
+            runTinySearchWorkload(testCtx, tupleGen);
+        }
+        
+        invIndex.deactivate();
+        invIndex.destroy();
+    }
+}
diff --git a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexMultiBulkLoadTest.java b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexMultiBulkLoadTest.java
new file mode 100644
index 0000000..80a3c0b
--- /dev/null
+++ b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexMultiBulkLoadTest.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex;
+
+import edu.uci.ics.hyracks.storage.am.config.AccessMethodTestsConfig;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.common.AbstractInvertedIndexLoadTest;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.LSMInvertedIndexTestContext.InvertedIndexType;
+
+public class PartitionedLSMInvertedIndexMultiBulkLoadTest extends AbstractInvertedIndexLoadTest {
+
+    public PartitionedLSMInvertedIndexMultiBulkLoadTest() {
+        super(InvertedIndexType.PARTITIONED_LSM, true, AccessMethodTestsConfig.LSM_INVINDEX_NUM_BULKLOAD_ROUNDS);
+    }
+}
diff --git a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexSearchTest.java b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexSearchTest.java
new file mode 100644
index 0000000..c8a7667
--- /dev/null
+++ b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexSearchTest.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex;
+
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.common.AbstractInvertedIndexSearchTest;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.LSMInvertedIndexTestContext.InvertedIndexType;
+
+public class PartitionedLSMInvertedIndexSearchTest extends AbstractInvertedIndexSearchTest {
+
+    public PartitionedLSMInvertedIndexSearchTest() {
+        super(InvertedIndexType.PARTITIONED_LSM, false);
+    }
+}
diff --git a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndexDeleteTest.java b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndexDeleteTest.java
new file mode 100644
index 0000000..eac7765
--- /dev/null
+++ b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndexDeleteTest.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.inmemory;
+
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.common.AbstractInvertedIndexDeleteTest;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.LSMInvertedIndexTestContext.InvertedIndexType;
+
+public class PartitionedInMemoryInvertedIndexDeleteTest extends AbstractInvertedIndexDeleteTest {
+    
+    public PartitionedInMemoryInvertedIndexDeleteTest() {
+        super(InvertedIndexType.PARTITIONED_INMEMORY, false);
+    }
+}
diff --git a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndexInsertTest.java b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndexInsertTest.java
new file mode 100644
index 0000000..8342efd
--- /dev/null
+++ b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndexInsertTest.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.inmemory;
+
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.common.AbstractInvertedIndexLoadTest;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.LSMInvertedIndexTestContext.InvertedIndexType;
+
+public class PartitionedInMemoryInvertedIndexInsertTest extends AbstractInvertedIndexLoadTest {
+
+    public PartitionedInMemoryInvertedIndexInsertTest() {
+        super(InvertedIndexType.PARTITIONED_INMEMORY, false, 1);
+    }
+}
diff --git a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndexSearchTest.java b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndexSearchTest.java
new file mode 100644
index 0000000..385d65d
--- /dev/null
+++ b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndexSearchTest.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.inmemory;
+
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.common.AbstractInvertedIndexSearchTest;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.LSMInvertedIndexTestContext.InvertedIndexType;
+
+public class PartitionedInMemoryInvertedIndexSearchTest extends AbstractInvertedIndexSearchTest {
+
+    public PartitionedInMemoryInvertedIndexSearchTest() {
+        super(InvertedIndexType.PARTITIONED_INMEMORY, false);
+    }
+}
diff --git a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/multithread/LSMInvertedIndexMultiThreadTest.java b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/multithread/LSMInvertedIndexMultiThreadTest.java
index 8b01a40..bd48068 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/multithread/LSMInvertedIndexMultiThreadTest.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/multithread/LSMInvertedIndexMultiThreadTest.java
@@ -39,17 +39,17 @@
 public class LSMInvertedIndexMultiThreadTest {
 
     protected final Logger LOGGER = Logger.getLogger(LSMInvertedIndexMultiThreadTest.class.getName());
-    
+
     // Machine-specific number of threads to use for testing.
     protected final int REGULAR_NUM_THREADS = Runtime.getRuntime().availableProcessors();
     // Excessive number of threads for testing.
     protected final int EXCESSIVE_NUM_THREADS = Runtime.getRuntime().availableProcessors() * 4;
     protected final int NUM_OPERATIONS = AccessMethodTestsConfig.LSM_INVINDEX_MULTITHREAD_NUM_OPERATIONS;
-    
-    private final LSMInvertedIndexTestHarness harness = new LSMInvertedIndexTestHarness();
-    private final LSMInvertedIndexWorkerFactory workerFactory = new LSMInvertedIndexWorkerFactory();
-    private final ArrayList<TestWorkloadConf> workloadConfs = getTestWorkloadConf();
-    
+
+    protected final LSMInvertedIndexTestHarness harness = new LSMInvertedIndexTestHarness();
+    protected final LSMInvertedIndexWorkerFactory workerFactory = new LSMInvertedIndexWorkerFactory();
+    protected final ArrayList<TestWorkloadConf> workloadConfs = getTestWorkloadConf();
+
     protected void setUp() throws HyracksException {
         harness.setUp();
     }
@@ -58,8 +58,8 @@
         harness.tearDown();
     }
 
-    protected void runTest(LSMInvertedIndexTestContext testCtx, TupleGenerator tupleGen, int numThreads, TestWorkloadConf conf,
-            String dataMsg) throws InterruptedException, TreeIndexException, HyracksException {
+    protected void runTest(LSMInvertedIndexTestContext testCtx, TupleGenerator tupleGen, int numThreads,
+            TestWorkloadConf conf, String dataMsg) throws InterruptedException, TreeIndexException, HyracksException {
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("LSMInvertedIndex MultiThread Test:\nData: " + dataMsg + "; Threads: " + numThreads
                     + "; Workload: " + conf.toString() + ".");
@@ -123,23 +123,23 @@
 
         return workloadConfs;
     }
-    
+
     @Test
     public void wordTokensInvIndexTest() throws IOException, IndexException, InterruptedException {
         String dataMsg = "Documents";
         int[] numThreads = new int[] { REGULAR_NUM_THREADS, EXCESSIVE_NUM_THREADS };
         for (int i = 0; i < numThreads.length; i++) {
-            for (TestWorkloadConf conf : workloadConfs) {                
+            for (TestWorkloadConf conf : workloadConfs) {
                 setUp();
                 LSMInvertedIndexTestContext testCtx = LSMInvertedIndexTestUtils.createWordInvIndexTestContext(harness,
-                        InvertedIndexType.LSM);
+                        getIndexType());
                 TupleGenerator tupleGen = LSMInvertedIndexTestUtils.createStringDocumentTupleGen(harness.getRandom());
                 runTest(testCtx, tupleGen, numThreads[i], conf, dataMsg);
                 tearDown();
             }
         }
     }
-    
+
     @Test
     public void hashedNGramTokensInvIndexTest() throws IOException, IndexException, InterruptedException {
         String dataMsg = "Person Names";
@@ -148,11 +148,15 @@
             for (TestWorkloadConf conf : workloadConfs) {
                 setUp();
                 LSMInvertedIndexTestContext testCtx = LSMInvertedIndexTestUtils.createHashedNGramInvIndexTestContext(
-                        harness, InvertedIndexType.LSM);
+                        harness, getIndexType());
                 TupleGenerator tupleGen = LSMInvertedIndexTestUtils.createPersonNamesTupleGen(harness.getRandom());
                 runTest(testCtx, tupleGen, numThreads[i], conf, dataMsg);
                 tearDown();
             }
         }
     }
+
+    protected InvertedIndexType getIndexType() {
+        return InvertedIndexType.LSM;
+    }
 }
diff --git a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/multithread/PartitionedLSMInvertedIndexMultiThreadTest.java b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/multithread/PartitionedLSMInvertedIndexMultiThreadTest.java
new file mode 100644
index 0000000..bc6964f
--- /dev/null
+++ b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/multithread/PartitionedLSMInvertedIndexMultiThreadTest.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.multithread;
+
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.LSMInvertedIndexTestContext.InvertedIndexType;
+
+public class PartitionedLSMInvertedIndexMultiThreadTest extends LSMInvertedIndexMultiThreadTest {
+
+    protected final Logger LOGGER = Logger.getLogger(PartitionedLSMInvertedIndexMultiThreadTest.class.getName());
+    
+    protected InvertedIndexType getIndexType() {
+        return InvertedIndexType.PARTITIONED_LSM;
+    }
+    
+}
diff --git a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndexBulkLoadTest.java b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndexBulkLoadTest.java
new file mode 100644
index 0000000..f641630
--- /dev/null
+++ b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndexBulkLoadTest.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk;
+
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.common.AbstractInvertedIndexLoadTest;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.LSMInvertedIndexTestContext.InvertedIndexType;
+
+public class PartitionedOnDiskInvertedIndexBulkLoadTest extends AbstractInvertedIndexLoadTest {
+
+    public PartitionedOnDiskInvertedIndexBulkLoadTest() {
+        super(InvertedIndexType.PARTITIONED_ONDISK, true, 1);
+    }
+}
diff --git a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndexSearchTest.java b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndexSearchTest.java
new file mode 100644
index 0000000..4fa25ed
--- /dev/null
+++ b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndexSearchTest.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk;
+
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.common.AbstractInvertedIndexSearchTest;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.LSMInvertedIndexTestContext.InvertedIndexType;
+
+public class PartitionedOnDiskInvertedIndexSearchTest extends AbstractInvertedIndexSearchTest {
+
+    public PartitionedOnDiskInvertedIndexSearchTest() {
+        super(InvertedIndexType.PARTITIONED_ONDISK, true);
+    }
+}
diff --git a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestContext.java b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestContext.java
index 34b11c7..22c010d 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestContext.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestContext.java
@@ -45,23 +45,28 @@
     public static enum InvertedIndexType {
         INMEMORY,
         ONDISK,
-        LSM
+        LSM,
+        PARTITIONED_INMEMORY,
+        PARTITIONED_ONDISK,
+        PARTITIONED_LSM
     };
 
     protected IInvertedIndex invIndex;
     protected IBinaryComparatorFactory[] allCmpFactories;
     protected IBinaryTokenizerFactory tokenizerFactory;
+    protected InvertedIndexType invIndexType;
     protected InvertedIndexTokenizingTupleIterator indexTupleIter;
     protected HashSet<Comparable> allTokens = new HashSet<Comparable>();
     protected List<ITupleReference> documentCorpus = new ArrayList<ITupleReference>();
     
     public LSMInvertedIndexTestContext(ISerializerDeserializer[] fieldSerdes, IIndex index,
-            IBinaryTokenizerFactory tokenizerFactory) {
+            IBinaryTokenizerFactory tokenizerFactory, InvertedIndexType invIndexType,
+            InvertedIndexTokenizingTupleIterator indexTupleIter) {
         super(fieldSerdes, index);
-        this.tokenizerFactory = tokenizerFactory;
         invIndex = (IInvertedIndex) index;
-        indexTupleIter = new InvertedIndexTokenizingTupleIterator(invIndex.getTokenTypeTraits().length,
-                invIndex.getInvListTypeTraits().length, tokenizerFactory.createTokenizer());
+        this.tokenizerFactory = tokenizerFactory;
+        this.invIndexType = invIndexType;
+        this.indexTupleIter = indexTupleIter;
     }
 
     @Override
@@ -88,8 +93,9 @@
         return allCmpFactories;
     }
 
-    public static LSMInvertedIndexTestContext create(LSMInvertedIndexTestHarness harness, ISerializerDeserializer[] fieldSerdes,
-            int tokenFieldCount, IBinaryTokenizerFactory tokenizerFactory, InvertedIndexType invIndexType) throws IndexException {
+    public static LSMInvertedIndexTestContext create(LSMInvertedIndexTestHarness harness,
+            ISerializerDeserializer[] fieldSerdes, int tokenFieldCount, IBinaryTokenizerFactory tokenizerFactory,
+            InvertedIndexType invIndexType) throws IndexException {
         ITypeTraits[] allTypeTraits = SerdeUtils.serdesToTypeTraits(fieldSerdes);
         IBinaryComparatorFactory[] allCmpFactories = SerdeUtils.serdesToComparatorFactories(fieldSerdes,
                 fieldSerdes.length);
@@ -108,7 +114,7 @@
             invListTypeTraits[i] = allTypeTraits[i + tokenFieldCount];
             invListCmpFactories[i] = allCmpFactories[i + tokenFieldCount];
         }
-        // Create index and test context.
+        // Create index and test context.        
         IInvertedIndex invIndex;
         switch (invIndexType) {
             case INMEMORY: {
@@ -117,12 +123,24 @@
                         tokenCmpFactories, tokenizerFactory);
                 break;
             }
+            case PARTITIONED_INMEMORY: {
+                invIndex = InvertedIndexUtils.createPartitionedInMemoryBTreeInvertedindex(harness.getMemBufferCache(),
+                        harness.getMemFreePageManager(), invListTypeTraits, invListCmpFactories, tokenTypeTraits,
+                        tokenCmpFactories, tokenizerFactory);
+                break;
+            }
             case ONDISK: {
                 invIndex = InvertedIndexUtils.createOnDiskInvertedIndex(harness.getDiskBufferCache(),
                         harness.getDiskFileMapProvider(), invListTypeTraits, invListCmpFactories, tokenTypeTraits,
                         tokenCmpFactories, harness.getInvListsFileRef());
                 break;
             }
+            case PARTITIONED_ONDISK: {
+                invIndex = InvertedIndexUtils.createPartitionedOnDiskInvertedIndex(harness.getDiskBufferCache(),
+                        harness.getDiskFileMapProvider(), invListTypeTraits, invListCmpFactories, tokenTypeTraits,
+                        tokenCmpFactories, harness.getInvListsFileRef());
+                break;
+            }
             case LSM: {
                 invIndex = InvertedIndexUtils.createLSMInvertedIndex(harness.getMemBufferCache(),
                         harness.getMemFreePageManager(), harness.getDiskFileMapProvider(), invListTypeTraits,
@@ -132,11 +150,42 @@
                         harness.getIOScheduler());
                 break;
             }
+            case PARTITIONED_LSM: {
+                invIndex = InvertedIndexUtils.createPartitionedLSMInvertedIndex(harness.getMemBufferCache(),
+                        harness.getMemFreePageManager(), harness.getDiskFileMapProvider(), invListTypeTraits,
+                        invListCmpFactories, tokenTypeTraits, tokenCmpFactories, tokenizerFactory,
+                        harness.getDiskBufferCache(), harness.getIOManager(), harness.getOnDiskDir(),
+                        harness.getFlushController(), harness.getMergePolicy(), harness.getOperationTrackerFactory(),
+                        harness.getIOScheduler());
+                break;
+            }
             default: {
                 throw new InvertedIndexException("Unknow inverted-index type '" + invIndexType + "'.");
             }
         }
-        LSMInvertedIndexTestContext testCtx = new LSMInvertedIndexTestContext(fieldSerdes, invIndex, tokenizerFactory);
+        InvertedIndexTokenizingTupleIterator indexTupleIter = null;
+        switch (invIndexType) {
+            case INMEMORY:
+            case ONDISK:
+            case LSM: {
+                indexTupleIter = new InvertedIndexTokenizingTupleIterator(invIndex.getTokenTypeTraits().length,
+                        invIndex.getInvListTypeTraits().length, tokenizerFactory.createTokenizer());
+                break;
+            }
+            case PARTITIONED_INMEMORY:
+            case PARTITIONED_ONDISK:
+            case PARTITIONED_LSM: {
+                indexTupleIter = new PartitionedInvertedIndexTokenizingTupleIterator(
+                        invIndex.getTokenTypeTraits().length, invIndex.getInvListTypeTraits().length,
+                        tokenizerFactory.createTokenizer());
+                break;
+            }
+            default: {
+                throw new InvertedIndexException("Unknow inverted-index type '" + invIndexType + "'.");
+            }
+        }
+        LSMInvertedIndexTestContext testCtx = new LSMInvertedIndexTestContext(fieldSerdes, invIndex, tokenizerFactory,
+                invIndexType, indexTupleIter);
         return testCtx;
     }
 
@@ -192,4 +241,8 @@
     public List<ITupleReference> getDocumentCorpus() {
         return documentCorpus;
     }
+    
+    public InvertedIndexType getInvertedIndexType() {
+        return invIndexType;
+    }
 }
diff --git a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestUtils.java b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestUtils.java
index 68824f5..38896fb 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestUtils.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestUtils.java
@@ -99,57 +99,105 @@
         return tupleGen;
     }
 
+    private static ISerializerDeserializer[] getNonHashedIndexFieldSerdes(InvertedIndexType invIndexType)
+            throws IndexException {
+        ISerializerDeserializer[] fieldSerdes = null;
+        switch (invIndexType) {
+            case INMEMORY:
+            case ONDISK:
+            case LSM: {
+                fieldSerdes = new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE };
+                break;
+            }
+            case PARTITIONED_INMEMORY:
+            case PARTITIONED_ONDISK:
+            case PARTITIONED_LSM: {
+                // Such indexes also include the set-size for partitioning.
+                fieldSerdes = new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE };
+                break;
+            }
+            default: {
+                throw new IndexException("Unhandled inverted index type '" + invIndexType + "'.");
+            }
+        }
+        return fieldSerdes;
+    }
+
+    private static ISerializerDeserializer[] getHashedIndexFieldSerdes(InvertedIndexType invIndexType)
+            throws IndexException {
+        ISerializerDeserializer[] fieldSerdes = null;
+        switch (invIndexType) {
+            case INMEMORY:
+            case ONDISK:
+            case LSM: {
+                fieldSerdes = new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE };
+                break;
+            }
+            case PARTITIONED_INMEMORY:
+            case PARTITIONED_ONDISK:
+            case PARTITIONED_LSM: {
+                // Such indexes also include the set-size for partitioning.
+                fieldSerdes = new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE };
+                break;
+            }
+            default: {
+                throw new IndexException("Unhandled inverted index type '" + invIndexType + "'.");
+            }
+        }
+        return fieldSerdes;
+    }
+
     public static LSMInvertedIndexTestContext createWordInvIndexTestContext(LSMInvertedIndexTestHarness harness,
             InvertedIndexType invIndexType) throws IOException, IndexException {
-        ISerializerDeserializer[] fieldSerdes = new ISerializerDeserializer[] {
-                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE };
+        ISerializerDeserializer[] fieldSerdes = getNonHashedIndexFieldSerdes(invIndexType);
         ITokenFactory tokenFactory = new UTF8WordTokenFactory();
         IBinaryTokenizerFactory tokenizerFactory = new DelimitedUTF8StringBinaryTokenizerFactory(true, false,
                 tokenFactory);
-        LSMInvertedIndexTestContext testCtx = LSMInvertedIndexTestContext.create(harness, fieldSerdes, 1, tokenizerFactory,
-                invIndexType);
+        LSMInvertedIndexTestContext testCtx = LSMInvertedIndexTestContext.create(harness, fieldSerdes,
+                fieldSerdes.length - 1, tokenizerFactory, invIndexType);
         return testCtx;
     }
 
     public static LSMInvertedIndexTestContext createHashedWordInvIndexTestContext(LSMInvertedIndexTestHarness harness,
             InvertedIndexType invIndexType) throws IOException, IndexException {
-        ISerializerDeserializer[] fieldSerdes = new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE,
-                IntegerSerializerDeserializer.INSTANCE };
+        ISerializerDeserializer[] fieldSerdes = getHashedIndexFieldSerdes(invIndexType);
         ITokenFactory tokenFactory = new HashedUTF8WordTokenFactory();
         IBinaryTokenizerFactory tokenizerFactory = new DelimitedUTF8StringBinaryTokenizerFactory(true, false,
                 tokenFactory);
-        LSMInvertedIndexTestContext testCtx = LSMInvertedIndexTestContext.create(harness, fieldSerdes, 1, tokenizerFactory,
-                invIndexType);
+        LSMInvertedIndexTestContext testCtx = LSMInvertedIndexTestContext.create(harness, fieldSerdes,
+                fieldSerdes.length - 1, tokenizerFactory, invIndexType);
         return testCtx;
     }
 
     public static LSMInvertedIndexTestContext createNGramInvIndexTestContext(LSMInvertedIndexTestHarness harness,
             InvertedIndexType invIndexType) throws IOException, IndexException {
-        ISerializerDeserializer[] fieldSerdes = new ISerializerDeserializer[] {
-                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE };
+        ISerializerDeserializer[] fieldSerdes = getNonHashedIndexFieldSerdes(invIndexType);
         ITokenFactory tokenFactory = new UTF8NGramTokenFactory();
         IBinaryTokenizerFactory tokenizerFactory = new NGramUTF8StringBinaryTokenizerFactory(TEST_GRAM_LENGTH, true,
                 true, false, tokenFactory);
-        LSMInvertedIndexTestContext testCtx = LSMInvertedIndexTestContext.create(harness, fieldSerdes, 1, tokenizerFactory,
-                invIndexType);
+        LSMInvertedIndexTestContext testCtx = LSMInvertedIndexTestContext.create(harness, fieldSerdes,
+                fieldSerdes.length - 1, tokenizerFactory, invIndexType);
         return testCtx;
     }
 
     public static LSMInvertedIndexTestContext createHashedNGramInvIndexTestContext(LSMInvertedIndexTestHarness harness,
             InvertedIndexType invIndexType) throws IOException, IndexException {
-        ISerializerDeserializer[] fieldSerdes = new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE,
-                IntegerSerializerDeserializer.INSTANCE };
+        ISerializerDeserializer[] fieldSerdes = getHashedIndexFieldSerdes(invIndexType);
         ITokenFactory tokenFactory = new HashedUTF8NGramTokenFactory();
         IBinaryTokenizerFactory tokenizerFactory = new NGramUTF8StringBinaryTokenizerFactory(TEST_GRAM_LENGTH, true,
                 true, false, tokenFactory);
-        LSMInvertedIndexTestContext testCtx = LSMInvertedIndexTestContext.create(harness, fieldSerdes, 1, tokenizerFactory,
-                invIndexType);
+        LSMInvertedIndexTestContext testCtx = LSMInvertedIndexTestContext.create(harness, fieldSerdes,
+                fieldSerdes.length - 1, tokenizerFactory, invIndexType);
         return testCtx;
     }
 
     public static void bulkLoadInvIndex(LSMInvertedIndexTestContext testCtx, TupleGenerator tupleGen, int numDocs)
             throws IndexException, IOException {
-        SortedSet<CheckTuple> tmpMemIndex = new TreeSet<CheckTuple>();;
+        SortedSet<CheckTuple> tmpMemIndex = new TreeSet<CheckTuple>();
         // First generate the expected index by inserting the documents one-by-one.
         for (int i = 0; i < numDocs; i++) {
             ITupleReference tuple = tupleGen.next();
@@ -254,8 +302,8 @@
      * Compares actual and expected indexes by comparing their inverted-lists one by one. Exercises the openInvertedListCursor() method of the inverted-index accessor.
      */
     @SuppressWarnings("unchecked")
-    public static void compareActualAndExpectedIndexes(LSMInvertedIndexTestContext testCtx) throws HyracksDataException,
-            IndexException {
+    public static void compareActualAndExpectedIndexes(LSMInvertedIndexTestContext testCtx)
+            throws HyracksDataException, IndexException {
         IInvertedIndex invIndex = (IInvertedIndex) testCtx.getIndex();
         ISerializerDeserializer[] fieldSerdes = testCtx.getFieldSerdes();
         MultiComparator invListCmp = MultiComparator.create(invIndex.getInvListCmpFactories());
@@ -330,10 +378,34 @@
     /**
      * Determine the expected results with the simple ScanCount algorithm.
      */
+    public static void getExpectedResults(int[] scanCountArray, TreeSet<CheckTuple> checkTuples,
+            ITupleReference searchDocument, IBinaryTokenizer tokenizer, ISerializerDeserializer tokenSerde,
+            IInvertedIndexSearchModifier searchModifier, List<Integer> expectedResults, InvertedIndexType invIndexType)
+            throws IOException {
+        boolean isPartitioned = false;
+        switch (invIndexType) {
+            case INMEMORY:
+            case ONDISK:
+            case LSM: {
+                isPartitioned = false;
+                break;
+            }
+            case PARTITIONED_INMEMORY:
+            case PARTITIONED_ONDISK:
+            case PARTITIONED_LSM: {
+                isPartitioned = true;
+                break;
+            }
+        }
+        getExpectedResults(scanCountArray, checkTuples, searchDocument, tokenizer, tokenSerde, searchModifier,
+                expectedResults, isPartitioned);
+    }
+
     @SuppressWarnings("unchecked")
     public static void getExpectedResults(int[] scanCountArray, TreeSet<CheckTuple> checkTuples,
             ITupleReference searchDocument, IBinaryTokenizer tokenizer, ISerializerDeserializer tokenSerde,
-            IInvertedIndexSearchModifier searchModifier, List<Integer> expectedResults) throws IOException {
+            IInvertedIndexSearchModifier searchModifier, List<Integer> expectedResults, boolean isPartitioned)
+            throws IOException {
         // Reset scan count array.
         Arrays.fill(scanCountArray, 0);
         expectedResults.clear();
@@ -341,9 +413,25 @@
         ByteArrayAccessibleOutputStream baaos = new ByteArrayAccessibleOutputStream();
         tokenizer.reset(searchDocument.getFieldData(0), searchDocument.getFieldStart(0),
                 searchDocument.getFieldLength(0));
+        // Run though tokenizer to get number of tokens.
         int numQueryTokens = 0;
         while (tokenizer.hasNext()) {
             tokenizer.next();
+            numQueryTokens++;
+        }
+        int numTokensLowerBound = -1;
+        int numTokensUpperBound = -1;
+        int invListElementField = 1;
+        if (isPartitioned) {
+            numTokensLowerBound = searchModifier.getNumTokensLowerBound(numQueryTokens);
+            numTokensUpperBound = searchModifier.getNumTokensUpperBound(numQueryTokens);
+            invListElementField = 2;
+        }
+        int occurrenceThreshold = searchModifier.getOccurrenceThreshold(numQueryTokens);
+        tokenizer.reset(searchDocument.getFieldData(0), searchDocument.getFieldStart(0),
+                searchDocument.getFieldLength(0));
+        while (tokenizer.hasNext()) {
+            tokenizer.next();
             IToken token = tokenizer.getToken();
             baaos.reset();
             DataOutput out = new DataOutputStream(baaos);
@@ -351,10 +439,28 @@
             ByteArrayInputStream inStream = new ByteArrayInputStream(baaos.getByteArray(), 0, baaos.size());
             DataInput dataIn = new DataInputStream(inStream);
             Comparable tokenObj = (Comparable) tokenSerde.deserialize(dataIn);
-            CheckTuple lowKey = new CheckTuple(1, 1);
-            lowKey.appendField(tokenObj);
-            CheckTuple highKey = new CheckTuple(1, 1);
-            highKey.appendField(tokenObj);
+            CheckTuple lowKey;
+            if (numTokensLowerBound < 0) {
+                // Index is not partitioned, or no length filtering is possible for this search modifier.
+                lowKey = new CheckTuple(1, 1);
+                lowKey.appendField(tokenObj);
+            } else {
+                // Index is length partitioned, and search modifier supports length filtering.
+                lowKey = new CheckTuple(2, 2);
+                lowKey.appendField(tokenObj);
+                lowKey.appendField(Integer.valueOf(numTokensLowerBound));
+            }
+            CheckTuple highKey;
+            if (numTokensUpperBound < 0) {
+                // Index is not partitioned, or no length filtering is possible for this search modifier.
+                highKey = new CheckTuple(1, 1);
+                highKey.appendField(tokenObj);
+            } else {
+                // Index is length partitioned, and search modifier supports length filtering.
+                highKey = new CheckTuple(2, 2);
+                highKey.appendField(tokenObj);
+                highKey.appendField(Integer.valueOf(numTokensUpperBound));
+            }
 
             // Get view over check tuples containing inverted-list corresponding to token. 
             SortedSet<CheckTuple> invList = OrderedIndexTestUtils.getPrefixExpectedSubset(checkTuples, lowKey, highKey);
@@ -362,13 +468,11 @@
             // Iterate over inverted list and update scan count array.
             while (invListIter.hasNext()) {
                 CheckTuple checkTuple = invListIter.next();
-                Integer element = (Integer) checkTuple.getField(1);
+                Integer element = (Integer) checkTuple.getField(invListElementField);
                 scanCountArray[element]++;
             }
-            numQueryTokens++;
         }
 
-        int occurrenceThreshold = searchModifier.getOccurrenceThreshold(numQueryTokens);
         // Run through scan count array, and see whether elements satisfy the given occurrence threshold.
         expectedResults.clear();
         for (int i = 0; i < scanCountArray.length; i++) {
@@ -394,7 +498,7 @@
         IIndexCursor resultCursor = accessor.createSearchCursor();
         int numQueries = numDocQueries + numRandomQueries;
         for (int i = 0; i < numQueries; i++) {
-            // If number of documents in the corpus io less than numDocQueries, then replace the remaining ones with random queries.
+            // If number of documents in the corpus is less than numDocQueries, then replace the remaining ones with random queries.
             if (i >= numDocQueries || i >= documentCorpus.size()) {
                 // Generate a random query.
                 ITupleReference randomQuery = tupleGen.next();
@@ -438,8 +542,9 @@
 
                     // Get expected results.
                     List<Integer> expectedResults = new ArrayList<Integer>();
-                    LSMInvertedIndexTestUtils.getExpectedResults(scanCountArray, testCtx.getCheckTuples(), searchDocument,
-                            tokenizer, testCtx.getFieldSerdes()[0], searchModifier, expectedResults);
+                    LSMInvertedIndexTestUtils.getExpectedResults(scanCountArray, testCtx.getCheckTuples(),
+                            searchDocument, tokenizer, testCtx.getFieldSerdes()[0], searchModifier, expectedResults,
+                            testCtx.getInvertedIndexType());
 
                     Iterator<Integer> expectedIter = expectedResults.iterator();
                     Iterator<Integer> actualIter = actualResults.iterator();