Refactored code for better sharing. Added new test for on-disk component of length-partitioned inverted index.

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_length_filter@2465 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
index bb43028..d1aa854 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
@@ -272,7 +272,7 @@
         }
     }
 
-    protected void resetInvertedListCursor(ITupleReference btreeTuple, IInvertedListCursor listCursor) {
+    public void resetInvertedListCursor(ITupleReference btreeTuple, IInvertedListCursor listCursor) {
         int startPageId = IntegerSerializerDeserializer.getInt(btreeTuple.getFieldData(invListStartPageIdField),
                 btreeTuple.getFieldStart(invListStartPageIdField));
         int endPageId = IntegerSerializerDeserializer.getInt(btreeTuple.getFieldData(invListEndPageIdField),
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndex.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndex.java
index b7063fb..6614be0 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndex.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndex.java
@@ -15,31 +15,26 @@
 
 package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexOperationContext;
 import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
 import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
 import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedListBuilder;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search.InvertedListPartitions;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search.PartitionedTOccurrenceSearcher;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.ObjectCache;
 import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
 import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
 
 public class PartitionedOnDiskInvertedIndex extends OnDiskInvertedIndex {
 
     protected final int PARTITIONING_NUM_TOKENS_FIELD = 1;
-    
+
     public PartitionedOnDiskInvertedIndex(IBufferCache bufferCache, IFileMapProvider fileMapProvider,
             IInvertedListBuilder invListBuilder, ITypeTraits[] invListTypeTraits,
             IBinaryComparatorFactory[] invListCmpFactories, ITypeTraits[] tokenTypeTraits,
@@ -60,85 +55,10 @@
             ISearchOperationCallback searchCallback) {
         return new PartitionedOnDiskInvertedIndexAccessor(this);
     }
-    
-    public class InvertedListPartitions {
-        private final int DEFAULT_NUM_PARTITIONS = 10;
-        private final int PARTITIONS_SLACK_SIZE = 10;
-        private final ObjectCache<IInvertedListCursor> invListCursorCache;
-        private final ObjectCache<ArrayList<IInvertedListCursor>> arrayListCache;
-        private ArrayList<IInvertedListCursor>[] partitions;
-        private int minValidPartitionIndex;
-        private int maxValidPartitionIndex;
 
-        public InvertedListPartitions(ObjectCache<IInvertedListCursor> invListCursorCache,
-                ObjectCache<ArrayList<IInvertedListCursor>> arrayListCache) {
-            this.invListCursorCache = invListCursorCache;
-            this.arrayListCache = arrayListCache;
-        }
-
-        @SuppressWarnings("unchecked")
-        public void reset(int numTokensLowerBound, int numTokensUpperBound) {
-            if (partitions == null) {
-                int initialSize;
-                if (numTokensUpperBound < 0) {
-                    initialSize = DEFAULT_NUM_PARTITIONS;
-                } else {
-                    initialSize = numTokensUpperBound + 1;
-                }
-                partitions = (ArrayList<IInvertedListCursor>[]) new ArrayList[initialSize];
-            } else {
-                if (numTokensUpperBound + 1 >= partitions.length) {
-                    partitions = Arrays.copyOf(partitions, numTokensUpperBound + 1);
-                }
-                Arrays.fill(partitions, null);
-            }
-            invListCursorCache.reset();
-            arrayListCache.reset();
-            minValidPartitionIndex = Integer.MAX_VALUE;
-            maxValidPartitionIndex = Integer.MIN_VALUE;
-        }
-
-        public void addInvertedListCursor(ITupleReference btreeTuple) {
-            IInvertedListCursor listCursor = invListCursorCache.getNext();
-            resetInvertedListCursor(btreeTuple, listCursor);
-            int numTokens = IntegerSerializerDeserializer.getInt(
-                    btreeTuple.getFieldData(PARTITIONING_NUM_TOKENS_FIELD),
-                    btreeTuple.getFieldStart(PARTITIONING_NUM_TOKENS_FIELD));
-            if (numTokens + 1 >= partitions.length) {
-                partitions = Arrays.copyOf(partitions, numTokens + PARTITIONS_SLACK_SIZE);
-            }
-            ArrayList<IInvertedListCursor> partitionCursors = partitions[numTokens];
-            if (partitionCursors == null) {
-                partitionCursors = arrayListCache.getNext();
-                partitionCursors.clear();
-                partitions[numTokens] = partitionCursors;
-                // Update range of valid partitions.
-                if (numTokens < minValidPartitionIndex) {
-                    minValidPartitionIndex = numTokens;
-                }
-                if (numTokens > maxValidPartitionIndex) {
-                    maxValidPartitionIndex = numTokens;
-                }
-            }
-            partitionCursors.add(listCursor);
-        }
-
-        public ArrayList<IInvertedListCursor>[] getPartitions() {
-            return partitions;
-        }
-        
-        public int getMinValidPartitionIndex() {
-            return minValidPartitionIndex;
-        }
-
-        public int getMaxValidPartitionIndex() {
-            return maxValidPartitionIndex;
-        }
-    }
-    
-    public void openInvertedListPartitionCursors(InvertedListPartitions invListPartitions, ITupleReference lowSearchKey,
-            ITupleReference highSearchKey,
-            IIndexOperationContext ictx) throws HyracksDataException, IndexException {
+    public void openInvertedListPartitionCursors(InvertedListPartitions invListPartitions,
+            ITupleReference lowSearchKey, ITupleReference highSearchKey, IIndexOperationContext ictx)
+            throws HyracksDataException, IndexException {
         OnDiskInvertedIndexOpContext ctx = (OnDiskInvertedIndexOpContext) ictx;
         if (lowSearchKey.getFieldCount() == 1) {
             ctx.btreePred.setLowKeyComparator(ctx.prefixSearchCmp);
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/AbstractTOccurrenceSearcher.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/AbstractTOccurrenceSearcher.java
new file mode 100644
index 0000000..af597bc
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/AbstractTOccurrenceSearcher.java
@@ -0,0 +1,154 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexSearcher;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IObjectFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.exceptions.OccurrenceThresholdPanicException;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeFrameTupleAccessor;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeTupleReference;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizer;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IToken;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.ObjectCache;
+
+public abstract class AbstractTOccurrenceSearcher implements IInvertedIndexSearcher {
+    protected static final RecordDescriptor QUERY_TOKEN_REC_DESC = new RecordDescriptor(
+            new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
+
+    protected final int OBJECT_CACHE_INIT_SIZE = 10;
+    protected final int OBJECT_CACHE_EXPAND_SIZE = 10;
+
+    protected final IHyracksCommonContext ctx;
+
+    protected final InvertedListMerger invListMerger;
+    protected final SearchResult searchResult;
+    protected final IInvertedIndex invIndex;
+    protected final MultiComparator invListCmp;
+
+    protected final ArrayTupleBuilder queryTokenBuilder = new ArrayTupleBuilder(QUERY_TOKEN_REC_DESC.getFieldCount());
+    protected final ByteBuffer queryTokenFrame;
+    protected final FrameTupleAppender queryTokenAppender;
+    protected final FrameTupleAccessor queryTokenAccessor;
+    protected final FrameTupleReference searchKey = new FrameTupleReference();
+
+    protected int occurrenceThreshold;
+
+    protected final IObjectFactory<IInvertedListCursor> invListCursorFactory;
+    protected final ObjectCache<IInvertedListCursor> invListCursorCache;
+
+    public AbstractTOccurrenceSearcher(IHyracksCommonContext ctx, IInvertedIndex invIndex) {
+        this.ctx = ctx;
+        this.invListMerger = new InvertedListMerger(ctx, invIndex);
+        this.searchResult = new SearchResult(invIndex.getInvListTypeTraits(), ctx);
+        this.invIndex = invIndex;
+        this.invListCmp = MultiComparator.create(invIndex.getInvListCmpFactories());
+        this.invListCursorFactory = new InvertedListCursorFactory(invIndex);
+        this.invListCursorCache = new ObjectCache<IInvertedListCursor>(invListCursorFactory, OBJECT_CACHE_INIT_SIZE,
+                OBJECT_CACHE_EXPAND_SIZE);
+        this.queryTokenFrame = ctx.allocateFrame();
+        this.queryTokenAppender = new FrameTupleAppender(ctx.getFrameSize());
+        this.queryTokenAccessor = new FrameTupleAccessor(ctx.getFrameSize(), QUERY_TOKEN_REC_DESC);
+        this.queryTokenAccessor.reset(queryTokenFrame);
+    }
+
+    public void reset() {
+        searchResult.clear();
+        invListMerger.reset();
+    }
+
+    protected void tokenizeQuery(InvertedIndexSearchPredicate searchPred) throws HyracksDataException,
+            OccurrenceThresholdPanicException {
+        ITupleReference queryTuple = searchPred.getQueryTuple();
+        int queryFieldIndex = searchPred.getQueryFieldIndex();
+        IBinaryTokenizer queryTokenizer = searchPred.getQueryTokenizer();
+
+        queryTokenAppender.reset(queryTokenFrame, true);
+        queryTokenizer.reset(queryTuple.getFieldData(queryFieldIndex), queryTuple.getFieldStart(queryFieldIndex),
+                queryTuple.getFieldLength(queryFieldIndex));
+
+        while (queryTokenizer.hasNext()) {
+            queryTokenizer.next();
+            queryTokenBuilder.reset();
+            try {
+                IToken token = queryTokenizer.getToken();
+                token.serializeToken(queryTokenBuilder.getDataOutput());
+                queryTokenBuilder.addFieldEndOffset();
+                // WARNING: assuming one frame is big enough to hold all tokens
+                queryTokenAppender.append(queryTokenBuilder.getFieldEndOffsets(), queryTokenBuilder.getByteArray(), 0,
+                        queryTokenBuilder.getSize());
+            } catch (IOException e) {
+                throw new HyracksDataException(e);
+            }
+        }
+    }
+
+    public IFrameTupleAccessor createResultFrameTupleAccessor() {
+        return new FixedSizeFrameTupleAccessor(ctx.getFrameSize(), searchResult.getTypeTraits());
+    }
+
+    public ITupleReference createResultFrameTupleReference() {
+        return new FixedSizeTupleReference(searchResult.getTypeTraits());
+    }
+
+    @Override
+    public List<ByteBuffer> getResultBuffers() {
+        return searchResult.getBuffers();
+    }
+
+    @Override
+    public int getNumValidResultBuffers() {
+        return searchResult.getCurrentBufferIndex() + 1;
+    }
+
+    public int getOccurrenceThreshold() {
+        return occurrenceThreshold;
+    }
+
+    public void printNewResults(int maxResultBufIdx, List<ByteBuffer> buffer) {
+        StringBuffer strBuffer = new StringBuffer();
+        FixedSizeFrameTupleAccessor resultFrameTupleAcc = searchResult.getAccessor();
+        for (int i = 0; i <= maxResultBufIdx; i++) {
+            ByteBuffer testBuf = buffer.get(i);
+            resultFrameTupleAcc.reset(testBuf);
+            for (int j = 0; j < resultFrameTupleAcc.getTupleCount(); j++) {
+                strBuffer.append(IntegerSerializerDeserializer.getInt(resultFrameTupleAcc.getBuffer().array(),
+                        resultFrameTupleAcc.getFieldStartOffset(j, 0)) + ",");
+                strBuffer.append(IntegerSerializerDeserializer.getInt(resultFrameTupleAcc.getBuffer().array(),
+                        resultFrameTupleAcc.getFieldStartOffset(j, 1)) + " ");
+            }
+        }
+        System.out.println(strBuffer.toString());
+    }
+}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/InvertedListPartitions.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/InvertedListPartitions.java
new file mode 100644
index 0000000..70071a1
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/InvertedListPartitions.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.OnDiskInvertedIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.ObjectCache;
+
+public class InvertedListPartitions {
+    private final int PARTITIONING_NUM_TOKENS_FIELD = 1;
+    private final int DEFAULT_NUM_PARTITIONS = 10;
+    private final int PARTITIONS_SLACK_SIZE = 10;    
+    private final OnDiskInvertedIndex invIndex;    
+    private final ObjectCache<IInvertedListCursor> invListCursorCache;
+    private final ObjectCache<ArrayList<IInvertedListCursor>> arrayListCache;
+    private ArrayList<IInvertedListCursor>[] partitions;
+    private int minValidPartitionIndex;
+    private int maxValidPartitionIndex;
+
+    public InvertedListPartitions(OnDiskInvertedIndex invIndex, ObjectCache<IInvertedListCursor> invListCursorCache,
+            ObjectCache<ArrayList<IInvertedListCursor>> arrayListCache) {
+        this.invIndex = invIndex;
+        this.invListCursorCache = invListCursorCache;
+        this.arrayListCache = arrayListCache;
+    }
+
+    @SuppressWarnings("unchecked")
+    public void reset(int numTokensLowerBound, int numTokensUpperBound) {
+        if (partitions == null) {
+            int initialSize;
+            if (numTokensUpperBound < 0) {
+                initialSize = DEFAULT_NUM_PARTITIONS;
+            } else {
+                initialSize = numTokensUpperBound + 1;
+            }
+            partitions = (ArrayList<IInvertedListCursor>[]) new ArrayList[initialSize];
+        } else {
+            if (numTokensUpperBound + 1 >= partitions.length) {
+                partitions = Arrays.copyOf(partitions, numTokensUpperBound + 1);
+            }
+            Arrays.fill(partitions, null);
+        }
+        invListCursorCache.reset();
+        arrayListCache.reset();
+        minValidPartitionIndex = Integer.MAX_VALUE;
+        maxValidPartitionIndex = Integer.MIN_VALUE;
+    }
+
+    public void addInvertedListCursor(ITupleReference btreeTuple) {
+        IInvertedListCursor listCursor = invListCursorCache.getNext();
+        invIndex.resetInvertedListCursor(btreeTuple, listCursor);
+        int numTokens = IntegerSerializerDeserializer.getInt(btreeTuple.getFieldData(PARTITIONING_NUM_TOKENS_FIELD),
+                btreeTuple.getFieldStart(PARTITIONING_NUM_TOKENS_FIELD));
+        if (numTokens + 1 >= partitions.length) {
+            partitions = Arrays.copyOf(partitions, numTokens + PARTITIONS_SLACK_SIZE);
+        }
+        ArrayList<IInvertedListCursor> partitionCursors = partitions[numTokens];
+        if (partitionCursors == null) {
+            partitionCursors = arrayListCache.getNext();
+            partitionCursors.clear();
+            partitions[numTokens] = partitionCursors;
+            // Update range of valid partitions.
+            if (numTokens < minValidPartitionIndex) {
+                minValidPartitionIndex = numTokens;
+            }
+            if (numTokens > maxValidPartitionIndex) {
+                maxValidPartitionIndex = numTokens;
+            }
+        }
+        partitionCursors.add(listCursor);
+    }
+
+    public ArrayList<IInvertedListCursor>[] getPartitions() {
+        return partitions;
+    }
+
+    public int getMinValidPartitionIndex() {
+        return minValidPartitionIndex;
+    }
+
+    public int getMaxValidPartitionIndex() {
+        return maxValidPartitionIndex;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/PartitionedTOccurrenceSearcher.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/PartitionedTOccurrenceSearcher.java
index 439f1ad..522500f 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/PartitionedTOccurrenceSearcher.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/PartitionedTOccurrenceSearcher.java
@@ -15,66 +15,28 @@
 
 package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search;
 
-import java.io.DataOutput;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.List;
 
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
 import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleReference;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexOperationContext;
 import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
-import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
 import edu.uci.ics.hyracks.storage.am.common.tuples.ConcatenatingTupleReference;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndex;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexSearchModifier;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexSearcher;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IObjectFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.exceptions.OccurrenceThresholdPanicException;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeFrameTupleAccessor;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeTupleReference;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.OnDiskInvertedIndex;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.OnDiskInvertedIndexSearchCursor;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.PartitionedOnDiskInvertedIndex;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.PartitionedOnDiskInvertedIndex.InvertedListPartitions;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizer;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IToken;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.ObjectCache;
 
-public class PartitionedTOccurrenceSearcher implements IInvertedIndexSearcher {
-
-    protected final IHyracksCommonContext ctx;
-
-    protected final InvertedListMerger invListMerger;
-    protected final SearchResult searchResult;
-
-    protected RecordDescriptor queryTokenRecDesc = new RecordDescriptor(
-            new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
-    protected ArrayTupleBuilder queryTokenBuilder = new ArrayTupleBuilder(queryTokenRecDesc.getFieldCount());
-    protected DataOutput queryTokenDos = queryTokenBuilder.getDataOutput();
-    protected FrameTupleAppender queryTokenAppender;
-    protected ByteBuffer queryTokenFrame;
-    protected final FrameTupleReference searchKey = new FrameTupleReference();
-
-    protected final IInvertedIndex invIndex;
-    protected final MultiComparator invListCmp;
-    protected int occurrenceThreshold;
-
-    protected final int cursorCacheSize = 10;
-    //protected ArrayList<IInvertedListCursor> invListCursorCache = new ArrayList<IInvertedListCursor>(cursorCacheSize);
-    protected ArrayList<IInvertedListCursor> invListCursors = new ArrayList<IInvertedListCursor>(cursorCacheSize);
+public class PartitionedTOccurrenceSearcher extends AbstractTOccurrenceSearcher {
 
     protected final ArrayTupleBuilder lowerBoundTupleBuilder = new ArrayTupleBuilder(1);
     protected final ArrayTupleReference lowerBoundTuple = new ArrayTupleReference();
@@ -83,68 +45,25 @@
     protected final ConcatenatingTupleReference partLowSearchKey = new ConcatenatingTupleReference(2);
     protected final ConcatenatingTupleReference partHighSearchKey = new ConcatenatingTupleReference(2);
 
-    protected final IObjectFactory<IInvertedListCursor> invListCursorFactory;
     protected final IObjectFactory<ArrayList<IInvertedListCursor>> arrayListFactory;
-    protected final ObjectCache<IInvertedListCursor> invListCursorCache;
     protected final ObjectCache<ArrayList<IInvertedListCursor>> arrayListCache;
 
     protected final InvertedListPartitions partitions;
 
     public PartitionedTOccurrenceSearcher(IHyracksCommonContext ctx, IInvertedIndex invIndex) {
-        this.ctx = ctx;
-        this.invListMerger = new InvertedListMerger(ctx, invIndex);
-        this.searchResult = new SearchResult(invIndex.getInvListTypeTraits(), ctx);
-        this.invIndex = invIndex;
-        this.invListCmp = MultiComparator.create(invIndex.getInvListCmpFactories());
-
-        queryTokenAppender = new FrameTupleAppender(ctx.getFrameSize());
-        queryTokenFrame = ctx.allocateFrame();
-
-        invListCursorFactory = new InvertedListCursorFactory(invIndex);
-        arrayListFactory = new ArrayListFactory<IInvertedListCursor>();
-        invListCursorCache = new ObjectCache<IInvertedListCursor>(invListCursorFactory, 10, 10);
-        arrayListCache = new ObjectCache<ArrayList<IInvertedListCursor>>(arrayListFactory, 10, 10);
-
-        PartitionedOnDiskInvertedIndex partInvIndex = (PartitionedOnDiskInvertedIndex) invIndex;
-        partitions = partInvIndex.new InvertedListPartitions(invListCursorCache, arrayListCache);
-    }
-
-    public void reset() {
-        searchResult.clear();
-        invListMerger.reset();
+        super(ctx, invIndex);
+        this.arrayListFactory = new ArrayListFactory<IInvertedListCursor>();
+        this.arrayListCache = new ObjectCache<ArrayList<IInvertedListCursor>>(arrayListFactory, 10, 10);
+        OnDiskInvertedIndex partInvIndex = (OnDiskInvertedIndex) invIndex;
+        this.partitions = new InvertedListPartitions(partInvIndex, invListCursorCache, arrayListCache);
     }
 
     public void search(OnDiskInvertedIndexSearchCursor resultCursor, InvertedIndexSearchPredicate searchPred,
             IIndexOperationContext ictx) throws HyracksDataException, IndexException {
-        ITupleReference queryTuple = searchPred.getQueryTuple();
-        int queryFieldIndex = searchPred.getQueryFieldIndex();
-        IInvertedIndexSearchModifier searchModifier = searchPred.getSearchModifier();
-        IBinaryTokenizer queryTokenizer = searchPred.getQueryTokenizer();
-
-        queryTokenAppender.reset(queryTokenFrame, true);
-        queryTokenizer.reset(queryTuple.getFieldData(queryFieldIndex), queryTuple.getFieldStart(queryFieldIndex),
-                queryTuple.getFieldLength(queryFieldIndex));
-
-        while (queryTokenizer.hasNext()) {
-            queryTokenizer.next();
-            queryTokenBuilder.reset();
-            try {
-                IToken token = queryTokenizer.getToken();
-                token.serializeToken(queryTokenDos);
-                queryTokenBuilder.addFieldEndOffset();
-                // WARNING: assuming one frame is big enough to hold all tokens
-                queryTokenAppender.append(queryTokenBuilder.getFieldEndOffsets(), queryTokenBuilder.getByteArray(), 0,
-                        queryTokenBuilder.getSize());
-            } catch (IOException e) {
-                throw new HyracksDataException(e);
-            }
-        }
-
-        FrameTupleAccessor queryTokenAccessor = new FrameTupleAccessor(ctx.getFrameSize(), queryTokenRecDesc);
-        queryTokenAccessor.reset(queryTokenFrame);
+        tokenizeQuery(searchPred);
         int numQueryTokens = queryTokenAccessor.getTupleCount();
 
-        // ALEX NEW CODE STARTS HERE
+        IInvertedIndexSearchModifier searchModifier = searchPred.getSearchModifier();
         int numTokensLowerBound = searchModifier.getNumTokensLowerBound(numQueryTokens);
         int numTokensUpperBound = searchModifier.getNumTokensUpperBound(numQueryTokens);
         ITupleReference lowSearchKey = null;
@@ -192,7 +111,6 @@
         }
 
         occurrenceThreshold = searchModifier.getOccurrenceThreshold(numQueryTokens);
-        // TODO: deal with panic cases properly
         if (occurrenceThreshold <= 0) {
             throw new OccurrenceThresholdPanicException("Merge Threshold is <= 0. Failing Search.");
         }
@@ -218,42 +136,4 @@
 
         resultCursor.open(null, searchPred);
     }
-
-    public IFrameTupleAccessor createResultFrameTupleAccessor() {
-        return new FixedSizeFrameTupleAccessor(ctx.getFrameSize(), searchResult.getTypeTraits());
-    }
-
-    public ITupleReference createResultFrameTupleReference() {
-        return new FixedSizeTupleReference(searchResult.getTypeTraits());
-    }
-
-    @Override
-    public List<ByteBuffer> getResultBuffers() {
-        return searchResult.getBuffers();
-    }
-
-    @Override
-    public int getNumValidResultBuffers() {
-        return searchResult.getCurrentBufferIndex() + 1;
-    }
-
-    public int getOccurrenceThreshold() {
-        return occurrenceThreshold;
-    }
-
-    public void printNewResults(int maxResultBufIdx, List<ByteBuffer> buffer) {
-        StringBuffer strBuffer = new StringBuffer();
-        FixedSizeFrameTupleAccessor resultFrameTupleAcc = searchResult.getAccessor();
-        for (int i = 0; i <= maxResultBufIdx; i++) {
-            ByteBuffer testBuf = buffer.get(i);
-            resultFrameTupleAcc.reset(testBuf);
-            for (int j = 0; j < resultFrameTupleAcc.getTupleCount(); j++) {
-                strBuffer.append(IntegerSerializerDeserializer.getInt(resultFrameTupleAcc.getBuffer().array(),
-                        resultFrameTupleAcc.getFieldStartOffset(j, 0)) + ",");
-                strBuffer.append(IntegerSerializerDeserializer.getInt(resultFrameTupleAcc.getBuffer().array(),
-                        resultFrameTupleAcc.getFieldStartOffset(j, 1)) + " ");
-            }
-        }
-        System.out.println(strBuffer.toString());
-    }
 }
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/TOccurrenceSearcher.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/TOccurrenceSearcher.java
index e0119a9..4513540 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/TOccurrenceSearcher.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/TOccurrenceSearcher.java
@@ -15,127 +15,41 @@
 
 package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search;
 
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.List;
 
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
 import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexOperationContext;
 import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
-import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndex;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexSearchModifier;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexSearcher;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.exceptions.OccurrenceThresholdPanicException;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeFrameTupleAccessor;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeTupleReference;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.OnDiskInvertedIndexSearchCursor;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizer;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IToken;
 
-public class TOccurrenceSearcher implements IInvertedIndexSearcher {
+public class TOccurrenceSearcher extends AbstractTOccurrenceSearcher {
 
-    protected final IHyracksCommonContext ctx;
-
-    protected final InvertedListMerger invListMerger;
-    protected final SearchResult searchResult;
-
-    protected RecordDescriptor queryTokenRecDesc = new RecordDescriptor(
-            new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
-    protected ArrayTupleBuilder queryTokenBuilder = new ArrayTupleBuilder(queryTokenRecDesc.getFieldCount());
-    protected DataOutput queryTokenDos = queryTokenBuilder.getDataOutput();
-    protected FrameTupleAppender queryTokenAppender;
-    protected ByteBuffer queryTokenFrame;
-    protected final FrameTupleReference searchKey = new FrameTupleReference();
-
-    protected final IInvertedIndex invIndex;
-    protected final MultiComparator invListCmp;
-    protected int occurrenceThreshold;
-
-    protected final int cursorCacheSize = 10;
-    protected ArrayList<IInvertedListCursor> invListCursorCache = new ArrayList<IInvertedListCursor>(cursorCacheSize);
-    protected ArrayList<IInvertedListCursor> invListCursors = new ArrayList<IInvertedListCursor>(cursorCacheSize);
+    protected final ArrayList<IInvertedListCursor> invListCursors = new ArrayList<IInvertedListCursor>();
 
     public TOccurrenceSearcher(IHyracksCommonContext ctx, IInvertedIndex invIndex) {
-        this.ctx = ctx;
-        this.invListMerger = new InvertedListMerger(ctx, invIndex);
-        this.searchResult = new SearchResult(invIndex.getInvListTypeTraits(), ctx);
-        this.invIndex = invIndex;
-        this.invListCmp = MultiComparator.create(invIndex.getInvListCmpFactories());
-
-        // Pre-create cursor objects.
-        for (int i = 0; i < cursorCacheSize; i++) {
-            invListCursorCache.add(invIndex.createInvertedListCursor());
-        }
-
-        queryTokenAppender = new FrameTupleAppender(ctx.getFrameSize());
-        queryTokenFrame = ctx.allocateFrame();
-    }
-
-    public void reset() {
-        searchResult.clear();
-        invListMerger.reset();
+        super(ctx, invIndex);
     }
 
     public void search(OnDiskInvertedIndexSearchCursor resultCursor, InvertedIndexSearchPredicate searchPred,
             IIndexOperationContext ictx) throws HyracksDataException, IndexException {
-        ITupleReference queryTuple = searchPred.getQueryTuple();
-        int queryFieldIndex = searchPred.getQueryFieldIndex();
-        IInvertedIndexSearchModifier searchModifier = searchPred.getSearchModifier();
-        IBinaryTokenizer queryTokenizer = searchPred.getQueryTokenizer();
-
-        queryTokenAppender.reset(queryTokenFrame, true);
-        queryTokenizer.reset(queryTuple.getFieldData(queryFieldIndex), queryTuple.getFieldStart(queryFieldIndex),
-                queryTuple.getFieldLength(queryFieldIndex));
-
-        while (queryTokenizer.hasNext()) {
-            queryTokenizer.next();
-            queryTokenBuilder.reset();
-            try {
-                IToken token = queryTokenizer.getToken();
-                token.serializeToken(queryTokenDos);
-                queryTokenBuilder.addFieldEndOffset();
-                // WARNING: assuming one frame is big enough to hold all tokens
-                queryTokenAppender.append(queryTokenBuilder.getFieldEndOffsets(), queryTokenBuilder.getByteArray(), 0,
-                        queryTokenBuilder.getSize());
-            } catch (IOException e) {
-                throw new HyracksDataException(e);
-            }
-        }
-
-        FrameTupleAccessor queryTokenAccessor = new FrameTupleAccessor(ctx.getFrameSize(), queryTokenRecDesc);
-        queryTokenAccessor.reset(queryTokenFrame);
+        tokenizeQuery(searchPred);
         int numQueryTokens = queryTokenAccessor.getTupleCount();
 
-        // Expand cursor cache if necessary.
-        if (numQueryTokens > invListCursorCache.size()) {
-            int diff = numQueryTokens - invListCursorCache.size();
-            for (int i = 0; i < diff; i++) {
-                invListCursorCache.add(invIndex.createInvertedListCursor());
-            }
-        }
-
         invListCursors.clear();
+        invListCursorCache.reset();
         for (int i = 0; i < numQueryTokens; i++) {
             searchKey.reset(queryTokenAccessor, i);
-            invIndex.openInvertedListCursor(invListCursorCache.get(i), searchKey, ictx);
-            invListCursors.add(invListCursorCache.get(i));
+            IInvertedListCursor invListCursor = invListCursorCache.getNext();
+            invIndex.openInvertedListCursor(invListCursor, searchKey, ictx);
+            invListCursors.add(invListCursor);
         }
 
+        IInvertedIndexSearchModifier searchModifier = searchPred.getSearchModifier();
         occurrenceThreshold = searchModifier.getOccurrenceThreshold(numQueryTokens);
         if (occurrenceThreshold <= 0) {
             throw new OccurrenceThresholdPanicException("Merge threshold is <= 0. Failing Search.");
@@ -146,42 +60,4 @@
         invListMerger.merge(invListCursors, occurrenceThreshold, numPrefixLists, searchResult);
         resultCursor.open(null, searchPred);
     }
-
-    public IFrameTupleAccessor createResultFrameTupleAccessor() {
-        return new FixedSizeFrameTupleAccessor(ctx.getFrameSize(), searchResult.getTypeTraits());
-    }
-
-    public ITupleReference createResultFrameTupleReference() {
-        return new FixedSizeTupleReference(searchResult.getTypeTraits());
-    }
-
-    @Override
-    public List<ByteBuffer> getResultBuffers() {
-        return searchResult.getBuffers();
-    }
-
-    @Override
-    public int getNumValidResultBuffers() {
-        return searchResult.getCurrentBufferIndex() + 1;
-    }
-
-    public int getOccurrenceThreshold() {
-        return occurrenceThreshold;
-    }
-
-    public void printNewResults(int maxResultBufIdx, List<ByteBuffer> buffer) {
-        StringBuffer strBuffer = new StringBuffer();
-        FixedSizeFrameTupleAccessor resultFrameTupleAcc = searchResult.getAccessor();
-        for (int i = 0; i <= maxResultBufIdx; i++) {
-            ByteBuffer testBuf = buffer.get(i);
-            resultFrameTupleAcc.reset(testBuf);
-            for (int j = 0; j < resultFrameTupleAcc.getTupleCount(); j++) {
-                strBuffer.append(IntegerSerializerDeserializer.getInt(resultFrameTupleAcc.getBuffer().array(),
-                        resultFrameTupleAcc.getFieldStartOffset(j, 0)) + ",");
-                strBuffer.append(IntegerSerializerDeserializer.getInt(resultFrameTupleAcc.getBuffer().array(),
-                        resultFrameTupleAcc.getFieldStartOffset(j, 1)) + " ");
-            }
-        }
-        System.out.println(strBuffer.toString());
-    }
 }
diff --git a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndexSearchTest.java b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndexSearchTest.java
new file mode 100644
index 0000000..4fa25ed
--- /dev/null
+++ b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndexSearchTest.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk;
+
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.common.AbstractInvertedIndexSearchTest;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.LSMInvertedIndexTestContext.InvertedIndexType;
+
+public class PartitionedOnDiskInvertedIndexSearchTest extends AbstractInvertedIndexSearchTest {
+
+    public PartitionedOnDiskInvertedIndexSearchTest() {
+        super(InvertedIndexType.PARTITIONED_ONDISK, true);
+    }
+}