Implemented bulk loading and basic search for the on-disk components of length-partitioned inverted indexes.

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_length_filter@2462 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/api/IInvertedIndexSearchModifier.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/api/IInvertedIndexSearchModifier.java
index 315b29e..afe082d 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/api/IInvertedIndexSearchModifier.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/api/IInvertedIndexSearchModifier.java
@@ -19,7 +19,7 @@
 public interface IInvertedIndexSearchModifier {
     public int getOccurrenceThreshold(int numQueryTokens);
 
-    public int getNumPrefixLists(int numQueryTokens);
+    public int getNumPrefixLists(int occurrenceThreshold, int numInvLists);
     
     public int getNumTokensLowerBound(int numQueryTokens);
     
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndex.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndex.java
index 95ee45e..b7063fb 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndex.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndex.java
@@ -67,6 +67,8 @@
         private final ObjectCache<IInvertedListCursor> invListCursorCache;
         private final ObjectCache<ArrayList<IInvertedListCursor>> arrayListCache;
         private ArrayList<IInvertedListCursor>[] partitions;
+        private int minValidPartitionIndex;
+        private int maxValidPartitionIndex;
 
         public InvertedListPartitions(ObjectCache<IInvertedListCursor> invListCursorCache,
                 ObjectCache<ArrayList<IInvertedListCursor>> arrayListCache) {
@@ -77,19 +79,23 @@
         @SuppressWarnings("unchecked")
         public void reset(int numTokensLowerBound, int numTokensUpperBound) {
             if (partitions == null) {
-                int initialSize = numTokensUpperBound;
+                int initialSize;
                 if (numTokensUpperBound < 0) {
                     initialSize = DEFAULT_NUM_PARTITIONS;
+                } else {
+                    initialSize = numTokensUpperBound + 1;
                 }
                 partitions = (ArrayList<IInvertedListCursor>[]) new ArrayList[initialSize];
             } else {
-                if (numTokensUpperBound >= partitions.length) {
-                    Arrays.copyOf(partitions, numTokensUpperBound);
+                if (numTokensUpperBound + 1 >= partitions.length) {
+                    partitions = Arrays.copyOf(partitions, numTokensUpperBound + 1);
                 }
                 Arrays.fill(partitions, null);
             }
             invListCursorCache.reset();
             arrayListCache.reset();
+            minValidPartitionIndex = Integer.MAX_VALUE;
+            maxValidPartitionIndex = Integer.MIN_VALUE;
         }
 
         public void addInvertedListCursor(ITupleReference btreeTuple) {
@@ -98,13 +104,21 @@
             int numTokens = IntegerSerializerDeserializer.getInt(
                     btreeTuple.getFieldData(PARTITIONING_NUM_TOKENS_FIELD),
                     btreeTuple.getFieldStart(PARTITIONING_NUM_TOKENS_FIELD));
-            if (numTokens >= partitions.length) {
+            if (numTokens + 1 >= partitions.length) {
                 partitions = Arrays.copyOf(partitions, numTokens + PARTITIONS_SLACK_SIZE);
             }
             ArrayList<IInvertedListCursor> partitionCursors = partitions[numTokens];
             if (partitionCursors == null) {
                 partitionCursors = arrayListCache.getNext();
+                partitionCursors.clear();
                 partitions[numTokens] = partitionCursors;
+                // Update range of valid partitions.
+                if (numTokens < minValidPartitionIndex) {
+                    minValidPartitionIndex = numTokens;
+                }
+                if (numTokens > maxValidPartitionIndex) {
+                    maxValidPartitionIndex = numTokens;
+                }
             }
             partitionCursors.add(listCursor);
         }
@@ -112,6 +126,14 @@
         public ArrayList<IInvertedListCursor>[] getPartitions() {
             return partitions;
         }
+        
+        public int getMinValidPartitionIndex() {
+            return minValidPartitionIndex;
+        }
+
+        public int getMaxValidPartitionIndex() {
+            return maxValidPartitionIndex;
+        }
     }
     
     public void openInvertedListPartitionCursors(InvertedListPartitions invListPartitions, ITupleReference lowSearchKey,
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/ConjunctiveSearchModifier.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/ConjunctiveSearchModifier.java
index 14d8414..55c5f30 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/ConjunctiveSearchModifier.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/ConjunctiveSearchModifier.java
@@ -25,7 +25,7 @@
     }
 
     @Override
-    public int getNumPrefixLists(int numQueryTokens) {
+    public int getNumPrefixLists(int occurrenceThreshold, int numInvLists) {
         return 1;
     }
     
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/EditDistanceSearchModifier.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/EditDistanceSearchModifier.java
index a9f4471..26e11ba 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/EditDistanceSearchModifier.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/EditDistanceSearchModifier.java
@@ -33,8 +33,8 @@
     }
 
     @Override
-    public int getNumPrefixLists(int numQueryTokens) {
-        return numQueryTokens - getOccurrenceThreshold(numQueryTokens) + 1;
+    public int getNumPrefixLists(int occurrenceThreshold, int numInvLists) {
+        return numInvLists - occurrenceThreshold + 1;
     }
 
     @Override
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/InvertedListMerger.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/InvertedListMerger.java
new file mode 100644
index 0000000..18ed73c
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/InvertedListMerger.java
@@ -0,0 +1,330 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+
+import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeFrameTupleAccessor;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeTupleReference;
+
+// TODO: The merge procedure is rather confusing regarding cursor positions, hasNext() calls etc.
+// Needs an overhaul some time.
+public class InvertedListMerger {
+
+    protected final MultiComparator invListCmp;
+    protected SearchResult prevSearchResult;
+    protected SearchResult newSearchResult;
+
+    public InvertedListMerger(IHyracksCommonContext ctx, IInvertedIndex invIndex) {
+        this.invListCmp = MultiComparator.create(invIndex.getInvListCmpFactories());
+        this.prevSearchResult = new SearchResult(invIndex.getInvListTypeTraits(), ctx);
+        this.newSearchResult = new SearchResult(prevSearchResult);
+    }
+
+    public void merge(ArrayList<IInvertedListCursor> invListCursors, int occurrenceThreshold, int numPrefixLists,
+            SearchResult searchResult) throws HyracksDataException, IndexException {
+        Collections.sort(invListCursors);
+        int numInvLists = invListCursors.size();
+        SearchResult result = null;
+        for (int i = 0; i < numInvLists; i++) {
+            SearchResult swapTemp = prevSearchResult;
+            prevSearchResult = newSearchResult;
+            newSearchResult = swapTemp;
+            newSearchResult.reset();
+            if (i + 1 != numInvLists) {
+                // Use temporary search results when not merging last list.
+                result = newSearchResult;
+            } else {
+                // When merging the last list, append results to the final search result.
+                result = searchResult;
+            }
+            IInvertedListCursor invListCursor = invListCursors.get(i);
+            invListCursor.pinPages();
+            if (i < numPrefixLists) {
+                // Merge prefix list.
+                mergePrefixList(invListCursor, prevSearchResult, result);
+            } else {
+                // Merge suffix list.
+                int numInvListElements = invListCursor.size();
+                int currentNumResults = prevSearchResult.getNumResults();
+                // Should we binary search the next list or should we sort-merge it?
+                if (currentNumResults * Math.log(numInvListElements) < currentNumResults + numInvListElements) {
+                    mergeSuffixListProbe(invListCursor, prevSearchResult, result, i, numInvLists,
+                            occurrenceThreshold);
+                } else {
+                    mergeSuffixListScan(invListCursor, prevSearchResult, result, i, numInvLists,
+                            occurrenceThreshold);
+                }
+            }
+            invListCursor.unpinPages();
+        }
+    }
+
+    protected void mergeSuffixListProbe(IInvertedListCursor invListCursor, SearchResult prevSearchResult,
+            SearchResult newSearchResult, int invListIx, int numInvLists, int occurrenceThreshold)
+            throws HyracksDataException, IndexException {
+
+        int prevBufIdx = 0;
+        int maxPrevBufIdx = prevSearchResult.getCurrentBufferIndex();
+        ByteBuffer prevCurrentBuffer = prevSearchResult.getBuffers().get(0);
+
+        FixedSizeFrameTupleAccessor resultFrameTupleAcc = prevSearchResult.getAccessor();
+        FixedSizeTupleReference resultTuple = prevSearchResult.getTuple();
+
+        int resultTidx = 0;
+
+        resultFrameTupleAcc.reset(prevCurrentBuffer);
+
+        while (resultTidx < resultFrameTupleAcc.getTupleCount()) {
+
+            resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));
+            int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
+                    resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
+
+            if (invListCursor.containsKey(resultTuple, invListCmp)) {
+                count++;
+                newSearchResult.append(resultTuple, count);
+            } else {
+                if (count + numInvLists - invListIx > occurrenceThreshold) {
+                    newSearchResult.append(resultTuple, count);
+                }
+            }
+
+            resultTidx++;
+            if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
+                prevBufIdx++;
+                if (prevBufIdx <= maxPrevBufIdx) {
+                    prevCurrentBuffer = prevSearchResult.getBuffers().get(prevBufIdx);
+                    resultFrameTupleAcc.reset(prevCurrentBuffer);
+                    resultTidx = 0;
+                }
+            }
+        }
+    }
+
+    protected void mergeSuffixListScan(IInvertedListCursor invListCursor, SearchResult prevSearchResult,
+            SearchResult newSearchResult, int invListIx, int numInvLists, int occurrenceThreshold)
+            throws HyracksDataException, IndexException {
+
+        int prevBufIdx = 0;
+        int maxPrevBufIdx = prevSearchResult.getCurrentBufferIndex();
+        ByteBuffer prevCurrentBuffer = prevSearchResult.getBuffers().get(0);
+
+        FixedSizeFrameTupleAccessor resultFrameTupleAcc = prevSearchResult.getAccessor();
+        FixedSizeTupleReference resultTuple = prevSearchResult.getTuple();
+
+        boolean advanceCursor = true;
+        boolean advancePrevResult = false;
+        int resultTidx = 0;
+
+        resultFrameTupleAcc.reset(prevCurrentBuffer);
+
+        int invListTidx = 0;
+        int invListNumTuples = invListCursor.size();
+
+        if (invListCursor.hasNext())
+            invListCursor.next();
+
+        while (invListTidx < invListNumTuples && resultTidx < resultFrameTupleAcc.getTupleCount()) {
+
+            ITupleReference invListTuple = invListCursor.getTuple();
+
+            resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));
+
+            int cmp = invListCmp.compare(invListTuple, resultTuple);
+            if (cmp == 0) {
+                int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
+                        resultTuple.getFieldStart(resultTuple.getFieldCount() - 1)) + 1;
+                newSearchResult.append(resultTuple, count);
+                advanceCursor = true;
+                advancePrevResult = true;
+            } else {
+                if (cmp < 0) {
+                    advanceCursor = true;
+                    advancePrevResult = false;
+                } else {
+                    int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
+                            resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
+                    if (count + numInvLists - invListIx > occurrenceThreshold) {
+                        newSearchResult.append(resultTuple, count);
+                    }
+                    advanceCursor = false;
+                    advancePrevResult = true;
+                }
+            }
+
+            if (advancePrevResult) {
+                resultTidx++;
+                if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
+                    prevBufIdx++;
+                    if (prevBufIdx <= maxPrevBufIdx) {
+                        prevCurrentBuffer = prevSearchResult.getBuffers().get(prevBufIdx);
+                        resultFrameTupleAcc.reset(prevCurrentBuffer);
+                        resultTidx = 0;
+                    }
+                }
+            }
+
+            if (advanceCursor) {
+                invListTidx++;
+                if (invListCursor.hasNext()) {
+                    invListCursor.next();
+                }
+            }
+        }
+
+        // append remaining elements from previous result set
+        while (resultTidx < resultFrameTupleAcc.getTupleCount()) {
+
+            resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));
+
+            int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
+                    resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
+            if (count + numInvLists - invListIx > occurrenceThreshold) {
+                newSearchResult.append(resultTuple, count);
+            }
+
+            resultTidx++;
+            if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
+                prevBufIdx++;
+                if (prevBufIdx <= maxPrevBufIdx) {
+                    prevCurrentBuffer = prevSearchResult.getBuffers().get(prevBufIdx);
+                    resultFrameTupleAcc.reset(prevCurrentBuffer);
+                    resultTidx = 0;
+                }
+            }
+        }
+    }
+
+    protected void mergePrefixList(IInvertedListCursor invListCursor, SearchResult prevSearchResult,
+            SearchResult newSearchResult) throws HyracksDataException, IndexException {
+
+        int prevBufIdx = 0;
+        int maxPrevBufIdx = prevSearchResult.getCurrentBufferIndex();
+        ByteBuffer prevCurrentBuffer = prevSearchResult.getBuffers().get(0);
+
+        FixedSizeFrameTupleAccessor resultFrameTupleAcc = prevSearchResult.getAccessor();
+        FixedSizeTupleReference resultTuple = prevSearchResult.getTuple();
+
+        boolean advanceCursor = true;
+        boolean advancePrevResult = false;
+        int resultTidx = 0;
+
+        resultFrameTupleAcc.reset(prevCurrentBuffer);
+
+        int invListTidx = 0;
+        int invListNumTuples = invListCursor.size();
+
+        if (invListCursor.hasNext())
+            invListCursor.next();
+
+        while (invListTidx < invListNumTuples && resultTidx < resultFrameTupleAcc.getTupleCount()) {
+
+            ITupleReference invListTuple = invListCursor.getTuple();
+            resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));
+
+            int cmp = invListCmp.compare(invListTuple, resultTuple);
+            if (cmp == 0) {
+                int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
+                        resultTuple.getFieldStart(resultTuple.getFieldCount() - 1)) + 1;
+                newSearchResult.append(resultTuple, count);
+                advanceCursor = true;
+                advancePrevResult = true;
+            } else {
+                if (cmp < 0) {
+                    int count = 1;
+                    newSearchResult.append(invListTuple, count);
+                    advanceCursor = true;
+                    advancePrevResult = false;
+                } else {
+                    int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
+                            resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
+                    newSearchResult.append(resultTuple, count);
+                    advanceCursor = false;
+                    advancePrevResult = true;
+                }
+            }
+
+            if (advancePrevResult) {
+                resultTidx++;
+                if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
+                    prevBufIdx++;
+                    if (prevBufIdx <= maxPrevBufIdx) {
+                        prevCurrentBuffer = prevSearchResult.getBuffers().get(prevBufIdx);
+                        resultFrameTupleAcc.reset(prevCurrentBuffer);
+                        resultTidx = 0;
+                    }
+                }
+            }
+
+            if (advanceCursor) {
+                invListTidx++;
+                if (invListCursor.hasNext()) {
+                    invListCursor.next();
+                }
+            }
+        }
+
+        // append remaining new elements from inverted list
+        while (invListTidx < invListNumTuples) {
+            ITupleReference invListTuple = invListCursor.getTuple();
+            newSearchResult.append(invListTuple, 1);
+            invListTidx++;
+            if (invListCursor.hasNext()) {
+                invListCursor.next();
+            }
+        }
+
+        // append remaining elements from previous result set
+        while (resultTidx < resultFrameTupleAcc.getTupleCount()) {
+
+            resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));
+
+            int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
+                    resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
+            newSearchResult.append(resultTuple, count);
+
+            resultTidx++;
+            if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
+                prevBufIdx++;
+                if (prevBufIdx <= maxPrevBufIdx) {
+                    prevCurrentBuffer = prevSearchResult.getBuffers().get(prevBufIdx);
+                    resultFrameTupleAcc.reset(prevCurrentBuffer);
+                    resultTidx = 0;
+                }
+            }
+        }
+    }
+
+    public SearchResult createSearchResult() {
+        return new SearchResult(prevSearchResult);
+    }
+
+    public void reset() {
+        prevSearchResult.clear();
+        newSearchResult.clear();
+    }
+}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/JaccardSearchModifier.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/JaccardSearchModifier.java
index d1facc4..b005905 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/JaccardSearchModifier.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/JaccardSearchModifier.java
@@ -31,11 +31,11 @@
     }
 
     @Override
