Checkpointing progress on implementing a length-partitioned inverted index.

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_length_filter@2455 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/MultiComparator.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/MultiComparator.java
index 01c6cc8..280bb41 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/MultiComparator.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/MultiComparator.java
@@ -96,6 +96,14 @@
         return new MultiComparator(cmps);
     }
     
+    public static MultiComparator create(IBinaryComparatorFactory[] cmpFactories, int startIndex, int numCmps) {
+        IBinaryComparator[] cmps = new IBinaryComparator[numCmps];
+        for (int i = startIndex; i < startIndex + numCmps; i++) {
+            cmps[i] = cmpFactories[i].createBinaryComparator();
+        }
+        return new MultiComparator(cmps);
+    }
+    
     public static MultiComparator create(IBinaryComparatorFactory[]... cmpFactories) {
         int size = 0;
         for (int i = 0; i < cmpFactories.length; i++) {
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/api/IObjectFactory.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/api/IObjectFactory.java
new file mode 100644
index 0000000..9068a2b
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/api/IObjectFactory.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api;
+
+public interface IObjectFactory<T> {
+    public T create();
+}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
index 2fd5d63..bb43028 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
@@ -69,16 +69,16 @@
  * cannot exceed the size of a Hyracks frame.
  */
 public class OnDiskInvertedIndex implements IInvertedIndex {
-    private final IHyracksCommonContext ctx = new DefaultHyracksCommonContext();
+    protected final IHyracksCommonContext ctx = new DefaultHyracksCommonContext();
 
     // Schema of BTree tuples, set in constructor.    
-    public final int invListStartPageIdField;
-    public final int invListEndPageIdField;
-    public final int invListStartOffField;
-    public final int invListNumElementsField;
+    protected final int invListStartPageIdField;
+    protected final int invListEndPageIdField;
+    protected final int invListStartOffField;
+    protected final int invListNumElementsField;
     
     // Type traits to be appended to the token type trait which finally form the BTree field type traits.
-    private static final ITypeTraits[] btreeValueTypeTraits = new ITypeTraits[4];
+    protected static final ITypeTraits[] btreeValueTypeTraits = new ITypeTraits[4];
     static {
         // startPageId
         btreeValueTypeTraits[0] = IntegerPointable.TYPE_TRAITS;
@@ -90,22 +90,22 @@
         btreeValueTypeTraits[3] = IntegerPointable.TYPE_TRAITS;
     }
 
-    private BTree btree;
-    private int rootPageId = 0;
-    private IBufferCache bufferCache;
-    private IFileMapProvider fileMapProvider;
-    private int fileId = -1;
-    private final ITypeTraits[] invListTypeTraits;
-    private final IBinaryComparatorFactory[] invListCmpFactories;
-    private final ITypeTraits[] tokenTypeTraits;
-    private final IBinaryComparatorFactory[] tokenCmpFactories;
-    private final IInvertedListBuilder invListBuilder;
-    private final int numTokenFields;
-    private final int numInvListKeys;
-    private final FileReference invListsFile;
+    protected BTree btree;
+    protected int rootPageId = 0;
+    protected IBufferCache bufferCache;
+    protected IFileMapProvider fileMapProvider;
+    protected int fileId = -1;
+    protected final ITypeTraits[] invListTypeTraits;
+    protected final IBinaryComparatorFactory[] invListCmpFactories;
+    protected final ITypeTraits[] tokenTypeTraits;
+    protected final IBinaryComparatorFactory[] tokenCmpFactories;
+    protected final IInvertedListBuilder invListBuilder;
+    protected final int numTokenFields;
+    protected final int numInvListKeys;
+    protected final FileReference invListsFile;
     // Last page id of inverted-lists file (inclusive). Set during bulk load.
-    private int invListsMaxPageId = -1;
-    private boolean isOpen = false;
+    protected int invListsMaxPageId = -1;
+    protected boolean isOpen = false;
 
     public OnDiskInvertedIndex(IBufferCache bufferCache, IFileMapProvider fileMapProvider,
             IInvertedListBuilder invListBuilder, ITypeTraits[] invListTypeTraits,
@@ -262,19 +262,7 @@
         try {
             if (ctx.btreeCursor.hasNext()) {
                 ctx.btreeCursor.next();
-                ITupleReference frameTuple = ctx.btreeCursor.getTuple();
-                int startPageId = IntegerSerializerDeserializer.getInt(
-                        frameTuple.getFieldData(invListStartPageIdField),
-                        frameTuple.getFieldStart(invListStartPageIdField));
-                int endPageId = IntegerSerializerDeserializer.getInt(
-                        frameTuple.getFieldData(invListEndPageIdField),
-                        frameTuple.getFieldStart(invListEndPageIdField));
-                int startOff = IntegerSerializerDeserializer.getInt(frameTuple.getFieldData(invListStartOffField),
-                        frameTuple.getFieldStart(invListStartOffField));
-                int numElements = IntegerSerializerDeserializer.getInt(
-                        frameTuple.getFieldData(invListNumElementsField),
-                        frameTuple.getFieldStart(invListNumElementsField));
-                listCursor.reset(startPageId, endPageId, startOff, numElements);
+                resetInvertedListCursor(ctx.btreeCursor.getTuple(), listCursor);
             } else {
                 listCursor.reset(0, 0, 0, 0);
             }
@@ -284,6 +272,18 @@
         }
     }
 
+    protected void resetInvertedListCursor(ITupleReference btreeTuple, IInvertedListCursor listCursor) {
+        int startPageId = IntegerSerializerDeserializer.getInt(btreeTuple.getFieldData(invListStartPageIdField),
+                btreeTuple.getFieldStart(invListStartPageIdField));
+        int endPageId = IntegerSerializerDeserializer.getInt(btreeTuple.getFieldData(invListEndPageIdField),
+                btreeTuple.getFieldStart(invListEndPageIdField));
+        int startOff = IntegerSerializerDeserializer.getInt(btreeTuple.getFieldData(invListStartOffField),
+                btreeTuple.getFieldStart(invListStartOffField));
+        int numElements = IntegerSerializerDeserializer.getInt(btreeTuple.getFieldData(invListNumElementsField),
+                btreeTuple.getFieldStart(invListNumElementsField));
+        listCursor.reset(startPageId, endPageId, startOff, numElements);
+    }
+    
     public final class OnDiskInvertedIndexBulkLoader implements IIndexBulkLoader {
         private final ArrayTupleBuilder btreeTupleBuilder;
         private final ArrayTupleReference btreeTupleReference;
@@ -472,6 +472,12 @@
             this.searcher = new TOccurrenceSearcher(ctx, index);
         }
 
+        // Let subclasses initialize.
+        protected OnDiskInvertedIndexAccessor(OnDiskInvertedIndex index, IInvertedIndexSearcher searcher) {
+            this.index = index;
+            this.searcher = searcher;
+        }
+        
         @Override
         public IIndexCursor createSearchCursor() {
             return new OnDiskInvertedIndexSearchCursor(searcher, index.getInvListTypeTraits().length);
@@ -631,7 +637,7 @@
         return 0;
     }
 
-    private static ITypeTraits[] getBTreeTypeTraits(ITypeTraits[] tokenTypeTraits) {
+    protected static ITypeTraits[] getBTreeTypeTraits(ITypeTraits[] tokenTypeTraits) {
         ITypeTraits[] btreeTypeTraits = new ITypeTraits[tokenTypeTraits.length + btreeValueTypeTraits.length];
         // Set key type traits.
         for (int i = 0; i < tokenTypeTraits.length; i++) {
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexOpContext.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexOpContext.java
index fd784f3..8ed4cd1 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexOpContext.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexOpContext.java
@@ -30,12 +30,17 @@
     public IIndexAccessor btreeAccessor;
     public IIndexCursor btreeCursor;
     public MultiComparator searchCmp;
+    // For prefix search on partitioned indexes.
+    public MultiComparator prefixSearchCmp;
 
     public OnDiskInvertedIndexOpContext(BTree btree) {
         // TODO: Ignore opcallbacks for now.
         btreeAccessor = btree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
         btreeCursor = btreeAccessor.createSearchCursor();
         searchCmp = MultiComparator.create(btree.getComparatorFactories());
+        if (btree.getComparatorFactories().length > 1) {
+            prefixSearchCmp = MultiComparator.create(btree.getComparatorFactories(), 0, 1);
+        }
     }
 
     @Override
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndex.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndex.java
new file mode 100644
index 0000000..95ee45e
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndex.java
@@ -0,0 +1,145 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexOperationContext;
+import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedListBuilder;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search.PartitionedTOccurrenceSearcher;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.ObjectCache;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
+
+public class PartitionedOnDiskInvertedIndex extends OnDiskInvertedIndex {
+
+    protected final int PARTITIONING_NUM_TOKENS_FIELD = 1;
+    
+    public PartitionedOnDiskInvertedIndex(IBufferCache bufferCache, IFileMapProvider fileMapProvider,
+            IInvertedListBuilder invListBuilder, ITypeTraits[] invListTypeTraits,
+            IBinaryComparatorFactory[] invListCmpFactories, ITypeTraits[] tokenTypeTraits,
+            IBinaryComparatorFactory[] tokenCmpFactories, FileReference btreeFile, FileReference invListsFile)
+            throws IndexException {
+        super(bufferCache, fileMapProvider, invListBuilder, invListTypeTraits, invListCmpFactories, tokenTypeTraits,
+                tokenCmpFactories, btreeFile, invListsFile);
+    }
+
+    public class PartitionedOnDiskInvertedIndexAccessor extends OnDiskInvertedIndexAccessor {
+        public PartitionedOnDiskInvertedIndexAccessor(OnDiskInvertedIndex index) {
+            super(index, new PartitionedTOccurrenceSearcher(ctx, index));
+        }
+    }
+
+    @Override
+    public IIndexAccessor createAccessor(IModificationOperationCallback modificationCallback,
+            ISearchOperationCallback searchCallback) {
+        return new PartitionedOnDiskInvertedIndexAccessor(this);
+    }
+    
+    public class InvertedListPartitions {
+        private final int DEFAULT_NUM_PARTITIONS = 10;
+        private final int PARTITIONS_SLACK_SIZE = 10;
+        private final ObjectCache<IInvertedListCursor> invListCursorCache;
+        private final ObjectCache<ArrayList<IInvertedListCursor>> arrayListCache;
+        private ArrayList<IInvertedListCursor>[] partitions;
+
+        public InvertedListPartitions(ObjectCache<IInvertedListCursor> invListCursorCache,
+                ObjectCache<ArrayList<IInvertedListCursor>> arrayListCache) {
+            this.invListCursorCache = invListCursorCache;
+            this.arrayListCache = arrayListCache;
+        }
+
+        @SuppressWarnings("unchecked")
+        public void reset(int numTokensLowerBound, int numTokensUpperBound) {
+            if (partitions == null) {
+                int initialSize = numTokensUpperBound;
+                if (numTokensUpperBound < 0) {
+                    initialSize = DEFAULT_NUM_PARTITIONS;
+                }
+                partitions = (ArrayList<IInvertedListCursor>[]) new ArrayList[initialSize];
+            } else {
+                if (numTokensUpperBound >= partitions.length) {
+                    Arrays.copyOf(partitions, numTokensUpperBound);
+                }
+                Arrays.fill(partitions, null);
+            }
+            invListCursorCache.reset();
+            arrayListCache.reset();
+        }
+
+        public void addInvertedListCursor(ITupleReference btreeTuple) {
+            IInvertedListCursor listCursor = invListCursorCache.getNext();
+            resetInvertedListCursor(btreeTuple, listCursor);
+            int numTokens = IntegerSerializerDeserializer.getInt(
+                    btreeTuple.getFieldData(PARTITIONING_NUM_TOKENS_FIELD),
+                    btreeTuple.getFieldStart(PARTITIONING_NUM_TOKENS_FIELD));
+            if (numTokens >= partitions.length) {
+                partitions = Arrays.copyOf(partitions, numTokens + PARTITIONS_SLACK_SIZE);
+            }
+            ArrayList<IInvertedListCursor> partitionCursors = partitions[numTokens];
+            if (partitionCursors == null) {
+                partitionCursors = arrayListCache.getNext();
+                partitions[numTokens] = partitionCursors;
+            }
+            partitionCursors.add(listCursor);
+        }
+
+        public ArrayList<IInvertedListCursor>[] getPartitions() {
+            return partitions;
+        }
+    }
+    
+    public void openInvertedListPartitionCursors(InvertedListPartitions invListPartitions, ITupleReference lowSearchKey,
+            ITupleReference highSearchKey,
+            IIndexOperationContext ictx) throws HyracksDataException, IndexException {
+        OnDiskInvertedIndexOpContext ctx = (OnDiskInvertedIndexOpContext) ictx;
+        if (lowSearchKey.getFieldCount() == 1) {
+            ctx.btreePred.setLowKeyComparator(ctx.prefixSearchCmp);
+        } else {
+            ctx.btreePred.setLowKeyComparator(ctx.searchCmp);
+        }
+        if (highSearchKey.getFieldCount() == 1) {
+            ctx.btreePred.setHighKeyComparator(ctx.prefixSearchCmp);
+        } else {
+            ctx.btreePred.setHighKeyComparator(ctx.searchCmp);
+        }
+        ctx.btreePred.setLowKey(lowSearchKey, true);
+        ctx.btreePred.setHighKey(highSearchKey, true);
+        ctx.btreeAccessor.search(ctx.btreeCursor, ctx.btreePred);
+        try {
+            while (ctx.btreeCursor.hasNext()) {
+                ctx.btreeCursor.next();
+                ITupleReference btreeTuple = ctx.btreeCursor.getTuple();
+                invListPartitions.addInvertedListCursor(btreeTuple);
+            }
+        } finally {
+            ctx.btreeCursor.close();
+            ctx.btreeCursor.reset();
+        }
+    }
+}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/ArrayListFactory.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/ArrayListFactory.java
new file mode 100644
index 0000000..493063e
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/ArrayListFactory.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search;
+
+import java.util.ArrayList;
+
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IObjectFactory;
+
+public class ArrayListFactory<T> implements IObjectFactory<ArrayList<T>>{
+    @Override
+    public ArrayList<T> create() {
+        return new ArrayList<T>();
+    }
+}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/InvertedListCursorFactory.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/InvertedListCursorFactory.java
new file mode 100644
index 0000000..b4b3c43
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/InvertedListCursorFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search;
+
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IObjectFactory;
+
+public class InvertedListCursorFactory implements IObjectFactory<IInvertedListCursor> {
+
+    private final IInvertedIndex invIndex;
+
+    public InvertedListCursorFactory(IInvertedIndex invIndex) {
+        this.invIndex = invIndex;
+    }
+
+    @Override
+    public IInvertedListCursor create() {
+        return invIndex.createInvertedListCursor();
+    }
+}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/PartitionedTOccurrenceSearcher.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/PartitionedTOccurrenceSearcher.java
new file mode 100644
index 0000000..db5bcff
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/PartitionedTOccurrenceSearcher.java
@@ -0,0 +1,607 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleReference;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexOperationContext;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.common.tuples.ConcatenatingTupleReference;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexSearchModifier;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexSearcher;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IObjectFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.exceptions.OccurrenceThresholdPanicException;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeFrameTupleAccessor;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeFrameTupleAppender;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeTupleReference;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.OnDiskInvertedIndexSearchCursor;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.PartitionedOnDiskInvertedIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.PartitionedOnDiskInvertedIndex.InvertedListPartitions;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizer;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IToken;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.ObjectCache;
+
+// TODO: The search procedure is rather confusing regarding cursor positions, hasNext() calls etc.
+// Needs an overhaul some time.
+public class PartitionedTOccurrenceSearcher implements IInvertedIndexSearcher {
+
+    protected final IHyracksCommonContext ctx;
+    protected final FixedSizeFrameTupleAppender resultFrameTupleApp;
+    protected final FixedSizeFrameTupleAccessor resultFrameTupleAcc;
+    protected final FixedSizeTupleReference resultTuple;
+    protected final int invListKeyLength;
+    protected int currentNumResults;
+
+    protected List<ByteBuffer> newResultBuffers = new ArrayList<ByteBuffer>();
+    protected List<ByteBuffer> prevResultBuffers = new ArrayList<ByteBuffer>();
+    protected List<ByteBuffer> swap = null;
+    protected int maxResultBufIdx = 0;
+
+    protected RecordDescriptor queryTokenRecDesc = new RecordDescriptor(
+            new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
+    protected ArrayTupleBuilder queryTokenBuilder = new ArrayTupleBuilder(queryTokenRecDesc.getFieldCount());
+    protected DataOutput queryTokenDos = queryTokenBuilder.getDataOutput();
+    protected FrameTupleAppender queryTokenAppender;
+    protected ByteBuffer queryTokenFrame;
+    protected final FrameTupleReference searchKey = new FrameTupleReference();
+
+    protected final IInvertedIndex invIndex;
+    protected final MultiComparator invListCmp;
+    protected final ITypeTraits[] invListFieldsWithCount;
+    protected int occurrenceThreshold;
+
+    protected final int cursorCacheSize = 10;
+    //protected List<IInvertedListCursor> invListCursorCache = new ArrayList<IInvertedListCursor>(cursorCacheSize);
+    protected List<IInvertedListCursor> invListCursors = new ArrayList<IInvertedListCursor>(cursorCacheSize);
+
+    protected final ArrayTupleBuilder lowerBoundTupleBuilder = new ArrayTupleBuilder(1);
+    protected final ArrayTupleReference lowerBoundTuple = new ArrayTupleReference();
+    protected final ArrayTupleBuilder upperBoundTupleBuilder = new ArrayTupleBuilder(1);
+    protected final ArrayTupleReference upperBoundTuple = new ArrayTupleReference();
+    protected final ConcatenatingTupleReference partLowSearchKey = new ConcatenatingTupleReference(2);
+    protected final ConcatenatingTupleReference partHighSearchKey = new ConcatenatingTupleReference(2);
+    
+    protected final IObjectFactory<IInvertedListCursor> invListCursorFactory;
+    protected final IObjectFactory<ArrayList<IInvertedListCursor>> arrayListFactory;
+    protected final ObjectCache<IInvertedListCursor> invListCursorCache;
+    protected final ObjectCache<ArrayList<IInvertedListCursor>> arrayListCache;
+    
+    protected final InvertedListPartitions partitions;
+    
+    public PartitionedTOccurrenceSearcher(IHyracksCommonContext ctx, IInvertedIndex invIndex) {
+        this.ctx = ctx;
+        this.invIndex = invIndex;
+        this.invListCmp = MultiComparator.create(invIndex.getInvListCmpFactories());
+
+        ITypeTraits[] invListFields = invIndex.getInvListTypeTraits();
+        invListFieldsWithCount = new ITypeTraits[invListFields.length + 1];
+        int tmp = 0;
+        for (int i = 0; i < invListFields.length; i++) {
+            invListFieldsWithCount[i] = invListFields[i];
+            tmp += invListFields[i].getFixedLength();
+        }
+        // using an integer for counting occurrences
+        invListFieldsWithCount[invListFields.length] = IntegerPointable.TYPE_TRAITS;
+        invListKeyLength = tmp;
+
+        resultFrameTupleApp = new FixedSizeFrameTupleAppender(ctx.getFrameSize(), invListFieldsWithCount);
+        resultFrameTupleAcc = new FixedSizeFrameTupleAccessor(ctx.getFrameSize(), invListFieldsWithCount);
+        resultTuple = new FixedSizeTupleReference(invListFieldsWithCount);
+        newResultBuffers.add(ctx.allocateFrame());
+        prevResultBuffers.add(ctx.allocateFrame());
+
+        queryTokenAppender = new FrameTupleAppender(ctx.getFrameSize());
+        queryTokenFrame = ctx.allocateFrame();
+
+        invListCursorFactory = new InvertedListCursorFactory(invIndex);
+        arrayListFactory = new ArrayListFactory<IInvertedListCursor>();        
+        invListCursorCache = new ObjectCache<IInvertedListCursor>(invListCursorFactory, 10, 10);
+        arrayListCache = new ObjectCache<ArrayList<IInvertedListCursor>>(arrayListFactory, 10, 10);
+        
+        PartitionedOnDiskInvertedIndex partInvIndex = (PartitionedOnDiskInvertedIndex) invIndex;
+        partitions = partInvIndex.new InvertedListPartitions(invListCursorCache, arrayListCache);
+        
+        currentNumResults = 0;
+    }
+
+    public void reset() {
+        for (ByteBuffer b : newResultBuffers) {
+            resultFrameTupleApp.reset(b, true);
+        }
+        for (ByteBuffer b : prevResultBuffers) {
+            resultFrameTupleApp.reset(b, true);
+        }
+        currentNumResults = 0;
+    }
+
+    public void search(OnDiskInvertedIndexSearchCursor resultCursor, InvertedIndexSearchPredicate searchPred,
+            IIndexOperationContext ictx) throws HyracksDataException, IndexException {
+        ITupleReference queryTuple = searchPred.getQueryTuple();
+        int queryFieldIndex = searchPred.getQueryFieldIndex();
+        IInvertedIndexSearchModifier searchModifier = searchPred.getSearchModifier();
+        IBinaryTokenizer queryTokenizer = searchPred.getQueryTokenizer();
+
+        queryTokenAppender.reset(queryTokenFrame, true);
+        queryTokenizer.reset(queryTuple.getFieldData(queryFieldIndex), queryTuple.getFieldStart(queryFieldIndex),
+                queryTuple.getFieldLength(queryFieldIndex));
+
+        while (queryTokenizer.hasNext()) {
+            queryTokenizer.next();
+            queryTokenBuilder.reset();
+            try {
+                IToken token = queryTokenizer.getToken();
+                token.serializeToken(queryTokenDos);
+                queryTokenBuilder.addFieldEndOffset();
+                // WARNING: assuming one frame is big enough to hold all tokens
+                queryTokenAppender.append(queryTokenBuilder.getFieldEndOffsets(), queryTokenBuilder.getByteArray(), 0,
+                        queryTokenBuilder.getSize());
+            } catch (IOException e) {
+                throw new HyracksDataException(e);
+            }
+        }
+
+        FrameTupleAccessor queryTokenAccessor = new FrameTupleAccessor(ctx.getFrameSize(), queryTokenRecDesc);
+        queryTokenAccessor.reset(queryTokenFrame);
+        int numQueryTokens = queryTokenAccessor.getTupleCount();
+        
+        // ALEX NEW CODE STARTS HERE
+        int numTokensLowerBound = searchModifier.getNumTokensLowerBound(numQueryTokens);
+        int numTokensUpperBound = searchModifier.getNumTokensLowerBound(numQueryTokens);
+        ITupleReference lowSearchKey = null;
+        ITupleReference highSearchKey = null;
+        try {
+            if (numTokensLowerBound >= 0) {
+                lowerBoundTupleBuilder.reset();
+                lowerBoundTupleBuilder.getDataOutput().writeInt(numTokensLowerBound);
+                lowerBoundTupleBuilder.addFieldEndOffset();
+                lowerBoundTuple.reset(lowerBoundTupleBuilder.getFieldEndOffsets(),
+                        lowerBoundTupleBuilder.getByteArray());
+                partLowSearchKey.reset();
+                partLowSearchKey.addTuple(searchKey);
+                partLowSearchKey.addTuple(lowerBoundTuple);
+                lowSearchKey = partLowSearchKey;
+            } else {
+                lowSearchKey = searchKey;
+            }
+            if (numTokensUpperBound >= 0) {
+                upperBoundTupleBuilder.reset();
+                upperBoundTupleBuilder.getDataOutput().writeInt(numTokensUpperBound);
+                upperBoundTupleBuilder.addFieldEndOffset();
+                upperBoundTuple.reset(upperBoundTupleBuilder.getFieldEndOffsets(),
+                        upperBoundTupleBuilder.getByteArray());
+                partHighSearchKey.reset();
+                partHighSearchKey.addTuple(searchKey);
+                partHighSearchKey.addTuple(upperBoundTuple);
+                highSearchKey = partHighSearchKey;
+            } else {
+                highSearchKey = searchKey;
+            }
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+        occurrenceThreshold = searchModifier.getOccurrenceThreshold(numQueryTokens);
+        // TODO: deal with panic cases properly
+        if (occurrenceThreshold <= 0) {
+            throw new OccurrenceThresholdPanicException("Merge Threshold is <= 0. Failing Search.");
+        }
+
+        PartitionedOnDiskInvertedIndex partInvIndex = (PartitionedOnDiskInvertedIndex) invIndex;
+        partitions.reset(numTokensLowerBound, numTokensUpperBound);
+        for (int i = 0; i < numQueryTokens; i++) {
+            searchKey.reset(queryTokenAccessor, i);
+            partInvIndex.openInvertedListPartitionCursors(partitions, lowSearchKey, highSearchKey, ictx);
+        }
+
+        // Process the partitions one-by-one.
+        ArrayList<IInvertedListCursor>[] partitionCursors = partitions.getPartitions();
+        int start = (numTokensLowerBound >= 0) ? numTokensLowerBound : 0;
+        int end = (numTokensUpperBound >= 0) ? numTokensUpperBound : partitionCursors.length - 1;
+        for (int i = start; i <= end; i++) {
+            if (partitionCursors[i] == null) {
+                continue;
+            }
+            // Prune partition because no element in it can satisfy the occurrence threshold.
+            if (partitionCursors[i].size() < occurrenceThreshold) {
+                continue;
+            }
+            
+            // Process partition.
+            Collections.sort(partitionCursors[i]);
+            // TODO: Continue here.
+            
+        }
+        
+        
+        
+        
+        int numPrefixLists = searchModifier.getNumPrefixLists(invListCursors.size());
+        maxResultBufIdx = mergePrefixLists(numPrefixLists, numQueryTokens);
+        maxResultBufIdx = mergeSuffixLists(numPrefixLists, numQueryTokens, maxResultBufIdx);
+
+        resultCursor.open(null, searchPred);
+    }
+
+    protected int mergePrefixLists(int numPrefixTokens, int numQueryTokens) throws HyracksDataException, IndexException {
+        int maxPrevBufIdx = 0;
+        for (int i = 0; i < numPrefixTokens; i++) {
+            swap = prevResultBuffers;
+            prevResultBuffers = newResultBuffers;
+            newResultBuffers = swap;
+            currentNumResults = 0;
+
+            invListCursors.get(i).pinPages();
+            maxPrevBufIdx = mergePrefixList(invListCursors.get(i), prevResultBuffers, maxPrevBufIdx, newResultBuffers);
+            invListCursors.get(i).unpinPages();
+        }
+        return maxPrevBufIdx;
+    }
+
+    protected int mergeSuffixLists(int numPrefixTokens, int numQueryTokens, int maxPrevBufIdx)
+            throws HyracksDataException, IndexException {
+        for (int i = numPrefixTokens; i < numQueryTokens; i++) {
+            swap = prevResultBuffers;
+            prevResultBuffers = newResultBuffers;
+            newResultBuffers = swap;
+
+            invListCursors.get(i).pinPages();
+            int numInvListElements = invListCursors.get(i).size();
+            // should we binary search the next list or should we sort-merge it?
+            if (currentNumResults * Math.log(numInvListElements) < currentNumResults + numInvListElements) {
+                maxPrevBufIdx = mergeSuffixListProbe(invListCursors.get(i), prevResultBuffers, maxPrevBufIdx,
+                        newResultBuffers, i, numQueryTokens);
+            } else {
+                maxPrevBufIdx = mergeSuffixListScan(invListCursors.get(i), prevResultBuffers, maxPrevBufIdx,
+                        newResultBuffers, i, numQueryTokens);
+            }
+            invListCursors.get(i).unpinPages();
+        }
+        return maxPrevBufIdx;
+    }
+
+    protected int mergeSuffixListProbe(IInvertedListCursor invListCursor, List<ByteBuffer> prevResultBuffers,
+            int maxPrevBufIdx, List<ByteBuffer> newResultBuffers, int invListIx, int numQueryTokens) throws HyracksDataException, IndexException {
+
+        int newBufIdx = 0;
+        ByteBuffer newCurrentBuffer = newResultBuffers.get(0);
+
+        int prevBufIdx = 0;
+        ByteBuffer prevCurrentBuffer = prevResultBuffers.get(0);
+
+        int resultTidx = 0;
+
+        currentNumResults = 0;
+
+        resultFrameTupleAcc.reset(prevCurrentBuffer);
+        resultFrameTupleApp.reset(newCurrentBuffer, true);
+
+        while (resultTidx < resultFrameTupleAcc.getTupleCount()) {
+
+            resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));
+            int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
+                    resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
+
+            if (invListCursor.containsKey(resultTuple, invListCmp)) {
+                count++;
+                newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
+            } else {
+                if (count + numQueryTokens - invListIx > occurrenceThreshold) {
+                    newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
+                }
+            }
+
+            resultTidx++;
+            if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
+                prevBufIdx++;
+                if (prevBufIdx <= maxPrevBufIdx) {
+                    prevCurrentBuffer = prevResultBuffers.get(prevBufIdx);
+                    resultFrameTupleAcc.reset(prevCurrentBuffer);
+                    resultTidx = 0;
+                }
+            }
+        }
+
+        return newBufIdx;
+    }
+
+    protected int mergeSuffixListScan(IInvertedListCursor invListCursor, List<ByteBuffer> prevResultBuffers,
+            int maxPrevBufIdx, List<ByteBuffer> newResultBuffers, int invListIx, int numQueryTokens)
+            throws HyracksDataException, IndexException {
+        
+        int newBufIdx = 0;
+        ByteBuffer newCurrentBuffer = newResultBuffers.get(0);
+
+        int prevBufIdx = 0;
+        ByteBuffer prevCurrentBuffer = prevResultBuffers.get(0);
+
+        boolean advanceCursor = true;
+        boolean advancePrevResult = false;
+        int resultTidx = 0;
+
+        currentNumResults = 0;
+        
+        resultFrameTupleAcc.reset(prevCurrentBuffer);
+        resultFrameTupleApp.reset(newCurrentBuffer, true);
+
+        int invListTidx = 0;
+        int invListNumTuples = invListCursor.size();
+
+        if (invListCursor.hasNext())
+            invListCursor.next();
+
+        while (invListTidx < invListNumTuples && resultTidx < resultFrameTupleAcc.getTupleCount()) {
+
+            ITupleReference invListTuple = invListCursor.getTuple();
+
+            resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));
+
+            int cmp = invListCmp.compare(invListTuple, resultTuple);
+            if (cmp == 0) {
+                int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
+                        resultTuple.getFieldStart(resultTuple.getFieldCount() - 1)) + 1;
+                newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
+                advanceCursor = true;
+                advancePrevResult = true;
+            } else {
+                if (cmp < 0) {
+                    advanceCursor = true;
+                    advancePrevResult = false;
+                } else {
+                    int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
+                            resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
+                    if (count + numQueryTokens - invListIx > occurrenceThreshold) {
+                        newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
+                    }
+                    advanceCursor = false;
+                    advancePrevResult = true;
+                }
+            }
+
+            if (advancePrevResult) {
+                resultTidx++;
+                if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
+                    prevBufIdx++;
+                    if (prevBufIdx <= maxPrevBufIdx) {
+                        prevCurrentBuffer = prevResultBuffers.get(prevBufIdx);
+                        resultFrameTupleAcc.reset(prevCurrentBuffer);
+                        resultTidx = 0;
+                    }
+                }
+            }
+
+            if (advanceCursor) {
+                invListTidx++;
+                if (invListCursor.hasNext()) {
+                    invListCursor.next();
+                }
+            }
+        }
+
+        // append remaining elements from previous result set
+        while (resultTidx < resultFrameTupleAcc.getTupleCount()) {
+
+            resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));
+
+            int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
+                    resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
+            if (count + numQueryTokens - invListIx > occurrenceThreshold) {
+                newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
+            }
+
+            resultTidx++;
+            if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
+                prevBufIdx++;
+                if (prevBufIdx <= maxPrevBufIdx) {
+                    prevCurrentBuffer = prevResultBuffers.get(prevBufIdx);
+                    resultFrameTupleAcc.reset(prevCurrentBuffer);
+                    resultTidx = 0;
+                }
+            }
+        }
+
+        return newBufIdx;
+    }
+
+    protected int mergePrefixList(IInvertedListCursor invListCursor, List<ByteBuffer> prevResultBuffers,
+            int maxPrevBufIdx, List<ByteBuffer> newResultBuffers) throws HyracksDataException, IndexException {
+        
+        int newBufIdx = 0;
+        ByteBuffer newCurrentBuffer = newResultBuffers.get(0);
+
+        int prevBufIdx = 0;
+        ByteBuffer prevCurrentBuffer = prevResultBuffers.get(0);
+
+        boolean advanceCursor = true;
+        boolean advancePrevResult = false;
+        int resultTidx = 0;
+
+        resultFrameTupleAcc.reset(prevCurrentBuffer);
+        resultFrameTupleApp.reset(newCurrentBuffer, true);
+
+        int invListTidx = 0;
+        int invListNumTuples = invListCursor.size();
+
+        if (invListCursor.hasNext())
+            invListCursor.next();
+
+        while (invListTidx < invListNumTuples && resultTidx < resultFrameTupleAcc.getTupleCount()) {
+
+            ITupleReference invListTuple = invListCursor.getTuple();
+            resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));
+
+            int cmp = invListCmp.compare(invListTuple, resultTuple);
+            if (cmp == 0) {
+                int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
+                        resultTuple.getFieldStart(resultTuple.getFieldCount() - 1)) + 1;
+                newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
+                advanceCursor = true;
+                advancePrevResult = true;
+            } else {
+                if (cmp < 0) {
+                    int count = 1;
+                    newBufIdx = appendTupleToNewResults(invListTuple, count, newBufIdx);
+                    advanceCursor = true;
+                    advancePrevResult = false;
+                } else {
+                    int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
+                            resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
+                    newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
+                    advanceCursor = false;
+                    advancePrevResult = true;
+                }
+            }
+
+            if (advancePrevResult) {
+                resultTidx++;
+                if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
+                    prevBufIdx++;
+                    if (prevBufIdx <= maxPrevBufIdx) {
+                        prevCurrentBuffer = prevResultBuffers.get(prevBufIdx);
+                        resultFrameTupleAcc.reset(prevCurrentBuffer);
+                        resultTidx = 0;
+                    }
+                }
+            }
+
+            if (advanceCursor) {
+                invListTidx++;
+                if (invListCursor.hasNext()) {
+                    invListCursor.next();
+                }
+            }
+        }
+
+        // append remaining new elements from inverted list
+        while (invListTidx < invListNumTuples) {
+            ITupleReference invListTuple = invListCursor.getTuple();
+            newBufIdx = appendTupleToNewResults(invListTuple, 1, newBufIdx);
+            invListTidx++;
+            if (invListCursor.hasNext()) {
+                invListCursor.next();
+            }
+        }
+
+        // append remaining elements from previous result set
+        while (resultTidx < resultFrameTupleAcc.getTupleCount()) {
+
+            resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));
+
+            int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
+                    resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
+            newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
+
+            resultTidx++;
+            if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
+                prevBufIdx++;
+                if (prevBufIdx <= maxPrevBufIdx) {
+                    prevCurrentBuffer = prevResultBuffers.get(prevBufIdx);
+                    resultFrameTupleAcc.reset(prevCurrentBuffer);
+                    resultTidx = 0;
+                }
+            }
+        }
+
+        return newBufIdx;
+    }
+
+    protected int appendTupleToNewResults(ITupleReference tuple, int newCount, int newBufIdx) {
+        ByteBuffer newCurrentBuffer = newResultBuffers.get(newBufIdx);
+
+        if (!resultFrameTupleApp.hasSpace()) {
+            newBufIdx++;
+            if (newBufIdx >= newResultBuffers.size()) {
+                newResultBuffers.add(ctx.allocateFrame());
+            }
+            newCurrentBuffer = newResultBuffers.get(newBufIdx);
+            resultFrameTupleApp.reset(newCurrentBuffer, true);
+        }
+
+        // append key
+        if (!resultFrameTupleApp.append(tuple.getFieldData(0), tuple.getFieldStart(0), invListKeyLength)) {
+            throw new IllegalStateException();
+        }
+
+        // append new count
+        if (!resultFrameTupleApp.append(newCount)) {
+            throw new IllegalStateException();
+        }
+
+        resultFrameTupleApp.incrementTupleCount(1);
+
+        currentNumResults++;
+
+        return newBufIdx;
+    }
+
+    public IFrameTupleAccessor createResultFrameTupleAccessor() {
+        return new FixedSizeFrameTupleAccessor(ctx.getFrameSize(), invListFieldsWithCount);
+    }
+
+    public ITupleReference createResultFrameTupleReference() {
+        return new FixedSizeTupleReference(invListFieldsWithCount);
+    }
+
+    @Override
+    public List<ByteBuffer> getResultBuffers() {
+        return newResultBuffers;
+    }
+
+    @Override
+    public int getNumValidResultBuffers() {
+        return maxResultBufIdx + 1;
+    }
+
+    public int getOccurrenceThreshold() {
+        return occurrenceThreshold;
+    }
+
+    public void printNewResults(int maxResultBufIdx, List<ByteBuffer> buffer) {
+        StringBuffer strBuffer = new StringBuffer();
+        for (int i = 0; i <= maxResultBufIdx; i++) {
+            ByteBuffer testBuf = buffer.get(i);
+            resultFrameTupleAcc.reset(testBuf);
+            for (int j = 0; j < resultFrameTupleAcc.getTupleCount(); j++) {
+                strBuffer.append(IntegerSerializerDeserializer.getInt(resultFrameTupleAcc.getBuffer().array(),
+                        resultFrameTupleAcc.getFieldStartOffset(j, 0)) + ",");
+                strBuffer.append(IntegerSerializerDeserializer.getInt(resultFrameTupleAcc.getBuffer().array(),
+                        resultFrameTupleAcc.getFieldStartOffset(j, 1)) + " ");
+            }
+        }
+        System.out.println(strBuffer.toString());
+    }
+}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/SearchResult.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/SearchResult.java
new file mode 100644
index 0000000..def477d
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/SearchResult.java
@@ -0,0 +1,139 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+
+import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeFrameTupleAccessor;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeFrameTupleAppender;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeTupleReference;
+
+/**
+ * Byte-buffer backed storage for intermediate and final results of inverted-index searches.
+ */
+// TODO: Rename members.
+public class SearchResult {
+    protected final ArrayList<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
+    protected final IHyracksCommonContext ctx;
+    protected final FixedSizeFrameTupleAppender resultFrameTupleApp;
+    protected final FixedSizeFrameTupleAccessor resultFrameTupleAcc;
+    protected final FixedSizeTupleReference resultTuple;
+    protected final ITypeTraits[] typeTraits;
+    protected final int invListElementSize;
+    
+    protected int currBufIdx;
+    protected int numResults;
+
+    public SearchResult(ITypeTraits[] invListFields, IHyracksCommonContext ctx) {
+        typeTraits = new ITypeTraits[invListFields.length + 1];
+        int tmp = 0;
+        for (int i = 0; i < invListFields.length; i++) {
+            typeTraits[i] = invListFields[i];
+            tmp += invListFields[i].getFixedLength();
+        }
+        invListElementSize = tmp;
+        // Integer for counting occurrences.
+        typeTraits[invListFields.length] = IntegerPointable.TYPE_TRAITS;
+        this.ctx = ctx;
+        resultFrameTupleApp = new FixedSizeFrameTupleAppender(ctx.getFrameSize(), typeTraits);
+        resultFrameTupleAcc = new FixedSizeFrameTupleAccessor(ctx.getFrameSize(), typeTraits);
+        resultTuple = new FixedSizeTupleReference(typeTraits);
+        buffers.add(ctx.allocateFrame());
+    }
+
+    /**
+     * Initialize from other search-result object to share member instances except for result buffers.
+     */
+    public SearchResult(SearchResult other) {
+        this.ctx = other.ctx;
+        this.resultFrameTupleApp = other.resultFrameTupleApp;
+        this.resultFrameTupleAcc = other.resultFrameTupleAcc;
+        this.resultTuple = other.resultTuple;
+        this.typeTraits = other.typeTraits;
+        this.invListElementSize = other.invListElementSize;
+        buffers.add(ctx.allocateFrame());
+    }
+
+    public FixedSizeFrameTupleAccessor getAccessor() {
+        return resultFrameTupleAcc;
+    }
+    
+    public FixedSizeFrameTupleAppender getAppender() {
+        return resultFrameTupleApp;
+    }
+    
+    public FixedSizeTupleReference getTuple() {
+        return resultTuple;
+    }
+    
+    public ArrayList<ByteBuffer> getBuffers() {
+        return buffers;
+    }
+    
+    public void reset() {
+        currBufIdx = 0;
+        numResults = 0;
+        resultFrameTupleApp.reset(buffers.get(0), true);
+    }
+    
+    public void clear() {
+        currBufIdx = 0;
+        numResults = 0;
+        for (ByteBuffer buffer : buffers) {
+            resultFrameTupleApp.reset(buffer, true);
+        }
+    }
+
+    public void append(ITupleReference invListElement, int count) {
+        ByteBuffer currentBuffer = buffers.get(currBufIdx);
+        if (!resultFrameTupleApp.hasSpace()) {
+            currBufIdx++;
+            if (currBufIdx >= buffers.size()) {
+                buffers.add(ctx.allocateFrame());
+            }
+            currentBuffer = buffers.get(currBufIdx);
+            resultFrameTupleApp.reset(currentBuffer, true);
+        }
+        // Append inverted-list element.
+        if (!resultFrameTupleApp.append(invListElement.getFieldData(0), invListElement.getFieldStart(0),
+                invListElementSize)) {
+            throw new IllegalStateException();
+        }
+        // Append count.
+        if (!resultFrameTupleApp.append(count)) {
+            throw new IllegalStateException();
+        }
+        resultFrameTupleApp.incrementTupleCount(1);
+        numResults++;
+    }
+
+    public int getCurrentBufferIndex() {
+        return currBufIdx;
+    }
+
+    public ITypeTraits[] getTypeTraits() {
+        return typeTraits;
+    }
+    
+    public int getNumResults() {
+        return numResults;
+    }
+}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/TOccurrenceSearcher.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/TOccurrenceSearcher.java
index 419d5aa..b642d1b 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/TOccurrenceSearcher.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/TOccurrenceSearcher.java
@@ -25,10 +25,8 @@
 import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
 import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -45,7 +43,6 @@
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.exceptions.OccurrenceThresholdPanicException;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeFrameTupleAccessor;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeFrameTupleAppender;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeTupleReference;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.OnDiskInvertedIndexSearchCursor;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizer;
@@ -56,16 +53,9 @@
 public class TOccurrenceSearcher implements IInvertedIndexSearcher {
 
     protected final IHyracksCommonContext ctx;
-    protected final FixedSizeFrameTupleAppender resultFrameTupleApp;
-    protected final FixedSizeFrameTupleAccessor resultFrameTupleAcc;
-    protected final FixedSizeTupleReference resultTuple;
-    protected final int invListKeyLength;
-    protected int currentNumResults;
-
-    protected List<ByteBuffer> newResultBuffers = new ArrayList<ByteBuffer>();
-    protected List<ByteBuffer> prevResultBuffers = new ArrayList<ByteBuffer>();
-    protected List<ByteBuffer> swap = null;
-    protected int maxResultBufIdx = 0;
+    
+    protected SearchResult newSearchResult;
+    protected SearchResult prevSearchResult;    
 
     protected RecordDescriptor queryTokenRecDesc = new RecordDescriptor(
             new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
@@ -77,34 +67,19 @@
 
     protected final IInvertedIndex invIndex;
     protected final MultiComparator invListCmp;
-    protected final ITypeTraits[] invListFieldsWithCount;
     protected int occurrenceThreshold;
 
     protected final int cursorCacheSize = 10;
     protected List<IInvertedListCursor> invListCursorCache = new ArrayList<IInvertedListCursor>(cursorCacheSize);
     protected List<IInvertedListCursor> invListCursors = new ArrayList<IInvertedListCursor>(cursorCacheSize);
-
+    
     public TOccurrenceSearcher(IHyracksCommonContext ctx, IInvertedIndex invIndex) {
         this.ctx = ctx;
         this.invIndex = invIndex;
         this.invListCmp = MultiComparator.create(invIndex.getInvListCmpFactories());
 
-        ITypeTraits[] invListFields = invIndex.getInvListTypeTraits();
-        invListFieldsWithCount = new ITypeTraits[invListFields.length + 1];
-        int tmp = 0;
-        for (int i = 0; i < invListFields.length; i++) {
-            invListFieldsWithCount[i] = invListFields[i];
-            tmp += invListFields[i].getFixedLength();
-        }
-        // using an integer for counting occurrences
-        invListFieldsWithCount[invListFields.length] = IntegerPointable.TYPE_TRAITS;
-        invListKeyLength = tmp;
-
-        resultFrameTupleApp = new FixedSizeFrameTupleAppender(ctx.getFrameSize(), invListFieldsWithCount);
-        resultFrameTupleAcc = new FixedSizeFrameTupleAccessor(ctx.getFrameSize(), invListFieldsWithCount);
-        resultTuple = new FixedSizeTupleReference(invListFieldsWithCount);
-        newResultBuffers.add(ctx.allocateFrame());
-        prevResultBuffers.add(ctx.allocateFrame());
+        this.prevSearchResult = new SearchResult(invIndex.getInvListTypeTraits(), ctx);
+        this.newSearchResult = new SearchResult(prevSearchResult);
 
         // Pre-create cursor objects.
         for (int i = 0; i < cursorCacheSize; i++) {
@@ -113,18 +88,11 @@
 
         queryTokenAppender = new FrameTupleAppender(ctx.getFrameSize());
         queryTokenFrame = ctx.allocateFrame();
-
-        currentNumResults = 0;
     }
 
     public void reset() {
-        for (ByteBuffer b : newResultBuffers) {
-            resultFrameTupleApp.reset(b, true);
-        }
-        for (ByteBuffer b : prevResultBuffers) {
-            resultFrameTupleApp.reset(b, true);
-        }
-        currentNumResults = 0;
+        prevSearchResult.clear();
+        newSearchResult.clear();
     }
 
     public void search(OnDiskInvertedIndexSearchCursor resultCursor, InvertedIndexSearchPredicate searchPred,
@@ -180,64 +148,60 @@
         }
         
         int numPrefixLists = searchModifier.getNumPrefixLists(invListCursors.size());
-        maxResultBufIdx = mergePrefixLists(numPrefixLists, numQueryTokens);
-        maxResultBufIdx = mergeSuffixLists(numPrefixLists, numQueryTokens, maxResultBufIdx);
+        mergePrefixLists(numPrefixLists, numQueryTokens);
+        mergeSuffixLists(numPrefixLists, numQueryTokens);
 
         resultCursor.open(null, searchPred);
     }
 
-    protected int mergePrefixLists(int numPrefixTokens, int numQueryTokens) throws HyracksDataException, IndexException {
-        int maxPrevBufIdx = 0;
+    protected void mergePrefixLists(int numPrefixTokens, int numQueryTokens) throws HyracksDataException,
+            IndexException {
         for (int i = 0; i < numPrefixTokens; i++) {
-            swap = prevResultBuffers;
-            prevResultBuffers = newResultBuffers;
-            newResultBuffers = swap;
-            currentNumResults = 0;
+            SearchResult swapTemp = prevSearchResult;
+            prevSearchResult = newSearchResult;
+            newSearchResult = swapTemp;
+            newSearchResult.reset();
 
             invListCursors.get(i).pinPages();
-            maxPrevBufIdx = mergePrefixList(invListCursors.get(i), prevResultBuffers, maxPrevBufIdx, newResultBuffers);
+            mergePrefixList(invListCursors.get(i), prevSearchResult, newSearchResult);
             invListCursors.get(i).unpinPages();
         }
-        return maxPrevBufIdx;
     }
 
-    protected int mergeSuffixLists(int numPrefixTokens, int numQueryTokens, int maxPrevBufIdx)
-            throws HyracksDataException, IndexException {
+    protected void mergeSuffixLists(int numPrefixTokens, int numQueryTokens) throws HyracksDataException,
+            IndexException {
         for (int i = numPrefixTokens; i < numQueryTokens; i++) {
-            swap = prevResultBuffers;
-            prevResultBuffers = newResultBuffers;
-            newResultBuffers = swap;
+            SearchResult swapTemp = prevSearchResult;
+            prevSearchResult = newSearchResult;
+            newSearchResult = swapTemp;
+            newSearchResult.reset();
 
             invListCursors.get(i).pinPages();
             int numInvListElements = invListCursors.get(i).size();
-            // should we binary search the next list or should we sort-merge it?
+            int currentNumResults = prevSearchResult.getNumResults();
+            // Should we binary search the next list or should we sort-merge it?
             if (currentNumResults * Math.log(numInvListElements) < currentNumResults + numInvListElements) {
-                maxPrevBufIdx = mergeSuffixListProbe(invListCursors.get(i), prevResultBuffers, maxPrevBufIdx,
-                        newResultBuffers, i, numQueryTokens);
+                mergeSuffixListProbe(invListCursors.get(i), prevSearchResult, newSearchResult, i, numQueryTokens);
             } else {
-                maxPrevBufIdx = mergeSuffixListScan(invListCursors.get(i), prevResultBuffers, maxPrevBufIdx,
-                        newResultBuffers, i, numQueryTokens);
+                mergeSuffixListScan(invListCursors.get(i), prevSearchResult, newSearchResult, i, numQueryTokens);
             }
             invListCursors.get(i).unpinPages();
         }
-        return maxPrevBufIdx;
     }
 
-    protected int mergeSuffixListProbe(IInvertedListCursor invListCursor, List<ByteBuffer> prevResultBuffers,
-            int maxPrevBufIdx, List<ByteBuffer> newResultBuffers, int invListIx, int numQueryTokens) throws HyracksDataException, IndexException {
-
-        int newBufIdx = 0;
-        ByteBuffer newCurrentBuffer = newResultBuffers.get(0);
+    protected void mergeSuffixListProbe(IInvertedListCursor invListCursor, SearchResult prevSearchResult,
+            SearchResult newSearchResult, int invListIx, int numQueryTokens) throws HyracksDataException, IndexException {
 
         int prevBufIdx = 0;
-        ByteBuffer prevCurrentBuffer = prevResultBuffers.get(0);
+        int maxPrevBufIdx = prevSearchResult.getCurrentBufferIndex();
+        ByteBuffer prevCurrentBuffer = prevSearchResult.getBuffers().get(0);
+
+        FixedSizeFrameTupleAccessor resultFrameTupleAcc = prevSearchResult.getAccessor();
+        FixedSizeTupleReference resultTuple = prevSearchResult.getTuple();
 
         int resultTidx = 0;
 
-        currentNumResults = 0;
-
         resultFrameTupleAcc.reset(prevCurrentBuffer);
-        resultFrameTupleApp.reset(newCurrentBuffer, true);
 
         while (resultTidx < resultFrameTupleAcc.getTupleCount()) {
 
@@ -247,10 +211,10 @@
 
             if (invListCursor.containsKey(resultTuple, invListCmp)) {
                 count++;
-                newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
+                newSearchResult.append(resultTuple, count);
             } else {
                 if (count + numQueryTokens - invListIx > occurrenceThreshold) {
-                    newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
+                    newSearchResult.append(resultTuple, count);
                 }
             }
 
@@ -258,34 +222,30 @@
             if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
                 prevBufIdx++;
                 if (prevBufIdx <= maxPrevBufIdx) {
-                    prevCurrentBuffer = prevResultBuffers.get(prevBufIdx);
+                    prevCurrentBuffer = prevSearchResult.getBuffers().get(prevBufIdx);
                     resultFrameTupleAcc.reset(prevCurrentBuffer);
                     resultTidx = 0;
                 }
             }
         }
-
-        return newBufIdx;
     }
 
-    protected int mergeSuffixListScan(IInvertedListCursor invListCursor, List<ByteBuffer> prevResultBuffers,
-            int maxPrevBufIdx, List<ByteBuffer> newResultBuffers, int invListIx, int numQueryTokens)
+    protected void mergeSuffixListScan(IInvertedListCursor invListCursor, SearchResult prevSearchResult,
+            SearchResult newSearchResult, int invListIx, int numQueryTokens)
             throws HyracksDataException, IndexException {
         
-        int newBufIdx = 0;
-        ByteBuffer newCurrentBuffer = newResultBuffers.get(0);
-
         int prevBufIdx = 0;
-        ByteBuffer prevCurrentBuffer = prevResultBuffers.get(0);
+        int maxPrevBufIdx = prevSearchResult.getCurrentBufferIndex();
+        ByteBuffer prevCurrentBuffer = prevSearchResult.getBuffers().get(0);
 
+        FixedSizeFrameTupleAccessor resultFrameTupleAcc = prevSearchResult.getAccessor();
+        FixedSizeTupleReference resultTuple = prevSearchResult.getTuple();
+        
         boolean advanceCursor = true;
         boolean advancePrevResult = false;
         int resultTidx = 0;
 
-        currentNumResults = 0;
-        
         resultFrameTupleAcc.reset(prevCurrentBuffer);
-        resultFrameTupleApp.reset(newCurrentBuffer, true);
 
         int invListTidx = 0;
         int invListNumTuples = invListCursor.size();
@@ -303,7 +263,7 @@
             if (cmp == 0) {
                 int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
                         resultTuple.getFieldStart(resultTuple.getFieldCount() - 1)) + 1;
-                newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
+                newSearchResult.append(resultTuple, count);
                 advanceCursor = true;
                 advancePrevResult = true;
             } else {
@@ -314,7 +274,7 @@
                     int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
                             resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
                     if (count + numQueryTokens - invListIx > occurrenceThreshold) {
-                        newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
+                        newSearchResult.append(resultTuple, count);
                     }
                     advanceCursor = false;
                     advancePrevResult = true;
@@ -326,7 +286,7 @@
                 if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
                     prevBufIdx++;
                     if (prevBufIdx <= maxPrevBufIdx) {
-                        prevCurrentBuffer = prevResultBuffers.get(prevBufIdx);
+                        prevCurrentBuffer = prevSearchResult.getBuffers().get(prevBufIdx);
                         resultFrameTupleAcc.reset(prevCurrentBuffer);
                         resultTidx = 0;
                     }
@@ -349,38 +309,36 @@
             int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
                     resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
             if (count + numQueryTokens - invListIx > occurrenceThreshold) {
-                newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
+                newSearchResult.append(resultTuple, count);
             }
 
             resultTidx++;
             if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
                 prevBufIdx++;
                 if (prevBufIdx <= maxPrevBufIdx) {
-                    prevCurrentBuffer = prevResultBuffers.get(prevBufIdx);
+                    prevCurrentBuffer = prevSearchResult.getBuffers().get(prevBufIdx);
                     resultFrameTupleAcc.reset(prevCurrentBuffer);
                     resultTidx = 0;
                 }
             }
         }
-
-        return newBufIdx;
     }
 
-    protected int mergePrefixList(IInvertedListCursor invListCursor, List<ByteBuffer> prevResultBuffers,
-            int maxPrevBufIdx, List<ByteBuffer> newResultBuffers) throws HyracksDataException, IndexException {
+    protected void mergePrefixList(IInvertedListCursor invListCursor, SearchResult prevSearchResult,
+            SearchResult newSearchResult) throws HyracksDataException, IndexException {
         
-        int newBufIdx = 0;
-        ByteBuffer newCurrentBuffer = newResultBuffers.get(0);
-
         int prevBufIdx = 0;
-        ByteBuffer prevCurrentBuffer = prevResultBuffers.get(0);
+        int maxPrevBufIdx = prevSearchResult.getCurrentBufferIndex();
+        ByteBuffer prevCurrentBuffer = prevSearchResult.getBuffers().get(0);
 
+        FixedSizeFrameTupleAccessor resultFrameTupleAcc = prevSearchResult.getAccessor();
+        FixedSizeTupleReference resultTuple = prevSearchResult.getTuple();
+        
         boolean advanceCursor = true;
         boolean advancePrevResult = false;
         int resultTidx = 0;
 
         resultFrameTupleAcc.reset(prevCurrentBuffer);
-        resultFrameTupleApp.reset(newCurrentBuffer, true);
 
         int invListTidx = 0;
         int invListNumTuples = invListCursor.size();
@@ -397,19 +355,19 @@
             if (cmp == 0) {
                 int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
                         resultTuple.getFieldStart(resultTuple.getFieldCount() - 1)) + 1;
-                newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
+                newSearchResult.append(resultTuple, count);
                 advanceCursor = true;
                 advancePrevResult = true;
             } else {
                 if (cmp < 0) {
                     int count = 1;
-                    newBufIdx = appendTupleToNewResults(invListTuple, count, newBufIdx);
+                    newSearchResult.append(invListTuple, count);
                     advanceCursor = true;
                     advancePrevResult = false;
                 } else {
                     int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
                             resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
-                    newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
+                    newSearchResult.append(resultTuple, count);
                     advanceCursor = false;
                     advancePrevResult = true;
                 }
@@ -420,7 +378,7 @@
                 if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
                     prevBufIdx++;
                     if (prevBufIdx <= maxPrevBufIdx) {
-                        prevCurrentBuffer = prevResultBuffers.get(prevBufIdx);
+                        prevCurrentBuffer = prevSearchResult.getBuffers().get(prevBufIdx);
                         resultFrameTupleAcc.reset(prevCurrentBuffer);
                         resultTidx = 0;
                     }
@@ -438,7 +396,7 @@
         // append remaining new elements from inverted list
         while (invListTidx < invListNumTuples) {
             ITupleReference invListTuple = invListCursor.getTuple();
-            newBufIdx = appendTupleToNewResults(invListTuple, 1, newBufIdx);
+            newSearchResult.append(invListTuple, 1);
             invListTidx++;
             if (invListCursor.hasNext()) {
                 invListCursor.next();
@@ -452,67 +410,36 @@
 
             int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
                     resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
-            newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
+            newSearchResult.append(resultTuple, count);
 
             resultTidx++;
             if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
                 prevBufIdx++;
                 if (prevBufIdx <= maxPrevBufIdx) {
-                    prevCurrentBuffer = prevResultBuffers.get(prevBufIdx);
+                    prevCurrentBuffer = prevSearchResult.getBuffers().get(prevBufIdx);
                     resultFrameTupleAcc.reset(prevCurrentBuffer);
                     resultTidx = 0;
                 }
             }
         }
-
-        return newBufIdx;
-    }
-
-    protected int appendTupleToNewResults(ITupleReference tuple, int newCount, int newBufIdx) {
-        ByteBuffer newCurrentBuffer = newResultBuffers.get(newBufIdx);
-
-        if (!resultFrameTupleApp.hasSpace()) {
-            newBufIdx++;
-            if (newBufIdx >= newResultBuffers.size()) {
-                newResultBuffers.add(ctx.allocateFrame());
-            }
-            newCurrentBuffer = newResultBuffers.get(newBufIdx);
-            resultFrameTupleApp.reset(newCurrentBuffer, true);
-        }
-
-        // append key
-        if (!resultFrameTupleApp.append(tuple.getFieldData(0), tuple.getFieldStart(0), invListKeyLength)) {
-            throw new IllegalStateException();
-        }
-
-        // append new count
-        if (!resultFrameTupleApp.append(newCount)) {
-            throw new IllegalStateException();
-        }
-
-        resultFrameTupleApp.incrementTupleCount(1);
-
-        currentNumResults++;
-
-        return newBufIdx;
     }
 
     public IFrameTupleAccessor createResultFrameTupleAccessor() {
-        return new FixedSizeFrameTupleAccessor(ctx.getFrameSize(), invListFieldsWithCount);
+        return new FixedSizeFrameTupleAccessor(ctx.getFrameSize(), newSearchResult.getTypeTraits());
     }
 
     public ITupleReference createResultFrameTupleReference() {
-        return new FixedSizeTupleReference(invListFieldsWithCount);
+        return new FixedSizeTupleReference(newSearchResult.getTypeTraits());
     }
 
     @Override
     public List<ByteBuffer> getResultBuffers() {
-        return newResultBuffers;
+        return newSearchResult.getBuffers();
     }
 
     @Override
     public int getNumValidResultBuffers() {
-        return maxResultBufIdx + 1;
+        return newSearchResult.getCurrentBufferIndex() + 1;
     }
 
     public int getOccurrenceThreshold() {
@@ -521,6 +448,7 @@
 
     public void printNewResults(int maxResultBufIdx, List<ByteBuffer> buffer) {
         StringBuffer strBuffer = new StringBuffer();
+        FixedSizeFrameTupleAccessor resultFrameTupleAcc = prevSearchResult.getAccessor();
         for (int i = 0; i <= maxResultBufIdx; i++) {
             ByteBuffer testBuf = buffer.get(i);
             resultFrameTupleAcc.reset(testBuf);
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/TOccurrenceSearcherSuffixProbeOnly.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/TOccurrenceSearcherSuffixProbeOnly.java
deleted file mode 100644
index 630b810..0000000
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/TOccurrenceSearcherSuffixProbeOnly.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
-import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.OnDiskInvertedIndex;
-
-public class TOccurrenceSearcherSuffixProbeOnly extends TOccurrenceSearcher {
-
-	protected final MultiComparator invListCmp;
-	
-    public TOccurrenceSearcherSuffixProbeOnly(IHyracksTaskContext ctx, OnDiskInvertedIndex invIndex) {
-        super(ctx, invIndex);
-        this.invListCmp = MultiComparator.create(invIndex.getInvListCmpFactories());
-    }
-
-    protected int mergeSuffixLists(int numPrefixTokens, int numQueryTokens, int maxPrevBufIdx) throws HyracksDataException, IndexException {
-        for (int i = numPrefixTokens; i < numQueryTokens; i++) {
-            swap = prevResultBuffers;
-            prevResultBuffers = newResultBuffers;
-            newResultBuffers = swap;
-            currentNumResults = 0;
-
-            invListCursors.get(i).pinPages();
-            maxPrevBufIdx = mergeSuffixListProbe(invListCursors.get(i), prevResultBuffers, maxPrevBufIdx,
-                    newResultBuffers, i, numQueryTokens);
-            invListCursors.get(i).unpinPages();
-        }
-        return maxPrevBufIdx;
-    }
-
-    protected int mergeSuffixListProbe(IInvertedListCursor invListCursor, List<ByteBuffer> prevResultBuffers,
-            int maxPrevBufIdx, List<ByteBuffer> newResultBuffers, int invListIx, int numQueryTokens) throws HyracksDataException, IndexException {
-
-        int newBufIdx = 0;
-        ByteBuffer newCurrentBuffer = newResultBuffers.get(0);
-
-        int prevBufIdx = 0;
-        ByteBuffer prevCurrentBuffer = prevResultBuffers.get(0);
-
-        int resultTidx = 0;
-
-        resultFrameTupleAcc.reset(prevCurrentBuffer);
-        resultFrameTupleApp.reset(newCurrentBuffer, true);
-
-        while (resultTidx < resultFrameTupleAcc.getTupleCount()) {
-
-            resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));
-            int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
-                    resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
-
-            if (invListCursor.containsKey(resultTuple, invListCmp)) {
-                count++;
-                newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
-            } else {
-                if (count + numQueryTokens - invListIx > occurrenceThreshold) {
-                    newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
-                }
-            }
-
-            resultTidx++;
-            if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
-                prevBufIdx++;
-                if (prevBufIdx <= maxPrevBufIdx) {
-                    prevCurrentBuffer = prevResultBuffers.get(prevBufIdx);
-                    resultFrameTupleAcc.reset(prevCurrentBuffer);
-                    resultTidx = 0;
-                }
-            }
-        }
-
-        return newBufIdx;
-    }
-}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/TOccurrenceSearcherSuffixScanOnly.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/TOccurrenceSearcherSuffixScanOnly.java
deleted file mode 100644
index 3640511..0000000
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/TOccurrenceSearcherSuffixScanOnly.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
-import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.OnDiskInvertedIndex;
-
-public class TOccurrenceSearcherSuffixScanOnly extends TOccurrenceSearcher {
-
-	protected final MultiComparator invListCmp;
-	
-    public TOccurrenceSearcherSuffixScanOnly(IHyracksTaskContext ctx, OnDiskInvertedIndex invIndex) {
-        super(ctx, invIndex);
-        this.invListCmp = MultiComparator.create(invIndex.getInvListCmpFactories());
-    }
-
-    protected int mergeSuffixLists(int numPrefixTokens, int numQueryTokens, int maxPrevBufIdx) throws HyracksDataException, IndexException {
-        for (int i = numPrefixTokens; i < numQueryTokens; i++) {
-            swap = prevResultBuffers;
-            prevResultBuffers = newResultBuffers;
-            newResultBuffers = swap;
-            currentNumResults = 0;
-
-            invListCursors.get(i).pinPages();
-            maxPrevBufIdx = mergeSuffixListScan(invListCursors.get(i), prevResultBuffers, maxPrevBufIdx,
-                    newResultBuffers, i, numQueryTokens);
-            invListCursors.get(i).unpinPages();
-        }
-        return maxPrevBufIdx;
-    }
-
-    protected int mergeSuffixListScan(IInvertedListCursor invListCursor, List<ByteBuffer> prevResultBuffers,
-            int maxPrevBufIdx, List<ByteBuffer> newResultBuffers, int invListIx, int numQueryTokens) throws HyracksDataException, IndexException {
-
-        int newBufIdx = 0;
-        ByteBuffer newCurrentBuffer = newResultBuffers.get(0);
-
-        int prevBufIdx = 0;
-        ByteBuffer prevCurrentBuffer = prevResultBuffers.get(0);
-
-        boolean advanceCursor = true;
-        boolean advancePrevResult = false;
-        int resultTidx = 0;
-
-        resultFrameTupleAcc.reset(prevCurrentBuffer);
-        resultFrameTupleApp.reset(newCurrentBuffer, true);
-
-        while (invListCursor.hasNext() && resultTidx < resultFrameTupleAcc.getTupleCount()) {
-
-            if (advanceCursor)
-                invListCursor.next();
-
-            ITupleReference invListTuple = invListCursor.getTuple();
-
-            resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));
-
-            int cmp = invListCmp.compare(invListTuple, resultTuple);
-            if (cmp == 0) {
-                int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
-                        resultTuple.getFieldStart(resultTuple.getFieldCount() - 1)) + 1;
-                newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
-                advanceCursor = true;
-                advancePrevResult = true;
-            } else {
-                if (cmp < 0) {
-                    advanceCursor = true;
-                    advancePrevResult = false;
-                } else {
-                    int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
-                            resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
-                    if (count + numQueryTokens - invListIx > occurrenceThreshold) {
-                        newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
-                    }
-                    advanceCursor = false;
-                    advancePrevResult = true;
-                }
-            }
-
-            if (advancePrevResult) {
-                resultTidx++;
-                if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
-                    prevBufIdx++;
-                    if (prevBufIdx <= maxPrevBufIdx) {
-                        prevCurrentBuffer = prevResultBuffers.get(prevBufIdx);
-                        resultFrameTupleAcc.reset(prevCurrentBuffer);
-                        resultTidx = 0;
-                    }
-                }
-            }
-        }
-
-        // append remaining elements from previous result set
-        while (resultTidx < resultFrameTupleAcc.getTupleCount()) {
-
-            int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
-                    resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
-            newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
-
-            resultTidx++;
-            if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
-                prevBufIdx++;
-                if (prevBufIdx <= maxPrevBufIdx) {
-                    prevCurrentBuffer = prevResultBuffers.get(prevBufIdx);
-                    resultFrameTupleAcc.reset(prevCurrentBuffer);
-                    resultTidx = 0;
-                }
-            }
-        }
-
-        return newBufIdx;
-    }
-}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/InvertedIndexUtils.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/InvertedIndexUtils.java
index 7da9416..3ffea85 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/InvertedIndexUtils.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/InvertedIndexUtils.java
@@ -48,6 +48,7 @@
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeElementInvertedListBuilderFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.OnDiskInvertedIndex;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.OnDiskInvertedIndexFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.PartitionedOnDiskInvertedIndex;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
 import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
 import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
