Merged hyracks_lsm_tree r2452:r2463.

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_length_filter@2464 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/MultiComparator.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/MultiComparator.java
index 01c6cc8..280bb41 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/MultiComparator.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/MultiComparator.java
@@ -96,6 +96,14 @@
         return new MultiComparator(cmps);
     }
     
+    public static MultiComparator create(IBinaryComparatorFactory[] cmpFactories, int startIndex, int numCmps) {
+        IBinaryComparator[] cmps = new IBinaryComparator[numCmps];
+        for (int i = startIndex; i < startIndex + numCmps; i++) {
+            cmps[i] = cmpFactories[i].createBinaryComparator();
+        }
+        return new MultiComparator(cmps);
+    }
+    
     public static MultiComparator create(IBinaryComparatorFactory[]... cmpFactories) {
         int size = 0;
         for (int i = 0; i < cmpFactories.length; i++) {
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/api/IInvertedIndexSearchModifier.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/api/IInvertedIndexSearchModifier.java
index 619937f..afe082d 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/api/IInvertedIndexSearchModifier.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/api/IInvertedIndexSearchModifier.java
@@ -19,5 +19,9 @@
 public interface IInvertedIndexSearchModifier {
     public int getOccurrenceThreshold(int numQueryTokens);
 
-    public int getNumPrefixLists(int numQueryTokens);
+    public int getNumPrefixLists(int occurrenceThreshold, int numInvLists);
+    
+    public int getNumTokensLowerBound(int numQueryTokens);
+    
+    public int getNumTokensUpperBound(int numQueryTokens);
 }
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/api/IObjectFactory.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/api/IObjectFactory.java
new file mode 100644
index 0000000..9068a2b
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/api/IObjectFactory.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api;
+
+public interface IObjectFactory<T> {
+    public T create();
+}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
index 69e57be..bb43028 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
@@ -15,6 +15,8 @@
 
 package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk;
 
+import java.io.DataOutput;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
@@ -67,17 +69,16 @@
  * cannot exceed the size of a Hyracks frame.
  */
 public class OnDiskInvertedIndex implements IInvertedIndex {
-    private final IHyracksCommonContext ctx = new DefaultHyracksCommonContext();
+    protected final IHyracksCommonContext ctx = new DefaultHyracksCommonContext();
 
-    // Schema of BTree tuples.
-    public final int TOKEN_FIELD = 0;
-    public final int INVLIST_START_PAGE_ID_FIELD = 1;
-    public final int INVLIST_END_PAGE_ID_FIELD = 2;
-    public final int INVLIST_START_OFF_FIELD = 3;
-    public final int INVLIST_NUM_ELEMENTS_FIELD = 4;
-
+    // Schema of BTree tuples, set in constructor.    
+    protected final int invListStartPageIdField;
+    protected final int invListEndPageIdField;
+    protected final int invListStartOffField;
+    protected final int invListNumElementsField;
+    
     // Type traits to be appended to the token type trait which finally form the BTree field type traits.
-    private static final ITypeTraits[] btreeValueTypeTraits = new ITypeTraits[4];
+    protected static final ITypeTraits[] btreeValueTypeTraits = new ITypeTraits[4];
     static {
         // startPageId
         btreeValueTypeTraits[0] = IntegerPointable.TYPE_TRAITS;
@@ -89,23 +90,22 @@
         btreeValueTypeTraits[3] = IntegerPointable.TYPE_TRAITS;
     }
 
-    private BTree btree;
-    private int rootPageId = 0;
-    private IBufferCache bufferCache;
-    private IFileMapProvider fileMapProvider;
-    private int fileId = -1;
-    private final ITypeTraits[] invListTypeTraits;
-    private final IBinaryComparatorFactory[] invListCmpFactories;
-    private final ITypeTraits[] tokenTypeTraits;
-    private final IBinaryComparatorFactory[] tokenCmpFactories;
-    private final IInvertedListBuilder invListBuilder;
-    private final int numTokenFields;
-    private final int numInvListKeys;
-    private final FileReference invListsFile;
+    protected BTree btree;
+    protected int rootPageId = 0;
+    protected IBufferCache bufferCache;
+    protected IFileMapProvider fileMapProvider;
+    protected int fileId = -1;
+    protected final ITypeTraits[] invListTypeTraits;
+    protected final IBinaryComparatorFactory[] invListCmpFactories;
+    protected final ITypeTraits[] tokenTypeTraits;
+    protected final IBinaryComparatorFactory[] tokenCmpFactories;
+    protected final IInvertedListBuilder invListBuilder;
+    protected final int numTokenFields;
+    protected final int numInvListKeys;
+    protected final FileReference invListsFile;
     // Last page id of inverted-lists file (inclusive). Set during bulk load.
-    private int invListsMaxPageId = -1;
-
-    private boolean isOpen = false;
+    protected int invListsMaxPageId = -1;
+    protected boolean isOpen = false;
 
     public OnDiskInvertedIndex(IBufferCache bufferCache, IFileMapProvider fileMapProvider,
             IInvertedListBuilder invListBuilder, ITypeTraits[] invListTypeTraits,
@@ -124,6 +124,10 @@
         this.numTokenFields = btree.getComparatorFactories().length;
         this.numInvListKeys = invListCmpFactories.length;
         this.invListsFile = invListsFile;
+        this.invListStartPageIdField = numTokenFields;
+        this.invListEndPageIdField = numTokenFields + 1;
+        this.invListStartOffField = numTokenFields + 2;
+        this.invListNumElementsField = numTokenFields + 3;
     }
 
     @Override
@@ -258,19 +262,7 @@
         try {
             if (ctx.btreeCursor.hasNext()) {
                 ctx.btreeCursor.next();
-                ITupleReference frameTuple = ctx.btreeCursor.getTuple();
-                int startPageId = IntegerSerializerDeserializer.getInt(
-                        frameTuple.getFieldData(INVLIST_START_PAGE_ID_FIELD),
-                        frameTuple.getFieldStart(INVLIST_START_PAGE_ID_FIELD));
-                int endPageId = IntegerSerializerDeserializer.getInt(
-                        frameTuple.getFieldData(INVLIST_END_PAGE_ID_FIELD),
-                        frameTuple.getFieldStart(INVLIST_END_PAGE_ID_FIELD));
-                int startOff = IntegerSerializerDeserializer.getInt(frameTuple.getFieldData(INVLIST_START_OFF_FIELD),
-                        frameTuple.getFieldStart(INVLIST_START_OFF_FIELD));
-                int numElements = IntegerSerializerDeserializer.getInt(
-                        frameTuple.getFieldData(INVLIST_NUM_ELEMENTS_FIELD),
-                        frameTuple.getFieldStart(INVLIST_NUM_ELEMENTS_FIELD));
-                listCursor.reset(startPageId, endPageId, startOff, numElements);
+                resetInvertedListCursor(ctx.btreeCursor.getTuple(), listCursor);
             } else {
                 listCursor.reset(0, 0, 0, 0);
             }
@@ -280,6 +272,18 @@
         }
     }
 
+    protected void resetInvertedListCursor(ITupleReference btreeTuple, IInvertedListCursor listCursor) {
+        int startPageId = IntegerSerializerDeserializer.getInt(btreeTuple.getFieldData(invListStartPageIdField),
+                btreeTuple.getFieldStart(invListStartPageIdField));
+        int endPageId = IntegerSerializerDeserializer.getInt(btreeTuple.getFieldData(invListEndPageIdField),
+                btreeTuple.getFieldStart(invListEndPageIdField));
+        int startOff = IntegerSerializerDeserializer.getInt(btreeTuple.getFieldData(invListStartOffField),
+                btreeTuple.getFieldStart(invListStartOffField));
+        int numElements = IntegerSerializerDeserializer.getInt(btreeTuple.getFieldData(invListNumElementsField),
+                btreeTuple.getFieldStart(invListNumElementsField));
+        listCursor.reset(startPageId, endPageId, startOff, numElements);
+    }
+    
     public final class OnDiskInvertedIndexBulkLoader implements IIndexBulkLoader {
         private final ArrayTupleBuilder btreeTupleBuilder;
         private final ArrayTupleReference btreeTupleReference;
@@ -330,14 +334,26 @@
         private void createAndInsertBTreeTuple() throws IndexException, HyracksDataException {
             // Build tuple.        
             btreeTupleBuilder.reset();
-            btreeTupleBuilder.addField(lastTuple.getFieldData(0), lastTuple.getFieldStart(0),
-                    lastTuple.getFieldLength(0));
-            // TODO: Boxing integers here. Fix it.
-            btreeTupleBuilder.addField(IntegerSerializerDeserializer.INSTANCE, currentInvListStartPageId);
-            btreeTupleBuilder.addField(IntegerSerializerDeserializer.INSTANCE, currentPageId);
-            btreeTupleBuilder.addField(IntegerSerializerDeserializer.INSTANCE, currentInvListStartOffset);
-            btreeTupleBuilder.addField(IntegerSerializerDeserializer.INSTANCE, invListBuilder.getListSize());
-            // Reset tuple reference and add it.
+            DataOutput output = btreeTupleBuilder.getDataOutput();
+            // Add key fields.
+            for (int i = 0; i < numTokenFields; i++) {
+                btreeTupleBuilder.addField(lastTuple.getFieldData(i), lastTuple.getFieldStart(i),
+                        lastTuple.getFieldLength(i));
+            }
+            // Add inverted-list 'pointer' value fields.
+            try {
+                output.writeInt(currentInvListStartPageId);
+                btreeTupleBuilder.addFieldEndOffset();
+                output.writeInt(currentPageId);
+                btreeTupleBuilder.addFieldEndOffset();
+                output.writeInt(currentInvListStartOffset);
+                btreeTupleBuilder.addFieldEndOffset();
+                output.writeInt(invListBuilder.getListSize());
+                btreeTupleBuilder.addFieldEndOffset();
+            } catch (IOException e) {
+                throw new HyracksDataException(e);
+            }
+            // Reset tuple reference and add it into the BTree load.
             btreeTupleReference.reset(btreeTupleBuilder.getFieldEndOffsets(), btreeTupleBuilder.getByteArray());
             btreeBulkloader.add(btreeTupleReference);
         }
@@ -456,6 +472,12 @@
             this.searcher = new TOccurrenceSearcher(ctx, index);
         }
 
+        // Let subclasses initialize.
+        protected OnDiskInvertedIndexAccessor(OnDiskInvertedIndex index, IInvertedIndexSearcher searcher) {
+            this.index = index;
+            this.searcher = searcher;
+        }
+        
         @Override
         public IIndexCursor createSearchCursor() {
             return new OnDiskInvertedIndexSearchCursor(searcher, index.getInvListTypeTraits().length);
@@ -615,7 +637,7 @@
         return 0;
     }
 
-    private static ITypeTraits[] getBTreeTypeTraits(ITypeTraits[] tokenTypeTraits) {
+    protected static ITypeTraits[] getBTreeTypeTraits(ITypeTraits[] tokenTypeTraits) {
         ITypeTraits[] btreeTypeTraits = new ITypeTraits[tokenTypeTraits.length + btreeValueTypeTraits.length];
         // Set key type traits.
         for (int i = 0; i < tokenTypeTraits.length; i++) {
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexOpContext.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexOpContext.java
index fd784f3..8ed4cd1 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexOpContext.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexOpContext.java
@@ -30,12 +30,17 @@
     public IIndexAccessor btreeAccessor;
     public IIndexCursor btreeCursor;
     public MultiComparator searchCmp;
+    // For prefix search on partitioned indexes.
+    public MultiComparator prefixSearchCmp;
 
     public OnDiskInvertedIndexOpContext(BTree btree) {
         // TODO: Ignore opcallbacks for now.
         btreeAccessor = btree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
         btreeCursor = btreeAccessor.createSearchCursor();
         searchCmp = MultiComparator.create(btree.getComparatorFactories());
+        if (btree.getComparatorFactories().length > 1) {
+            prefixSearchCmp = MultiComparator.create(btree.getComparatorFactories(), 0, 1);
+        }
     }
 
     @Override
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndex.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndex.java
new file mode 100644
index 0000000..b7063fb
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndex.java
@@ -0,0 +1,167 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexOperationContext;
+import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedListBuilder;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search.PartitionedTOccurrenceSearcher;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.ObjectCache;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
+
+public class PartitionedOnDiskInvertedIndex extends OnDiskInvertedIndex {
+
+    protected final int PARTITIONING_NUM_TOKENS_FIELD = 1;
+    
+    public PartitionedOnDiskInvertedIndex(IBufferCache bufferCache, IFileMapProvider fileMapProvider,
+            IInvertedListBuilder invListBuilder, ITypeTraits[] invListTypeTraits,
+            IBinaryComparatorFactory[] invListCmpFactories, ITypeTraits[] tokenTypeTraits,
+            IBinaryComparatorFactory[] tokenCmpFactories, FileReference btreeFile, FileReference invListsFile)
+            throws IndexException {
+        super(bufferCache, fileMapProvider, invListBuilder, invListTypeTraits, invListCmpFactories, tokenTypeTraits,
+                tokenCmpFactories, btreeFile, invListsFile);
+    }
+
+    public class PartitionedOnDiskInvertedIndexAccessor extends OnDiskInvertedIndexAccessor {
+        public PartitionedOnDiskInvertedIndexAccessor(OnDiskInvertedIndex index) {
+            super(index, new PartitionedTOccurrenceSearcher(ctx, index));
+        }
+    }
+
+    @Override
+    public IIndexAccessor createAccessor(IModificationOperationCallback modificationCallback,
+            ISearchOperationCallback searchCallback) {
+        return new PartitionedOnDiskInvertedIndexAccessor(this);
+    }
+    
+    public class InvertedListPartitions {
+        private final int DEFAULT_NUM_PARTITIONS = 10;
+        private final int PARTITIONS_SLACK_SIZE = 10;
+        private final ObjectCache<IInvertedListCursor> invListCursorCache;
+        private final ObjectCache<ArrayList<IInvertedListCursor>> arrayListCache;
+        private ArrayList<IInvertedListCursor>[] partitions;
+        private int minValidPartitionIndex;
+        private int maxValidPartitionIndex;
+
+        public InvertedListPartitions(ObjectCache<IInvertedListCursor> invListCursorCache,
+                ObjectCache<ArrayList<IInvertedListCursor>> arrayListCache) {
+            this.invListCursorCache = invListCursorCache;
+            this.arrayListCache = arrayListCache;
+        }
+
+        @SuppressWarnings("unchecked")
+        public void reset(int numTokensLowerBound, int numTokensUpperBound) {
+            if (partitions == null) {
+                int initialSize;
+                if (numTokensUpperBound < 0) {
+                    initialSize = DEFAULT_NUM_PARTITIONS;
+                } else {
+                    initialSize = numTokensUpperBound + 1;
+                }
+                partitions = (ArrayList<IInvertedListCursor>[]) new ArrayList[initialSize];
+            } else {
+                if (numTokensUpperBound + 1 >= partitions.length) {
+                    partitions = Arrays.copyOf(partitions, numTokensUpperBound + 1);
+                }
+                Arrays.fill(partitions, null);
+            }
+            invListCursorCache.reset();
+            arrayListCache.reset();
+            minValidPartitionIndex = Integer.MAX_VALUE;
+            maxValidPartitionIndex = Integer.MIN_VALUE;
+        }
+
+        public void addInvertedListCursor(ITupleReference btreeTuple) {
+            IInvertedListCursor listCursor = invListCursorCache.getNext();
+            resetInvertedListCursor(btreeTuple, listCursor);
+            int numTokens = IntegerSerializerDeserializer.getInt(
+                    btreeTuple.getFieldData(PARTITIONING_NUM_TOKENS_FIELD),
+                    btreeTuple.getFieldStart(PARTITIONING_NUM_TOKENS_FIELD));
+            if (numTokens + 1 >= partitions.length) {
+                partitions = Arrays.copyOf(partitions, numTokens + PARTITIONS_SLACK_SIZE);
+            }
+            ArrayList<IInvertedListCursor> partitionCursors = partitions[numTokens];
+            if (partitionCursors == null) {
+                partitionCursors = arrayListCache.getNext();
+                partitionCursors.clear();
+                partitions[numTokens] = partitionCursors;
+                // Update range of valid partitions.
+                if (numTokens < minValidPartitionIndex) {
+                    minValidPartitionIndex = numTokens;
+                }
+                if (numTokens > maxValidPartitionIndex) {
+                    maxValidPartitionIndex = numTokens;
+                }
+            }
+            partitionCursors.add(listCursor);
+        }
+
+        public ArrayList<IInvertedListCursor>[] getPartitions() {
+            return partitions;
+        }
+        
+        public int getMinValidPartitionIndex() {
+            return minValidPartitionIndex;
+        }
+
+        public int getMaxValidPartitionIndex() {
+            return maxValidPartitionIndex;
+        }
+    }
+    
+    public void openInvertedListPartitionCursors(InvertedListPartitions invListPartitions, ITupleReference lowSearchKey,
+            ITupleReference highSearchKey,
+            IIndexOperationContext ictx) throws HyracksDataException, IndexException {
+        OnDiskInvertedIndexOpContext ctx = (OnDiskInvertedIndexOpContext) ictx;
+        if (lowSearchKey.getFieldCount() == 1) {
+            ctx.btreePred.setLowKeyComparator(ctx.prefixSearchCmp);
+        } else {
+            ctx.btreePred.setLowKeyComparator(ctx.searchCmp);
+        }
+        if (highSearchKey.getFieldCount() == 1) {
+            ctx.btreePred.setHighKeyComparator(ctx.prefixSearchCmp);
+        } else {
+            ctx.btreePred.setHighKeyComparator(ctx.searchCmp);
+        }
+        ctx.btreePred.setLowKey(lowSearchKey, true);
+        ctx.btreePred.setHighKey(highSearchKey, true);
+        ctx.btreeAccessor.search(ctx.btreeCursor, ctx.btreePred);
+        try {
+            while (ctx.btreeCursor.hasNext()) {
+                ctx.btreeCursor.next();
+                ITupleReference btreeTuple = ctx.btreeCursor.getTuple();
+                invListPartitions.addInvertedListCursor(btreeTuple);
+            }
+        } finally {
+            ctx.btreeCursor.close();
+            ctx.btreeCursor.reset();
+        }
+    }
+}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/ArrayListFactory.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/ArrayListFactory.java
new file mode 100644
index 0000000..493063e
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/ArrayListFactory.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search;
+
+import java.util.ArrayList;
+
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IObjectFactory;
+
+public class ArrayListFactory<T> implements IObjectFactory<ArrayList<T>>{
+    @Override
+    public ArrayList<T> create() {
+        return new ArrayList<T>();
+    }
+}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/ConjunctiveSearchModifier.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/ConjunctiveSearchModifier.java
index 1d260f0..55c5f30 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/ConjunctiveSearchModifier.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/ConjunctiveSearchModifier.java
@@ -25,7 +25,7 @@
     }
 
     @Override
-    public int getNumPrefixLists(int numQueryTokens) {
+    public int getNumPrefixLists(int occurrenceThreshold, int numInvLists) {
         return 1;
     }
     
@@ -33,4 +33,14 @@
     public String toString() {
         return "Conjunctive Search Modifier";
     }
+
+    @Override
+    public int getNumTokensLowerBound(int numQueryTokens) {
+        return -1;
+    }
+
+    @Override
+    public int getNumTokensUpperBound(int numQueryTokens) {
+        return -1;
+    }
 }
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/EditDistanceSearchModifier.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/EditDistanceSearchModifier.java
index 0580319..26e11ba 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/EditDistanceSearchModifier.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/EditDistanceSearchModifier.java
@@ -33,10 +33,20 @@
     }
 
     @Override
-    public int getNumPrefixLists(int numQueryTokens) {
-        return numQueryTokens - getOccurrenceThreshold(numQueryTokens) + 1;
+    public int getNumPrefixLists(int occurrenceThreshold, int numInvLists) {
+        return numInvLists - occurrenceThreshold + 1;
     }
 
+    @Override
+    public int getNumTokensLowerBound(int numQueryTokens) {
+        return numQueryTokens - edThresh;
+    }
+
+    @Override
+    public int getNumTokensUpperBound(int numQueryTokens) {
+        return numQueryTokens + edThresh;
+    }
+    
     public int getGramLength() {
         return gramLength;
     }
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/InvertedListCursorFactory.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/InvertedListCursorFactory.java
new file mode 100644
index 0000000..b4b3c43
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/InvertedListCursorFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search;
+
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IObjectFactory;
+
+public class InvertedListCursorFactory implements IObjectFactory<IInvertedListCursor> {
+
+    private final IInvertedIndex invIndex;
+
+    public InvertedListCursorFactory(IInvertedIndex invIndex) {
+        this.invIndex = invIndex;
+    }
+
+    @Override
+    public IInvertedListCursor create() {
+        return invIndex.createInvertedListCursor();
+    }
+}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/InvertedListMerger.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/InvertedListMerger.java
new file mode 100644
index 0000000..18ed73c
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/InvertedListMerger.java
@@ -0,0 +1,330 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+
+import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeFrameTupleAccessor;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeTupleReference;
+
+// TODO: The merge procedure is rather confusing regarding cursor positions, hasNext() calls etc.
+// Needs an overhaul some time.
+public class InvertedListMerger {
+
+    protected final MultiComparator invListCmp;
+    protected SearchResult prevSearchResult;
+    protected SearchResult newSearchResult;
+
+    public InvertedListMerger(IHyracksCommonContext ctx, IInvertedIndex invIndex) {
+        this.invListCmp = MultiComparator.create(invIndex.getInvListCmpFactories());
+        this.prevSearchResult = new SearchResult(invIndex.getInvListTypeTraits(), ctx);
+        this.newSearchResult = new SearchResult(prevSearchResult);
+    }
+
+    public void merge(ArrayList<IInvertedListCursor> invListCursors, int occurrenceThreshold, int numPrefixLists,
+            SearchResult searchResult) throws HyracksDataException, IndexException {
+        Collections.sort(invListCursors);
+        int numInvLists = invListCursors.size();
+        SearchResult result = null;
+        for (int i = 0; i < numInvLists; i++) {
+            SearchResult swapTemp = prevSearchResult;
+            prevSearchResult = newSearchResult;
+            newSearchResult = swapTemp;
+            newSearchResult.reset();
+            if (i + 1 != numInvLists) {
+                // Use temporary search results when not merging last list.
+                result = newSearchResult;
+            } else {
+                // When merging the last list, append results to the final search result.
+                result = searchResult;
+            }
+            IInvertedListCursor invListCursor = invListCursors.get(i);
+            invListCursor.pinPages();
+            if (i < numPrefixLists) {
+                // Merge prefix list.
+                mergePrefixList(invListCursor, prevSearchResult, result);
+            } else {
+                // Merge suffix list.
+                int numInvListElements = invListCursor.size();
+                int currentNumResults = prevSearchResult.getNumResults();
+                // Should we binary search the next list or should we sort-merge it?
+                if (currentNumResults * Math.log(numInvListElements) < currentNumResults + numInvListElements) {
+                    mergeSuffixListProbe(invListCursor, prevSearchResult, result, i, numInvLists,
+                            occurrenceThreshold);
+                } else {
+                    mergeSuffixListScan(invListCursor, prevSearchResult, result, i, numInvLists,
+                            occurrenceThreshold);
+                }
+            }
+            invListCursor.unpinPages();
+        }
+    }
+
+    protected void mergeSuffixListProbe(IInvertedListCursor invListCursor, SearchResult prevSearchResult,
+            SearchResult newSearchResult, int invListIx, int numInvLists, int occurrenceThreshold)
+            throws HyracksDataException, IndexException {
+
+        int prevBufIdx = 0;
+        int maxPrevBufIdx = prevSearchResult.getCurrentBufferIndex();
+        ByteBuffer prevCurrentBuffer = prevSearchResult.getBuffers().get(0);
+
+        FixedSizeFrameTupleAccessor resultFrameTupleAcc = prevSearchResult.getAccessor();
+        FixedSizeTupleReference resultTuple = prevSearchResult.getTuple();
+
+        int resultTidx = 0;
+
+        resultFrameTupleAcc.reset(prevCurrentBuffer);
+
+        while (resultTidx < resultFrameTupleAcc.getTupleCount()) {
+
+            resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));
+            int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
+                    resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
+
+            if (invListCursor.containsKey(resultTuple, invListCmp)) {
+                count++;
+                newSearchResult.append(resultTuple, count);
+            } else {
+                if (count + numInvLists - invListIx > occurrenceThreshold) {
+                    newSearchResult.append(resultTuple, count);
+                }
+            }
+
+            resultTidx++;
+            if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
+                prevBufIdx++;
+                if (prevBufIdx <= maxPrevBufIdx) {
+                    prevCurrentBuffer = prevSearchResult.getBuffers().get(prevBufIdx);
+                    resultFrameTupleAcc.reset(prevCurrentBuffer);
+                    resultTidx = 0;
+                }
+            }
+        }
+    }
+
+    protected void mergeSuffixListScan(IInvertedListCursor invListCursor, SearchResult prevSearchResult,
+            SearchResult newSearchResult, int invListIx, int numInvLists, int occurrenceThreshold)
+            throws HyracksDataException, IndexException {
+
+        int prevBufIdx = 0;
+        int maxPrevBufIdx = prevSearchResult.getCurrentBufferIndex();
+        ByteBuffer prevCurrentBuffer = prevSearchResult.getBuffers().get(0);
+
+        FixedSizeFrameTupleAccessor resultFrameTupleAcc = prevSearchResult.getAccessor();
+        FixedSizeTupleReference resultTuple = prevSearchResult.getTuple();
+
+        boolean advanceCursor = true;
+        boolean advancePrevResult = false;
+        int resultTidx = 0;
+
+        resultFrameTupleAcc.reset(prevCurrentBuffer);
+
+        int invListTidx = 0;
+        int invListNumTuples = invListCursor.size();
+
+        if (invListCursor.hasNext())
+            invListCursor.next();
+
+        while (invListTidx < invListNumTuples && resultTidx < resultFrameTupleAcc.getTupleCount()) {
+
+            ITupleReference invListTuple = invListCursor.getTuple();
+
+            resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));
+
+            int cmp = invListCmp.compare(invListTuple, resultTuple);
+            if (cmp == 0) {
+                int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
+                        resultTuple.getFieldStart(resultTuple.getFieldCount() - 1)) + 1;
+                newSearchResult.append(resultTuple, count);
+                advanceCursor = true;
+                advancePrevResult = true;
+            } else {
+                if (cmp < 0) {
+                    advanceCursor = true;
+                    advancePrevResult = false;
+                } else {
+                    int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
+                            resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
+                    if (count + numInvLists - invListIx > occurrenceThreshold) {
+                        newSearchResult.append(resultTuple, count);
+                    }
+                    advanceCursor = false;
+                    advancePrevResult = true;
+                }
+            }
+
+            if (advancePrevResult) {
+                resultTidx++;
+                if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
+                    prevBufIdx++;
+                    if (prevBufIdx <= maxPrevBufIdx) {
+                        prevCurrentBuffer = prevSearchResult.getBuffers().get(prevBufIdx);
+                        resultFrameTupleAcc.reset(prevCurrentBuffer);
+                        resultTidx = 0;
+                    }
+                }
+            }
+
+            if (advanceCursor) {
+                invListTidx++;
+                if (invListCursor.hasNext()) {
+                    invListCursor.next();
+                }
+            }
+        }
+
+        // append remaining elements from previous result set
+        while (resultTidx < resultFrameTupleAcc.getTupleCount()) {
+
+            resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));
+
+            int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
+                    resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
+            if (count + numInvLists - invListIx > occurrenceThreshold) {
+                newSearchResult.append(resultTuple, count);
+            }
+
+            resultTidx++;
+            if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
+                prevBufIdx++;
+                if (prevBufIdx <= maxPrevBufIdx) {
+                    prevCurrentBuffer = prevSearchResult.getBuffers().get(prevBufIdx);
+                    resultFrameTupleAcc.reset(prevCurrentBuffer);
+                    resultTidx = 0;
+                }
+            }
+        }
+    }
+
+    protected void mergePrefixList(IInvertedListCursor invListCursor, SearchResult prevSearchResult,
+            SearchResult newSearchResult) throws HyracksDataException, IndexException {
+
+        int prevBufIdx = 0;
+        int maxPrevBufIdx = prevSearchResult.getCurrentBufferIndex();
+        ByteBuffer prevCurrentBuffer = prevSearchResult.getBuffers().get(0);
+
+        FixedSizeFrameTupleAccessor resultFrameTupleAcc = prevSearchResult.getAccessor();
+        FixedSizeTupleReference resultTuple = prevSearchResult.getTuple();
+
+        boolean advanceCursor = true;
+        boolean advancePrevResult = false;
+        int resultTidx = 0;
+
+        resultFrameTupleAcc.reset(prevCurrentBuffer);
+
+        int invListTidx = 0;
+        int invListNumTuples = invListCursor.size();
+
+        if (invListCursor.hasNext())
+            invListCursor.next();
+
+        while (invListTidx < invListNumTuples && resultTidx < resultFrameTupleAcc.getTupleCount()) {
+
+            ITupleReference invListTuple = invListCursor.getTuple();
+            resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));
+
+            int cmp = invListCmp.compare(invListTuple, resultTuple);
+            if (cmp == 0) {
+                int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
+                        resultTuple.getFieldStart(resultTuple.getFieldCount() - 1)) + 1;
+                newSearchResult.append(resultTuple, count);
+                advanceCursor = true;
+                advancePrevResult = true;
+            } else {
+                if (cmp < 0) {
+                    int count = 1;
+                    newSearchResult.append(invListTuple, count);
+                    advanceCursor = true;
+                    advancePrevResult = false;
+                } else {
+                    int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
+                            resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
+                    newSearchResult.append(resultTuple, count);
+                    advanceCursor = false;
+                    advancePrevResult = true;
+                }
+            }
+
+            if (advancePrevResult) {
+                resultTidx++;
+                if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
+                    prevBufIdx++;
+                    if (prevBufIdx <= maxPrevBufIdx) {
+                        prevCurrentBuffer = prevSearchResult.getBuffers().get(prevBufIdx);
+                        resultFrameTupleAcc.reset(prevCurrentBuffer);
+                        resultTidx = 0;
+                    }
+                }
+            }
+
+            if (advanceCursor) {
+                invListTidx++;
+                if (invListCursor.hasNext()) {
+                    invListCursor.next();
+                }
+            }
+        }
+
+        // append remaining new elements from inverted list
+        while (invListTidx < invListNumTuples) {
+            ITupleReference invListTuple = invListCursor.getTuple();
+            newSearchResult.append(invListTuple, 1);
+            invListTidx++;
+            if (invListCursor.hasNext()) {
+                invListCursor.next();
+            }
+        }
+
+        // append remaining elements from previous result set
+        while (resultTidx < resultFrameTupleAcc.getTupleCount()) {
+
+            resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));
+
+            int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
+                    resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
+            newSearchResult.append(resultTuple, count);
+
+            resultTidx++;
+            if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
+                prevBufIdx++;
+                if (prevBufIdx <= maxPrevBufIdx) {
+                    prevCurrentBuffer = prevSearchResult.getBuffers().get(prevBufIdx);
+                    resultFrameTupleAcc.reset(prevCurrentBuffer);
+                    resultTidx = 0;
+                }
+            }
+        }
+    }
+
+    public SearchResult createSearchResult() {
+        return new SearchResult(prevSearchResult);
+    }
+
+    public void reset() {
+        prevSearchResult.clear();
+        newSearchResult.clear();
+    }
+}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/JaccardSearchModifier.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/JaccardSearchModifier.java
index 4cf7e40..b005905 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/JaccardSearchModifier.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/JaccardSearchModifier.java
@@ -31,13 +31,23 @@
     }
 
     @Override