-    public int getNumPrefixLists(int numQueryTokens) {
-        if (numQueryTokens == 0) {
+    public int getNumPrefixLists(int occurrenceThreshold, int numInvLists) {
+        if (numInvLists == 0) {
             return 0;
         }
-        return numQueryTokens - getOccurrenceThreshold(numQueryTokens) + 1;
+        return numInvLists - occurrenceThreshold + 1;
     }
 
     @Override
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/PartitionedTOccurrenceSearcher.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/PartitionedTOccurrenceSearcher.java
index db5bcff..439f1ad 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/PartitionedTOccurrenceSearcher.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/PartitionedTOccurrenceSearcher.java
@@ -19,16 +19,13 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
 import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleReference;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
@@ -48,7 +45,6 @@
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IObjectFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.exceptions.OccurrenceThresholdPanicException;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeFrameTupleAccessor;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeFrameTupleAppender;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeTupleReference;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.OnDiskInvertedIndexSearchCursor;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.PartitionedOnDiskInvertedIndex;
@@ -57,21 +53,12 @@
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IToken;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.ObjectCache;
 
-// TODO: The search procedure is rather confusing regarding cursor positions, hasNext() calls etc.
-// Needs an overhaul some time.
 public class PartitionedTOccurrenceSearcher implements IInvertedIndexSearcher {
 
     protected final IHyracksCommonContext ctx;
-    protected final FixedSizeFrameTupleAppender resultFrameTupleApp;
-    protected final FixedSizeFrameTupleAccessor resultFrameTupleAcc;
-    protected final FixedSizeTupleReference resultTuple;
-    protected final int invListKeyLength;
-    protected int currentNumResults;
 
-    protected List<ByteBuffer> newResultBuffers = new ArrayList<ByteBuffer>();
-    protected List<ByteBuffer> prevResultBuffers = new ArrayList<ByteBuffer>();
-    protected List<ByteBuffer> swap = null;
-    protected int maxResultBufIdx = 0;
+    protected final InvertedListMerger invListMerger;
+    protected final SearchResult searchResult;
 
     protected RecordDescriptor queryTokenRecDesc = new RecordDescriptor(
             new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
@@ -83,12 +70,11 @@
 
     protected final IInvertedIndex invIndex;
     protected final MultiComparator invListCmp;
-    protected final ITypeTraits[] invListFieldsWithCount;
     protected int occurrenceThreshold;
 
     protected final int cursorCacheSize = 10;
-    //protected List<IInvertedListCursor> invListCursorCache = new ArrayList<IInvertedListCursor>(cursorCacheSize);
-    protected List<IInvertedListCursor> invListCursors = new ArrayList<IInvertedListCursor>(cursorCacheSize);
+    //protected ArrayList<IInvertedListCursor> invListCursorCache = new ArrayList<IInvertedListCursor>(cursorCacheSize);
+    protected ArrayList<IInvertedListCursor> invListCursors = new ArrayList<IInvertedListCursor>(cursorCacheSize);
 
     protected final ArrayTupleBuilder lowerBoundTupleBuilder = new ArrayTupleBuilder(1);
     protected final ArrayTupleReference lowerBoundTuple = new ArrayTupleReference();
@@ -96,58 +82,36 @@
     protected final ArrayTupleReference upperBoundTuple = new ArrayTupleReference();
     protected final ConcatenatingTupleReference partLowSearchKey = new ConcatenatingTupleReference(2);
     protected final ConcatenatingTupleReference partHighSearchKey = new ConcatenatingTupleReference(2);
-    
+
     protected final IObjectFactory<IInvertedListCursor> invListCursorFactory;
     protected final IObjectFactory<ArrayList<IInvertedListCursor>> arrayListFactory;
     protected final ObjectCache<IInvertedListCursor> invListCursorCache;
     protected final ObjectCache<ArrayList<IInvertedListCursor>> arrayListCache;
-    
+
     protected final InvertedListPartitions partitions;
-    
+
     public PartitionedTOccurrenceSearcher(IHyracksCommonContext ctx, IInvertedIndex invIndex) {
         this.ctx = ctx;
+        this.invListMerger = new InvertedListMerger(ctx, invIndex);
+        this.searchResult = new SearchResult(invIndex.getInvListTypeTraits(), ctx);
         this.invIndex = invIndex;
         this.invListCmp = MultiComparator.create(invIndex.getInvListCmpFactories());
 
-        ITypeTraits[] invListFields = invIndex.getInvListTypeTraits();
-        invListFieldsWithCount = new ITypeTraits[invListFields.length + 1];
-        int tmp = 0;
-        for (int i = 0; i < invListFields.length; i++) {
-            invListFieldsWithCount[i] = invListFields[i];
-            tmp += invListFields[i].getFixedLength();
-        }
-        // using an integer for counting occurrences
-        invListFieldsWithCount[invListFields.length] = IntegerPointable.TYPE_TRAITS;
-        invListKeyLength = tmp;
-
-        resultFrameTupleApp = new FixedSizeFrameTupleAppender(ctx.getFrameSize(), invListFieldsWithCount);
-        resultFrameTupleAcc = new FixedSizeFrameTupleAccessor(ctx.getFrameSize(), invListFieldsWithCount);
-        resultTuple = new FixedSizeTupleReference(invListFieldsWithCount);
-        newResultBuffers.add(ctx.allocateFrame());
-        prevResultBuffers.add(ctx.allocateFrame());
-
         queryTokenAppender = new FrameTupleAppender(ctx.getFrameSize());
         queryTokenFrame = ctx.allocateFrame();
 
         invListCursorFactory = new InvertedListCursorFactory(invIndex);
-        arrayListFactory = new ArrayListFactory<IInvertedListCursor>();        
+        arrayListFactory = new ArrayListFactory<IInvertedListCursor>();
         invListCursorCache = new ObjectCache<IInvertedListCursor>(invListCursorFactory, 10, 10);
         arrayListCache = new ObjectCache<ArrayList<IInvertedListCursor>>(arrayListFactory, 10, 10);
-        
+
         PartitionedOnDiskInvertedIndex partInvIndex = (PartitionedOnDiskInvertedIndex) invIndex;
         partitions = partInvIndex.new InvertedListPartitions(invListCursorCache, arrayListCache);
-        
-        currentNumResults = 0;
     }
 
     public void reset() {
-        for (ByteBuffer b : newResultBuffers) {
-            resultFrameTupleApp.reset(b, true);
-        }
-        for (ByteBuffer b : prevResultBuffers) {
-            resultFrameTupleApp.reset(b, true);
-        }
-        currentNumResults = 0;
+        searchResult.clear();
+        invListMerger.reset();
     }
 
     public void search(OnDiskInvertedIndexSearchCursor resultCursor, InvertedIndexSearchPredicate searchPred,
@@ -179,10 +143,10 @@
         FrameTupleAccessor queryTokenAccessor = new FrameTupleAccessor(ctx.getFrameSize(), queryTokenRecDesc);
         queryTokenAccessor.reset(queryTokenFrame);
         int numQueryTokens = queryTokenAccessor.getTupleCount();
-        
+
         // ALEX NEW CODE STARTS HERE
         int numTokensLowerBound = searchModifier.getNumTokensLowerBound(numQueryTokens);
-        int numTokensUpperBound = searchModifier.getNumTokensLowerBound(numQueryTokens);
+        int numTokensUpperBound = searchModifier.getNumTokensUpperBound(numQueryTokens);
         ITupleReference lowSearchKey = null;
         ITupleReference highSearchKey = null;
         try {
@@ -192,6 +156,8 @@
                 lowerBoundTupleBuilder.addFieldEndOffset();
                 lowerBoundTuple.reset(lowerBoundTupleBuilder.getFieldEndOffsets(),
                         lowerBoundTupleBuilder.getByteArray());
+                // Only needed for setting the number of fields in searchKey.
+                searchKey.reset(queryTokenAccessor, 0);
                 partLowSearchKey.reset();
                 partLowSearchKey.addTuple(searchKey);
                 partLowSearchKey.addTuple(lowerBoundTuple);
@@ -205,6 +171,8 @@
                 upperBoundTupleBuilder.addFieldEndOffset();
                 upperBoundTuple.reset(upperBoundTupleBuilder.getFieldEndOffsets(),
                         upperBoundTupleBuilder.getByteArray());
+                // Only needed for setting the number of fields in searchKey.
+                searchKey.reset(queryTokenAccessor, 0);
                 partHighSearchKey.reset();
                 partHighSearchKey.addTuple(searchKey);
                 partHighSearchKey.addTuple(upperBoundTuple);
@@ -215,11 +183,6 @@
         } catch (IOException e) {
             throw new HyracksDataException(e);
         }
-        occurrenceThreshold = searchModifier.getOccurrenceThreshold(numQueryTokens);
-        // TODO: deal with panic cases properly
-        if (occurrenceThreshold <= 0) {
-            throw new OccurrenceThresholdPanicException("Merge Threshold is <= 0. Failing Search.");
-        }
 
         PartitionedOnDiskInvertedIndex partInvIndex = (PartitionedOnDiskInvertedIndex) invIndex;
         partitions.reset(numTokensLowerBound, numTokensUpperBound);
@@ -228,10 +191,17 @@
             partInvIndex.openInvertedListPartitionCursors(partitions, lowSearchKey, highSearchKey, ictx);
         }
 
+        occurrenceThreshold = searchModifier.getOccurrenceThreshold(numQueryTokens);
+        // TODO: deal with panic cases properly
+        if (occurrenceThreshold <= 0) {
+            throw new OccurrenceThresholdPanicException("Merge Threshold is <= 0. Failing Search.");
+        }
+
         // Process the partitions one-by-one.
         ArrayList<IInvertedListCursor>[] partitionCursors = partitions.getPartitions();
-        int start = (numTokensLowerBound >= 0) ? numTokensLowerBound : 0;
-        int end = (numTokensUpperBound >= 0) ? numTokensUpperBound : partitionCursors.length - 1;
+        int start = partitions.getMinValidPartitionIndex();
+        int end = partitions.getMaxValidPartitionIndex();
+        searchResult.reset();
         for (int i = start; i <= end; i++) {
             if (partitionCursors[i] == null) {
                 continue;
@@ -240,350 +210,31 @@
             if (partitionCursors[i].size() < occurrenceThreshold) {
                 continue;
             }
-            
-            // Process partition.
-            Collections.sort(partitionCursors[i]);
-            // TODO: Continue here.
-            
+            // Merge inverted lists of current partition.
+            int numPrefixLists = searchModifier.getNumPrefixLists(occurrenceThreshold, partitionCursors[i].size());
+            invListMerger.reset();
+            invListMerger.merge(partitionCursors[i], occurrenceThreshold, numPrefixLists, searchResult);
         }
-        
-        
-        
-        
-        int numPrefixLists = searchModifier.getNumPrefixLists(invListCursors.size());
-        maxResultBufIdx = mergePrefixLists(numPrefixLists, numQueryTokens);
-        maxResultBufIdx = mergeSuffixLists(numPrefixLists, numQueryTokens, maxResultBufIdx);
 
         resultCursor.open(null, searchPred);
     }
 
-    protected int mergePrefixLists(int numPrefixTokens, int numQueryTokens) throws HyracksDataException, IndexException {
-        int maxPrevBufIdx = 0;
-        for (int i = 0; i < numPrefixTokens; i++) {
-            swap = prevResultBuffers;
-            prevResultBuffers = newResultBuffers;
-            newResultBuffers = swap;
-            currentNumResults = 0;
-
-            invListCursors.get(i).pinPages();
-            maxPrevBufIdx = mergePrefixList(invListCursors.get(i), prevResultBuffers, maxPrevBufIdx, newResultBuffers);
-            invListCursors.get(i).unpinPages();
-        }
-        return maxPrevBufIdx;
-    }
-
-    protected int mergeSuffixLists(int numPrefixTokens, int numQueryTokens, int maxPrevBufIdx)
-            throws HyracksDataException, IndexException {
-        for (int i = numPrefixTokens; i < numQueryTokens; i++) {
-            swap = prevResultBuffers;
-            prevResultBuffers = newResultBuffers;
-            newResultBuffers = swap;
-
-            invListCursors.get(i).pinPages();
-            int numInvListElements = invListCursors.get(i).size();
-            // should we binary search the next list or should we sort-merge it?
-            if (currentNumResults * Math.log(numInvListElements) < currentNumResults + numInvListElements) {
-                maxPrevBufIdx = mergeSuffixListProbe(invListCursors.get(i), prevResultBuffers, maxPrevBufIdx,
-                        newResultBuffers, i, numQueryTokens);
-            } else {
-                maxPrevBufIdx = mergeSuffixListScan(invListCursors.get(i), prevResultBuffers, maxPrevBufIdx,
-                        newResultBuffers, i, numQueryTokens);
-            }
-            invListCursors.get(i).unpinPages();
-        }
-        return maxPrevBufIdx;
-    }
-
-    protected int mergeSuffixListProbe(IInvertedListCursor invListCursor, List<ByteBuffer> prevResultBuffers,
-            int maxPrevBufIdx, List<ByteBuffer> newResultBuffers, int invListIx, int numQueryTokens) throws HyracksDataException, IndexException {
-
-        int newBufIdx = 0;
-        ByteBuffer newCurrentBuffer = newResultBuffers.get(0);
-
-        int prevBufIdx = 0;
-        ByteBuffer prevCurrentBuffer = prevResultBuffers.get(0);
-
-        int resultTidx = 0;
-
-        currentNumResults = 0;
-
-        resultFrameTupleAcc.reset(prevCurrentBuffer);
-        resultFrameTupleApp.reset(newCurrentBuffer, true);
-
-        while (resultTidx < resultFrameTupleAcc.getTupleCount()) {
-
-            resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));
-            int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
-                    resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
-
-            if (invListCursor.containsKey(resultTuple, invListCmp)) {
-                count++;
-                newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
-            } else {
-                if (count + numQueryTokens - invListIx > occurrenceThreshold) {
-                    newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
-                }
-            }
-
-            resultTidx++;
-            if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
-                prevBufIdx++;
-                if (prevBufIdx <= maxPrevBufIdx) {
-                    prevCurrentBuffer = prevResultBuffers.get(prevBufIdx);
-                    resultFrameTupleAcc.reset(prevCurrentBuffer);
-                    resultTidx = 0;
-                }
-            }
-        }
-
-        return newBufIdx;
-    }
-
-    protected int mergeSuffixListScan(IInvertedListCursor invListCursor, List<ByteBuffer> prevResultBuffers,
-            int maxPrevBufIdx, List<ByteBuffer> newResultBuffers, int invListIx, int numQueryTokens)
-            throws HyracksDataException, IndexException {
-        
-        int newBufIdx = 0;
-        ByteBuffer newCurrentBuffer = newResultBuffers.get(0);
-
-        int prevBufIdx = 0;
-        ByteBuffer prevCurrentBuffer = prevResultBuffers.get(0);
-
-        boolean advanceCursor = true;
-        boolean advancePrevResult = false;
-        int resultTidx = 0;
-
-        currentNumResults = 0;
-        
-        resultFrameTupleAcc.reset(prevCurrentBuffer);
-        resultFrameTupleApp.reset(newCurrentBuffer, true);
-
-        int invListTidx = 0;
-        int invListNumTuples = invListCursor.size();
-
-        if (invListCursor.hasNext())
-            invListCursor.next();
-
-        while (invListTidx < invListNumTuples && resultTidx < resultFrameTupleAcc.getTupleCount()) {
-
-            ITupleReference invListTuple = invListCursor.getTuple();
-
-            resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));
-
-            int cmp = invListCmp.compare(invListTuple, resultTuple);
-            if (cmp == 0) {
-                int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
-                        resultTuple.getFieldStart(resultTuple.getFieldCount() - 1)) + 1;
-                newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
-                advanceCursor = true;
-                advancePrevResult = true;
-            } else {
-                if (cmp < 0) {
-                    advanceCursor = true;
-                    advancePrevResult = false;
-                } else {
-                    int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
-                            resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
-                    if (count + numQueryTokens - invListIx > occurrenceThreshold) {
-                        newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
-                    }
-                    advanceCursor = false;
-                    advancePrevResult = true;
-                }
-            }
-
-            if (advancePrevResult) {
-                resultTidx++;
-                if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
-                    prevBufIdx++;
-                    if (prevBufIdx <= maxPrevBufIdx) {
-                        prevCurrentBuffer = prevResultBuffers.get(prevBufIdx);
-                        resultFrameTupleAcc.reset(prevCurrentBuffer);
-                        resultTidx = 0;
-                    }
-                }
-            }
-
-            if (advanceCursor) {
-                invListTidx++;
-                if (invListCursor.hasNext()) {
-                    invListCursor.next();
-                }
-            }
-        }
-
-        // append remaining elements from previous result set
-        while (resultTidx < resultFrameTupleAcc.getTupleCount()) {
-
-            resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));
-
-            int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
-                    resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
-            if (count + numQueryTokens - invListIx > occurrenceThreshold) {
-                newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
-            }
-
-            resultTidx++;
-            if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
-                prevBufIdx++;
-                if (prevBufIdx <= maxPrevBufIdx) {
-                    prevCurrentBuffer = prevResultBuffers.get(prevBufIdx);
-                    resultFrameTupleAcc.reset(prevCurrentBuffer);
-                    resultTidx = 0;
-                }
-            }
-        }
-
-        return newBufIdx;
-    }
-
-    protected int mergePrefixList(IInvertedListCursor invListCursor, List<ByteBuffer> prevResultBuffers,
-            int maxPrevBufIdx, List<ByteBuffer> newResultBuffers) throws HyracksDataException, IndexException {
-        
-        int newBufIdx = 0;
-        ByteBuffer newCurrentBuffer = newResultBuffers.get(0);
-
-        int prevBufIdx = 0;
-        ByteBuffer prevCurrentBuffer = prevResultBuffers.get(0);
-
-        boolean advanceCursor = true;
-        boolean advancePrevResult = false;
-        int resultTidx = 0;
-
-        resultFrameTupleAcc.reset(prevCurrentBuffer);
-        resultFrameTupleApp.reset(newCurrentBuffer, true);
-
-        int invListTidx = 0;
-        int invListNumTuples = invListCursor.size();
-
-        if (invListCursor.hasNext())
-            invListCursor.next();
-
-        while (invListTidx < invListNumTuples && resultTidx < resultFrameTupleAcc.getTupleCount()) {
-
-            ITupleReference invListTuple = invListCursor.getTuple();
-            resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));
-
-            int cmp = invListCmp.compare(invListTuple, resultTuple);
-            if (cmp == 0) {
-                int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
-                        resultTuple.getFieldStart(resultTuple.getFieldCount() - 1)) + 1;
-                newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
-                advanceCursor = true;
-                advancePrevResult = true;
-            } else {
-                if (cmp < 0) {
-                    int count = 1;
-                    newBufIdx = appendTupleToNewResults(invListTuple, count, newBufIdx);
-                    advanceCursor = true;
-                    advancePrevResult = false;
-                } else {
-                    int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
-                            resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
-                    newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
-                    advanceCursor = false;
-                    advancePrevResult = true;
-                }
-            }
-
-            if (advancePrevResult) {
-                resultTidx++;
-                if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
-                    prevBufIdx++;
-                    if (prevBufIdx <= maxPrevBufIdx) {
-                        prevCurrentBuffer = prevResultBuffers.get(prevBufIdx);
-                        resultFrameTupleAcc.reset(prevCurrentBuffer);
-                        resultTidx = 0;
-                    }
-                }
-            }
-
-            if (advanceCursor) {
-                invListTidx++;
-                if (invListCursor.hasNext()) {
-                    invListCursor.next();
-                }
-            }
-        }
-
-        // append remaining new elements from inverted list
-        while (invListTidx < invListNumTuples) {
-            ITupleReference invListTuple = invListCursor.getTuple();
-            newBufIdx = appendTupleToNewResults(invListTuple, 1, newBufIdx);
-            invListTidx++;
-            if (invListCursor.hasNext()) {
-                invListCursor.next();
-            }
-        }
-
-        // append remaining elements from previous result set
-        while (resultTidx < resultFrameTupleAcc.getTupleCount()) {
-
-            resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));
-
-            int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
-                    resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
-            newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
-
-            resultTidx++;
-            if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
-                prevBufIdx++;
-                if (prevBufIdx <= maxPrevBufIdx) {
-                    prevCurrentBuffer = prevResultBuffers.get(prevBufIdx);
-                    resultFrameTupleAcc.reset(prevCurrentBuffer);
-                    resultTidx = 0;
-                }
-            }
-        }
-
-        return newBufIdx;
-    }
-
-    protected int appendTupleToNewResults(ITupleReference tuple, int newCount, int newBufIdx) {
-        ByteBuffer newCurrentBuffer = newResultBuffers.get(newBufIdx);
-
-        if (!resultFrameTupleApp.hasSpace()) {
-            newBufIdx++;
-            if (newBufIdx >= newResultBuffers.size()) {
-                newResultBuffers.add(ctx.allocateFrame());
-            }
-            newCurrentBuffer = newResultBuffers.get(newBufIdx);
-            resultFrameTupleApp.reset(newCurrentBuffer, true);
-        }
-
-        // append key
-        if (!resultFrameTupleApp.append(tuple.getFieldData(0), tuple.getFieldStart(0), invListKeyLength)) {
-            throw new IllegalStateException();
-        }
-
-        // append new count
-        if (!resultFrameTupleApp.append(newCount)) {
-            throw new IllegalStateException();
-        }
-
-        resultFrameTupleApp.incrementTupleCount(1);
-
-        currentNumResults++;
-
-        return newBufIdx;
-    }
-
     public IFrameTupleAccessor createResultFrameTupleAccessor() {
-        return new FixedSizeFrameTupleAccessor(ctx.getFrameSize(), invListFieldsWithCount);
+        return new FixedSizeFrameTupleAccessor(ctx.getFrameSize(), searchResult.getTypeTraits());
     }
 
     public ITupleReference createResultFrameTupleReference() {
-        return new FixedSizeTupleReference(invListFieldsWithCount);
+        return new FixedSizeTupleReference(searchResult.getTypeTraits());
     }
 
     @Override
     public List<ByteBuffer> getResultBuffers() {
-        return newResultBuffers;
+        return searchResult.getBuffers();
     }
 
     @Override
     public int getNumValidResultBuffers() {
-        return maxResultBufIdx + 1;
+        return searchResult.getCurrentBufferIndex() + 1;
     }
 
     public int getOccurrenceThreshold() {
@@ -592,6 +243,7 @@
 
     public void printNewResults(int maxResultBufIdx, List<ByteBuffer> buffer) {
         StringBuffer strBuffer = new StringBuffer();
+        FixedSizeFrameTupleAccessor resultFrameTupleAcc = searchResult.getAccessor();
         for (int i = 0; i <= maxResultBufIdx; i++) {
             ByteBuffer testBuf = buffer.get(i);
             resultFrameTupleAcc.reset(testBuf);
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/SearchResult.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/SearchResult.java
index def477d..aa0d3f2 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/SearchResult.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/SearchResult.java
@@ -33,12 +33,12 @@
 public class SearchResult {
     protected final ArrayList<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
     protected final IHyracksCommonContext ctx;
-    protected final FixedSizeFrameTupleAppender resultFrameTupleApp;
-    protected final FixedSizeFrameTupleAccessor resultFrameTupleAcc;
-    protected final FixedSizeTupleReference resultTuple;
+    protected final FixedSizeFrameTupleAppender appender;
+    protected final FixedSizeFrameTupleAccessor accessor;
+    protected final FixedSizeTupleReference tuple;
     protected final ITypeTraits[] typeTraits;
     protected final int invListElementSize;
-    
+
     protected int currBufIdx;
     protected int numResults;
 
@@ -53,9 +53,9 @@
         // Integer for counting occurrences.
         typeTraits[invListFields.length] = IntegerPointable.TYPE_TRAITS;
         this.ctx = ctx;
-        resultFrameTupleApp = new FixedSizeFrameTupleAppender(ctx.getFrameSize(), typeTraits);
-        resultFrameTupleAcc = new FixedSizeFrameTupleAccessor(ctx.getFrameSize(), typeTraits);
-        resultTuple = new FixedSizeTupleReference(typeTraits);
+        appender = new FixedSizeFrameTupleAppender(ctx.getFrameSize(), typeTraits);
+        accessor = new FixedSizeFrameTupleAccessor(ctx.getFrameSize(), typeTraits);
+        tuple = new FixedSizeTupleReference(typeTraits);
         buffers.add(ctx.allocateFrame());
     }
 
@@ -64,64 +64,63 @@
      */
     public SearchResult(SearchResult other) {
         this.ctx = other.ctx;
-        this.resultFrameTupleApp = other.resultFrameTupleApp;
-        this.resultFrameTupleAcc = other.resultFrameTupleAcc;
-        this.resultTuple = other.resultTuple;
+        this.appender = other.appender;
+        this.accessor = other.accessor;
+        this.tuple = other.tuple;
         this.typeTraits = other.typeTraits;
         this.invListElementSize = other.invListElementSize;
         buffers.add(ctx.allocateFrame());
     }
 
     public FixedSizeFrameTupleAccessor getAccessor() {
-        return resultFrameTupleAcc;
+        return accessor;
     }
-    
+
     public FixedSizeFrameTupleAppender getAppender() {
-        return resultFrameTupleApp;
+        return appender;
     }
-    
+
     public FixedSizeTupleReference getTuple() {
-        return resultTuple;
+        return tuple;
     }
-    
+
     public ArrayList<ByteBuffer> getBuffers() {
         return buffers;
     }
-    
+
     public void reset() {
         currBufIdx = 0;
         numResults = 0;
-        resultFrameTupleApp.reset(buffers.get(0), true);
+        appender.reset(buffers.get(0), true);
     }
-    
+
     public void clear() {
         currBufIdx = 0;
         numResults = 0;
         for (ByteBuffer buffer : buffers) {
-            resultFrameTupleApp.reset(buffer, true);
+            appender.reset(buffer, true);
         }
     }
 
     public void append(ITupleReference invListElement, int count) {
         ByteBuffer currentBuffer = buffers.get(currBufIdx);
-        if (!resultFrameTupleApp.hasSpace()) {
+        if (!appender.hasSpace()) {
             currBufIdx++;
             if (currBufIdx >= buffers.size()) {
                 buffers.add(ctx.allocateFrame());
             }
             currentBuffer = buffers.get(currBufIdx);
-            resultFrameTupleApp.reset(currentBuffer, true);
+            appender.reset(currentBuffer, true);
         }
         // Append inverted-list element.
-        if (!resultFrameTupleApp.append(invListElement.getFieldData(0), invListElement.getFieldStart(0),
-                invListElementSize)) {
+        if (!appender.append(invListElement.getFieldData(0), invListElement.getFieldStart(0), invListElementSize)) {
             throw new IllegalStateException();
         }
         // Append count.
-        if (!resultFrameTupleApp.append(count)) {
+        if (!appender.append(count)) {
             throw new IllegalStateException();
         }
-        resultFrameTupleApp.incrementTupleCount(1);
+        appender.incrementTupleCount(1);
         numResults++;
     }
 
@@ -132,8 +131,52 @@
     public ITypeTraits[] getTypeTraits() {
         return typeTraits;
     }
-    
+
     public int getNumResults() {
         return numResults;
     }
+
+    // TODO: This code may help to clean up the core list-merging algorithms.
+    /*
+    public SearchResultCursor getCursor() {
+        cursor.reset();
+        return cursor;
+    }
+    
+    public class SearchResultCursor {
+        private int bufferIndex;
+        private int resultIndex;
+        private int frameResultIndex;
+        private ByteBuffer currentBuffer;
+
+        public void reset() {
+            bufferIndex = 0;
+            resultIndex = 0;
+            frameResultIndex = 0;
+            currentBuffer = buffers.get(0);
+            resultFrameTupleAcc.reset(currentBuffer);
+        }
+
+        public boolean hasNext() {
+            return resultIndex < numResults;
+        }
+
+        public void next() {
+            resultTuple.reset(currentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(frameResultIndex));            
+            if (frameResultIndex < resultFrameTupleAcc.getTupleCount()) {
+                frameResultIndex++;
+            } else {
+                bufferIndex++;
+                currentBuffer = buffers.get(bufferIndex);
+                resultFrameTupleAcc.reset(currentBuffer);
+                frameResultIndex = 0;
+            }            
+            resultIndex++;
+        }
+
+        public ITupleReference getTuple() {
+            return resultTuple;
+        }
+    }
+    */
 }
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/TOccurrenceSearcher.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/TOccurrenceSearcher.java
index b642d1b..e0119a9 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/TOccurrenceSearcher.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/TOccurrenceSearcher.java
@@ -19,7 +19,6 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
@@ -48,14 +47,12 @@
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizer;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IToken;
 
-// TODO: The search procedure is rather confusing regarding cursor positions, hasNext() calls etc.
-// Needs an overhaul some time.
 public class TOccurrenceSearcher implements IInvertedIndexSearcher {
 
     protected final IHyracksCommonContext ctx;
-    
-    protected SearchResult newSearchResult;
-    protected SearchResult prevSearchResult;    
+
+    protected final InvertedListMerger invListMerger;
+    protected final SearchResult searchResult;
 
     protected RecordDescriptor queryTokenRecDesc = new RecordDescriptor(
             new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
@@ -70,17 +67,16 @@
     protected int occurrenceThreshold;
 
     protected final int cursorCacheSize = 10;
-    protected List<IInvertedListCursor> invListCursorCache = new ArrayList<IInvertedListCursor>(cursorCacheSize);
-    protected List<IInvertedListCursor> invListCursors = new ArrayList<IInvertedListCursor>(cursorCacheSize);
-    
+    protected ArrayList<IInvertedListCursor> invListCursorCache = new ArrayList<IInvertedListCursor>(cursorCacheSize);
+    protected ArrayList<IInvertedListCursor> invListCursors = new ArrayList<IInvertedListCursor>(cursorCacheSize);
+
     public TOccurrenceSearcher(IHyracksCommonContext ctx, IInvertedIndex invIndex) {
         this.ctx = ctx;
+        this.invListMerger = new InvertedListMerger(ctx, invIndex);
+        this.searchResult = new SearchResult(invIndex.getInvListTypeTraits(), ctx);
         this.invIndex = invIndex;
         this.invListCmp = MultiComparator.create(invIndex.getInvListCmpFactories());
 
-        this.prevSearchResult = new SearchResult(invIndex.getInvListTypeTraits(), ctx);
-        this.newSearchResult = new SearchResult(prevSearchResult);
-
         // Pre-create cursor objects.
         for (int i = 0; i < cursorCacheSize; i++) {
             invListCursorCache.add(invIndex.createInvertedListCursor());
@@ -91,8 +87,8 @@
     }
 
     public void reset() {
-        prevSearchResult.clear();
-        newSearchResult.clear();
+        searchResult.clear();
+        invListMerger.reset();
     }
 
     public void search(OnDiskInvertedIndexSearchCursor resultCursor, InvertedIndexSearchPredicate searchPred,
@@ -139,307 +135,34 @@
             invIndex.openInvertedListCursor(invListCursorCache.get(i), searchKey, ictx);
             invListCursors.add(invListCursorCache.get(i));
         }
-        Collections.sort(invListCursors);
-        
-        occurrenceThreshold = searchModifier.getOccurrenceThreshold(invListCursors.size());
-        // TODO: deal with panic cases properly
-        if (occurrenceThreshold <= 0) {
-            throw new OccurrenceThresholdPanicException("Merge Threshold is <= 0. Failing Search.");
-        }
-        
-        int numPrefixLists = searchModifier.getNumPrefixLists(invListCursors.size());
-        mergePrefixLists(numPrefixLists, numQueryTokens);
-        mergeSuffixLists(numPrefixLists, numQueryTokens);
 
+        occurrenceThreshold = searchModifier.getOccurrenceThreshold(numQueryTokens);
+        if (occurrenceThreshold <= 0) {
+            throw new OccurrenceThresholdPanicException("Merge threshold is <= 0. Failing Search.");
+        }
+        int numPrefixLists = searchModifier.getNumPrefixLists(occurrenceThreshold, invListCursors.size());
+
+        searchResult.reset();
+        invListMerger.merge(invListCursors, occurrenceThreshold, numPrefixLists, searchResult);
         resultCursor.open(null, searchPred);
     }
 
-    protected void mergePrefixLists(int numPrefixTokens, int numQueryTokens) throws HyracksDataException,
-            IndexException {
-        for (int i = 0; i < numPrefixTokens; i++) {
-            SearchResult swapTemp = prevSearchResult;
-            prevSearchResult = newSearchResult;
-            newSearchResult = swapTemp;
-            newSearchResult.reset();
-
-            invListCursors.get(i).pinPages();
-            mergePrefixList(invListCursors.get(i), prevSearchResult, newSearchResult);
-            invListCursors.get(i).unpinPages();
-        }
-    }
-
-    protected void mergeSuffixLists(int numPrefixTokens, int numQueryTokens) throws HyracksDataException,
-            IndexException {
-        for (int i = numPrefixTokens; i < numQueryTokens; i++) {
-            SearchResult swapTemp = prevSearchResult;
-            prevSearchResult = newSearchResult;
-            newSearchResult = swapTemp;
-            newSearchResult.reset();
-
-            invListCursors.get(i).pinPages();
-            int numInvListElements = invListCursors.get(i).size();
-            int currentNumResults = prevSearchResult.getNumResults();
-            // Should we binary search the next list or should we sort-merge it?
-            if (currentNumResults * Math.log(numInvListElements) < currentNumResults + numInvListElements) {
-                mergeSuffixListProbe(invListCursors.get(i), prevSearchResult, newSearchResult, i, numQueryTokens);
-            } else {
-                mergeSuffixListScan(invListCursors.get(i), prevSearchResult, newSearchResult, i, numQueryTokens);
-            }
-            invListCursors.get(i).unpinPages();
-        }
-    }
-
-    protected void mergeSuffixListProbe(IInvertedListCursor invListCursor, SearchResult prevSearchResult,
-            SearchResult newSearchResult, int invListIx, int numQueryTokens) throws HyracksDataException, IndexException {
-
-        int prevBufIdx = 0;
-        int maxPrevBufIdx = prevSearchResult.getCurrentBufferIndex();
-        ByteBuffer prevCurrentBuffer = prevSearchResult.getBuffers().get(0);
-
-        FixedSizeFrameTupleAccessor resultFrameTupleAcc = prevSearchResult.getAccessor();
-        FixedSizeTupleReference resultTuple = prevSearchResult.getTuple();
-
-        int resultTidx = 0;
-
-        resultFrameTupleAcc.reset(prevCurrentBuffer);
-
-        while (resultTidx < resultFrameTupleAcc.getTupleCount()) {
-
-            resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));
-            int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
-                    resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
-
-            if (invListCursor.containsKey(resultTuple, invListCmp)) {
-                count++;
-                newSearchResult.append(resultTuple, count);
-            } else {
-                if (count + numQueryTokens - invListIx > occurrenceThreshold) {
-                    newSearchResult.append(resultTuple, count);
-                }
-            }
-
-            resultTidx++;
-            if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
-                prevBufIdx++;
-                if (prevBufIdx <= maxPrevBufIdx) {
-                    prevCurrentBuffer = prevSearchResult.getBuffers().get(prevBufIdx);
-                    resultFrameTupleAcc.reset(prevCurrentBuffer);
-                    resultTidx = 0;
-                }
-            }
-        }
-    }
-
-    protected void mergeSuffixListScan(IInvertedListCursor invListCursor, SearchResult prevSearchResult,
-            SearchResult newSearchResult, int invListIx, int numQueryTokens)
-            throws HyracksDataException, IndexException {
-        
-        int prevBufIdx = 0;
-        int maxPrevBufIdx = prevSearchResult.getCurrentBufferIndex();
-        ByteBuffer prevCurrentBuffer = prevSearchResult.getBuffers().get(0);
-
-        FixedSizeFrameTupleAccessor resultFrameTupleAcc = prevSearchResult.getAccessor();
-        FixedSizeTupleReference resultTuple = prevSearchResult.getTuple();
-        
-        boolean advanceCursor = true;
-        boolean advancePrevResult = false;
-        int resultTidx = 0;
-
-        resultFrameTupleAcc.reset(prevCurrentBuffer);
-
-        int invListTidx = 0;
-        int invListNumTuples = invListCursor.size();
-
-        if (invListCursor.hasNext())
-            invListCursor.next();
-
-        while (invListTidx < invListNumTuples && resultTidx < resultFrameTupleAcc.getTupleCount()) {
-
-            ITupleReference invListTuple = invListCursor.getTuple();
-
-            resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));
-
-            int cmp = invListCmp.compare(invListTuple, resultTuple);
-            if (cmp == 0) {
-                int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
-                        resultTuple.getFieldStart(resultTuple.getFieldCount() - 1)) + 1;
-                newSearchResult.append(resultTuple, count);
-                advanceCursor = true;
-                advancePrevResult = true;
-            } else {
-                if (cmp < 0) {
-                    advanceCursor = true;
-                    advancePrevResult = false;
-                } else {
-                    int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
-                            resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
-                    if (count + numQueryTokens - invListIx > occurrenceThreshold) {
-                        newSearchResult.append(resultTuple, count);
-                    }
-                    advanceCursor = false;
-                    advancePrevResult = true;
-                }
-            }
-
-            if (advancePrevResult) {
-                resultTidx++;
-                if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
-                    prevBufIdx++;
-                    if (prevBufIdx <= maxPrevBufIdx) {
-                        prevCurrentBuffer = prevSearchResult.getBuffers().get(prevBufIdx);
-                        resultFrameTupleAcc.reset(prevCurrentBuffer);
-                        resultTidx = 0;
-                    }
-                }
-            }
-
-            if (advanceCursor) {
-                invListTidx++;
-                if (invListCursor.hasNext()) {
-                    invListCursor.next();
-                }
-            }
-        }
-
-        // append remaining elements from previous result set
-        while (resultTidx < resultFrameTupleAcc.getTupleCount()) {
-
-            resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));
-
-            int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
-                    resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
-            if (count + numQueryTokens - invListIx > occurrenceThreshold) {
-                newSearchResult.append(resultTuple, count);
-            }
-
-            resultTidx++;
-            if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
-                prevBufIdx++;
-                if (prevBufIdx <= maxPrevBufIdx) {
-                    prevCurrentBuffer = prevSearchResult.getBuffers().get(prevBufIdx);
-                    resultFrameTupleAcc.reset(prevCurrentBuffer);
-                    resultTidx = 0;
-                }
-            }
-        }
-    }
-
-    protected void mergePrefixList(IInvertedListCursor invListCursor, SearchResult prevSearchResult,
-            SearchResult newSearchResult) throws HyracksDataException, IndexException {
-        
-        int prevBufIdx = 0;
-        int maxPrevBufIdx = prevSearchResult.getCurrentBufferIndex();
-        ByteBuffer prevCurrentBuffer = prevSearchResult.getBuffers().get(0);
-
-        FixedSizeFrameTupleAccessor resultFrameTupleAcc = prevSearchResult.getAccessor();
-        FixedSizeTupleReference resultTuple = prevSearchResult.getTuple();
-        
-        boolean advanceCursor = true;
-        boolean advancePrevResult = false;
-        int resultTidx = 0;
-
-        resultFrameTupleAcc.reset(prevCurrentBuffer);
-
-        int invListTidx = 0;
-        int invListNumTuples = invListCursor.size();
-
-        if (invListCursor.hasNext())
-            invListCursor.next();
-
-        while (invListTidx < invListNumTuples && resultTidx < resultFrameTupleAcc.getTupleCount()) {
-
-            ITupleReference invListTuple = invListCursor.getTuple();
-            resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));
-
-            int cmp = invListCmp.compare(invListTuple, resultTuple);
-            if (cmp == 0) {
-                int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
-                        resultTuple.getFieldStart(resultTuple.getFieldCount() - 1)) + 1;
-                newSearchResult.append(resultTuple, count);
-                advanceCursor = true;
-                advancePrevResult = true;
-            } else {
-                if (cmp < 0) {
-                    int count = 1;
-                    newSearchResult.append(invListTuple, count);
-                    advanceCursor = true;
-                    advancePrevResult = false;
-                } else {
-                    int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
-                            resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
-                    newSearchResult.append(resultTuple, count);
-                    advanceCursor = false;
-                    advancePrevResult = true;
-                }
-            }
-
-            if (advancePrevResult) {
-                resultTidx++;
-                if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
-                    prevBufIdx++;
-                    if (prevBufIdx <= maxPrevBufIdx) {
-                        prevCurrentBuffer = prevSearchResult.getBuffers().get(prevBufIdx);
-                        resultFrameTupleAcc.reset(prevCurrentBuffer);
-                        resultTidx = 0;
-                    }
-                }
-            }
-
-            if (advanceCursor) {
-                invListTidx++;
-                if (invListCursor.hasNext()) {
-                    invListCursor.next();
-                }
-            }
-        }
-
-        // append remaining new elements from inverted list
-        while (invListTidx < invListNumTuples) {
-            ITupleReference invListTuple = invListCursor.getTuple();
-            newSearchResult.append(invListTuple, 1);
-            invListTidx++;
-            if (invListCursor.hasNext()) {
-                invListCursor.next();
-            }
-        }
-
-        // append remaining elements from previous result set
-        while (resultTidx < resultFrameTupleAcc.getTupleCount()) {
-
-            resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));
-
-            int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
-                    resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
-            newSearchResult.append(resultTuple, count);
-
-            resultTidx++;
-            if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
-                prevBufIdx++;
-                if (prevBufIdx <= maxPrevBufIdx) {
-                    prevCurrentBuffer = prevSearchResult.getBuffers().get(prevBufIdx);
-                    resultFrameTupleAcc.reset(prevCurrentBuffer);
-                    resultTidx = 0;
-                }
-            }
-        }
-    }
-
     public IFrameTupleAccessor createResultFrameTupleAccessor() {
-        return new FixedSizeFrameTupleAccessor(ctx.getFrameSize(), newSearchResult.getTypeTraits());
+        return new FixedSizeFrameTupleAccessor(ctx.getFrameSize(), searchResult.getTypeTraits());
     }
 
     public ITupleReference createResultFrameTupleReference() {
-        return new FixedSizeTupleReference(newSearchResult.getTypeTraits());
+        return new FixedSizeTupleReference(searchResult.getTypeTraits());
     }
 
     @Override
     public List<ByteBuffer> getResultBuffers() {
-        return newSearchResult.getBuffers();
+        return searchResult.getBuffers();
     }
 
     @Override
     public int getNumValidResultBuffers() {
-        return newSearchResult.getCurrentBufferIndex() + 1;
+        return searchResult.getCurrentBufferIndex() + 1;
     }
 
     public int getOccurrenceThreshold() {
@@ -448,7 +171,7 @@
 
     public void printNewResults(int maxResultBufIdx, List<ByteBuffer> buffer) {
         StringBuffer strBuffer = new StringBuffer();
-        FixedSizeFrameTupleAccessor resultFrameTupleAcc = prevSearchResult.getAccessor();
+        FixedSizeFrameTupleAccessor resultFrameTupleAcc = searchResult.getAccessor();
         for (int i = 0; i <= maxResultBufIdx; i++) {
             ByteBuffer testBuf = buffer.get(i);
             resultFrameTupleAcc.reset(testBuf);
diff --git a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndexBulkLoadTest.java b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndexBulkLoadTest.java
new file mode 100644
index 0000000..f641630
--- /dev/null
+++ b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndexBulkLoadTest.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk;
+
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.common.AbstractInvertedIndexLoadTest;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.LSMInvertedIndexTestContext.InvertedIndexType;
+
+public class PartitionedOnDiskInvertedIndexBulkLoadTest extends AbstractInvertedIndexLoadTest {
+
+    public PartitionedOnDiskInvertedIndexBulkLoadTest() {
+        super(InvertedIndexType.PARTITIONED_ONDISK, true, 1);
+    }
+}
diff --git a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestUtils.java b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestUtils.java
index e518816..53237d9 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestUtils.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestUtils.java
@@ -99,17 +99,15 @@
         return tupleGen;
     }
 
-    public static LSMInvertedIndexTestContext createWordInvIndexTestContext(LSMInvertedIndexTestHarness harness,
-            InvertedIndexType invIndexType) throws IOException, IndexException {
+    private static ISerializerDeserializer[] getNonHashedIndexFieldSerdes(InvertedIndexType invIndexType)
+            throws IndexException {
         ISerializerDeserializer[] fieldSerdes = null;
-        int tokenFieldCount = 0;
         switch (invIndexType) {
             case INMEMORY:
             case ONDISK:
             case LSM: {
                 fieldSerdes = new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE,
                         IntegerSerializerDeserializer.INSTANCE };
-                tokenFieldCount = 1;
                 break;
             }
             case PARTITIONED_INMEMORY:
@@ -118,54 +116,82 @@
                 // Such indexes also include the set-size for partitioning.
                 fieldSerdes = new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE,
                         IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE };
-                tokenFieldCount = 2;
                 break;
             }
             default: {
                 throw new IndexException("Unhandled inverted index type '" + invIndexType + "'.");
             }
         }
+        return fieldSerdes;
+    }
+    
+    private static ISerializerDeserializer[] getHashedIndexFieldSerdes(InvertedIndexType invIndexType)
+            throws IndexException {
+        ISerializerDeserializer[] fieldSerdes = null;
+        switch (invIndexType) {
+            case INMEMORY:
+            case ONDISK:
+            case LSM: {
+                fieldSerdes = new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE };
+                break;
+            }
+            case PARTITIONED_INMEMORY:
+            case PARTITIONED_ONDISK:
+            case PARTITIONED_LSM: {
+                // Such indexes also include the set-size for partitioning.
+                fieldSerdes = new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE };
+                break;
+            }
+            default: {
+                throw new IndexException("Unhandled inverted index type '" + invIndexType + "'.");
+            }
+        }
+        return fieldSerdes;
+    }
+    
+    public static LSMInvertedIndexTestContext createWordInvIndexTestContext(LSMInvertedIndexTestHarness harness,
+            InvertedIndexType invIndexType) throws IOException, IndexException {
+        ISerializerDeserializer[] fieldSerdes = getNonHashedIndexFieldSerdes(invIndexType);
         ITokenFactory tokenFactory = new UTF8WordTokenFactory();
         IBinaryTokenizerFactory tokenizerFactory = new DelimitedUTF8StringBinaryTokenizerFactory(true, false,
                 tokenFactory);
-        LSMInvertedIndexTestContext testCtx = LSMInvertedIndexTestContext.create(harness, fieldSerdes, tokenFieldCount,
-                tokenizerFactory, invIndexType);
+        LSMInvertedIndexTestContext testCtx = LSMInvertedIndexTestContext.create(harness, fieldSerdes,
+                fieldSerdes.length - 1, tokenizerFactory, invIndexType);
         return testCtx;
     }
     
     public static LSMInvertedIndexTestContext createHashedWordInvIndexTestContext(LSMInvertedIndexTestHarness harness,
             InvertedIndexType invIndexType) throws IOException, IndexException {
-        ISerializerDeserializer[] fieldSerdes = new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE,
-                IntegerSerializerDeserializer.INSTANCE };
+        ISerializerDeserializer[] fieldSerdes = getHashedIndexFieldSerdes(invIndexType);
         ITokenFactory tokenFactory = new HashedUTF8WordTokenFactory();
         IBinaryTokenizerFactory tokenizerFactory = new DelimitedUTF8StringBinaryTokenizerFactory(true, false,
                 tokenFactory);