@@ -72,6 +73,16 @@
         return new OnDiskInvertedIndex(bufferCache, fileMapProvider, builder, invListTypeTraits, invListCmpFactories,
                 tokenTypeTraits, tokenCmpFactories, btreeFile, invListsFile);
     }
+    
+    public static PartitionedOnDiskInvertedIndex createPartitionedOnDiskInvertedIndex(IBufferCache bufferCache,
+            IFileMapProvider fileMapProvider, ITypeTraits[] invListTypeTraits,
+            IBinaryComparatorFactory[] invListCmpFactories, ITypeTraits[] tokenTypeTraits,
+            IBinaryComparatorFactory[] tokenCmpFactories, FileReference invListsFile) throws IndexException {
+        IInvertedListBuilder builder = new FixedSizeElementInvertedListBuilder(invListTypeTraits);
+        FileReference btreeFile = getBTreeFile(invListsFile);
+        return new PartitionedOnDiskInvertedIndex(bufferCache, fileMapProvider, builder, invListTypeTraits,
+                invListCmpFactories, tokenTypeTraits, tokenCmpFactories, btreeFile, invListsFile);
+    }
 
     public static FileReference getBTreeFile(FileReference invListsFile) {
         return new FileReference(new File(invListsFile.getFile().getPath() + "_btree"));
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/ObjectCache.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/ObjectCache.java
new file mode 100644
index 0000000..b073f20
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/ObjectCache.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util;
+
+import java.util.ArrayList;
+
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IObjectFactory;
+
+public class ObjectCache<T> {
+    protected final int expandSize;
+    protected final IObjectFactory<T> objFactory;
+    protected final ArrayList<T> cache;
+    protected int lastReturned = 0;
+
+    public ObjectCache(IObjectFactory<T> objFactory, int initialSize, int expandSize) {
+        this.objFactory = objFactory;
+        this.cache = new ArrayList<T>(initialSize);
+        this.expandSize = expandSize;
+        expand(initialSize);
+    }
+
+    private void expand(int expandSize) {
+        for (int i = 0; i < expandSize; i++) {
+            cache.add(objFactory.create());
+        }
+    }
+
+    public void reset() {
+        lastReturned = 0;
+    }
+
+    public T getNext() {
+        if (lastReturned >= cache.size()) {
+            expand(expandSize);
+        }
+        return cache.get(lastReturned++);
+    }
+}
diff --git a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestContext.java b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestContext.java
index ce87bd7..eb75a8a 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestContext.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestContext.java
@@ -124,13 +124,18 @@
                         tokenCmpFactories, tokenizerFactory);
                 break;
             }