-    public int getNumPrefixLists(int numQueryTokens) {
-        if (numQueryTokens == 0) {
+    public int getNumPrefixLists(int occurrenceThreshold, int numInvLists) {
+        if (numInvLists == 0) {
             return 0;
         }
-        return numQueryTokens - getOccurrenceThreshold(numQueryTokens) + 1;
+        return numInvLists - occurrenceThreshold + 1;
     }
 
+    @Override
+    public int getNumTokensLowerBound(int numQueryTokens) {
+        return (int) Math.floor(numQueryTokens * jaccThresh);
+    }
+
+    @Override
+    public int getNumTokensUpperBound(int numQueryTokens) {
+        return (int) Math.ceil(numQueryTokens / jaccThresh);
+    }
+    
     public float getJaccThresh() {
         return jaccThresh;
     }
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/PartitionedTOccurrenceSearcher.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/PartitionedTOccurrenceSearcher.java
new file mode 100644
index 0000000..439f1ad
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/PartitionedTOccurrenceSearcher.java
@@ -0,0 +1,259 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleReference;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexOperationContext;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.common.tuples.ConcatenatingTupleReference;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexSearchModifier;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexSearcher;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IObjectFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.exceptions.OccurrenceThresholdPanicException;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeFrameTupleAccessor;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeTupleReference;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.OnDiskInvertedIndexSearchCursor;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.PartitionedOnDiskInvertedIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.PartitionedOnDiskInvertedIndex.InvertedListPartitions;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizer;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IToken;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.ObjectCache;
+
+public class PartitionedTOccurrenceSearcher implements IInvertedIndexSearcher {
+
+    protected final IHyracksCommonContext ctx;
+
+    protected final InvertedListMerger invListMerger;
+    protected final SearchResult searchResult;
+
+    protected RecordDescriptor queryTokenRecDesc = new RecordDescriptor(
+            new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
+    protected ArrayTupleBuilder queryTokenBuilder = new ArrayTupleBuilder(queryTokenRecDesc.getFieldCount());
+    protected DataOutput queryTokenDos = queryTokenBuilder.getDataOutput();
+    protected FrameTupleAppender queryTokenAppender;
+    protected ByteBuffer queryTokenFrame;
+    protected final FrameTupleReference searchKey = new FrameTupleReference();
+
+    protected final IInvertedIndex invIndex;
+    protected final MultiComparator invListCmp;
+    protected int occurrenceThreshold;
+
+    protected final int cursorCacheSize = 10;
+    //protected ArrayList<IInvertedListCursor> invListCursorCache = new ArrayList<IInvertedListCursor>(cursorCacheSize);
+    protected ArrayList<IInvertedListCursor> invListCursors = new ArrayList<IInvertedListCursor>(cursorCacheSize);
+
+    protected final ArrayTupleBuilder lowerBoundTupleBuilder = new ArrayTupleBuilder(1);
+    protected final ArrayTupleReference lowerBoundTuple = new ArrayTupleReference();
+    protected final ArrayTupleBuilder upperBoundTupleBuilder = new ArrayTupleBuilder(1);
+    protected final ArrayTupleReference upperBoundTuple = new ArrayTupleReference();
+    protected final ConcatenatingTupleReference partLowSearchKey = new ConcatenatingTupleReference(2);
+    protected final ConcatenatingTupleReference partHighSearchKey = new ConcatenatingTupleReference(2);
+
+    protected final IObjectFactory<IInvertedListCursor> invListCursorFactory;
+    protected final IObjectFactory<ArrayList<IInvertedListCursor>> arrayListFactory;
+    protected final ObjectCache<IInvertedListCursor> invListCursorCache;
+    protected final ObjectCache<ArrayList<IInvertedListCursor>> arrayListCache;
+
+    protected final InvertedListPartitions partitions;
+
+    public PartitionedTOccurrenceSearcher(IHyracksCommonContext ctx, IInvertedIndex invIndex) {
+        this.ctx = ctx;
+        this.invListMerger = new InvertedListMerger(ctx, invIndex);
+        this.searchResult = new SearchResult(invIndex.getInvListTypeTraits(), ctx);
+        this.invIndex = invIndex;
+        this.invListCmp = MultiComparator.create(invIndex.getInvListCmpFactories());
+
+        queryTokenAppender = new FrameTupleAppender(ctx.getFrameSize());
+        queryTokenFrame = ctx.allocateFrame();
+
+        invListCursorFactory = new InvertedListCursorFactory(invIndex);
+        arrayListFactory = new ArrayListFactory<IInvertedListCursor>();
+        invListCursorCache = new ObjectCache<IInvertedListCursor>(invListCursorFactory, 10, 10);
+        arrayListCache = new ObjectCache<ArrayList<IInvertedListCursor>>(arrayListFactory, 10, 10);
+
+        PartitionedOnDiskInvertedIndex partInvIndex = (PartitionedOnDiskInvertedIndex) invIndex;
+        partitions = partInvIndex.new InvertedListPartitions(invListCursorCache, arrayListCache);
+    }
+
+    public void reset() {
+        searchResult.clear();
+        invListMerger.reset();
+    }
+
+    public void search(OnDiskInvertedIndexSearchCursor resultCursor, InvertedIndexSearchPredicate searchPred,
+            IIndexOperationContext ictx) throws HyracksDataException, IndexException {
+        ITupleReference queryTuple = searchPred.getQueryTuple();
+        int queryFieldIndex = searchPred.getQueryFieldIndex();
+        IInvertedIndexSearchModifier searchModifier = searchPred.getSearchModifier();
+        IBinaryTokenizer queryTokenizer = searchPred.getQueryTokenizer();
+
+        queryTokenAppender.reset(queryTokenFrame, true);
+        queryTokenizer.reset(queryTuple.getFieldData(queryFieldIndex), queryTuple.getFieldStart(queryFieldIndex),
+                queryTuple.getFieldLength(queryFieldIndex));
+
+        while (queryTokenizer.hasNext()) {
+            queryTokenizer.next();
+            queryTokenBuilder.reset();
+            try {
+                IToken token = queryTokenizer.getToken();
+                token.serializeToken(queryTokenDos);
+                queryTokenBuilder.addFieldEndOffset();
+                // WARNING: assuming one frame is big enough to hold all tokens
+                queryTokenAppender.append(queryTokenBuilder.getFieldEndOffsets(), queryTokenBuilder.getByteArray(), 0,
+                        queryTokenBuilder.getSize());
+            } catch (IOException e) {
+                throw new HyracksDataException(e);
+            }
+        }
+
+        FrameTupleAccessor queryTokenAccessor = new FrameTupleAccessor(ctx.getFrameSize(), queryTokenRecDesc);
+        queryTokenAccessor.reset(queryTokenFrame);
+        int numQueryTokens = queryTokenAccessor.getTupleCount();
+
+        // ALEX NEW CODE STARTS HERE
+        int numTokensLowerBound = searchModifier.getNumTokensLowerBound(numQueryTokens);
+        int numTokensUpperBound = searchModifier.getNumTokensUpperBound(numQueryTokens);
+        ITupleReference lowSearchKey = null;
+        ITupleReference highSearchKey = null;
+        try {
+            if (numTokensLowerBound >= 0) {
+                lowerBoundTupleBuilder.reset();
+                lowerBoundTupleBuilder.getDataOutput().writeInt(numTokensLowerBound);
+                lowerBoundTupleBuilder.addFieldEndOffset();
+                lowerBoundTuple.reset(lowerBoundTupleBuilder.getFieldEndOffsets(),
+                        lowerBoundTupleBuilder.getByteArray());
+                // Only needed for setting the number of fields in searchKey.
+                searchKey.reset(queryTokenAccessor, 0);
+                partLowSearchKey.reset();
+                partLowSearchKey.addTuple(searchKey);
+                partLowSearchKey.addTuple(lowerBoundTuple);
+                lowSearchKey = partLowSearchKey;
+            } else {
+                lowSearchKey = searchKey;
+            }
+            if (numTokensUpperBound >= 0) {
+                upperBoundTupleBuilder.reset();
+                upperBoundTupleBuilder.getDataOutput().writeInt(numTokensUpperBound);
+                upperBoundTupleBuilder.addFieldEndOffset();
+                upperBoundTuple.reset(upperBoundTupleBuilder.getFieldEndOffsets(),
+                        upperBoundTupleBuilder.getByteArray());
+                // Only needed for setting the number of fields in searchKey.
+                searchKey.reset(queryTokenAccessor, 0);
+                partHighSearchKey.reset();
+                partHighSearchKey.addTuple(searchKey);
+                partHighSearchKey.addTuple(upperBoundTuple);
+                highSearchKey = partHighSearchKey;
+            } else {
+                highSearchKey = searchKey;
+            }
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+
+        PartitionedOnDiskInvertedIndex partInvIndex = (PartitionedOnDiskInvertedIndex) invIndex;
+        partitions.reset(numTokensLowerBound, numTokensUpperBound);
+        for (int i = 0; i < numQueryTokens; i++) {
+            searchKey.reset(queryTokenAccessor, i);
+            partInvIndex.openInvertedListPartitionCursors(partitions, lowSearchKey, highSearchKey, ictx);
+        }
+
+        occurrenceThreshold = searchModifier.getOccurrenceThreshold(numQueryTokens);
+        // TODO: deal with panic cases properly
+        if (occurrenceThreshold <= 0) {
+            throw new OccurrenceThresholdPanicException("Merge Threshold is <= 0. Failing Search.");
+        }
+
+        // Process the partitions one-by-one.
+        ArrayList<IInvertedListCursor>[] partitionCursors = partitions.getPartitions();
+        int start = partitions.getMinValidPartitionIndex();
+        int end = partitions.getMaxValidPartitionIndex();
+        searchResult.reset();
+        for (int i = start; i <= end; i++) {
+            if (partitionCursors[i] == null) {
+                continue;
+            }
+            // Prune partition because no element in it can satisfy the occurrence threshold.
+            if (partitionCursors[i].size() < occurrenceThreshold) {
+                continue;
+            }
+            // Merge inverted lists of current partition.
+            int numPrefixLists = searchModifier.getNumPrefixLists(occurrenceThreshold, partitionCursors[i].size());
+            invListMerger.reset();
+            invListMerger.merge(partitionCursors[i], occurrenceThreshold, numPrefixLists, searchResult);
+        }
+
+        resultCursor.open(null, searchPred);
+    }
+
+    public IFrameTupleAccessor createResultFrameTupleAccessor() {
+        return new FixedSizeFrameTupleAccessor(ctx.getFrameSize(), searchResult.getTypeTraits());
+    }
+
+    public ITupleReference createResultFrameTupleReference() {
+        return new FixedSizeTupleReference(searchResult.getTypeTraits());
+    }
+
+    @Override
+    public List<ByteBuffer> getResultBuffers() {
+        return searchResult.getBuffers();
+    }
+
+    @Override
+    public int getNumValidResultBuffers() {
+        return searchResult.getCurrentBufferIndex() + 1;
+    }
+
+    public int getOccurrenceThreshold() {
+        return occurrenceThreshold;
+    }
+
+    public void printNewResults(int maxResultBufIdx, List<ByteBuffer> buffer) {
+        StringBuffer strBuffer = new StringBuffer();
+        FixedSizeFrameTupleAccessor resultFrameTupleAcc = searchResult.getAccessor();
+        for (int i = 0; i <= maxResultBufIdx; i++) {
+            ByteBuffer testBuf = buffer.get(i);
+            resultFrameTupleAcc.reset(testBuf);
+            for (int j = 0; j < resultFrameTupleAcc.getTupleCount(); j++) {
+                strBuffer.append(IntegerSerializerDeserializer.getInt(resultFrameTupleAcc.getBuffer().array(),
+                        resultFrameTupleAcc.getFieldStartOffset(j, 0)) + ",");
+                strBuffer.append(IntegerSerializerDeserializer.getInt(resultFrameTupleAcc.getBuffer().array(),
+                        resultFrameTupleAcc.getFieldStartOffset(j, 1)) + " ");
+            }
+        }
+        System.out.println(strBuffer.toString());
+    }
+}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/SearchResult.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/SearchResult.java
new file mode 100644
index 0000000..aa0d3f2
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/SearchResult.java
@@ -0,0 +1,182 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+
+import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeFrameTupleAccessor;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeFrameTupleAppender;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeTupleReference;
+
+/**
+ * Byte-buffer backed storage for intermediate and final results of inverted-index searches.
+ */
+// TODO: Rename members.
+public class SearchResult {
+    protected final ArrayList<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
+    protected final IHyracksCommonContext ctx;
+    protected final FixedSizeFrameTupleAppender appender;
+    protected final FixedSizeFrameTupleAccessor accessor;
+    protected final FixedSizeTupleReference tuple;
+    protected final ITypeTraits[] typeTraits;
+    protected final int invListElementSize;
+
+    protected int currBufIdx;
+    protected int numResults;
+
+    public SearchResult(ITypeTraits[] invListFields, IHyracksCommonContext ctx) {
+        typeTraits = new ITypeTraits[invListFields.length + 1];
+        int tmp = 0;
+        for (int i = 0; i < invListFields.length; i++) {
+            typeTraits[i] = invListFields[i];
+            tmp += invListFields[i].getFixedLength();
+        }
+        invListElementSize = tmp;
+        // Integer for counting occurrences.
+        typeTraits[invListFields.length] = IntegerPointable.TYPE_TRAITS;
+        this.ctx = ctx;
+        appender = new FixedSizeFrameTupleAppender(ctx.getFrameSize(), typeTraits);
+        accessor = new FixedSizeFrameTupleAccessor(ctx.getFrameSize(), typeTraits);
+        tuple = new FixedSizeTupleReference(typeTraits);
+        buffers.add(ctx.allocateFrame());
+    }
+
+    /**
+     * Initialize from other search-result object to share member instances except for result buffers.
+     */
+    public SearchResult(SearchResult other) {
+        this.ctx = other.ctx;
+        this.appender = other.appender;
+        this.accessor = other.accessor;
+        this.tuple = other.tuple;
+        this.typeTraits = other.typeTraits;
+        this.invListElementSize = other.invListElementSize;
+        buffers.add(ctx.allocateFrame());
+    }
+
+    public FixedSizeFrameTupleAccessor getAccessor() {
+        return accessor;
+    }
+
+    public FixedSizeFrameTupleAppender getAppender() {
+        return appender;
+    }
+
+    public FixedSizeTupleReference getTuple() {
+        return tuple;
+    }
+
+    public ArrayList<ByteBuffer> getBuffers() {
+        return buffers;
+    }
+
+    public void reset() {
+        currBufIdx = 0;
+        numResults = 0;
+        appender.reset(buffers.get(0), true);
+    }
+
+    public void clear() {
+        currBufIdx = 0;
+        numResults = 0;
+        for (ByteBuffer buffer : buffers) {
+            appender.reset(buffer, true);
+        }
+    }
+
+    public void append(ITupleReference invListElement, int count) {
+        ByteBuffer currentBuffer = buffers.get(currBufIdx);
+        if (!appender.hasSpace()) {
+            currBufIdx++;
+            if (currBufIdx >= buffers.size()) {
+                buffers.add(ctx.allocateFrame());
+            }
+            currentBuffer = buffers.get(currBufIdx);
+            appender.reset(currentBuffer, true);
+        }
+        // Append inverted-list element.
+        if (!appender.append(invListElement.getFieldData(0), invListElement.getFieldStart(0), invListElementSize)) {
+            throw new IllegalStateException();
+        }
+        // Append count.
+        if (!appender.append(count)) {
+            throw new IllegalStateException();
+        }
+        appender.incrementTupleCount(1);
+        numResults++;
+    }
+
+    public int getCurrentBufferIndex() {
+        return currBufIdx;
+    }
+
+    public ITypeTraits[] getTypeTraits() {
+        return typeTraits;
+    }
+
+    public int getNumResults() {
+        return numResults;
+    }
+
+    // TODO: This code may help to clean up the core list-merging algorithms.
+    /*
+    public SearchResultCursor getCursor() {
+        cursor.reset();
+        return cursor;
+    }
+    
+    public class SearchResultCursor {
+        private int bufferIndex;
+        private int resultIndex;
+        private int frameResultIndex;
+        private ByteBuffer currentBuffer;
+
+        public void reset() {
+            bufferIndex = 0;
+            resultIndex = 0;
+            frameResultIndex = 0;
+            currentBuffer = buffers.get(0);
+            resultFrameTupleAcc.reset(currentBuffer);
+        }
+
+        public boolean hasNext() {
+            return resultIndex < numResults;
+        }
+
+        public void next() {
+            resultTuple.reset(currentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(frameResultIndex));            
+            if (frameResultIndex < resultFrameTupleAcc.getTupleCount()) {
+                frameResultIndex++;
+            } else {
+                bufferIndex++;
+                currentBuffer = buffers.get(bufferIndex);
+                resultFrameTupleAcc.reset(currentBuffer);
+                frameResultIndex = 0;
+            }            
+            resultIndex++;
+        }
+
+        public ITupleReference getTuple() {
+            return resultTuple;
+        }
+    }
+    */
+}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/TOccurrenceSearcher.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/TOccurrenceSearcher.java
index 419d5aa..e0119a9 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/TOccurrenceSearcher.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/TOccurrenceSearcher.java
@@ -19,16 +19,13 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
 import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -45,27 +42,17 @@
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.exceptions.OccurrenceThresholdPanicException;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeFrameTupleAccessor;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeFrameTupleAppender;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeTupleReference;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.OnDiskInvertedIndexSearchCursor;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizer;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IToken;
 
-// TODO: The search procedure is rather confusing regarding cursor positions, hasNext() calls etc.
-// Needs an overhaul some time.
 public class TOccurrenceSearcher implements IInvertedIndexSearcher {
 
     protected final IHyracksCommonContext ctx;
-    protected final FixedSizeFrameTupleAppender resultFrameTupleApp;
-    protected final FixedSizeFrameTupleAccessor resultFrameTupleAcc;
-    protected final FixedSizeTupleReference resultTuple;
-    protected final int invListKeyLength;
-    protected int currentNumResults;
 
-    protected List<ByteBuffer> newResultBuffers = new ArrayList<ByteBuffer>();
-    protected List<ByteBuffer> prevResultBuffers = new ArrayList<ByteBuffer>();
-    protected List<ByteBuffer> swap = null;
-    protected int maxResultBufIdx = 0;
+    protected final InvertedListMerger invListMerger;
+    protected final SearchResult searchResult;
 
     protected RecordDescriptor queryTokenRecDesc = new RecordDescriptor(
             new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
@@ -77,35 +64,19 @@
 
     protected final IInvertedIndex invIndex;
     protected final MultiComparator invListCmp;
-    protected final ITypeTraits[] invListFieldsWithCount;
     protected int occurrenceThreshold;
 
     protected final int cursorCacheSize = 10;
-    protected List<IInvertedListCursor> invListCursorCache = new ArrayList<IInvertedListCursor>(cursorCacheSize);
-    protected List<IInvertedListCursor> invListCursors = new ArrayList<IInvertedListCursor>(cursorCacheSize);
+    protected ArrayList<IInvertedListCursor> invListCursorCache = new ArrayList<IInvertedListCursor>(cursorCacheSize);
+    protected ArrayList<IInvertedListCursor> invListCursors = new ArrayList<IInvertedListCursor>(cursorCacheSize);
 
     public TOccurrenceSearcher(IHyracksCommonContext ctx, IInvertedIndex invIndex) {
         this.ctx = ctx;
+        this.invListMerger = new InvertedListMerger(ctx, invIndex);
+        this.searchResult = new SearchResult(invIndex.getInvListTypeTraits(), ctx);
         this.invIndex = invIndex;
         this.invListCmp = MultiComparator.create(invIndex.getInvListCmpFactories());
 
-        ITypeTraits[] invListFields = invIndex.getInvListTypeTraits();
-        invListFieldsWithCount = new ITypeTraits[invListFields.length + 1];
-        int tmp = 0;
-        for (int i = 0; i < invListFields.length; i++) {
-            invListFieldsWithCount[i] = invListFields[i];
-            tmp += invListFields[i].getFixedLength();
-        }
-        // using an integer for counting occurrences
-        invListFieldsWithCount[invListFields.length] = IntegerPointable.TYPE_TRAITS;
-        invListKeyLength = tmp;
-
-        resultFrameTupleApp = new FixedSizeFrameTupleAppender(ctx.getFrameSize(), invListFieldsWithCount);
-        resultFrameTupleAcc = new FixedSizeFrameTupleAccessor(ctx.getFrameSize(), invListFieldsWithCount);
-        resultTuple = new FixedSizeTupleReference(invListFieldsWithCount);
-        newResultBuffers.add(ctx.allocateFrame());
-        prevResultBuffers.add(ctx.allocateFrame());
-
         // Pre-create cursor objects.
         for (int i = 0; i < cursorCacheSize; i++) {
             invListCursorCache.add(invIndex.createInvertedListCursor());
@@ -113,18 +84,11 @@
 
         queryTokenAppender = new FrameTupleAppender(ctx.getFrameSize());
         queryTokenFrame = ctx.allocateFrame();
-
-        currentNumResults = 0;
     }
 
     public void reset() {
-        for (ByteBuffer b : newResultBuffers) {
-            resultFrameTupleApp.reset(b, true);
-        }
-        for (ByteBuffer b : prevResultBuffers) {
-            resultFrameTupleApp.reset(b, true);
-        }
-        currentNumResults = 0;
+        searchResult.clear();
+        invListMerger.reset();
     }
 
     public void search(OnDiskInvertedIndexSearchCursor resultCursor, InvertedIndexSearchPredicate searchPred,
@@ -171,348 +135,34 @@
             invIndex.openInvertedListCursor(invListCursorCache.get(i), searchKey, ictx);
             invListCursors.add(invListCursorCache.get(i));
         }
-        Collections.sort(invListCursors);
-        
-        occurrenceThreshold = searchModifier.getOccurrenceThreshold(invListCursors.size());
-        // TODO: deal with panic cases properly
-        if (occurrenceThreshold <= 0) {
-            throw new OccurrenceThresholdPanicException("Merge Threshold is <= 0. Failing Search.");
-        }
-        
-        int numPrefixLists = searchModifier.getNumPrefixLists(invListCursors.size());
-        maxResultBufIdx = mergePrefixLists(numPrefixLists, numQueryTokens);
-        maxResultBufIdx = mergeSuffixLists(numPrefixLists, numQueryTokens, maxResultBufIdx);
 
+        occurrenceThreshold = searchModifier.getOccurrenceThreshold(numQueryTokens);
+        if (occurrenceThreshold <= 0) {
+            throw new OccurrenceThresholdPanicException("Merge threshold is <= 0. Failing Search.");
+        }
+        int numPrefixLists = searchModifier.getNumPrefixLists(occurrenceThreshold, invListCursors.size());
+
+        searchResult.reset();
+        invListMerger.merge(invListCursors, occurrenceThreshold, numPrefixLists, searchResult);
         resultCursor.open(null, searchPred);
     }
 
-    protected int mergePrefixLists(int numPrefixTokens, int numQueryTokens) throws HyracksDataException, IndexException {
-        int maxPrevBufIdx = 0;
-        for (int i = 0; i < numPrefixTokens; i++) {
-            swap = prevResultBuffers;
-            prevResultBuffers = newResultBuffers;
-            newResultBuffers = swap;
-            currentNumResults = 0;
-
-            invListCursors.get(i).pinPages();
-            maxPrevBufIdx = mergePrefixList(invListCursors.get(i), prevResultBuffers, maxPrevBufIdx, newResultBuffers);
-            invListCursors.get(i).unpinPages();
-        }
-        return maxPrevBufIdx;
-    }
-
-    protected int mergeSuffixLists(int numPrefixTokens, int numQueryTokens, int maxPrevBufIdx)
-            throws HyracksDataException, IndexException {
-        for (int i = numPrefixTokens; i < numQueryTokens; i++) {
-            swap = prevResultBuffers;
-            prevResultBuffers = newResultBuffers;
-            newResultBuffers = swap;
-
-            invListCursors.get(i).pinPages();
-            int numInvListElements = invListCursors.get(i).size();
-            // should we binary search the next list or should we sort-merge it?
-            if (currentNumResults * Math.log(numInvListElements) < currentNumResults + numInvListElements) {
-                maxPrevBufIdx = mergeSuffixListProbe(invListCursors.get(i), prevResultBuffers, maxPrevBufIdx,
-                        newResultBuffers, i, numQueryTokens);
-            } else {
-                maxPrevBufIdx = mergeSuffixListScan(invListCursors.get(i), prevResultBuffers, maxPrevBufIdx,
-                        newResultBuffers, i, numQueryTokens);
-            }
-            invListCursors.get(i).unpinPages();
-        }
-        return maxPrevBufIdx;
-    }
-
-    protected int mergeSuffixListProbe(IInvertedListCursor invListCursor, List<ByteBuffer> prevResultBuffers,
-            int maxPrevBufIdx, List<ByteBuffer> newResultBuffers, int invListIx, int numQueryTokens) throws HyracksDataException, IndexException {
-
-        int newBufIdx = 0;
-        ByteBuffer newCurrentBuffer = newResultBuffers.get(0);
-
-        int prevBufIdx = 0;
-        ByteBuffer prevCurrentBuffer = prevResultBuffers.get(0);
-
-        int resultTidx = 0;
-
-        currentNumResults = 0;
-
-        resultFrameTupleAcc.reset(prevCurrentBuffer);
-        resultFrameTupleApp.reset(newCurrentBuffer, true);
-
-        while (resultTidx < resultFrameTupleAcc.getTupleCount()) {
-
-            resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));
-            int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
-                    resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
-
-            if (invListCursor.containsKey(resultTuple, invListCmp)) {
-                count++;
-                newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
-            } else {
-                if (count + numQueryTokens - invListIx > occurrenceThreshold) {
-                    newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
-                }
-            }
-
-            resultTidx++;
-            if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
-                prevBufIdx++;
-                if (prevBufIdx <= maxPrevBufIdx) {
-                    prevCurrentBuffer = prevResultBuffers.get(prevBufIdx);
-                    resultFrameTupleAcc.reset(prevCurrentBuffer);
-                    resultTidx = 0;
-                }
-            }
-        }
-
-        return newBufIdx;
-    }
-
-    protected int mergeSuffixListScan(IInvertedListCursor invListCursor, List<ByteBuffer> prevResultBuffers,
-            int maxPrevBufIdx, List<ByteBuffer> newResultBuffers, int invListIx, int numQueryTokens)
-            throws HyracksDataException, IndexException {
-        
-        int newBufIdx = 0;
-        ByteBuffer newCurrentBuffer = newResultBuffers.get(0);
-
-        int prevBufIdx = 0;
-        ByteBuffer prevCurrentBuffer = prevResultBuffers.get(0);
-
-        boolean advanceCursor = true;
-        boolean advancePrevResult = false;
-        int resultTidx = 0;
-
-        currentNumResults = 0;
-        
-        resultFrameTupleAcc.reset(prevCurrentBuffer);
-        resultFrameTupleApp.reset(newCurrentBuffer, true);
-
-        int invListTidx = 0;
-        int invListNumTuples = invListCursor.size();
-
-        if (invListCursor.hasNext())
-            invListCursor.next();
-
-        while (invListTidx < invListNumTuples && resultTidx < resultFrameTupleAcc.getTupleCount()) {
-
-            ITupleReference invListTuple = invListCursor.getTuple();
-
-            resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));
-
-            int cmp = invListCmp.compare(invListTuple, resultTuple);
-            if (cmp == 0) {
-                int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
-                        resultTuple.getFieldStart(resultTuple.getFieldCount() - 1)) + 1;
-                newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
-                advanceCursor = true;
-                advancePrevResult = true;
-            } else {
-                if (cmp < 0) {
-                    advanceCursor = true;
-                    advancePrevResult = false;
-                } else {
-                    int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
-                            resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
-                    if (count + numQueryTokens - invListIx > occurrenceThreshold) {
-                        newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
-                    }
-                    advanceCursor = false;
-                    advancePrevResult = true;
-                }
-            }
-
-            if (advancePrevResult) {
-                resultTidx++;
-                if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
-                    prevBufIdx++;
-                    if (prevBufIdx <= maxPrevBufIdx) {
-                        prevCurrentBuffer = prevResultBuffers.get(prevBufIdx);
-                        resultFrameTupleAcc.reset(prevCurrentBuffer);
-                        resultTidx = 0;
-                    }
-                }
-            }
-
-            if (advanceCursor) {
-                invListTidx++;
-                if (invListCursor.hasNext()) {
-                    invListCursor.next();
-                }
-            }
-        }
-
-        // append remaining elements from previous result set
-        while (resultTidx < resultFrameTupleAcc.getTupleCount()) {
-
-            resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));
-
-            int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
-                    resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
-            if (count + numQueryTokens - invListIx > occurrenceThreshold) {
-                newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
-            }
-
-            resultTidx++;
-            if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
-                prevBufIdx++;
-                if (prevBufIdx <= maxPrevBufIdx) {
-                    prevCurrentBuffer = prevResultBuffers.get(prevBufIdx);
-                    resultFrameTupleAcc.reset(prevCurrentBuffer);
-                    resultTidx = 0;
-                }
-            }
-        }
-
-        return newBufIdx;
-    }
-
-    protected int mergePrefixList(IInvertedListCursor invListCursor, List<ByteBuffer> prevResultBuffers,
-            int maxPrevBufIdx, List<ByteBuffer> newResultBuffers) throws HyracksDataException, IndexException {
-        
-        int newBufIdx = 0;
-        ByteBuffer newCurrentBuffer = newResultBuffers.get(0);
-
-        int prevBufIdx = 0;
-        ByteBuffer prevCurrentBuffer = prevResultBuffers.get(0);
-
-        boolean advanceCursor = true;
-        boolean advancePrevResult = false;
-        int resultTidx = 0;
-
-        resultFrameTupleAcc.reset(prevCurrentBuffer);
-        resultFrameTupleApp.reset(newCurrentBuffer, true);
-
-        int invListTidx = 0;
-        int invListNumTuples = invListCursor.size();
-
-        if (invListCursor.hasNext())
-            invListCursor.next();
-
-        while (invListTidx < invListNumTuples && resultTidx < resultFrameTupleAcc.getTupleCount()) {
-
-            ITupleReference invListTuple = invListCursor.getTuple();
-            resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));
-
-            int cmp = invListCmp.compare(invListTuple, resultTuple);
-            if (cmp == 0) {
-                int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
-                        resultTuple.getFieldStart(resultTuple.getFieldCount() - 1)) + 1;
-                newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
-                advanceCursor = true;
-                advancePrevResult = true;
-            } else {
-                if (cmp < 0) {
-                    int count = 1;
-                    newBufIdx = appendTupleToNewResults(invListTuple, count, newBufIdx);
-                    advanceCursor = true;
-                    advancePrevResult = false;
-                } else {
-                    int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
-                            resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
-                    newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
-                    advanceCursor = false;
-                    advancePrevResult = true;
-                }
-            }
-
-            if (advancePrevResult) {
-                resultTidx++;
-                if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
-                    prevBufIdx++;
-                    if (prevBufIdx <= maxPrevBufIdx) {
-                        prevCurrentBuffer = prevResultBuffers.get(prevBufIdx);
-                        resultFrameTupleAcc.reset(prevCurrentBuffer);
-                        resultTidx = 0;
-                    }
-                }
-            }
-
-            if (advanceCursor) {
-                invListTidx++;
-                if (invListCursor.hasNext()) {
-                    invListCursor.next();
-                }
-            }
-        }
-
-        // append remaining new elements from inverted list
-        while (invListTidx < invListNumTuples) {
-            ITupleReference invListTuple = invListCursor.getTuple();
-            newBufIdx = appendTupleToNewResults(invListTuple, 1, newBufIdx);
-            invListTidx++;
-            if (invListCursor.hasNext()) {
-                invListCursor.next();
-            }
-        }
-
-        // append remaining elements from previous result set
-        while (resultTidx < resultFrameTupleAcc.getTupleCount()) {
-
-            resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));
-
-            int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
-                    resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
-            newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
-
-            resultTidx++;
-            if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
-                prevBufIdx++;
-                if (prevBufIdx <= maxPrevBufIdx) {
-                    prevCurrentBuffer = prevResultBuffers.get(prevBufIdx);
-                    resultFrameTupleAcc.reset(prevCurrentBuffer);
-                    resultTidx = 0;
-                }
-            }
-        }
-
-        return newBufIdx;
-    }
-
-    protected int appendTupleToNewResults(ITupleReference tuple, int newCount, int newBufIdx) {
-        ByteBuffer newCurrentBuffer = newResultBuffers.get(newBufIdx);
-
-        if (!resultFrameTupleApp.hasSpace()) {
-            newBufIdx++;
-            if (newBufIdx >= newResultBuffers.size()) {
-                newResultBuffers.add(ctx.allocateFrame());
-            }
-            newCurrentBuffer = newResultBuffers.get(newBufIdx);
-            resultFrameTupleApp.reset(newCurrentBuffer, true);
-        }
-
-        // append key
-        if (!resultFrameTupleApp.append(tuple.getFieldData(0), tuple.getFieldStart(0), invListKeyLength)) {
-            throw new IllegalStateException();
-        }
-
-        // append new count
-        if (!resultFrameTupleApp.append(newCount)) {
-            throw new IllegalStateException();
-        }
-
-        resultFrameTupleApp.incrementTupleCount(1);
-
-        currentNumResults++;
-
-        return newBufIdx;
-    }
-
     public IFrameTupleAccessor createResultFrameTupleAccessor() {
-        return new FixedSizeFrameTupleAccessor(ctx.getFrameSize(), invListFieldsWithCount);
+        return new FixedSizeFrameTupleAccessor(ctx.getFrameSize(), searchResult.getTypeTraits());
     }
 
     public ITupleReference createResultFrameTupleReference() {
-        return new FixedSizeTupleReference(invListFieldsWithCount);
+        return new FixedSizeTupleReference(searchResult.getTypeTraits());
     }
 
     @Override
     public List<ByteBuffer> getResultBuffers() {
-        return newResultBuffers;
+        return searchResult.getBuffers();
     }
 
     @Override
     public int getNumValidResultBuffers() {
-        return maxResultBufIdx + 1;
+        return searchResult.getCurrentBufferIndex() + 1;
     }
 
     public int getOccurrenceThreshold() {
@@ -521,6 +171,7 @@
 
     public void printNewResults(int maxResultBufIdx, List<ByteBuffer> buffer) {
         StringBuffer strBuffer = new StringBuffer();
+        FixedSizeFrameTupleAccessor resultFrameTupleAcc = searchResult.getAccessor();
         for (int i = 0; i <= maxResultBufIdx; i++) {
             ByteBuffer testBuf = buffer.get(i);
             resultFrameTupleAcc.reset(testBuf);
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/TOccurrenceSearcherSuffixProbeOnly.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/TOccurrenceSearcherSuffixProbeOnly.java
deleted file mode 100644
index 630b810..0000000
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/TOccurrenceSearcherSuffixProbeOnly.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
-import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.OnDiskInvertedIndex;
-
-public class TOccurrenceSearcherSuffixProbeOnly extends TOccurrenceSearcher {
-
-	protected final MultiComparator invListCmp;
-	
-    public TOccurrenceSearcherSuffixProbeOnly(IHyracksTaskContext ctx, OnDiskInvertedIndex invIndex) {
-        super(ctx, invIndex);
-        this.invListCmp = MultiComparator.create(invIndex.getInvListCmpFactories());
-    }
-
-    protected int mergeSuffixLists(int numPrefixTokens, int numQueryTokens, int maxPrevBufIdx) throws HyracksDataException, IndexException {
-        for (int i = numPrefixTokens; i < numQueryTokens; i++) {
-            swap = prevResultBuffers;
-            prevResultBuffers = newResultBuffers;
-            newResultBuffers = swap;
-            currentNumResults = 0;
-
-            invListCursors.get(i).pinPages();
-            maxPrevBufIdx = mergeSuffixListProbe(invListCursors.get(i), prevResultBuffers, maxPrevBufIdx,
-                    newResultBuffers, i, numQueryTokens);
-            invListCursors.get(i).unpinPages();
-        }
-        return maxPrevBufIdx;
-    }
-
-    protected int mergeSuffixListProbe(IInvertedListCursor invListCursor, List<ByteBuffer> prevResultBuffers,
-            int maxPrevBufIdx, List<ByteBuffer> newResultBuffers, int invListIx, int numQueryTokens) throws HyracksDataException, IndexException {
-
-        int newBufIdx = 0;
-        ByteBuffer newCurrentBuffer = newResultBuffers.get(0);
-
-        int prevBufIdx = 0;
-        ByteBuffer prevCurrentBuffer = prevResultBuffers.get(0);
-
-        int resultTidx = 0;
-
-        resultFrameTupleAcc.reset(prevCurrentBuffer);
-        resultFrameTupleApp.reset(newCurrentBuffer, true);
-
-        while (resultTidx < resultFrameTupleAcc.getTupleCount()) {
-
-            resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));
-            int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
-                    resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
-
-            if (invListCursor.containsKey(resultTuple, invListCmp)) {
-                count++;
-                newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
-            } else {
-                if (count + numQueryTokens - invListIx > occurrenceThreshold) {
-                    newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
-                }
-            }
-
-            resultTidx++;
-            if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
-                prevBufIdx++;
-                if (prevBufIdx <= maxPrevBufIdx) {
-                    prevCurrentBuffer = prevResultBuffers.get(prevBufIdx);
-                    resultFrameTupleAcc.reset(prevCurrentBuffer);
-                    resultTidx = 0;
-                }
-            }
-        }
-
-        return newBufIdx;
-    }
-}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/TOccurrenceSearcherSuffixScanOnly.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/TOccurrenceSearcherSuffixScanOnly.java
deleted file mode 100644
index 3640511..0000000
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/TOccurrenceSearcherSuffixScanOnly.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
-import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.OnDiskInvertedIndex;
-
-public class TOccurrenceSearcherSuffixScanOnly extends TOccurrenceSearcher {
-
-	protected final MultiComparator invListCmp;
-	
-    public TOccurrenceSearcherSuffixScanOnly(IHyracksTaskContext ctx, OnDiskInvertedIndex invIndex) {
-        super(ctx, invIndex);
-        this.invListCmp = MultiComparator.create(invIndex.getInvListCmpFactories());
-    }
-
-    protected int mergeSuffixLists(int numPrefixTokens, int numQueryTokens, int maxPrevBufIdx) throws HyracksDataException, IndexException {
-        for (int i = numPrefixTokens; i < numQueryTokens; i++) {
-            swap = prevResultBuffers;
-            prevResultBuffers = newResultBuffers;
-            newResultBuffers = swap;
-            currentNumResults = 0;
-
-            invListCursors.get(i).pinPages();
-            maxPrevBufIdx = mergeSuffixListScan(invListCursors.get(i), prevResultBuffers, maxPrevBufIdx,
-                    newResultBuffers, i, numQueryTokens);
-            invListCursors.get(i).unpinPages();
-        }
-        return maxPrevBufIdx;
-    }
-
-    protected int mergeSuffixListScan(IInvertedListCursor invListCursor, List<ByteBuffer> prevResultBuffers,
-            int maxPrevBufIdx, List<ByteBuffer> newResultBuffers, int invListIx, int numQueryTokens) throws HyracksDataException, IndexException {
-
-        int newBufIdx = 0;
-        ByteBuffer newCurrentBuffer = newResultBuffers.get(0);
-
-        int prevBufIdx = 0;
-        ByteBuffer prevCurrentBuffer = prevResultBuffers.get(0);
-
-        boolean advanceCursor = true;
-        boolean advancePrevResult = false;
-        int resultTidx = 0;
-
-        resultFrameTupleAcc.reset(prevCurrentBuffer);
-        resultFrameTupleApp.reset(newCurrentBuffer, true);
-
-        while (invListCursor.hasNext() && resultTidx < resultFrameTupleAcc.getTupleCount()) {
-
-            if (advanceCursor)
-                invListCursor.next();
-
-            ITupleReference invListTuple = invListCursor.getTuple();
-
-            resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));
-
-            int cmp = invListCmp.compare(invListTuple, resultTuple);
-            if (cmp == 0) {
-                int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
-                        resultTuple.getFieldStart(resultTuple.getFieldCount() - 1)) + 1;
-                newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
-                advanceCursor = true;
-                advancePrevResult = true;
-            } else {
-                if (cmp < 0) {
-                    advanceCursor = true;
-                    advancePrevResult = false;
-                } else {
-                    int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
-                            resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
-                    if (count + numQueryTokens - invListIx > occurrenceThreshold) {
-                        newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
-                    }
-                    advanceCursor = false;
-                    advancePrevResult = true;
-                }
-            }
-
-            if (advancePrevResult) {
-                resultTidx++;
-                if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
-                    prevBufIdx++;
-                    if (prevBufIdx <= maxPrevBufIdx) {
-                        prevCurrentBuffer = prevResultBuffers.get(prevBufIdx);
-                        resultFrameTupleAcc.reset(prevCurrentBuffer);
-                        resultTidx = 0;
-                    }
-                }
-            }
-        }
-
-        // append remaining elements from previous result set
-        while (resultTidx < resultFrameTupleAcc.getTupleCount()) {
-
-            int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
-                    resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
-            newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
-
-            resultTidx++;
-            if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
-                prevBufIdx++;
-                if (prevBufIdx <= maxPrevBufIdx) {
-                    prevCurrentBuffer = prevResultBuffers.get(prevBufIdx);
-                    resultFrameTupleAcc.reset(prevCurrentBuffer);
-                    resultTidx = 0;
-                }
-            }
-        }
-
-        return newBufIdx;
-    }
-}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/InvertedIndexTokenizingNumTokensTupleIterator.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/InvertedIndexTokenizingNumTokensTupleIterator.java
new file mode 100644
index 0000000..347ea73
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/InvertedIndexTokenizingNumTokensTupleIterator.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util;
+
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizer;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IToken;
+
+// TODO: We can possibly avoid copying the data into a new tuple here.
+public class InvertedIndexTokenizingNumTokensTupleIterator extends InvertedIndexTokenizingTupleIterator {
+
+    protected int numTokens = 0;
+
+    public InvertedIndexTokenizingNumTokensTupleIterator(int tokensFieldCount, int invListFieldCount,
+            IBinaryTokenizer tokenizer) {
+        super(tokensFieldCount, invListFieldCount, tokenizer);
+    }
+
+    public void reset(ITupleReference inputTuple) {
+        super.reset(inputTuple);
+        // Run through the tokenizer once to get the total number of tokens.
+        numTokens = 0;
+        while (tokenizer.hasNext()) {
+            tokenizer.next();
+            numTokens++;
+        }
+        super.reset(inputTuple);
+    }
+
+    public void next() throws HyracksDataException {
+        tokenizer.next();
+        IToken token = tokenizer.getToken();
+        tupleBuilder.reset();
+        try {
+            // Add token field.
+            token.serializeToken(tupleBuilder.getDataOutput());
+            tupleBuilder.addFieldEndOffset();
+            // Add field with number of tokens.
+            tupleBuilder.getDataOutput().writeInt(numTokens);
+            tupleBuilder.addFieldEndOffset();
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+        // Add inverted-list element fields.
+        for (int i = 0; i < invListFieldCount; i++) {
+            tupleBuilder.addField(inputTuple.getFieldData(i + 1), inputTuple.getFieldStart(i + 1),
+                    inputTuple.getFieldLength(i + 1));
+        }
+        // Reset tuple reference for insert operation.
+        tupleReference.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray());
+    }
+}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/InvertedIndexTokenizingTupleIterator.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/InvertedIndexTokenizingTupleIterator.java
index 633213a..4f8f635 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/InvertedIndexTokenizingTupleIterator.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/InvertedIndexTokenizingTupleIterator.java
@@ -27,13 +27,13 @@
 // TODO: We can possibly avoid copying the data into a new tuple here.
 public class InvertedIndexTokenizingTupleIterator {
     // Field that is expected to be tokenized.
-    private final int DOC_FIELD_INDEX = 0;
+    protected final int DOC_FIELD_INDEX = 0;
 
-    private final int invListFieldCount;
-    private final ArrayTupleBuilder tupleBuilder;
-    private final ArrayTupleReference tupleReference;
-    private final IBinaryTokenizer tokenizer;
-    private ITupleReference inputTuple;
+    protected final int invListFieldCount;
+    protected final ArrayTupleBuilder tupleBuilder;
+    protected final ArrayTupleReference tupleReference;
+    protected final IBinaryTokenizer tokenizer;
+    protected ITupleReference inputTuple;
 
     public InvertedIndexTokenizingTupleIterator(int tokensFieldCount, int invListFieldCount, IBinaryTokenizer tokenizer) {
         this.invListFieldCount = invListFieldCount;
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/InvertedIndexUtils.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/InvertedIndexUtils.java
index 7da9416..3ffea85 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/InvertedIndexUtils.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/InvertedIndexUtils.java
@@ -48,6 +48,7 @@
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeElementInvertedListBuilderFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.OnDiskInvertedIndex;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.OnDiskInvertedIndexFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.PartitionedOnDiskInvertedIndex;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
 import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
 import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
@@ -72,6 +73,16 @@
         return new OnDiskInvertedIndex(bufferCache, fileMapProvider, builder, invListTypeTraits, invListCmpFactories,
                 tokenTypeTraits, tokenCmpFactories, btreeFile, invListsFile);
     }
+    
+    public static PartitionedOnDiskInvertedIndex createPartitionedOnDiskInvertedIndex(IBufferCache bufferCache,
+            IFileMapProvider fileMapProvider, ITypeTraits[] invListTypeTraits,
+            IBinaryComparatorFactory[] invListCmpFactories, ITypeTraits[] tokenTypeTraits,
+            IBinaryComparatorFactory[] tokenCmpFactories, FileReference invListsFile) throws IndexException {
+        IInvertedListBuilder builder = new FixedSizeElementInvertedListBuilder(invListTypeTraits);
+        FileReference btreeFile = getBTreeFile(invListsFile);
+        return new PartitionedOnDiskInvertedIndex(bufferCache, fileMapProvider, builder, invListTypeTraits,
+                invListCmpFactories, tokenTypeTraits, tokenCmpFactories, btreeFile, invListsFile);
+    }
 
     public static FileReference getBTreeFile(FileReference invListsFile) {
         return new FileReference(new File(invListsFile.getFile().getPath() + "_btree"));
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/ObjectCache.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/ObjectCache.java
new file mode 100644
index 0000000..b073f20
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/ObjectCache.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util;
+
+import java.util.ArrayList;
+
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IObjectFactory;
+
+public class ObjectCache<T> {
+    protected final int expandSize;
+    protected final IObjectFactory<T> objFactory;
+    protected final ArrayList<T> cache;
+    protected int lastReturned = 0;
+
+    public ObjectCache(IObjectFactory<T> objFactory, int initialSize, int expandSize) {
+        this.objFactory = objFactory;
+        this.cache = new ArrayList<T>(initialSize);
+        this.expandSize = expandSize;
+        expand(initialSize);
+    }
+
+    private void expand(int expandSize) {
+        for (int i = 0; i < expandSize; i++) {
+            cache.add(objFactory.create());
+        }
+    }
+
+    public void reset() {
+        lastReturned = 0;
+    }
+
+    public T getNext() {
+        if (lastReturned >= cache.size()) {
+            expand(expandSize);
+        }
+        return cache.get(lastReturned++);
+    }
+}
diff --git a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndexBulkLoadTest.java b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndexBulkLoadTest.java
new file mode 100644
index 0000000..f641630
--- /dev/null
+++ b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndexBulkLoadTest.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk;
+
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.common.AbstractInvertedIndexLoadTest;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.LSMInvertedIndexTestContext.InvertedIndexType;
+
+public class PartitionedOnDiskInvertedIndexBulkLoadTest extends AbstractInvertedIndexLoadTest {
+
+    public PartitionedOnDiskInvertedIndexBulkLoadTest() {
+        super(InvertedIndexType.PARTITIONED_ONDISK, true, 1);
+    }
+}
diff --git a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestContext.java b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestContext.java
index 34b11c7..eb75a8a 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestContext.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestContext.java
@@ -45,23 +45,28 @@
     public static enum InvertedIndexType {
         INMEMORY,
         ONDISK,
-        LSM
+        LSM,
+        PARTITIONED_INMEMORY,
+        PARTITIONED_ONDISK,
+        PARTITIONED_LSM
     };
 
     protected IInvertedIndex invIndex;
     protected IBinaryComparatorFactory[] allCmpFactories;
     protected IBinaryTokenizerFactory tokenizerFactory;
+    protected InvertedIndexType invIndexType;
     protected InvertedIndexTokenizingTupleIterator indexTupleIter;
     protected HashSet<Comparable> allTokens = new HashSet<Comparable>();
     protected List<ITupleReference> documentCorpus = new ArrayList<ITupleReference>();
     
     public LSMInvertedIndexTestContext(ISerializerDeserializer[] fieldSerdes, IIndex index,
-            IBinaryTokenizerFactory tokenizerFactory) {
+            IBinaryTokenizerFactory tokenizerFactory, InvertedIndexType invIndexType,
+            InvertedIndexTokenizingTupleIterator indexTupleIter) {
         super(fieldSerdes, index);
-        this.tokenizerFactory = tokenizerFactory;
         invIndex = (IInvertedIndex) index;
-        indexTupleIter = new InvertedIndexTokenizingTupleIterator(invIndex.getTokenTypeTraits().length,
-                invIndex.getInvListTypeTraits().length, tokenizerFactory.createTokenizer());
+        this.tokenizerFactory = tokenizerFactory;
+        this.invIndexType = invIndexType;
+        this.indexTupleIter = indexTupleIter;
     }
 
     @Override
@@ -88,8 +93,9 @@
         return allCmpFactories;
     }
 
-    public static LSMInvertedIndexTestContext create(LSMInvertedIndexTestHarness harness, ISerializerDeserializer[] fieldSerdes,
-            int tokenFieldCount, IBinaryTokenizerFactory tokenizerFactory, InvertedIndexType invIndexType) throws IndexException {
+    public static LSMInvertedIndexTestContext create(LSMInvertedIndexTestHarness harness,
+            ISerializerDeserializer[] fieldSerdes, int tokenFieldCount, IBinaryTokenizerFactory tokenizerFactory,
+            InvertedIndexType invIndexType) throws IndexException {
         ITypeTraits[] allTypeTraits = SerdeUtils.serdesToTypeTraits(fieldSerdes);
         IBinaryComparatorFactory[] allCmpFactories = SerdeUtils.serdesToComparatorFactories(fieldSerdes,
                 fieldSerdes.length);
@@ -108,10 +114,11 @@
             invListTypeTraits[i] = allTypeTraits[i + tokenFieldCount];
             invListCmpFactories[i] = allCmpFactories[i + tokenFieldCount];
         }
-        // Create index and test context.
+        // Create index and test context.        
         IInvertedIndex invIndex;
         switch (invIndexType) {
-            case INMEMORY: {
+            case INMEMORY:
+            case PARTITIONED_INMEMORY: {
                 invIndex = InvertedIndexUtils.createInMemoryBTreeInvertedindex(harness.getMemBufferCache(),
                         harness.getMemFreePageManager(), invListTypeTraits, invListCmpFactories, tokenTypeTraits,
                         tokenCmpFactories, tokenizerFactory);
@@ -123,7 +130,14 @@
                         tokenCmpFactories, harness.getInvListsFileRef());
                 break;
             }
-            case LSM: {
+            case PARTITIONED_ONDISK: {
+                invIndex = InvertedIndexUtils.createPartitionedOnDiskInvertedIndex(harness.getDiskBufferCache(),
+                        harness.getDiskFileMapProvider(), invListTypeTraits, invListCmpFactories, tokenTypeTraits,
+                        tokenCmpFactories, harness.getInvListsFileRef());
+                break;
+            }
+            case LSM:
+            case PARTITIONED_LSM: {
                 invIndex = InvertedIndexUtils.createLSMInvertedIndex(harness.getMemBufferCache(),
                         harness.getMemFreePageManager(), harness.getDiskFileMapProvider(), invListTypeTraits,
                         invListCmpFactories, tokenTypeTraits, tokenCmpFactories, tokenizerFactory,
@@ -136,7 +150,29 @@
                 throw new InvertedIndexException("Unknow inverted-index type '" + invIndexType + "'.");
             }
         }
-        LSMInvertedIndexTestContext testCtx = new LSMInvertedIndexTestContext(fieldSerdes, invIndex, tokenizerFactory);
+        InvertedIndexTokenizingTupleIterator indexTupleIter = null;
+        switch (invIndexType) {
+            case INMEMORY:
+            case ONDISK:
+            case LSM: {
+                indexTupleIter = new InvertedIndexTokenizingTupleIterator(invIndex.getTokenTypeTraits().length,
+                        invIndex.getInvListTypeTraits().length, tokenizerFactory.createTokenizer());
+                break;
+            }
+            case PARTITIONED_INMEMORY:
+            case PARTITIONED_ONDISK:
+            case PARTITIONED_LSM: {
+                indexTupleIter = new InvertedIndexTokenizingNumTokensTupleIterator(
+                        invIndex.getTokenTypeTraits().length, invIndex.getInvListTypeTraits().length,
+                        tokenizerFactory.createTokenizer());
+                break;
+            }
+            default: {
+                throw new InvertedIndexException("Unknow inverted-index type '" + invIndexType + "'.");
+            }
+        }
+        LSMInvertedIndexTestContext testCtx = new LSMInvertedIndexTestContext(fieldSerdes, invIndex, tokenizerFactory,
+                invIndexType, indexTupleIter);
         return testCtx;
     }
 
@@ -192,4 +228,8 @@
     public List<ITupleReference> getDocumentCorpus() {
         return documentCorpus;
     }
+    
+    public InvertedIndexType getInvertedIndexType() {
+        return invIndexType;
+    }
 }
diff --git a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestUtils.java b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestUtils.java
index 68824f5..53237d9 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestUtils.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestUtils.java
@@ -99,57 +99,105 @@
         return tupleGen;
     }
 
+    private static ISerializerDeserializer[] getNonHashedIndexFieldSerdes(InvertedIndexType invIndexType)
+            throws IndexException {
+        ISerializerDeserializer[] fieldSerdes = null;
+        switch (invIndexType) {
+            case INMEMORY:
+            case ONDISK:
+            case LSM: {
+                fieldSerdes = new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE };
+                break;
+            }
+            case PARTITIONED_INMEMORY:
+            case PARTITIONED_ONDISK:
+            case PARTITIONED_LSM: {
+                // Such indexes also include the set-size for partitioning.
+                fieldSerdes = new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE };
+                break;
+            }
+            default: {
+                throw new IndexException("Unhandled inverted index type '" + invIndexType + "'.");
+            }
+        }
+        return fieldSerdes;
+    }
+    
+    private static ISerializerDeserializer[] getHashedIndexFieldSerdes(InvertedIndexType invIndexType)
+            throws IndexException {
+        ISerializerDeserializer[] fieldSerdes = null;
+        switch (invIndexType) {
+            case INMEMORY:
+            case ONDISK:
+            case LSM: {
+                fieldSerdes = new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE };
+                break;
+            }
+            case PARTITIONED_INMEMORY:
+            case PARTITIONED_ONDISK:
+            case PARTITIONED_LSM: {
+                // Such indexes also include the set-size for partitioning.
+                fieldSerdes = new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE };
+                break;
+            }
+            default: {
+                throw new IndexException("Unhandled inverted index type '" + invIndexType + "'.");
+            }
+        }
+        return fieldSerdes;
+    }
+    
     public static LSMInvertedIndexTestContext createWordInvIndexTestContext(LSMInvertedIndexTestHarness harness,
             InvertedIndexType invIndexType) throws IOException, IndexException {
-        ISerializerDeserializer[] fieldSerdes = new ISerializerDeserializer[] {
-                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE };
+        ISerializerDeserializer[] fieldSerdes = getNonHashedIndexFieldSerdes(invIndexType);
         ITokenFactory tokenFactory = new UTF8WordTokenFactory();
         IBinaryTokenizerFactory tokenizerFactory = new DelimitedUTF8StringBinaryTokenizerFactory(true, false,
                 tokenFactory);
-        LSMInvertedIndexTestContext testCtx = LSMInvertedIndexTestContext.create(harness, fieldSerdes, 1, tokenizerFactory,
-                invIndexType);
+        LSMInvertedIndexTestContext testCtx = LSMInvertedIndexTestContext.create(harness, fieldSerdes,
+                fieldSerdes.length - 1, tokenizerFactory, invIndexType);
         return testCtx;
     }