-        LSMInvertedIndexTestContext testCtx = LSMInvertedIndexTestContext.create(harness, fieldSerdes, 1, tokenizerFactory,
-                invIndexType);
+        LSMInvertedIndexTestContext testCtx = LSMInvertedIndexTestContext.create(harness, fieldSerdes,
+                fieldSerdes.length - 1, tokenizerFactory, invIndexType);
         return testCtx;
     }
 
     public static LSMInvertedIndexTestContext createNGramInvIndexTestContext(LSMInvertedIndexTestHarness harness,
             InvertedIndexType invIndexType) throws IOException, IndexException {
-        ISerializerDeserializer[] fieldSerdes = new ISerializerDeserializer[] {
-                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE };
+        ISerializerDeserializer[] fieldSerdes = getNonHashedIndexFieldSerdes(invIndexType);
         ITokenFactory tokenFactory = new UTF8NGramTokenFactory();
         IBinaryTokenizerFactory tokenizerFactory = new NGramUTF8StringBinaryTokenizerFactory(TEST_GRAM_LENGTH, true,
                 true, false, tokenFactory);
-        LSMInvertedIndexTestContext testCtx = LSMInvertedIndexTestContext.create(harness, fieldSerdes, 1, tokenizerFactory,
-                invIndexType);
+        LSMInvertedIndexTestContext testCtx = LSMInvertedIndexTestContext.create(harness, fieldSerdes,
+                fieldSerdes.length - 1, tokenizerFactory, invIndexType);
         return testCtx;
     }
 
     public static LSMInvertedIndexTestContext createHashedNGramInvIndexTestContext(LSMInvertedIndexTestHarness harness,
             InvertedIndexType invIndexType) throws IOException, IndexException {
-        ISerializerDeserializer[] fieldSerdes = new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE,
-                IntegerSerializerDeserializer.INSTANCE };
+        ISerializerDeserializer[] fieldSerdes = getHashedIndexFieldSerdes(invIndexType);
         ITokenFactory tokenFactory = new HashedUTF8NGramTokenFactory();
         IBinaryTokenizerFactory tokenizerFactory = new NGramUTF8StringBinaryTokenizerFactory(TEST_GRAM_LENGTH, true,
                 true, false, tokenFactory);
-        LSMInvertedIndexTestContext testCtx = LSMInvertedIndexTestContext.create(harness, fieldSerdes, 1, tokenizerFactory,
-                invIndexType);
+        LSMInvertedIndexTestContext testCtx = LSMInvertedIndexTestContext.create(harness, fieldSerdes,
+                fieldSerdes.length - 1, tokenizerFactory, invIndexType);
         return testCtx;
     }
 
@@ -481,7 +507,7 @@
                 int queryIndex = Math.abs(rnd.nextInt() % documentCorpus.size());
                 searchDocument.reset(documentCorpus.get(queryIndex));
             }
-
+            
             // Set query tuple in search predicate.
             searchPred.setQueryTuple(searchDocument);
             searchPred.setQueryFieldIndex(0);