-            case ONDISK:
-            case PARTITIONED_ONDISK: {
+            case ONDISK: {
                 invIndex = InvertedIndexUtils.createOnDiskInvertedIndex(harness.getDiskBufferCache(),
                         harness.getDiskFileMapProvider(), invListTypeTraits, invListCmpFactories, tokenTypeTraits,
                         tokenCmpFactories, harness.getInvListsFileRef());
                 break;
             }
+            case PARTITIONED_ONDISK: {
+                invIndex = InvertedIndexUtils.createPartitionedOnDiskInvertedIndex(harness.getDiskBufferCache(),
+                        harness.getDiskFileMapProvider(), invListTypeTraits, invListCmpFactories, tokenTypeTraits,
+                        tokenCmpFactories, harness.getInvListsFileRef());
+                break;
+            }
             case LSM:
             case PARTITIONED_LSM: {
                 invIndex = InvertedIndexUtils.createLSMInvertedIndex(harness.getMemBufferCache(),
diff --git a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestUtils.java b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestUtils.java
index 8044821..e518816 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestUtils.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestUtils.java
@@ -414,18 +414,22 @@
             Comparable tokenObj = (Comparable) tokenSerde.deserialize(dataIn);
             CheckTuple lowKey;
             if (numTokensLowerBound < 0) {
+                // Index is not partitioned, or no length filtering is possible for this search modifier.
                 lowKey = new CheckTuple(1, 1);
                 lowKey.appendField(tokenObj);
             } else {
+                // Index is length partitioned, and search modifier supports length filtering.
                 lowKey = new CheckTuple(2, 2);
                 lowKey.appendField(tokenObj);
                 lowKey.appendField(Integer.valueOf(numTokensLowerBound));
             }
             CheckTuple highKey;
             if (numTokensUpperBound < 0) {
+                // Index is not partitioned, or no length filtering is possible for this search modifier.
                 highKey = new CheckTuple(1, 1);
                 highKey.appendField(tokenObj);
             } else {
+                // Index is length partitioned, and search modifier supports length filtering.
                 highKey = new CheckTuple(2, 2);
                 highKey.appendField(tokenObj);
                 highKey.appendField(Integer.valueOf(numTokensUpperBound));
@@ -440,7 +444,6 @@
                 Integer element = (Integer) checkTuple.getField(invListElementField);
                 scanCountArray[element]++;
             }
-           
         }
         
         // Run through scan count array, and see whether elements satisfy the given occurrence threshold.