-
+    
     public static LSMInvertedIndexTestContext createHashedWordInvIndexTestContext(LSMInvertedIndexTestHarness harness,
             InvertedIndexType invIndexType) throws IOException, IndexException {
-        ISerializerDeserializer[] fieldSerdes = new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE,
-                IntegerSerializerDeserializer.INSTANCE };
+        ISerializerDeserializer[] fieldSerdes = getHashedIndexFieldSerdes(invIndexType);
         ITokenFactory tokenFactory = new HashedUTF8WordTokenFactory();
         IBinaryTokenizerFactory tokenizerFactory = new DelimitedUTF8StringBinaryTokenizerFactory(true, false,
                 tokenFactory);
-        LSMInvertedIndexTestContext testCtx = LSMInvertedIndexTestContext.create(harness, fieldSerdes, 1, tokenizerFactory,
-                invIndexType);
+        LSMInvertedIndexTestContext testCtx = LSMInvertedIndexTestContext.create(harness, fieldSerdes,
+                fieldSerdes.length - 1, tokenizerFactory, invIndexType);
         return testCtx;
     }
 
     public static LSMInvertedIndexTestContext createNGramInvIndexTestContext(LSMInvertedIndexTestHarness harness,
             InvertedIndexType invIndexType) throws IOException, IndexException {
-        ISerializerDeserializer[] fieldSerdes = new ISerializerDeserializer[] {
-                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE };
+        ISerializerDeserializer[] fieldSerdes = getNonHashedIndexFieldSerdes(invIndexType);
         ITokenFactory tokenFactory = new UTF8NGramTokenFactory();
         IBinaryTokenizerFactory tokenizerFactory = new NGramUTF8StringBinaryTokenizerFactory(TEST_GRAM_LENGTH, true,
                 true, false, tokenFactory);
-        LSMInvertedIndexTestContext testCtx = LSMInvertedIndexTestContext.create(harness, fieldSerdes, 1, tokenizerFactory,
-                invIndexType);
+        LSMInvertedIndexTestContext testCtx = LSMInvertedIndexTestContext.create(harness, fieldSerdes,
+                fieldSerdes.length - 1, tokenizerFactory, invIndexType);
         return testCtx;
     }
 
     public static LSMInvertedIndexTestContext createHashedNGramInvIndexTestContext(LSMInvertedIndexTestHarness harness,
             InvertedIndexType invIndexType) throws IOException, IndexException {
-        ISerializerDeserializer[] fieldSerdes = new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE,
-                IntegerSerializerDeserializer.INSTANCE };
+        ISerializerDeserializer[] fieldSerdes = getHashedIndexFieldSerdes(invIndexType);
         ITokenFactory tokenFactory = new HashedUTF8NGramTokenFactory();
         IBinaryTokenizerFactory tokenizerFactory = new NGramUTF8StringBinaryTokenizerFactory(TEST_GRAM_LENGTH, true,
                 true, false, tokenFactory);
-        LSMInvertedIndexTestContext testCtx = LSMInvertedIndexTestContext.create(harness, fieldSerdes, 1, tokenizerFactory,
-                invIndexType);
+        LSMInvertedIndexTestContext testCtx = LSMInvertedIndexTestContext.create(harness, fieldSerdes,
+                fieldSerdes.length - 1, tokenizerFactory, invIndexType);
         return testCtx;
     }
 
     public static void bulkLoadInvIndex(LSMInvertedIndexTestContext testCtx, TupleGenerator tupleGen, int numDocs)
             throws IndexException, IOException {
-        SortedSet<CheckTuple> tmpMemIndex = new TreeSet<CheckTuple>();;
+        SortedSet<CheckTuple> tmpMemIndex = new TreeSet<CheckTuple>();
         // First generate the expected index by inserting the documents one-by-one.
         for (int i = 0; i < numDocs; i++) {
             ITupleReference tuple = tupleGen.next();
@@ -330,10 +378,33 @@
     /**
      * Determine the expected results with the simple ScanCount algorithm.
      */
+    public static void getExpectedResults(int[] scanCountArray, TreeSet<CheckTuple> checkTuples,
+            ITupleReference searchDocument, IBinaryTokenizer tokenizer, ISerializerDeserializer tokenSerde,
+            IInvertedIndexSearchModifier searchModifier, List<Integer> expectedResults, InvertedIndexType invIndexType)
+            throws IOException {
+        boolean isPartitioned = false;
+        switch (invIndexType) {
+            case INMEMORY:
+            case ONDISK:
+            case LSM: {
+                isPartitioned = false;
+                break;
+            }
+            case PARTITIONED_INMEMORY:
+            case PARTITIONED_ONDISK:
+            case PARTITIONED_LSM: {
+                isPartitioned = true;
+                break;
+            }
+        }
+        getExpectedResults(scanCountArray, checkTuples, searchDocument, tokenizer, tokenSerde,
+                searchModifier, expectedResults, isPartitioned);
+    }
+    
     @SuppressWarnings("unchecked")
     public static void getExpectedResults(int[] scanCountArray, TreeSet<CheckTuple> checkTuples,
             ITupleReference searchDocument, IBinaryTokenizer tokenizer, ISerializerDeserializer tokenSerde,
-            IInvertedIndexSearchModifier searchModifier, List<Integer> expectedResults) throws IOException {
+            IInvertedIndexSearchModifier searchModifier, List<Integer> expectedResults, boolean isPartitioned) throws IOException {
         // Reset scan count array.
         Arrays.fill(scanCountArray, 0);
         expectedResults.clear();
@@ -341,9 +412,25 @@
         ByteArrayAccessibleOutputStream baaos = new ByteArrayAccessibleOutputStream();
         tokenizer.reset(searchDocument.getFieldData(0), searchDocument.getFieldStart(0),
                 searchDocument.getFieldLength(0));
+        // Run though tokenizer to get number of tokens.
         int numQueryTokens = 0;
         while (tokenizer.hasNext()) {
             tokenizer.next();
+            numQueryTokens++;
+        }
+        int numTokensLowerBound = -1;
+        int numTokensUpperBound = -1;
+        int invListElementField = 1;
+        if (isPartitioned) {
+            numTokensLowerBound = searchModifier.getNumTokensLowerBound(numQueryTokens);
+            numTokensUpperBound = searchModifier.getNumTokensUpperBound(numQueryTokens);
+            invListElementField = 2;
+        }
+        int occurrenceThreshold = searchModifier.getOccurrenceThreshold(numQueryTokens);
+        tokenizer.reset(searchDocument.getFieldData(0), searchDocument.getFieldStart(0),
+                searchDocument.getFieldLength(0));
+        while (tokenizer.hasNext()) {
+            tokenizer.next();
             IToken token = tokenizer.getToken();
             baaos.reset();
             DataOutput out = new DataOutputStream(baaos);
@@ -351,10 +438,28 @@
             ByteArrayInputStream inStream = new ByteArrayInputStream(baaos.getByteArray(), 0, baaos.size());
             DataInput dataIn = new DataInputStream(inStream);
             Comparable tokenObj = (Comparable) tokenSerde.deserialize(dataIn);
-            CheckTuple lowKey = new CheckTuple(1, 1);
-            lowKey.appendField(tokenObj);
-            CheckTuple highKey = new CheckTuple(1, 1);
-            highKey.appendField(tokenObj);
+            CheckTuple lowKey;
+            if (numTokensLowerBound < 0) {
+                // Index is not partitioned, or no length filtering is possible for this search modifier.
+                lowKey = new CheckTuple(1, 1);
+                lowKey.appendField(tokenObj);
+            } else {
+                // Index is length partitioned, and search modifier supports length filtering.
+                lowKey = new CheckTuple(2, 2);
+                lowKey.appendField(tokenObj);
+                lowKey.appendField(Integer.valueOf(numTokensLowerBound));
+            }
+            CheckTuple highKey;
+            if (numTokensUpperBound < 0) {
+                // Index is not partitioned, or no length filtering is possible for this search modifier.
+                highKey = new CheckTuple(1, 1);
+                highKey.appendField(tokenObj);
+            } else {
+                // Index is length partitioned, and search modifier supports length filtering.
+                highKey = new CheckTuple(2, 2);
+                highKey.appendField(tokenObj);
+                highKey.appendField(Integer.valueOf(numTokensUpperBound));
+            }                        
 
             // Get view over check tuples containing inverted-list corresponding to token. 
             SortedSet<CheckTuple> invList = OrderedIndexTestUtils.getPrefixExpectedSubset(checkTuples, lowKey, highKey);
@@ -362,13 +467,11 @@
             // Iterate over inverted list and update scan count array.
             while (invListIter.hasNext()) {
                 CheckTuple checkTuple = invListIter.next();
-                Integer element = (Integer) checkTuple.getField(1);
+                Integer element = (Integer) checkTuple.getField(invListElementField);
                 scanCountArray[element]++;
             }
-            numQueryTokens++;
         }
-
-        int occurrenceThreshold = searchModifier.getOccurrenceThreshold(numQueryTokens);
+        
         // Run through scan count array, and see whether elements satisfy the given occurrence threshold.
         expectedResults.clear();
         for (int i = 0; i < scanCountArray.length; i++) {
@@ -394,7 +497,7 @@
         IIndexCursor resultCursor = accessor.createSearchCursor();
         int numQueries = numDocQueries + numRandomQueries;
         for (int i = 0; i < numQueries; i++) {
-            // If number of documents in the corpus io less than numDocQueries, then replace the remaining ones with random queries.
+            // If number of documents in the corpus is less than numDocQueries, then replace the remaining ones with random queries.
             if (i >= numDocQueries || i >= documentCorpus.size()) {
                 // Generate a random query.
                 ITupleReference randomQuery = tupleGen.next();
@@ -404,7 +507,7 @@
                 int queryIndex = Math.abs(rnd.nextInt() % documentCorpus.size());
                 searchDocument.reset(documentCorpus.get(queryIndex));
             }
-
+            
             // Set query tuple in search predicate.
             searchPred.setQueryTuple(searchDocument);
             searchPred.setQueryFieldIndex(0);
@@ -438,8 +541,9 @@
 
                     // Get expected results.
                     List<Integer> expectedResults = new ArrayList<Integer>();
-                    LSMInvertedIndexTestUtils.getExpectedResults(scanCountArray, testCtx.getCheckTuples(), searchDocument,
-                            tokenizer, testCtx.getFieldSerdes()[0], searchModifier, expectedResults);
+                    LSMInvertedIndexTestUtils.getExpectedResults(scanCountArray, testCtx.getCheckTuples(),
+                            searchDocument, tokenizer, testCtx.getFieldSerdes()[0], searchModifier, expectedResults,
+                            testCtx.getInvertedIndexType());
 
                     Iterator<Integer> expectedIter = expectedResults.iterator();
                     Iterator<Integer> actualIter = actualResults.iterator();