Reintegrated hyracks_lsm_length_filter.

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@2489 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/marshalling/ShortSerializerDeserializer.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/marshalling/ShortSerializerDeserializer.java
new file mode 100644
index 0000000..15384b3
--- /dev/null
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/marshalling/ShortSerializerDeserializer.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.dataflow.common.data.marshalling;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class ShortSerializerDeserializer implements ISerializerDeserializer<Short> {
+    private static final long serialVersionUID = 1L;
+
+    public static final ShortSerializerDeserializer INSTANCE = new ShortSerializerDeserializer();
+
+    private ShortSerializerDeserializer() {
+    }
+
+    @Override
+    public Short deserialize(DataInput in) throws HyracksDataException {
+        try {
+            return in.readShort();
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @Override
+    public void serialize(Short instance, DataOutput out) throws HyracksDataException {
+        try {
+            out.writeShort(instance.intValue());
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    public static short getShort(byte[] bytes, int offset) {
+        return (short) (((bytes[offset] & 0xff) << 8) + ((bytes[offset + 1] & 0xff)));
+    }
+
+    public static void putShort(int val, byte[] bytes, int offset) {
+        bytes[offset] = (byte) ((val >>> 8) & 0xFF);
+        bytes[offset + 1] = (byte) ((val >>> 0) & 0xFF);
+    }
+}
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/util/SerdeUtils.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/util/SerdeUtils.java
index 00575f4..9dafa83 100644
--- a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/util/SerdeUtils.java
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/util/SerdeUtils.java
@@ -25,43 +25,45 @@
 import edu.uci.ics.hyracks.data.std.primitive.FloatPointable;
 import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
 import edu.uci.ics.hyracks.data.std.primitive.LongPointable;
+import edu.uci.ics.hyracks.data.std.primitive.ShortPointable;
 import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.BooleanSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.DoubleSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.FloatSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.ShortSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 
 @SuppressWarnings("rawtypes")
 public class SerdeUtils {
-	public static class PayloadTypeTraits implements ITypeTraits {
-		private static final long serialVersionUID = 1L;
-		final int payloadSize;
-		
-		public PayloadTypeTraits(int payloadSize) {
-			this.payloadSize = payloadSize;
-		}
-		
-		@Override
-		public boolean isFixedLength() {
-			return true;
-		}
+    public static class PayloadTypeTraits implements ITypeTraits {
+        private static final long serialVersionUID = 1L;
+        final int payloadSize;
 
-		@Override
-		public int getFixedLength() {
-			return payloadSize;
-		}
-	}
-	
-	public static ITypeTraits[] serdesToTypeTraits(ISerializerDeserializer[] serdes) {
+        public PayloadTypeTraits(int payloadSize) {
+            this.payloadSize = payloadSize;
+        }
+
+        @Override
+        public boolean isFixedLength() {
+            return true;
+        }
+
+        @Override
+        public int getFixedLength() {
+            return payloadSize;
+        }
+    }
+
+    public static ITypeTraits[] serdesToTypeTraits(ISerializerDeserializer[] serdes) {
         ITypeTraits[] typeTraits = new ITypeTraits[serdes.length];
         for (int i = 0; i < serdes.length; i++) {
             typeTraits[i] = serdeToTypeTrait(serdes[i]);
         }
         return typeTraits;
     }
-    
+
     public static ITypeTraits[] serdesToTypeTraits(ISerializerDeserializer[] serdes, int payloadSize) {
         ITypeTraits[] typeTraits = new ITypeTraits[serdes.length + 1];
         for (int i = 0; i < serdes.length; i++) {
@@ -72,6 +74,9 @@
     }
 
     public static ITypeTraits serdeToTypeTrait(ISerializerDeserializer serde) {
+        if (serde instanceof ShortSerializerDeserializer) {
+            return ShortPointable.TYPE_TRAITS;
+        }
         if (serde instanceof IntegerSerializerDeserializer) {
             return IntegerPointable.TYPE_TRAITS;
         }
@@ -112,6 +117,9 @@
     }
 
     public static IBinaryComparatorFactory serdeToComparatorFactory(ISerializerDeserializer serde) {
+        if (serde instanceof ShortSerializerDeserializer) {
+            return PointableBinaryComparatorFactory.of(ShortPointable.FACTORY);
+        }
         if (serde instanceof IntegerSerializerDeserializer) {
             return PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY);
         }
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/AbstractfWordInvertedIndexTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/AbstractfWordInvertedIndexTest.java
new file mode 100644
index 0000000..05b7a26
--- /dev/null
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/AbstractfWordInvertedIndexTest.java
@@ -0,0 +1,346 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.tests.am.invertedindex;
+
+import java.io.DataOutput;
+import java.io.File;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexSearchModifierFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.BinaryTokenizerOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexBulkLoadOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexCreateOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexSearchOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search.ConjunctiveSearchModifierFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.DelimitedUTF8StringBinaryTokenizerFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.ITokenFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.UTF8WordTokenFactory;
+import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+import edu.uci.ics.hyracks.storage.common.file.ILocalResourceFactoryProvider;
+import edu.uci.ics.hyracks.storage.common.file.TransientLocalResourceFactoryProvider;
+import edu.uci.ics.hyracks.test.support.TestIndexLifecycleManagerProvider;
+import edu.uci.ics.hyracks.test.support.TestStorageManagerComponentHolder;
+import edu.uci.ics.hyracks.test.support.TestStorageManagerInterface;
+import edu.uci.ics.hyracks.tests.integration.AbstractIntegrationTest;
+
+@SuppressWarnings("rawtypes")
+public abstract class AbstractfWordInvertedIndexTest extends AbstractIntegrationTest {
+    static {
+        TestStorageManagerComponentHolder.init(8192, 20, 20);
+    }
+
+    protected static final int MERGE_THRESHOLD = 3;
+
+    protected IStorageManagerInterface storageManager = new TestStorageManagerInterface();
+    protected IIndexLifecycleManagerProvider lcManagerProvider = new TestIndexLifecycleManagerProvider();
+    protected IIndexDataflowHelperFactory btreeDataflowHelperFactory = new BTreeDataflowHelperFactory();
+    protected IIndexDataflowHelperFactory invertedIndexDataflowHelperFactory;
+
+    protected final static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("ddMMyy-hhmmssSS");
+    protected final static String sep = System.getProperty("file.separator");
+    protected final String dateString = simpleDateFormat.format(new Date());
+    protected final String primaryFileName = System.getProperty("java.io.tmpdir") + sep + "primaryBtree" + dateString;
+    protected final String btreeFileName = System.getProperty("java.io.tmpdir") + sep + "invIndexBtree" + dateString;
+
+    protected IFileSplitProvider primaryFileSplitProvider = new ConstantFileSplitProvider(
+            new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(primaryFileName))) });
+    protected IFileSplitProvider btreeFileSplitProvider = new ConstantFileSplitProvider(
+            new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(btreeFileName))) });
+
+    // Primary BTree index.
+    protected int primaryFieldCount = 2;
+    protected ITypeTraits[] primaryTypeTraits = new ITypeTraits[primaryFieldCount];
+    protected int primaryKeyFieldCount = 1;
+    protected IBinaryComparatorFactory[] primaryComparatorFactories = new IBinaryComparatorFactory[primaryKeyFieldCount];
+    protected RecordDescriptor primaryRecDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+            IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+    // Inverted index BTree dictionary.
+    protected ITypeTraits[] tokenTypeTraits;
+    protected IBinaryComparatorFactory[] tokenComparatorFactories;
+
+    // Inverted index stuff.
+    protected int invListElementFieldCount = 1;
+    protected ITypeTraits[] invListsTypeTraits = new ITypeTraits[invListElementFieldCount];
+    protected IBinaryComparatorFactory[] invListsComparatorFactories = new IBinaryComparatorFactory[invListElementFieldCount];
+    protected RecordDescriptor invListsRecDesc = new RecordDescriptor(
+            new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE });
+    protected RecordDescriptor tokenizerRecDesc;
+
+    // Tokenizer stuff.
+    protected ITokenFactory tokenFactory = new UTF8WordTokenFactory();
+    protected IBinaryTokenizerFactory tokenizerFactory = new DelimitedUTF8StringBinaryTokenizerFactory(true, false,
+            tokenFactory);
+
+    // Sorting stuff.
+    IBinaryComparatorFactory[] sortComparatorFactories;
+
+    @Before
+    public void setup() throws Exception {
+        prepare();
+
+        // Field declarations and comparators for primary BTree index.
+        primaryTypeTraits[0] = IntegerPointable.TYPE_TRAITS;
+        primaryTypeTraits[1] = UTF8StringPointable.TYPE_TRAITS;
+        primaryComparatorFactories[0] = PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY);
+
+        // Field declarations and comparators for inverted lists.
+        invListsTypeTraits[0] = IntegerPointable.TYPE_TRAITS;
+        invListsComparatorFactories[0] = PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY);
+
+        createPrimaryIndex();
+        loadPrimaryIndex();
+        printPrimaryIndex();
+        createInvertedIndex();
+        loadInvertedIndex();
+    }
+
+    protected abstract void prepare();
+
+    protected abstract boolean addNumTokensKey();
+
+    public void createPrimaryIndex() throws Exception {
+        JobSpecification spec = new JobSpecification();
+        TransientLocalResourceFactoryProvider localResourceFactoryProvider = new TransientLocalResourceFactoryProvider();
+        TreeIndexCreateOperatorDescriptor primaryCreateOp = new TreeIndexCreateOperatorDescriptor(spec, storageManager,
+                lcManagerProvider, primaryFileSplitProvider, primaryTypeTraits, primaryComparatorFactories,
+                btreeDataflowHelperFactory, localResourceFactoryProvider, NoOpOperationCallbackFactory.INSTANCE);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryCreateOp, NC1_ID);
+        spec.addRoot(primaryCreateOp);
+        runTest(spec);
+    }
+
+    public void createInvertedIndex() throws Exception {
+        JobSpecification spec = new JobSpecification();
+        ILocalResourceFactoryProvider localResourceFactoryProvider = new TransientLocalResourceFactoryProvider();
+        LSMInvertedIndexCreateOperatorDescriptor invIndexCreateOp = new LSMInvertedIndexCreateOperatorDescriptor(spec,
+                storageManager, btreeFileSplitProvider, lcManagerProvider, tokenTypeTraits, tokenComparatorFactories,
+                invListsTypeTraits, invListsComparatorFactories, tokenizerFactory, invertedIndexDataflowHelperFactory,
+                localResourceFactoryProvider, NoOpOperationCallbackFactory.INSTANCE);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, invIndexCreateOp, NC1_ID);
+        spec.addRoot(invIndexCreateOp);
+        runTest(spec);
+    }
+
+    @Test
+    public void testConjunctiveSearcher() throws Exception {
+        IInvertedIndexSearchModifierFactory conjunctiveSearchModifierFactory = new ConjunctiveSearchModifierFactory();
+        searchInvertedIndex("of", conjunctiveSearchModifierFactory);
+        searchInvertedIndex("3d", conjunctiveSearchModifierFactory);
+        searchInvertedIndex("of the human", conjunctiveSearchModifierFactory);
+    }
+
+    private IOperatorDescriptor createFileScanOp(JobSpecification spec) {
+        FileSplit[] dblpTitleFileSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
+                "data/cleanednumbereddblptitles.txt"))) };
+        IFileSplitProvider dblpTitleSplitProvider = new ConstantFileSplitProvider(dblpTitleFileSplits);
+        RecordDescriptor dblpTitleRecDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+        FileScanOperatorDescriptor dblpTitleScanner = new FileScanOperatorDescriptor(spec, dblpTitleSplitProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { IntegerParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE }, '|'), dblpTitleRecDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, dblpTitleScanner, NC1_ID);
+        return dblpTitleScanner;
+    }
+
+    private IOperatorDescriptor createPrimaryBulkLoadOp(JobSpecification spec) {
+        int[] fieldPermutation = { 0, 1 };
+        TreeIndexBulkLoadOperatorDescriptor primaryBtreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
+                storageManager, lcManagerProvider, primaryFileSplitProvider, primaryTypeTraits,
+                primaryComparatorFactories, fieldPermutation, 0.7f, true, btreeDataflowHelperFactory,
+                NoOpOperationCallbackFactory.INSTANCE);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeBulkLoad, NC1_ID);
+        return primaryBtreeBulkLoad;
+    }
+
+    private IOperatorDescriptor createScanKeyProviderOp(JobSpecification spec) throws HyracksDataException {
+        // build dummy tuple containing nothing
+        ArrayTupleBuilder tb = new ArrayTupleBuilder(primaryKeyFieldCount * 2);
+        DataOutput dos = tb.getDataOutput();
+        tb.reset();
+        UTF8StringSerializerDeserializer.INSTANCE.serialize("0", dos);
+        tb.addFieldEndOffset();
+        ISerializerDeserializer[] keyRecDescSers = { UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE };
+        RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
+        ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(spec,
+                keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, keyProviderOp, NC1_ID);
+        return keyProviderOp;
+    }
+
+    private IOperatorDescriptor createPrimaryScanOp(JobSpecification spec) throws HyracksDataException {
+        int[] lowKeyFields = null; // - infinity
+        int[] highKeyFields = null; // + infinity
+        BTreeSearchOperatorDescriptor primaryBtreeSearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
+                storageManager, lcManagerProvider, primaryFileSplitProvider, primaryTypeTraits,
+                primaryComparatorFactories, lowKeyFields, highKeyFields, true, true, btreeDataflowHelperFactory, false,
+                NoOpOperationCallbackFactory.INSTANCE);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeSearchOp, NC1_ID);
+        return primaryBtreeSearchOp;
+    }
+
+    private void loadPrimaryIndex() throws Exception {
+        JobSpecification spec = new JobSpecification();
+        // Assuming that the data is pre-sorted on the key. No need to sort
+        // before bulk load.
+        IOperatorDescriptor fileScanOp = createFileScanOp(spec);
+        IOperatorDescriptor primaryBulkLoad = createPrimaryBulkLoadOp(spec);
+        spec.connect(new OneToOneConnectorDescriptor(spec), fileScanOp, 0, primaryBulkLoad, 0);
+        spec.addRoot(primaryBulkLoad);
+        runTest(spec);
+    }
+
+    private void printPrimaryIndex() throws Exception {
+        JobSpecification spec = new JobSpecification();
+        IOperatorDescriptor keyProviderOp = createScanKeyProviderOp(spec);
+        IOperatorDescriptor primaryScanOp = createPrimaryScanOp(spec);
+        IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
+                createTempFile().getAbsolutePath()) });
+        IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
+        spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, printer, 0);
+        spec.addRoot(printer);
+        runTest(spec);
+    }
+
+    private IOperatorDescriptor createExternalSortOp(JobSpecification spec, int[] sortFields,
+            RecordDescriptor outputRecDesc) {
+        ExternalSortOperatorDescriptor externalSortOp = new ExternalSortOperatorDescriptor(spec, 1000, sortFields,
+                sortComparatorFactories, outputRecDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, externalSortOp, NC1_ID);
+        return externalSortOp;
+    }
+
+    private IOperatorDescriptor createBinaryTokenizerOp(JobSpecification spec, int docField, int[] keyFields) {
+        BinaryTokenizerOperatorDescriptor binaryTokenizer = new BinaryTokenizerOperatorDescriptor(spec,
+                tokenizerRecDesc, tokenizerFactory, docField, keyFields, addNumTokensKey());
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, binaryTokenizer, NC1_ID);
+        return binaryTokenizer;
+    }
+
+    private IOperatorDescriptor createInvertedIndexBulkLoadOp(JobSpecification spec, int[] fieldPermutation) {
+        LSMInvertedIndexBulkLoadOperatorDescriptor invIndexBulkLoadOp = new LSMInvertedIndexBulkLoadOperatorDescriptor(
+                spec, fieldPermutation, true, storageManager, btreeFileSplitProvider, lcManagerProvider,
+                tokenTypeTraits, tokenComparatorFactories, invListsTypeTraits, invListsComparatorFactories,
+                tokenizerFactory, invertedIndexDataflowHelperFactory, NoOpOperationCallbackFactory.INSTANCE);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, invIndexBulkLoadOp, NC1_ID);
+        return invIndexBulkLoadOp;
+    }
+
+    public void loadInvertedIndex() throws Exception {
+        JobSpecification spec = new JobSpecification();
+        IOperatorDescriptor keyProviderOp = createScanKeyProviderOp(spec);
+        IOperatorDescriptor primaryScanOp = createPrimaryScanOp(spec);
+        int docField = 1;
+        int[] keyFields = { 0 };
+        IOperatorDescriptor binaryTokenizerOp = createBinaryTokenizerOp(spec, docField, keyFields);
+        int[] sortFields = new int[sortComparatorFactories.length];
+        int[] fieldPermutation = new int[sortComparatorFactories.length];
+        for (int i = 0; i < sortFields.length; i++) {
+            sortFields[i] = i;
+            fieldPermutation[i] = i;
+        }
+        IOperatorDescriptor externalSortOp = createExternalSortOp(spec, sortFields, tokenizerRecDesc);
+        IOperatorDescriptor invIndexBulkLoadOp = createInvertedIndexBulkLoadOp(spec, fieldPermutation);
+        spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, binaryTokenizerOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), binaryTokenizerOp, 0, externalSortOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), externalSortOp, 0, invIndexBulkLoadOp, 0);
+        spec.addRoot(invIndexBulkLoadOp);
+        runTest(spec);
+    }
+
+    private IOperatorDescriptor createQueryProviderOp(JobSpecification spec, String queryString)
+            throws HyracksDataException {
+        // Build tuple with exactly one field, which is the query,
+        ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
+        DataOutput dos = tb.getDataOutput();
+        tb.reset();
+        UTF8StringSerializerDeserializer.INSTANCE.serialize(queryString, dos);
+        tb.addFieldEndOffset();
+        ISerializerDeserializer[] querySerde = { UTF8StringSerializerDeserializer.INSTANCE };
+        RecordDescriptor queryRecDesc = new RecordDescriptor(querySerde);
+        ConstantTupleSourceOperatorDescriptor queryProviderOp = new ConstantTupleSourceOperatorDescriptor(spec,
+                queryRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, queryProviderOp, NC1_ID);
+        return queryProviderOp;
+    }
+
+    private IOperatorDescriptor createInvertedIndexSearchOp(JobSpecification spec,
+            IInvertedIndexSearchModifierFactory searchModifierFactory) {
+        LSMInvertedIndexSearchOperatorDescriptor invIndexSearchOp = new LSMInvertedIndexSearchOperatorDescriptor(spec,
+                0, storageManager, btreeFileSplitProvider, lcManagerProvider, tokenTypeTraits,
+                tokenComparatorFactories, invListsTypeTraits, invListsComparatorFactories,
+                invertedIndexDataflowHelperFactory, tokenizerFactory, searchModifierFactory, invListsRecDesc, false,
+                NoOpOperationCallbackFactory.INSTANCE);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, invIndexSearchOp, NC1_ID);
+        return invIndexSearchOp;
+    }
+
+    public void searchInvertedIndex(String queryString, IInvertedIndexSearchModifierFactory searchModifierFactory)
+            throws Exception {
+        JobSpecification spec = new JobSpecification();
+        IOperatorDescriptor queryProviderOp = createQueryProviderOp(spec, queryString);
+        IOperatorDescriptor invIndexSearchOp = createInvertedIndexSearchOp(spec, searchModifierFactory);
+        IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
+                createTempFile().getAbsolutePath()) });
+        IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
+        spec.connect(new OneToOneConnectorDescriptor(spec), queryProviderOp, 0, invIndexSearchOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), invIndexSearchOp, 0, printer, 0);
+        spec.addRoot(printer);
+        runTest(spec);
+    }
+}
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/BinaryTokenizerOperatorTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/BinaryTokenizerOperatorTest.java
index 76155ae..47480da 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/BinaryTokenizerOperatorTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/BinaryTokenizerOperatorTest.java
@@ -11,6 +11,7 @@
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.ShortSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
@@ -33,6 +34,15 @@
 
     @Test
     public void tokenizerTest() throws Exception {
+        test(false);
+    }
+
+    @Test
+    public void tokenizerWithNumTokensTest() throws Exception {
+        test(true);
+    }
+
+    private void test(boolean addNumTokensKey) throws Exception {
         JobSpecification spec = new JobSpecification();
 
         FileSplit[] dblpTitleFileSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
@@ -46,16 +56,22 @@
                         UTF8StringParserFactory.INSTANCE }, '|'), dblpTitleRecDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, dblpTitleScanner, NC1_ID);
 
-        RecordDescriptor tokenizerRecDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+        RecordDescriptor tokenizerRecDesc;
+        if (!addNumTokensKey) {
+            tokenizerRecDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                    UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+        } else {
+            tokenizerRecDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                    UTF8StringSerializerDeserializer.INSTANCE, ShortSerializerDeserializer.INSTANCE,
+                    IntegerSerializerDeserializer.INSTANCE });
+        }
 
         ITokenFactory tokenFactory = new UTF8WordTokenFactory();
         IBinaryTokenizerFactory tokenizerFactory = new DelimitedUTF8StringBinaryTokenizerFactory(true, false,
                 tokenFactory);
-        int[] tokenFields = { 1 };
         int[] keyFields = { 0 };
         BinaryTokenizerOperatorDescriptor binaryTokenizer = new BinaryTokenizerOperatorDescriptor(spec,
-                tokenizerRecDesc, tokenizerFactory, tokenFields, keyFields);
+                tokenizerRecDesc, tokenizerFactory, 1, keyFields, addNumTokensKey);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, binaryTokenizer, NC1_ID);
 
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/InvertedIndexOperatorsTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/InvertedIndexOperatorsTest.java
deleted file mode 100644
index d108289..0000000
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/InvertedIndexOperatorsTest.java
+++ /dev/null
@@ -1,73 +0,0 @@
-package edu.uci.ics.hyracks.tests.am.invertedindex;
-
-import java.io.File;
-
-import org.junit.Test;
-
-import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
-import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
-import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
-import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
-import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
-import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.BinaryTokenizerOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.DelimitedUTF8StringBinaryTokenizerFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.ITokenFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.UTF8WordTokenFactory;
-import edu.uci.ics.hyracks.tests.integration.AbstractIntegrationTest;
-
-public class InvertedIndexOperatorsTest extends AbstractIntegrationTest {
-
-    @Test
-    public void tokenizerTest() throws Exception {
-        JobSpecification spec = new JobSpecification();
-
-        FileSplit[] dblpTitleFileSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
-                "data/cleanednumbereddblptitles.txt"))) };
-        IFileSplitProvider dblpTitleSplitProvider = new ConstantFileSplitProvider(dblpTitleFileSplits);
-        RecordDescriptor dblpTitleRecDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
-
-        FileScanOperatorDescriptor dblpTitleScanner = new FileScanOperatorDescriptor(spec, dblpTitleSplitProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { IntegerParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE }, '|'), dblpTitleRecDesc);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, dblpTitleScanner, NC1_ID);
-
-        RecordDescriptor tokenizerRecDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
-
-        ITokenFactory tokenFactory = new UTF8WordTokenFactory();
-        IBinaryTokenizerFactory tokenizerFactory = new DelimitedUTF8StringBinaryTokenizerFactory(true, false,
-                tokenFactory);
-        int[] tokenFields = { 1 };
-        int[] projFields = { 0 };
-        BinaryTokenizerOperatorDescriptor binaryTokenizer = new BinaryTokenizerOperatorDescriptor(spec,
-                tokenizerRecDesc, tokenizerFactory, tokenFields, projFields);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, binaryTokenizer, NC1_ID);
-
-        IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
-                createTempFile().getAbsolutePath()) });
-        IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
-
-        spec.connect(new OneToOneConnectorDescriptor(spec), dblpTitleScanner, 0, binaryTokenizer, 0);
-
-        spec.connect(new OneToOneConnectorDescriptor(spec), binaryTokenizer, 0, printer, 0);
-
-        spec.addRoot(printer);
-        runTest(spec);
-    }
-}
\ No newline at end of file
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/PartitionedWordInvertedIndexTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/PartitionedWordInvertedIndexTest.java
new file mode 100644
index 0000000..1bd6878
--- /dev/null
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/PartitionedWordInvertedIndexTest.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.tests.am.invertedindex;
+
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
+import edu.uci.ics.hyracks.data.std.primitive.ShortPointable;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.ShortSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.FlushControllerProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ImmediateSchedulerProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.RefCountingOperationTrackerFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.PartitionedLSMInvertedIndexDataflowHelperFactory;
+
+public class PartitionedWordInvertedIndexTest extends AbstractfWordInvertedIndexTest {
+
+    @Override
+    protected void prepare() {
+        // Field declarations and comparators for tokens.
+        tokenTypeTraits = new ITypeTraits[] { UTF8StringPointable.TYPE_TRAITS, ShortPointable.TYPE_TRAITS };
+        tokenComparatorFactories = new IBinaryComparatorFactory[] {
+                PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+                PointableBinaryComparatorFactory.of(ShortPointable.FACTORY) };
+
+        tokenizerRecDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, ShortSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE });
+
+        sortComparatorFactories = new IBinaryComparatorFactory[] {
+                PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+                PointableBinaryComparatorFactory.of(ShortPointable.FACTORY),
+                PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) };
+
+        invertedIndexDataflowHelperFactory = new PartitionedLSMInvertedIndexDataflowHelperFactory(
+                new FlushControllerProvider(), new ConstantMergePolicyProvider(ImmediateSchedulerProvider.INSTANCE,
+                        MERGE_THRESHOLD), RefCountingOperationTrackerFactory.INSTANCE,
+                ImmediateSchedulerProvider.INSTANCE);
+    }
+
+    @Override
+    protected boolean addNumTokensKey() {
+        return true;
+    }
+}
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/WordInvertedIndexTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/WordInvertedIndexTest.java
index a63cc07..69c7c90 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/WordInvertedIndexTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/WordInvertedIndexTest.java
@@ -15,332 +15,44 @@
 
 package edu.uci.ics.hyracks.tests.am.invertedindex;
 
-import java.io.DataOutput;
-import java.io.File;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
 import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
 import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
-import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
-import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
-import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
-import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
-import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyProvider;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.FlushControllerProvider;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ImmediateSchedulerProvider;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.RefCountingOperationTrackerFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexSearchModifierFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.BinaryTokenizerOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexBulkLoadOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexCreateOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexSearchOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search.ConjunctiveSearchModifierFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.DelimitedUTF8StringBinaryTokenizerFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.ITokenFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.UTF8WordTokenFactory;
-import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
-import edu.uci.ics.hyracks.storage.common.file.ILocalResourceFactoryProvider;
-import edu.uci.ics.hyracks.storage.common.file.TransientLocalResourceFactoryProvider;
-import edu.uci.ics.hyracks.test.support.TestIndexLifecycleManagerProvider;
-import edu.uci.ics.hyracks.test.support.TestStorageManagerComponentHolder;
-import edu.uci.ics.hyracks.test.support.TestStorageManagerInterface;
-import edu.uci.ics.hyracks.tests.integration.AbstractIntegrationTest;
 
-@SuppressWarnings("rawtypes")
-public class WordInvertedIndexTest extends AbstractIntegrationTest {
-    static {
-        TestStorageManagerComponentHolder.init(8192, 20, 20);
-    }
+public class WordInvertedIndexTest extends AbstractfWordInvertedIndexTest {
 
-    private static final int MERGE_THRESHOLD = 3;
-
-    private IStorageManagerInterface storageManager = new TestStorageManagerInterface();
-    private IIndexLifecycleManagerProvider lcManagerProvider = new TestIndexLifecycleManagerProvider();
-    private IIndexDataflowHelperFactory btreeDataflowHelperFactory = new BTreeDataflowHelperFactory();
-    private IIndexDataflowHelperFactory invertedIndexDataflowHelperFactory = new LSMInvertedIndexDataflowHelperFactory(
-            new FlushControllerProvider(), new ConstantMergePolicyProvider(ImmediateSchedulerProvider.INSTANCE,
-                    MERGE_THRESHOLD), RefCountingOperationTrackerFactory.INSTANCE, ImmediateSchedulerProvider.INSTANCE);
-
-    private final static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("ddMMyy-hhmmssSS");
-    private final static String sep = System.getProperty("file.separator");
-    private final String dateString = simpleDateFormat.format(new Date());
-    private final String primaryFileName = System.getProperty("java.io.tmpdir") + sep + "primaryBtree" + dateString;
-    private final String btreeFileName = System.getProperty("java.io.tmpdir") + sep + "invIndexBtree" + dateString;
-
-    private IFileSplitProvider primaryFileSplitProvider = new ConstantFileSplitProvider(
-            new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(primaryFileName))) });
-    private IFileSplitProvider btreeFileSplitProvider = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(
-            NC1_ID, new FileReference(new File(btreeFileName))) });
-
-    // Primary BTree index.
-    private int primaryFieldCount = 2;
-    private ITypeTraits[] primaryTypeTraits = new ITypeTraits[primaryFieldCount];
-    private int primaryKeyFieldCount = 1;
-    private IBinaryComparatorFactory[] primaryComparatorFactories = new IBinaryComparatorFactory[primaryKeyFieldCount];
-    private RecordDescriptor primaryRecDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-            IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
-
-    // Inverted index BTree dictionary.
-    private ITypeTraits[] tokenTypeTraits = new ITypeTraits[1];
-    private IBinaryComparatorFactory[] tokenComparatorFactories = new IBinaryComparatorFactory[1];
-
-    // Inverted index stuff.
-    private int invListElementFieldCount = 1;
-    private ITypeTraits[] invListsTypeTraits = new ITypeTraits[invListElementFieldCount];
-    private IBinaryComparatorFactory[] invListsComparatorFactories = new IBinaryComparatorFactory[invListElementFieldCount];
-    private RecordDescriptor tokenizerRecDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-            UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
-    private RecordDescriptor invListsRecDesc = new RecordDescriptor(
-            new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE });
-
-    // Tokenizer stuff.
-    private ITokenFactory tokenFactory = new UTF8WordTokenFactory();
-    private IBinaryTokenizerFactory tokenizerFactory = new DelimitedUTF8StringBinaryTokenizerFactory(true, false,
-            tokenFactory);
-
-    @Before
-    public void setup() throws Exception {
-        // Field declarations and comparators for primary BTree index.
-        primaryTypeTraits[0] = IntegerPointable.TYPE_TRAITS;
-        primaryTypeTraits[1] = UTF8StringPointable.TYPE_TRAITS;
-        primaryComparatorFactories[0] = PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY);
-
+    @Override
+    protected void prepare() {
         // Field declarations and comparators for tokens.
-        tokenTypeTraits[0] = UTF8StringPointable.TYPE_TRAITS;
-        tokenComparatorFactories[0] = PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY);
+        tokenTypeTraits = new ITypeTraits[] { UTF8StringPointable.TYPE_TRAITS };
+        tokenComparatorFactories = new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+                .of(UTF8StringPointable.FACTORY) };
 
-        // Field declarations and comparators for inverted lists.
-        invListsTypeTraits[0] = IntegerPointable.TYPE_TRAITS;
-        invListsComparatorFactories[0] = PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY);
+        tokenizerRecDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
 
-        createPrimaryIndex();
-        loadPrimaryIndex();
-        printPrimaryIndex();
-        createInvertedIndex();
-        loadInvertedIndex();
+        sortComparatorFactories = new IBinaryComparatorFactory[] {
+                PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+                PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) };
+
+        invertedIndexDataflowHelperFactory = new LSMInvertedIndexDataflowHelperFactory(new FlushControllerProvider(),
+                new ConstantMergePolicyProvider(ImmediateSchedulerProvider.INSTANCE, MERGE_THRESHOLD),
+                RefCountingOperationTrackerFactory.INSTANCE, ImmediateSchedulerProvider.INSTANCE);
     }
 
-    public void createPrimaryIndex() throws Exception {
-        JobSpecification spec = new JobSpecification();
-        TransientLocalResourceFactoryProvider localResourceFactoryProvider = new TransientLocalResourceFactoryProvider();
-        TreeIndexCreateOperatorDescriptor primaryCreateOp = new TreeIndexCreateOperatorDescriptor(spec, storageManager,
-                lcManagerProvider, primaryFileSplitProvider, primaryTypeTraits, primaryComparatorFactories,
-                btreeDataflowHelperFactory, localResourceFactoryProvider, NoOpOperationCallbackFactory.INSTANCE);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryCreateOp, NC1_ID);
-        spec.addRoot(primaryCreateOp);
-        runTest(spec);
-    }
-
-    public void createInvertedIndex() throws Exception {
-        JobSpecification spec = new JobSpecification();
-        ILocalResourceFactoryProvider localResourceFactoryProvider = new TransientLocalResourceFactoryProvider();
-        LSMInvertedIndexCreateOperatorDescriptor invIndexCreateOp = new LSMInvertedIndexCreateOperatorDescriptor(spec,
-                storageManager, btreeFileSplitProvider, lcManagerProvider, tokenTypeTraits, tokenComparatorFactories,
-                invListsTypeTraits, invListsComparatorFactories, tokenizerFactory, invertedIndexDataflowHelperFactory,
-                localResourceFactoryProvider, NoOpOperationCallbackFactory.INSTANCE);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, invIndexCreateOp, NC1_ID);
-        spec.addRoot(invIndexCreateOp);
-        runTest(spec);
-    }
-
-    @Test
-    public void testConjunctiveSearcher() throws Exception {
-        IInvertedIndexSearchModifierFactory conjunctiveSearchModifierFactory = new ConjunctiveSearchModifierFactory();
-        searchInvertedIndex("of", conjunctiveSearchModifierFactory);
-        searchInvertedIndex("3d", conjunctiveSearchModifierFactory);
-        searchInvertedIndex("of the human", conjunctiveSearchModifierFactory);
-    }
-
-    private IOperatorDescriptor createFileScanOp(JobSpecification spec) {
-        FileSplit[] dblpTitleFileSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
-                "data/cleanednumbereddblptitles.txt"))) };
-        IFileSplitProvider dblpTitleSplitProvider = new ConstantFileSplitProvider(dblpTitleFileSplits);
-        RecordDescriptor dblpTitleRecDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
-        FileScanOperatorDescriptor dblpTitleScanner = new FileScanOperatorDescriptor(spec, dblpTitleSplitProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { IntegerParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE }, '|'), dblpTitleRecDesc);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, dblpTitleScanner, NC1_ID);
-        return dblpTitleScanner;
-    }
-
-    private IOperatorDescriptor createPrimaryBulkLoadOp(JobSpecification spec) {
-        int[] fieldPermutation = { 0, 1 };
-        TreeIndexBulkLoadOperatorDescriptor primaryBtreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
-                storageManager, lcManagerProvider, primaryFileSplitProvider, primaryTypeTraits,
-                primaryComparatorFactories, fieldPermutation, 0.7f, true, btreeDataflowHelperFactory,
-                NoOpOperationCallbackFactory.INSTANCE);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeBulkLoad, NC1_ID);
-        return primaryBtreeBulkLoad;
-    }
-
-    private IOperatorDescriptor createScanKeyProviderOp(JobSpecification spec) throws HyracksDataException {
-        // build dummy tuple containing nothing
-        ArrayTupleBuilder tb = new ArrayTupleBuilder(primaryKeyFieldCount * 2);
-        DataOutput dos = tb.getDataOutput();
-        tb.reset();
-        UTF8StringSerializerDeserializer.INSTANCE.serialize("0", dos);
-        tb.addFieldEndOffset();
-        ISerializerDeserializer[] keyRecDescSers = { UTF8StringSerializerDeserializer.INSTANCE,
-                UTF8StringSerializerDeserializer.INSTANCE };
-        RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
-        ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(spec,
-                keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, keyProviderOp, NC1_ID);
-        return keyProviderOp;
-    }
-
-    private IOperatorDescriptor createPrimaryScanOp(JobSpecification spec) throws HyracksDataException {
-        int[] lowKeyFields = null; // - infinity
-        int[] highKeyFields = null; // + infinity
-        BTreeSearchOperatorDescriptor primaryBtreeSearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
-                storageManager, lcManagerProvider, primaryFileSplitProvider, primaryTypeTraits,
-                primaryComparatorFactories, lowKeyFields, highKeyFields, true, true, btreeDataflowHelperFactory, false,
-                NoOpOperationCallbackFactory.INSTANCE);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeSearchOp, NC1_ID);
-        return primaryBtreeSearchOp;
-    }
-
-    private void loadPrimaryIndex() throws Exception {
-        JobSpecification spec = new JobSpecification();
-        // Assuming that the data is pre-sorted on the key. No need to sort
-        // before bulk load.
-        IOperatorDescriptor fileScanOp = createFileScanOp(spec);
-        IOperatorDescriptor primaryBulkLoad = createPrimaryBulkLoadOp(spec);
-        spec.connect(new OneToOneConnectorDescriptor(spec), fileScanOp, 0, primaryBulkLoad, 0);
-        spec.addRoot(primaryBulkLoad);
-        runTest(spec);
-    }
-
-    private void printPrimaryIndex() throws Exception {
-        JobSpecification spec = new JobSpecification();
-        IOperatorDescriptor keyProviderOp = createScanKeyProviderOp(spec);
-        IOperatorDescriptor primaryScanOp = createPrimaryScanOp(spec);
-        IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
-                createTempFile().getAbsolutePath()) });
-        IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
-        spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, printer, 0);
-        spec.addRoot(printer);
-        runTest(spec);
-    }
-
-    private IOperatorDescriptor createExternalSortOp(JobSpecification spec, int[] sortFields,
-            RecordDescriptor outputRecDesc) {
-        ExternalSortOperatorDescriptor externalSortOp = new ExternalSortOperatorDescriptor(spec, 1000, sortFields,
-                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
-                        PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) }, outputRecDesc);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, externalSortOp, NC1_ID);
-        return externalSortOp;
-    }
-
-    private IOperatorDescriptor createBinaryTokenizerOp(JobSpecification spec, int[] tokenFields, int[] keyFields) {
-        BinaryTokenizerOperatorDescriptor binaryTokenizer = new BinaryTokenizerOperatorDescriptor(spec,
-                tokenizerRecDesc, tokenizerFactory, tokenFields, keyFields);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, binaryTokenizer, NC1_ID);
-        return binaryTokenizer;
-    }
-
-    private IOperatorDescriptor createInvertedIndexBulkLoadOp(JobSpecification spec, int[] fieldPermutation) {
-        LSMInvertedIndexBulkLoadOperatorDescriptor invIndexBulkLoadOp = new LSMInvertedIndexBulkLoadOperatorDescriptor(
-                spec, fieldPermutation, true, storageManager, btreeFileSplitProvider, lcManagerProvider,
-                tokenTypeTraits, tokenComparatorFactories, invListsTypeTraits, invListsComparatorFactories,
-                tokenizerFactory, invertedIndexDataflowHelperFactory, NoOpOperationCallbackFactory.INSTANCE);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, invIndexBulkLoadOp, NC1_ID);
-        return invIndexBulkLoadOp;
-    }
-
-    public void loadInvertedIndex() throws Exception {
-        JobSpecification spec = new JobSpecification();
-        IOperatorDescriptor keyProviderOp = createScanKeyProviderOp(spec);
-        IOperatorDescriptor primaryScanOp = createPrimaryScanOp(spec);
-        int[] tokenFields = { 1 };
-        int[] keyFields = { 0 };
-        IOperatorDescriptor binaryTokenizerOp = createBinaryTokenizerOp(spec, tokenFields, keyFields);
-        int[] sortFields = { 0, 1 };
-        IOperatorDescriptor externalSortOp = createExternalSortOp(spec, sortFields, tokenizerRecDesc);
-        int[] fieldPermutation = { 0, 1 };
-        IOperatorDescriptor invIndexBulkLoadOp = createInvertedIndexBulkLoadOp(spec, fieldPermutation);
-        spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, binaryTokenizerOp, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), binaryTokenizerOp, 0, externalSortOp, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), externalSortOp, 0, invIndexBulkLoadOp, 0);
-        spec.addRoot(invIndexBulkLoadOp);
-        runTest(spec);
-    }
-
-    private IOperatorDescriptor createQueryProviderOp(JobSpecification spec, String queryString)
-            throws HyracksDataException {
-        // Build tuple with exactly one field, which is the query,
-        ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
-        DataOutput dos = tb.getDataOutput();
-        tb.reset();
-        UTF8StringSerializerDeserializer.INSTANCE.serialize(queryString, dos);
-        tb.addFieldEndOffset();
-        ISerializerDeserializer[] querySerde = { UTF8StringSerializerDeserializer.INSTANCE };
-        RecordDescriptor queryRecDesc = new RecordDescriptor(querySerde);
-        ConstantTupleSourceOperatorDescriptor queryProviderOp = new ConstantTupleSourceOperatorDescriptor(spec,
-                queryRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, queryProviderOp, NC1_ID);
-        return queryProviderOp;
-    }
-
-    private IOperatorDescriptor createInvertedIndexSearchOp(JobSpecification spec,
-            IInvertedIndexSearchModifierFactory searchModifierFactory) {
-        LSMInvertedIndexSearchOperatorDescriptor invIndexSearchOp = new LSMInvertedIndexSearchOperatorDescriptor(spec,
-                0, storageManager, btreeFileSplitProvider, lcManagerProvider, tokenTypeTraits,
-                tokenComparatorFactories, invListsTypeTraits, invListsComparatorFactories,
-                invertedIndexDataflowHelperFactory, tokenizerFactory, searchModifierFactory, invListsRecDesc, false,
-                NoOpOperationCallbackFactory.INSTANCE);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, invIndexSearchOp, NC1_ID);
-        return invIndexSearchOp;
-    }
-
-    public void searchInvertedIndex(String queryString, IInvertedIndexSearchModifierFactory searchModifierFactory)
-            throws Exception {
-        JobSpecification spec = new JobSpecification();
-        IOperatorDescriptor queryProviderOp = createQueryProviderOp(spec, queryString);
-        IOperatorDescriptor invIndexSearchOp = createInvertedIndexSearchOp(spec, searchModifierFactory);
-        IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
-                createTempFile().getAbsolutePath()) });
-        IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
-        spec.connect(new OneToOneConnectorDescriptor(spec), queryProviderOp, 0, invIndexSearchOp, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), invIndexSearchOp, 0, printer, 0);
-        spec.addRoot(printer);
-        runTest(spec);
+    @Override
+    protected boolean addNumTokensKey() {
+        return false;
     }
 }
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTreeCountingSearchCursor.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTreeCountingSearchCursor.java
index 4f667b2..c1cab0a 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTreeCountingSearchCursor.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTreeCountingSearchCursor.java
@@ -204,6 +204,7 @@
             }
             bufferCache.unpin(page);
         }
+        tupleBuilder.reset();
         tupleIndex = 0;
         page = null;
         pred = null;
@@ -216,7 +217,7 @@
             close();
         } catch (Exception e) {
             e.printStackTrace();
-        }
+        }        
     }
 
     @Override
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/MultiComparator.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/MultiComparator.java
index 01c6cc8..280bb41 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/MultiComparator.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/MultiComparator.java
@@ -96,6 +96,14 @@
         return new MultiComparator(cmps);
     }
     
+    public static MultiComparator create(IBinaryComparatorFactory[] cmpFactories, int startIndex, int numCmps) {
+        IBinaryComparator[] cmps = new IBinaryComparator[numCmps];
+        for (int i = startIndex; i < startIndex + numCmps; i++) {
+            cmps[i] = cmpFactories[i].createBinaryComparator();
+        }
+        return new MultiComparator(cmps);
+    }
+    
     public static MultiComparator create(IBinaryComparatorFactory[]... cmpFactories) {
         int size = 0;
         for (int i = 0; i < cmpFactories.length; i++) {
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/api/IInvertedIndexSearchModifier.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/api/IInvertedIndexSearchModifier.java
index 619937f..0d0d936 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/api/IInvertedIndexSearchModifier.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/api/IInvertedIndexSearchModifier.java
@@ -15,9 +15,12 @@
 
 package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api;
 
-
 public interface IInvertedIndexSearchModifier {
     public int getOccurrenceThreshold(int numQueryTokens);
 
-    public int getNumPrefixLists(int numQueryTokens);
+    public int getNumPrefixLists(int occurrenceThreshold, int numInvLists);
+
+    public short getNumTokensLowerBound(short numQueryTokens);
+
+    public short getNumTokensUpperBound(short numQueryTokens);
 }
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/api/IInvertedListCursor.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/api/IInvertedListCursor.java
index c62cc57..de703ac 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/api/IInvertedListCursor.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/api/IInvertedListCursor.java
@@ -44,7 +44,7 @@
     public int getStartOff();
 
     public boolean containsKey(ITupleReference searchTuple, MultiComparator invListCmp) throws HyracksDataException, IndexException;
-
+    
     // for debugging
     @SuppressWarnings("rawtypes")
     public String printInvList(ISerializerDeserializer[] serdes) throws HyracksDataException, IndexException;
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/api/IObjectFactory.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/api/IObjectFactory.java
new file mode 100644
index 0000000..9068a2b
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/api/IObjectFactory.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api;
+
+public interface IObjectFactory<T> {
+    public T create();
+}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/api/IPartitionedInvertedIndex.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/api/IPartitionedInvertedIndex.java
new file mode 100644
index 0000000..7db972c
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/api/IPartitionedInvertedIndex.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexOperationContext;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search.InvertedListPartitions;
+
+public interface IPartitionedInvertedIndex {
+    public void openInvertedListPartitionCursors(IInvertedIndexSearcher searcher, IIndexOperationContext ictx,
+            short numTokensLowerBound, short numTokensUpperBound, InvertedListPartitions invListPartitions)
+            throws HyracksDataException, IndexException;
+}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorDescriptor.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorDescriptor.java
index 4706eed..84152d5 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorDescriptor.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorDescriptor.java
@@ -29,19 +29,23 @@
     private static final long serialVersionUID = 1L;
 
     private final IBinaryTokenizerFactory tokenizerFactory;
-    // Fields that will be tokenized
-    private final int[] tokenFields;
+    // Field that will be tokenized.
+    private final int docField;
     // operator will append these key fields to each token, e.g., as
     // payload for an inverted list
     // WARNING: too many key fields can cause significant data blowup.
     private final int[] keyFields;
+    // Indicates whether the first key field should be the number of tokens in the tokenized set of the document.
+    // This value is used in partitioned inverted indexes, for example.
+    private final boolean addNumTokensKey;
 
     public BinaryTokenizerOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor recDesc,
-            IBinaryTokenizerFactory tokenizerFactory, int[] tokenFields, int[] keyFields) {
+            IBinaryTokenizerFactory tokenizerFactory, int docField, int[] keyFields, boolean addNumTokensKey) {
         super(spec, 1, 1);
         this.tokenizerFactory = tokenizerFactory;
-        this.tokenFields = tokenFields;
+        this.docField = docField;
         this.keyFields = keyFields;
+        this.addNumTokensKey = addNumTokensKey;
         recordDescriptors[0] = recDesc;
     }
 
@@ -49,6 +53,7 @@
     public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
         return new BinaryTokenizerOperatorNodePushable(ctx, recordDescProvider.getInputRecordDescriptor(
-                getActivityId(), 0), recordDescriptors[0], tokenizerFactory.createTokenizer(), tokenFields, keyFields);
+                getActivityId(), 0), recordDescriptors[0], tokenizerFactory.createTokenizer(), docField, keyFields,
+                addNumTokensKey);
     }
 }
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
index 8f2b155..9495516 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2009-2010 by The Regents of the University of California
+ * Copyright 2009-2012 by The Regents of the University of California
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
@@ -34,8 +34,9 @@
 
     private final IHyracksTaskContext ctx;
     private final IBinaryTokenizer tokenizer;
-    private final int[] tokenFields;
-    private final int[] projFields;
+    private final int docField;
+    private final int[] keyFields;
+    private final boolean addNumTokensKey;
     private final RecordDescriptor inputRecDesc;
     private final RecordDescriptor outputRecDesc;
 
@@ -46,11 +47,13 @@
     private ByteBuffer writeBuffer;
 
     public BinaryTokenizerOperatorNodePushable(IHyracksTaskContext ctx, RecordDescriptor inputRecDesc,
-            RecordDescriptor outputRecDesc, IBinaryTokenizer tokenizer, int[] tokenFields, int[] projFields) {
+            RecordDescriptor outputRecDesc, IBinaryTokenizer tokenizer, int docField, int[] keyFields,
+            boolean addNumTokensKey) {
         this.ctx = ctx;
         this.tokenizer = tokenizer;
-        this.tokenFields = tokenFields;
-        this.projFields = projFields;
+        this.docField = docField;
+        this.keyFields = keyFields;
+        this.addNumTokensKey = addNumTokensKey;
         this.inputRecDesc = inputRecDesc;
         this.outputRecDesc = outputRecDesc;
     }
@@ -69,41 +72,51 @@
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
         accessor.reset(buffer);
-
         int tupleCount = accessor.getTupleCount();
         for (int i = 0; i < tupleCount; i++) {
-
-            for (int j = 0; j < tokenFields.length; j++) {
-
+            short numTokens = 0;
+            if (addNumTokensKey) {
+                // Run through the tokens to get the total number of tokens.
                 tokenizer.reset(
                         accessor.getBuffer().array(),
                         accessor.getTupleStartOffset(i) + accessor.getFieldSlotsLength()
-                                + accessor.getFieldStartOffset(i, tokenFields[j]),
-                        accessor.getFieldLength(i, tokenFields[j]));
-
+                                + accessor.getFieldStartOffset(i, docField), accessor.getFieldLength(i, docField));
                 while (tokenizer.hasNext()) {
                     tokenizer.next();
+                    numTokens++;
+                }
+            }
 
-                    builder.reset();
-                    try {
-                        IToken token = tokenizer.getToken();
-                        token.serializeToken(builderDos);
+            tokenizer.reset(
+                    accessor.getBuffer().array(),
+                    accessor.getTupleStartOffset(i) + accessor.getFieldSlotsLength()
+                            + accessor.getFieldStartOffset(i, docField), accessor.getFieldLength(i, docField));
+            while (tokenizer.hasNext()) {
+                tokenizer.next();
+
+                builder.reset();
+                try {
+                    IToken token = tokenizer.getToken();
+                    token.serializeToken(builderDos);
+                    builder.addFieldEndOffset();
+                    // Add number of tokens if requested.
+                    if (addNumTokensKey) {
+                        builder.getDataOutput().writeShort(numTokens);
                         builder.addFieldEndOffset();
-                    } catch (IOException e) {
-                        throw new HyracksDataException(e.getMessage());
                     }
+                } catch (IOException e) {
+                    throw new HyracksDataException(e.getMessage());
+                }
 
-                    for (int k = 0; k < projFields.length; k++) {
-                        builder.addField(accessor, i, projFields[k]);
-                    }
+                for (int k = 0; k < keyFields.length; k++) {
+                    builder.addField(accessor, i, keyFields[k]);
+                }
 
+                if (!appender.append(builder.getFieldEndOffsets(), builder.getByteArray(), 0, builder.getSize())) {
+                    FrameUtils.flushFrame(writeBuffer, writer);
+                    appender.reset(writeBuffer, true);
                     if (!appender.append(builder.getFieldEndOffsets(), builder.getByteArray(), 0, builder.getSize())) {
-                        FrameUtils.flushFrame(writeBuffer, writer);
-                        appender.reset(writeBuffer, true);
-                        if (!appender
-                                .append(builder.getFieldEndOffsets(), builder.getByteArray(), 0, builder.getSize())) {
-                            throw new IllegalStateException();
-                        }
+                        throw new IllegalStateException();
                     }
                 }
             }
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/PartitionedLSMInvertedIndexDataflowHelper.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/PartitionedLSMInvertedIndexDataflowHelper.java
new file mode 100644
index 0000000..ea07b24
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/PartitionedLSMInvertedIndexDataflowHelper.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.common.api.IInMemoryFreePageManager;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.frames.LIFOMetaDataFrameFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.IInMemoryBufferCache;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushController;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.common.dataflow.AbstractLSMIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.DualIndexInMemoryBufferCache;
+import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.DualIndexInMemoryFreePageManager;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls.PartitionedLSMInvertedIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.InvertedIndexUtils;
+import edu.uci.ics.hyracks.storage.common.buffercache.HeapBufferAllocator;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
+
+public final class PartitionedLSMInvertedIndexDataflowHelper extends AbstractLSMIndexDataflowHelper {
+
+    public PartitionedLSMInvertedIndexDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
+            int partition, ILSMFlushController flushController, ILSMMergePolicy mergePolicy,
+            ILSMOperationTrackerFactory opTrackerFactory, ILSMIOOperationScheduler ioScheduler) {
+        this(opDesc, ctx, partition, DEFAULT_MEM_PAGE_SIZE, DEFAULT_MEM_NUM_PAGES, flushController, mergePolicy,
+                opTrackerFactory, ioScheduler);
+    }
+
+    public PartitionedLSMInvertedIndexDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
+            int partition, int memPageSize, int memNumPages, ILSMFlushController flushController,
+            ILSMMergePolicy mergePolicy, ILSMOperationTrackerFactory opTrackerFactory,
+            ILSMIOOperationScheduler ioScheduler) {
+        super(opDesc, ctx, partition, memPageSize, memNumPages, flushController, mergePolicy, opTrackerFactory,
+                ioScheduler);
+    }
+
+    @Override
+    public IIndex createIndexInstance() throws HyracksDataException {
+        IInvertedIndexOperatorDescriptor invIndexOpDesc = (IInvertedIndexOperatorDescriptor) opDesc;
+        try {
+            ITreeIndexMetaDataFrameFactory metaDataFrameFactory = new LIFOMetaDataFrameFactory();
+            IInMemoryBufferCache memBufferCache = new DualIndexInMemoryBufferCache(new HeapBufferAllocator(),
+                    memPageSize, memNumPages);
+            IInMemoryFreePageManager memFreePageManager = new DualIndexInMemoryFreePageManager(memNumPages,
+                    metaDataFrameFactory);
+            IBufferCache diskBufferCache = opDesc.getStorageManager().getBufferCache(ctx);
+            IFileMapProvider diskFileMapProvider = opDesc.getStorageManager().getFileMapProvider(ctx);
+            PartitionedLSMInvertedIndex invIndex = InvertedIndexUtils.createPartitionedLSMInvertedIndex(memBufferCache,
+                    memFreePageManager, diskFileMapProvider, invIndexOpDesc.getInvListsTypeTraits(),
+                    invIndexOpDesc.getInvListsComparatorFactories(), invIndexOpDesc.getTokenTypeTraits(),
+                    invIndexOpDesc.getTokenComparatorFactories(), invIndexOpDesc.getTokenizerFactory(),
+                    diskBufferCache, ctx.getIOManager(), file.getFile().getPath(), flushController, mergePolicy,
+                    opTrackerFactory, ioScheduler);
+            return invIndex;
+        } catch (IndexException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/PartitionedLSMInvertedIndexDataflowHelperFactory.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/PartitionedLSMInvertedIndexDataflowHelperFactory.java
new file mode 100644
index 0000000..34367db
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/PartitionedLSMInvertedIndexDataflowHelperFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushControllerProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.common.dataflow.AbstractLSMIndexDataflowHelperFactory;
+
+public class PartitionedLSMInvertedIndexDataflowHelperFactory extends AbstractLSMIndexDataflowHelperFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    public PartitionedLSMInvertedIndexDataflowHelperFactory(ILSMFlushControllerProvider flushControllerProvider,
+            ILSMMergePolicyProvider mergePolicyProvider, ILSMOperationTrackerFactory opTrackerProvider,
+            ILSMIOOperationSchedulerProvider ioSchedulerProvider) {
+        super(flushControllerProvider, mergePolicyProvider, opTrackerProvider, ioSchedulerProvider);
+    }
+
+    @Override
+    public IndexDataflowHelper createIndexDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
+            int partition) {
+        return new PartitionedLSMInvertedIndexDataflowHelper(opDesc, ctx, partition,
+                flushControllerProvider.getFlushController(ctx), mergePolicyProvider.getMergePolicy(ctx),
+                opTrackerFactory, ioSchedulerProvider.getIOScheduler(ctx));
+    }
+
+}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
index 851451e..3becf5c 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
@@ -96,33 +96,33 @@
         }
     }
 
-    private final ILSMHarness lsmHarness;
+    protected final ILSMHarness lsmHarness;
 
     // In-memory components.
-    private final LSMInvertedIndexComponent memComponent;
-    private final IInMemoryFreePageManager memFreePageManager;
-    private final IBinaryTokenizerFactory tokenizerFactory;
+    protected final LSMInvertedIndexComponent memComponent;
+    protected final IInMemoryFreePageManager memFreePageManager;
+    protected final IBinaryTokenizerFactory tokenizerFactory;
 
     // On-disk components.
-    private final ILSMIndexFileManager fileManager;
+    protected final ILSMIndexFileManager fileManager;
     // For creating inverted indexes in flush and merge.
-    private final OnDiskInvertedIndexFactory diskInvIndexFactory;
+    protected final OnDiskInvertedIndexFactory diskInvIndexFactory;
     // For creating deleted-keys BTrees in flush and merge.
-    private final BTreeFactory deletedKeysBTreeFactory;
-    private final IBufferCache diskBufferCache;
+    protected final BTreeFactory deletedKeysBTreeFactory;
+    protected final IBufferCache diskBufferCache;
     // List of LSMInvertedIndexComponent instances. Using Object for better sharing via
     // ILSMIndex + LSMHarness.
-    private final LinkedList<Object> diskComponents;
+    protected final LinkedList<Object> diskComponents;
     // Helps to guarantees physical consistency of LSM components.
-    private final ILSMComponentFinalizer componentFinalizer;
+    protected final ILSMComponentFinalizer componentFinalizer;
 
     // Type traits and comparators for tokens and inverted-list elements.
-    private final ITypeTraits[] invListTypeTraits;
-    private final IBinaryComparatorFactory[] invListCmpFactories;
-    private final ITypeTraits[] tokenTypeTraits;
-    private final IBinaryComparatorFactory[] tokenCmpFactories;
+    protected final ITypeTraits[] invListTypeTraits;
+    protected final IBinaryComparatorFactory[] invListCmpFactories;
+    protected final ITypeTraits[] tokenTypeTraits;
+    protected final IBinaryComparatorFactory[] tokenCmpFactories;
 
-    private boolean isActivated;
+    protected boolean isActivated;
 
     public LSMInvertedIndex(IInMemoryBufferCache memBufferCache, IInMemoryFreePageManager memFreePageManager,
             OnDiskInvertedIndexFactory diskInvIndexFactory, BTreeFactory deletedKeysBTreeFactory,
@@ -131,13 +131,6 @@
             IBinaryComparatorFactory[] tokenCmpFactories, IBinaryTokenizerFactory tokenizerFactory,
             ILSMFlushController flushController, ILSMMergePolicy mergePolicy, ILSMOperationTrackerFactory opTrackerFactory,
             ILSMIOOperationScheduler ioScheduler) throws IndexException {
-        InMemoryInvertedIndex memInvIndex = InvertedIndexUtils.createInMemoryBTreeInvertedindex(memBufferCache,
-                memFreePageManager, invListTypeTraits, invListCmpFactories, tokenTypeTraits, tokenCmpFactories,
-                tokenizerFactory);
-        BTree deleteKeysBTree = BTreeUtils.createBTree(memBufferCache, memFreePageManager,
-                ((InMemoryBufferCache) memBufferCache).getFileMapProvider(), invListTypeTraits, invListCmpFactories,
-                BTreeLeafFrameType.REGULAR_NSM, new FileReference(new File("membtree")));
-        memComponent = new LSMInvertedIndexComponent(memInvIndex, deleteKeysBTree);
         this.memFreePageManager = memFreePageManager;
         this.tokenizerFactory = tokenizerFactory;
         this.fileManager = fileManager;
@@ -148,11 +141,24 @@
         this.invListCmpFactories = invListCmpFactories;
         this.tokenTypeTraits = tokenTypeTraits;
         this.tokenCmpFactories = tokenCmpFactories;
+        diskComponents = new LinkedList<Object>();
+        isActivated = false;
+        // Create in-memory component.
+        InMemoryInvertedIndex memInvIndex = createInMemoryInvertedIndex(memBufferCache);
+        BTree deleteKeysBTree = BTreeUtils.createBTree(memBufferCache, memFreePageManager,
+                ((InMemoryBufferCache) memBufferCache).getFileMapProvider(), invListTypeTraits, invListCmpFactories,
+                BTreeLeafFrameType.REGULAR_NSM, new FileReference(new File("membtree")));
+        memComponent = new LSMInvertedIndexComponent(memInvIndex, deleteKeysBTree);
+        // The operation tracker may need to have the in-memory component created.
         ILSMOperationTracker opTracker = opTrackerFactory.createOperationTracker(this);
         this.lsmHarness = new LSMHarness(this, flushController, mergePolicy, opTracker, ioScheduler);
         this.componentFinalizer = new LSMInvertedIndexComponentFinalizer(diskFileMapProvider);
-        diskComponents = new LinkedList<Object>();
-        isActivated = false;
+    }
+    
+    protected InMemoryInvertedIndex createInMemoryInvertedIndex(IInMemoryBufferCache memBufferCache)
+            throws IndexException {
+        return InvertedIndexUtils.createInMemoryBTreeInvertedindex(memBufferCache, memFreePageManager,
+                invListTypeTraits, invListCmpFactories, tokenTypeTraits, tokenCmpFactories, tokenizerFactory);
     }
 
     @Override
@@ -312,7 +318,7 @@
     /**
      * The keys in the in-memory deleted-keys BTree only refer to on-disk components.
      * We delete documents from the in-memory inverted index by deleting its entries directly,
-     * and do NOT add the deleted key to the deleted-keys BTree.
+     * while still adding the deleted key to the deleted-keys BTree.
      * Otherwise, inserts would have to remove keys from the in-memory deleted-keys BTree which 
      * may cause incorrect behavior (lost deletes) in the following pathological case:
      * Insert doc 1, flush, delete doc 1, insert doc 1
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java
index b3dee89..625ecd4 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java
@@ -28,6 +28,8 @@
 
 public class LSMInvertedIndexOpContext implements ILSMIndexOperationContext {
 
+    private static final int NUM_DOCUMENT_FIELDS = 1;
+    
     private IndexOperation op;
     private final IInvertedIndex memInvIndex;
     private final IIndex memDeletedKeysBTree;
@@ -68,11 +70,10 @@
                     deletedKeysBTreeAccessor = memDeletedKeysBTree.createAccessor(NoOpOperationCallback.INSTANCE,
                             NoOpOperationCallback.INSTANCE);
                     // Project away the document fields, leaving only the key fields.
-                    int numTokenFields = memInvIndex.getTokenTypeTraits().length;
                     int numKeyFields = memInvIndex.getInvListTypeTraits().length;
-                    int[] keyFieldPermutation = new int[memInvIndex.getInvListTypeTraits().length];
+                    int[] keyFieldPermutation = new int[numKeyFields];
                     for (int i = 0; i < numKeyFields; i++) {
-                        keyFieldPermutation[i] = numTokenFields + i;
+                        keyFieldPermutation[i] = NUM_DOCUMENT_FIELDS + i;
                     }
                     keysOnlyTuple = new PermutingTupleReference(keyFieldPermutation);
                 }
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/PartitionedLSMInvertedIndex.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/PartitionedLSMInvertedIndex.java
new file mode 100644
index 0000000..d963dbc
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/PartitionedLSMInvertedIndex.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls;
+
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.storage.am.common.api.IInMemoryFreePageManager;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.IInMemoryBufferCache;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushController;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexFileManager;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.BTreeFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.inmemory.InMemoryInvertedIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.OnDiskInvertedIndexFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.InvertedIndexUtils;
+import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
+
+public class PartitionedLSMInvertedIndex extends LSMInvertedIndex {
+
+    public PartitionedLSMInvertedIndex(IInMemoryBufferCache memBufferCache,
+            IInMemoryFreePageManager memFreePageManager, OnDiskInvertedIndexFactory diskInvIndexFactory,
+            BTreeFactory deletedKeysBTreeFactory, ILSMIndexFileManager fileManager,
+            IFileMapProvider diskFileMapProvider, ITypeTraits[] invListTypeTraits,
+            IBinaryComparatorFactory[] invListCmpFactories, ITypeTraits[] tokenTypeTraits,
+            IBinaryComparatorFactory[] tokenCmpFactories, IBinaryTokenizerFactory tokenizerFactory,
+            ILSMFlushController flushController, ILSMMergePolicy mergePolicy,
+            ILSMOperationTrackerFactory opTrackerFactory, ILSMIOOperationScheduler ioScheduler) throws IndexException {
+        super(memBufferCache, memFreePageManager, diskInvIndexFactory, deletedKeysBTreeFactory, fileManager,
+                diskFileMapProvider, invListTypeTraits, invListCmpFactories, tokenTypeTraits, tokenCmpFactories,
+                tokenizerFactory, flushController, mergePolicy, opTrackerFactory, ioScheduler);
+    }
+
+    protected InMemoryInvertedIndex createInMemoryInvertedIndex(IInMemoryBufferCache memBufferCache)
+            throws IndexException {
+        return InvertedIndexUtils.createPartitionedInMemoryBTreeInvertedindex(memBufferCache, memFreePageManager,
+                invListTypeTraits, invListCmpFactories, tokenTypeTraits, tokenCmpFactories, tokenizerFactory);
+    }
+
+}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java
index f0af06a..668250c 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java
@@ -44,16 +44,16 @@
 
 public class InMemoryInvertedIndex implements IInvertedIndex {
 
-    private final BTree btree;
-    private final FileReference memBTreeFile = new FileReference(new File("memBTree"));
-    private final ITypeTraits[] tokenTypeTraits;
-    private final IBinaryComparatorFactory[] tokenCmpFactories;
-    private final ITypeTraits[] invListTypeTraits;
-    private final IBinaryComparatorFactory[] invListCmpFactories;
-    private final IBinaryTokenizerFactory tokenizerFactory;
+    protected final BTree btree;
+    protected final FileReference memBTreeFile = new FileReference(new File("memBTree"));
+    protected final ITypeTraits[] tokenTypeTraits;
+    protected final IBinaryComparatorFactory[] tokenCmpFactories;
+    protected final ITypeTraits[] invListTypeTraits;
+    protected final IBinaryComparatorFactory[] invListCmpFactories;
+    protected final IBinaryTokenizerFactory tokenizerFactory;
 
-    private final ITypeTraits[] btreeTypeTraits;
-    private final IBinaryComparatorFactory[] btreeCmpFactories;
+    protected final ITypeTraits[] btreeTypeTraits;
+    protected final IBinaryComparatorFactory[] btreeCmpFactories;
 
     public InMemoryInvertedIndex(IBufferCache memBufferCache, IFreePageManager memFreePageManager,
             ITypeTraits[] invListTypeTraits, IBinaryComparatorFactory[] invListCmpFactories,
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndexAccessor.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndexAccessor.java
index 5a352c3..a62aaf1 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndexAccessor.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndexAccessor.java
@@ -46,7 +46,7 @@
     public InMemoryInvertedIndexAccessor(InMemoryInvertedIndex index, IIndexOperationContext opCtx) {
         this.opCtx = opCtx;
         this.index = index;
-        this.searcher = new TOccurrenceSearcher(hyracksCtx, index);
+        this.searcher = createSearcher();
         this.btreeAccessor = (BTreeAccessor) index.getBTree().createAccessor(NoOpOperationCallback.INSTANCE,
                 NoOpOperationCallback.INSTANCE);
     }
@@ -62,7 +62,7 @@
         opCtx.setOperation(IndexOperation.DELETE);
         index.delete(tuple, btreeAccessor, opCtx);
     }
-    
+
     @Override
     public IIndexCursor createSearchCursor() {
         return new OnDiskInvertedIndexSearchCursor(searcher, index.getInvListTypeTraits().length);
@@ -89,13 +89,13 @@
         IBTreeLeafFrame leafFrame = (IBTreeLeafFrame) index.getBTree().getLeafFrameFactory().createFrame();
         return new BTreeRangeSearchCursor(leafFrame, false);
     }
-    
+
     @Override
     public void rangeSearch(IIndexCursor cursor, ISearchPredicate searchPred) throws IndexException,
             HyracksDataException {
         btreeAccessor.search(cursor, searchPred);
     }
-    
+
     public BTreeAccessor getBTreeAccessor() {
         return btreeAccessor;
     }
@@ -109,4 +109,8 @@
     public void upsert(ITupleReference tuple) throws HyracksDataException, IndexException {
         throw new UnsupportedOperationException("Upsert not supported by in-memory inverted index.");
     }
+
+    protected IInvertedIndexSearcher createSearcher() {
+        return new TOccurrenceSearcher(hyracksCtx, index);
+    }
 }
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndexOpContext.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndexOpContext.java
index 4a77e52..c109cfd 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndexOpContext.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndexOpContext.java
@@ -39,7 +39,7 @@
     public MultiComparator tokenFieldsCmp;
 
     // To generate in-memory BTree tuples for insertions.
-    private final IBinaryTokenizerFactory tokenizerFactory;
+    protected final IBinaryTokenizerFactory tokenizerFactory;
     public InvertedIndexTokenizingTupleIterator tupleIter;
 
     public InMemoryInvertedIndexOpContext(BTree btree, IBinaryComparatorFactory[] tokenCmpFactories,
@@ -52,19 +52,16 @@
     @Override
     public void setOperation(IndexOperation newOp) {
         switch (newOp) {
-            case INSERT: 
+            case INSERT:
             case DELETE: {
                 if (tupleIter == null) {
-                    IBinaryTokenizer tokenizer = tokenizerFactory.createTokenizer();
-                    tupleIter = new InvertedIndexTokenizingTupleIterator(tokenCmpFactories.length,
-                            btree.getFieldCount() - tokenCmpFactories.length, tokenizer);
+                    setTokenizingTupleIterator();
                 }
                 break;
             }
             case SEARCH: {
                 if (btreePred == null) {
                     btreePred = new RangePredicate(null, null, true, true, null, null);
-                    // TODO: Ignore opcallbacks for now.
                     btreeAccessor = (BTreeAccessor) btree.createAccessor(NoOpOperationCallback.INSTANCE,
                             NoOpOperationCallback.INSTANCE);
                     btreeCmp = MultiComparator.create(btree.getComparatorFactories());
@@ -88,4 +85,10 @@
     public IndexOperation getOperation() {
         return op;
     }
+
+    protected void setTokenizingTupleIterator() {
+        IBinaryTokenizer tokenizer = tokenizerFactory.createTokenizer();
+        tupleIter = new InvertedIndexTokenizingTupleIterator(tokenCmpFactories.length, btree.getFieldCount()
+                - tokenCmpFactories.length, tokenizer);
+    }
 }
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedListCursor.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedListCursor.java
index 21e2bf7..6af3bd2 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedListCursor.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedListCursor.java
@@ -44,13 +44,13 @@
     private MultiComparator tokenFieldsCmp;
     private MultiComparator btreeCmp;
     private final PermutingTupleReference resultTuple;
-    private final ConcatenatingTupleReference btreeSearchTuple;    
-    
+    private final ConcatenatingTupleReference btreeSearchTuple;
+
     private final ArrayTupleBuilder tokenTupleBuilder;
     private final ArrayTupleReference tokenTuple = new ArrayTupleReference();
-    
+
     private int numElements = -1;
-    
+
     public InMemoryInvertedListCursor(int invListFieldCount, int tokenFieldCount) {
         int[] fieldPermutation = new int[invListFieldCount];
         for (int i = 0; i < invListFieldCount; i++) {
@@ -61,19 +61,22 @@
         btreeSearchTuple = new ConcatenatingTupleReference(2);
         tokenTupleBuilder = new ArrayTupleBuilder(tokenFieldCount);
     }
-    
+
     public void prepare(BTreeAccessor btreeAccessor, RangePredicate btreePred, MultiComparator tokenFieldsCmp,
-            MultiComparator btreeCmp) {
-        this.btreeAccessor = btreeAccessor;
-        this.btreeCursor = btreeAccessor.createSearchCursor();
-        this.countingCursor = btreeAccessor.createCountingSearchCursor();
-        this.btreePred = btreePred;
-        this.btreePred.setLowKeyComparator(tokenFieldsCmp);
-        this.btreePred.setHighKeyComparator(tokenFieldsCmp);
-        this.tokenFieldsCmp = tokenFieldsCmp;
-        this.btreeCmp = btreeCmp;
+            MultiComparator btreeCmp) throws HyracksDataException, IndexException {
+        // Avoid object creation if this.btreeAccessor == btreeAccessor.
+        if (this.btreeAccessor != btreeAccessor) {
+            this.btreeAccessor = btreeAccessor;
+            this.btreeCursor = btreeAccessor.createSearchCursor();
+            this.countingCursor = btreeAccessor.createCountingSearchCursor();
+            this.btreePred = btreePred;
+            this.btreePred.setLowKeyComparator(tokenFieldsCmp);
+            this.btreePred.setHighKeyComparator(tokenFieldsCmp);
+            this.tokenFieldsCmp = tokenFieldsCmp;
+            this.btreeCmp = btreeCmp;
+        }
     }
-    
+
     @Override
     public int compareTo(IInvertedListCursor cursor) {
         return size() - cursor.size();
@@ -136,14 +139,15 @@
             btreePred.setHighKeyComparator(tokenFieldsCmp);
             btreePred.setLowKey(tokenTuple, true);
             btreePred.setHighKey(tokenTuple, true);
-            
+
             // Perform the count.
             try {
                 btreeAccessor.search(countingCursor, btreePred);
                 while (countingCursor.hasNext()) {
                     countingCursor.next();
                     ITupleReference countTuple = countingCursor.getTuple();
-                    numElements = IntegerSerializerDeserializer.getInt(countTuple.getFieldData(0), countTuple.getFieldStart(0));
+                    numElements = IntegerSerializerDeserializer.getInt(countTuple.getFieldData(0),
+                            countTuple.getFieldStart(0));
                 }
             } catch (HyracksDataException e) {
                 e.printStackTrace();
@@ -176,7 +180,8 @@
     }
 
     @Override
-    public boolean containsKey(ITupleReference searchTuple, MultiComparator invListCmp) throws HyracksDataException, IndexException {
+    public boolean containsKey(ITupleReference searchTuple, MultiComparator invListCmp) throws HyracksDataException,
+            IndexException {
         // Close cursor if necessary.
         unpinPages();
         btreeSearchTuple.addTuple(searchTuple);
@@ -197,7 +202,7 @@
             btreeCursor.close();
             btreeCursor.reset();
             btreeSearchTuple.removeLastTuple();
-        }        
+        }
         return containsKey;
     }
 
@@ -214,10 +219,10 @@
                 DataInput dataIn = new DataInputStream(inStream);
                 Object o = serdes[0].deserialize(dataIn);
                 strBuilder.append(o.toString() + " ");
-            }            
+            }
         } finally {
             btreeCursor.close();
-            btreeCursor.reset();            
+            btreeCursor.reset();
         }
         try {
             btreeAccessor.search(btreeCursor, btreePred);
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndex.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndex.java
new file mode 100644
index 0000000..53def29
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndex.java
@@ -0,0 +1,131 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.inmemory;
+
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeException;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree.BTreeAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IFreePageManager;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexOperationContext;
+import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexSearcher;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IPartitionedInvertedIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search.InvertedListPartitions;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search.PartitionedTOccurrenceSearcher;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.PartitionedInvertedIndexTokenizingTupleIterator;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+
+public class PartitionedInMemoryInvertedIndex extends InMemoryInvertedIndex implements IPartitionedInvertedIndex {
+
+    protected final ReentrantReadWriteLock partitionIndexLock = new ReentrantReadWriteLock(true);
+    protected short minPartitionIndex = Short.MAX_VALUE;
+    protected short maxPartitionIndex = Short.MIN_VALUE;
+
+    public PartitionedInMemoryInvertedIndex(IBufferCache memBufferCache, IFreePageManager memFreePageManager,
+            ITypeTraits[] invListTypeTraits, IBinaryComparatorFactory[] invListCmpFactories,
+            ITypeTraits[] tokenTypeTraits, IBinaryComparatorFactory[] tokenCmpFactories,
+            IBinaryTokenizerFactory tokenizerFactory) throws BTreeException {
+        super(memBufferCache, memFreePageManager, invListTypeTraits, invListCmpFactories, tokenTypeTraits,
+                tokenCmpFactories, tokenizerFactory);
+    }
+
+    @Override
+    public void insert(ITupleReference tuple, BTreeAccessor btreeAccessor, IIndexOperationContext ictx)
+            throws HyracksDataException, IndexException {
+        super.insert(tuple, btreeAccessor, ictx);
+        PartitionedInMemoryInvertedIndexOpContext ctx = (PartitionedInMemoryInvertedIndexOpContext) ictx;
+        PartitionedInvertedIndexTokenizingTupleIterator tupleIter = (PartitionedInvertedIndexTokenizingTupleIterator) ctx.tupleIter;
+        updatePartitionIndexes(tupleIter.getNumTokens());
+    }
+
+    @Override
+    public void clear() throws HyracksDataException {
+        super.clear();
+        minPartitionIndex = Short.MAX_VALUE;
+        maxPartitionIndex = Short.MIN_VALUE;
+    }
+
+    public void updatePartitionIndexes(short numTokens) {
+        partitionIndexLock.writeLock().lock();
+        if (numTokens < minPartitionIndex) {
+            minPartitionIndex = numTokens;
+        }
+        if (numTokens > maxPartitionIndex) {
+            maxPartitionIndex = numTokens;
+        }
+        partitionIndexLock.writeLock().unlock();
+    }
+
+    @Override
+    public IIndexAccessor createAccessor(IModificationOperationCallback modificationCallback,
+            ISearchOperationCallback searchCallback) {
+        return new PartitionedInMemoryInvertedIndexAccessor(this, new PartitionedInMemoryInvertedIndexOpContext(btree,
+                tokenCmpFactories, tokenizerFactory));
+    }
+
+    @Override
+    public void openInvertedListPartitionCursors(IInvertedIndexSearcher searcher, IIndexOperationContext ictx,
+            short numTokensLowerBound, short numTokensUpperBound, InvertedListPartitions invListPartitions)
+            throws HyracksDataException, IndexException {
+        short minPartitionIndex;
+        short maxPartitionIndex;
+        partitionIndexLock.readLock().lock();
+        minPartitionIndex = this.minPartitionIndex;
+        maxPartitionIndex = this.maxPartitionIndex;
+        partitionIndexLock.readLock().unlock();
+
+        if (minPartitionIndex == Short.MAX_VALUE || maxPartitionIndex == Short.MIN_VALUE) {
+            // Index must be empty.
+            return;
+        }
+        short partitionStartIndex = minPartitionIndex;
+        short partitionEndIndex = maxPartitionIndex;
+        if (numTokensLowerBound >= 0) {
+            partitionStartIndex = (short) Math.max(minPartitionIndex, numTokensLowerBound);
+        }
+        if (numTokensUpperBound >= 0) {
+            partitionEndIndex = (short) Math.min(maxPartitionIndex, numTokensUpperBound);
+        }
+
+        PartitionedTOccurrenceSearcher partSearcher = (PartitionedTOccurrenceSearcher) searcher;
+        PartitionedInMemoryInvertedIndexOpContext ctx = (PartitionedInMemoryInvertedIndexOpContext) ictx;
+        ctx.setOperation(IndexOperation.SEARCH);
+        // We can pick either of the full low or high search key, since they should be identical here.
+        ITupleReference searchKey = partSearcher.getFullLowSearchKey();
+        ctx.btreePred.setLowKey(searchKey, true);
+        ctx.btreePred.setHighKey(searchKey, true);
+        // Go through all possibly partitions and see if the token matches.
+        // TODO: This procedure could be made more efficient by determining the next partition to search
+        // using the last existing partition and re-searching the BTree with an open interval as low key.
+        for (short i = partitionStartIndex; i <= partitionEndIndex; i++) {
+            partSearcher.setNumTokensBoundsInSearchKeys(i, i);
+            InMemoryInvertedListCursor inMemListCursor = (InMemoryInvertedListCursor) partSearcher
+                    .getCachedInvertedListCursor();
+            inMemListCursor.prepare(ctx.btreeAccessor, ctx.btreePred, ctx.tokenFieldsCmp, ctx.btreeCmp);
+            inMemListCursor.reset(searchKey);
+            invListPartitions.addInvertedListCursor(inMemListCursor, i);
+        }
+    }
+}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndexAccessor.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndexAccessor.java
new file mode 100644
index 0000000..813961c
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndexAccessor.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.inmemory;
+
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexOperationContext;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexSearcher;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search.PartitionedTOccurrenceSearcher;
+
+public class PartitionedInMemoryInvertedIndexAccessor extends InMemoryInvertedIndexAccessor {
+
+    public PartitionedInMemoryInvertedIndexAccessor(InMemoryInvertedIndex index, IIndexOperationContext opCtx) {
+        super(index, opCtx);
+    }
+
+    protected IInvertedIndexSearcher createSearcher() {
+        return new PartitionedTOccurrenceSearcher(hyracksCtx, index);
+    }
+}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndexOpContext.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndexOpContext.java
new file mode 100644
index 0000000..f0e5046
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndexOpContext.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.inmemory;
+
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizer;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.PartitionedInvertedIndexTokenizingTupleIterator;
+
+public class PartitionedInMemoryInvertedIndexOpContext extends InMemoryInvertedIndexOpContext {
+
+    public PartitionedInMemoryInvertedIndexOpContext(BTree btree, IBinaryComparatorFactory[] tokenCmpFactories,
+            IBinaryTokenizerFactory tokenizerFactory) {
+        super(btree, tokenCmpFactories, tokenizerFactory);
+    }
+
+    protected void setTokenizingTupleIterator() {
+        IBinaryTokenizer tokenizer = tokenizerFactory.createTokenizer();
+        tupleIter = new PartitionedInvertedIndexTokenizingTupleIterator(tokenCmpFactories.length, btree.getFieldCount()
+                - tokenCmpFactories.length, tokenizer);
+    }
+}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
index 69e57be..d1aa854 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
@@ -15,6 +15,8 @@
 
 package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk;
 
+import java.io.DataOutput;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
@@ -67,17 +69,16 @@
  * cannot exceed the size of a Hyracks frame.
  */
 public class OnDiskInvertedIndex implements IInvertedIndex {
-    private final IHyracksCommonContext ctx = new DefaultHyracksCommonContext();
+    protected final IHyracksCommonContext ctx = new DefaultHyracksCommonContext();
 
-    // Schema of BTree tuples.
-    public final int TOKEN_FIELD = 0;
-    public final int INVLIST_START_PAGE_ID_FIELD = 1;
-    public final int INVLIST_END_PAGE_ID_FIELD = 2;
-    public final int INVLIST_START_OFF_FIELD = 3;
-    public final int INVLIST_NUM_ELEMENTS_FIELD = 4;
-
+    // Schema of BTree tuples, set in constructor.    
+    protected final int invListStartPageIdField;
+    protected final int invListEndPageIdField;
+    protected final int invListStartOffField;
+    protected final int invListNumElementsField;
+    
     // Type traits to be appended to the token type trait which finally form the BTree field type traits.
-    private static final ITypeTraits[] btreeValueTypeTraits = new ITypeTraits[4];
+    protected static final ITypeTraits[] btreeValueTypeTraits = new ITypeTraits[4];
     static {
         // startPageId
         btreeValueTypeTraits[0] = IntegerPointable.TYPE_TRAITS;
@@ -89,23 +90,22 @@
         btreeValueTypeTraits[3] = IntegerPointable.TYPE_TRAITS;
     }
 
-    private BTree btree;
-    private int rootPageId = 0;
-    private IBufferCache bufferCache;
-    private IFileMapProvider fileMapProvider;
-    private int fileId = -1;
-    private final ITypeTraits[] invListTypeTraits;
-    private final IBinaryComparatorFactory[] invListCmpFactories;
-    private final ITypeTraits[] tokenTypeTraits;
-    private final IBinaryComparatorFactory[] tokenCmpFactories;
-    private final IInvertedListBuilder invListBuilder;
-    private final int numTokenFields;
-    private final int numInvListKeys;
-    private final FileReference invListsFile;
+    protected BTree btree;
+    protected int rootPageId = 0;
+    protected IBufferCache bufferCache;
+    protected IFileMapProvider fileMapProvider;
+    protected int fileId = -1;
+    protected final ITypeTraits[] invListTypeTraits;
+    protected final IBinaryComparatorFactory[] invListCmpFactories;
+    protected final ITypeTraits[] tokenTypeTraits;
+    protected final IBinaryComparatorFactory[] tokenCmpFactories;
+    protected final IInvertedListBuilder invListBuilder;
+    protected final int numTokenFields;
+    protected final int numInvListKeys;
+    protected final FileReference invListsFile;
     // Last page id of inverted-lists file (inclusive). Set during bulk load.
-    private int invListsMaxPageId = -1;
-
-    private boolean isOpen = false;
+    protected int invListsMaxPageId = -1;
+    protected boolean isOpen = false;
 
     public OnDiskInvertedIndex(IBufferCache bufferCache, IFileMapProvider fileMapProvider,
             IInvertedListBuilder invListBuilder, ITypeTraits[] invListTypeTraits,
@@ -124,6 +124,10 @@
         this.numTokenFields = btree.getComparatorFactories().length;
         this.numInvListKeys = invListCmpFactories.length;
         this.invListsFile = invListsFile;
+        this.invListStartPageIdField = numTokenFields;
+        this.invListEndPageIdField = numTokenFields + 1;
+        this.invListStartOffField = numTokenFields + 2;
+        this.invListNumElementsField = numTokenFields + 3;
     }
 
     @Override
@@ -258,19 +262,7 @@
         try {
             if (ctx.btreeCursor.hasNext()) {
                 ctx.btreeCursor.next();
-                ITupleReference frameTuple = ctx.btreeCursor.getTuple();
-                int startPageId = IntegerSerializerDeserializer.getInt(
-                        frameTuple.getFieldData(INVLIST_START_PAGE_ID_FIELD),
-                        frameTuple.getFieldStart(INVLIST_START_PAGE_ID_FIELD));
-                int endPageId = IntegerSerializerDeserializer.getInt(
-                        frameTuple.getFieldData(INVLIST_END_PAGE_ID_FIELD),
-                        frameTuple.getFieldStart(INVLIST_END_PAGE_ID_FIELD));
-                int startOff = IntegerSerializerDeserializer.getInt(frameTuple.getFieldData(INVLIST_START_OFF_FIELD),
-                        frameTuple.getFieldStart(INVLIST_START_OFF_FIELD));
-                int numElements = IntegerSerializerDeserializer.getInt(
-                        frameTuple.getFieldData(INVLIST_NUM_ELEMENTS_FIELD),
-                        frameTuple.getFieldStart(INVLIST_NUM_ELEMENTS_FIELD));
-                listCursor.reset(startPageId, endPageId, startOff, numElements);
+                resetInvertedListCursor(ctx.btreeCursor.getTuple(), listCursor);
             } else {
                 listCursor.reset(0, 0, 0, 0);
             }
@@ -280,6 +272,18 @@
         }
     }
 
+    public void resetInvertedListCursor(ITupleReference btreeTuple, IInvertedListCursor listCursor) {
+        int startPageId = IntegerSerializerDeserializer.getInt(btreeTuple.getFieldData(invListStartPageIdField),
+                btreeTuple.getFieldStart(invListStartPageIdField));
+        int endPageId = IntegerSerializerDeserializer.getInt(btreeTuple.getFieldData(invListEndPageIdField),
+                btreeTuple.getFieldStart(invListEndPageIdField));
+        int startOff = IntegerSerializerDeserializer.getInt(btreeTuple.getFieldData(invListStartOffField),
+                btreeTuple.getFieldStart(invListStartOffField));
+        int numElements = IntegerSerializerDeserializer.getInt(btreeTuple.getFieldData(invListNumElementsField),
+                btreeTuple.getFieldStart(invListNumElementsField));
+        listCursor.reset(startPageId, endPageId, startOff, numElements);
+    }
+    
     public final class OnDiskInvertedIndexBulkLoader implements IIndexBulkLoader {
         private final ArrayTupleBuilder btreeTupleBuilder;
         private final ArrayTupleReference btreeTupleReference;
@@ -330,14 +334,26 @@
         private void createAndInsertBTreeTuple() throws IndexException, HyracksDataException {
             // Build tuple.        
             btreeTupleBuilder.reset();
-            btreeTupleBuilder.addField(lastTuple.getFieldData(0), lastTuple.getFieldStart(0),
-                    lastTuple.getFieldLength(0));
-            // TODO: Boxing integers here. Fix it.
-            btreeTupleBuilder.addField(IntegerSerializerDeserializer.INSTANCE, currentInvListStartPageId);
-            btreeTupleBuilder.addField(IntegerSerializerDeserializer.INSTANCE, currentPageId);
-            btreeTupleBuilder.addField(IntegerSerializerDeserializer.INSTANCE, currentInvListStartOffset);
-            btreeTupleBuilder.addField(IntegerSerializerDeserializer.INSTANCE, invListBuilder.getListSize());
-            // Reset tuple reference and add it.
+            DataOutput output = btreeTupleBuilder.getDataOutput();
+            // Add key fields.
+            for (int i = 0; i < numTokenFields; i++) {
+                btreeTupleBuilder.addField(lastTuple.getFieldData(i), lastTuple.getFieldStart(i),
+                        lastTuple.getFieldLength(i));
+            }
+            // Add inverted-list 'pointer' value fields.
+            try {
+                output.writeInt(currentInvListStartPageId);
+                btreeTupleBuilder.addFieldEndOffset();
+                output.writeInt(currentPageId);
+                btreeTupleBuilder.addFieldEndOffset();
+                output.writeInt(currentInvListStartOffset);
+                btreeTupleBuilder.addFieldEndOffset();
+                output.writeInt(invListBuilder.getListSize());
+                btreeTupleBuilder.addFieldEndOffset();
+            } catch (IOException e) {
+                throw new HyracksDataException(e);
+            }
+            // Reset tuple reference and add it into the BTree load.
             btreeTupleReference.reset(btreeTupleBuilder.getFieldEndOffsets(), btreeTupleBuilder.getByteArray());
             btreeBulkloader.add(btreeTupleReference);
         }
@@ -456,6 +472,12 @@
             this.searcher = new TOccurrenceSearcher(ctx, index);
         }
 
+        // Let subclasses initialize.
+        protected OnDiskInvertedIndexAccessor(OnDiskInvertedIndex index, IInvertedIndexSearcher searcher) {
+            this.index = index;
+            this.searcher = searcher;
+        }
+        
         @Override
         public IIndexCursor createSearchCursor() {
             return new OnDiskInvertedIndexSearchCursor(searcher, index.getInvListTypeTraits().length);
@@ -615,7 +637,7 @@
         return 0;
     }
 
-    private static ITypeTraits[] getBTreeTypeTraits(ITypeTraits[] tokenTypeTraits) {
+    protected static ITypeTraits[] getBTreeTypeTraits(ITypeTraits[] tokenTypeTraits) {
         ITypeTraits[] btreeTypeTraits = new ITypeTraits[tokenTypeTraits.length + btreeValueTypeTraits.length];
         // Set key type traits.
         for (int i = 0; i < tokenTypeTraits.length; i++) {
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexOpContext.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexOpContext.java
index fd784f3..8ed4cd1 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexOpContext.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndexOpContext.java
@@ -30,12 +30,17 @@
     public IIndexAccessor btreeAccessor;
     public IIndexCursor btreeCursor;
     public MultiComparator searchCmp;
+    // For prefix search on partitioned indexes.
+    public MultiComparator prefixSearchCmp;
 
     public OnDiskInvertedIndexOpContext(BTree btree) {
         // TODO: Ignore opcallbacks for now.
         btreeAccessor = btree.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
         btreeCursor = btreeAccessor.createSearchCursor();
         searchCmp = MultiComparator.create(btree.getComparatorFactories());
+        if (btree.getComparatorFactories().length > 1) {
+            prefixSearchCmp = MultiComparator.create(btree.getComparatorFactories(), 0, 1);
+        }
     }
 
     @Override
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndex.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndex.java
new file mode 100644
index 0000000..5b2ec60
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndex.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk;
+
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.ShortSerializerDeserializer;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexOperationContext;
+import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexSearcher;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedListBuilder;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IPartitionedInvertedIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search.InvertedListPartitions;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search.PartitionedTOccurrenceSearcher;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
+
+public class PartitionedOnDiskInvertedIndex extends OnDiskInvertedIndex implements IPartitionedInvertedIndex {
+
+    protected final int PARTITIONING_NUM_TOKENS_FIELD = 1;
+
+    public PartitionedOnDiskInvertedIndex(IBufferCache bufferCache, IFileMapProvider fileMapProvider,
+            IInvertedListBuilder invListBuilder, ITypeTraits[] invListTypeTraits,
+            IBinaryComparatorFactory[] invListCmpFactories, ITypeTraits[] tokenTypeTraits,
+            IBinaryComparatorFactory[] tokenCmpFactories, FileReference btreeFile, FileReference invListsFile)
+            throws IndexException {
+        super(bufferCache, fileMapProvider, invListBuilder, invListTypeTraits, invListCmpFactories, tokenTypeTraits,
+                tokenCmpFactories, btreeFile, invListsFile);
+    }
+
+    public class PartitionedOnDiskInvertedIndexAccessor extends OnDiskInvertedIndexAccessor {
+        public PartitionedOnDiskInvertedIndexAccessor(OnDiskInvertedIndex index) {
+            super(index, new PartitionedTOccurrenceSearcher(ctx, index));
+        }
+    }
+
+    @Override
+    public IIndexAccessor createAccessor(IModificationOperationCallback modificationCallback,
+            ISearchOperationCallback searchCallback) {
+        return new PartitionedOnDiskInvertedIndexAccessor(this);
+    }
+
+    @Override
+    public void openInvertedListPartitionCursors(IInvertedIndexSearcher searcher, IIndexOperationContext ictx,
+            short numTokensLowerBound, short numTokensUpperBound, InvertedListPartitions invListPartitions)
+            throws HyracksDataException, IndexException {
+        PartitionedTOccurrenceSearcher partSearcher = (PartitionedTOccurrenceSearcher) searcher;
+        OnDiskInvertedIndexOpContext ctx = (OnDiskInvertedIndexOpContext) ictx;
+        ITupleReference lowSearchKey = null;
+        ITupleReference highSearchKey = null;
+        partSearcher.setNumTokensBoundsInSearchKeys(numTokensLowerBound, numTokensUpperBound);
+        if (numTokensLowerBound < 0) {
+            ctx.btreePred.setLowKeyComparator(ctx.prefixSearchCmp);
+            lowSearchKey = partSearcher.getPrefixSearchKey();
+        } else {
+            ctx.btreePred.setLowKeyComparator(ctx.searchCmp);
+            lowSearchKey = partSearcher.getFullLowSearchKey();
+        }
+        if (numTokensUpperBound < 0) {
+            ctx.btreePred.setHighKeyComparator(ctx.prefixSearchCmp);
+            highSearchKey = partSearcher.getPrefixSearchKey();
+        } else {
+            ctx.btreePred.setHighKeyComparator(ctx.searchCmp);
+            highSearchKey = partSearcher.getFullHighSearchKey();
+        }
+        ctx.btreePred.setLowKey(lowSearchKey, true);
+        ctx.btreePred.setHighKey(highSearchKey, true);
+        ctx.btreeAccessor.search(ctx.btreeCursor, ctx.btreePred);
+        try {
+            while (ctx.btreeCursor.hasNext()) {
+                ctx.btreeCursor.next();
+                ITupleReference btreeTuple = ctx.btreeCursor.getTuple();
+                short numTokens = ShortSerializerDeserializer.getShort(
+                        btreeTuple.getFieldData(PARTITIONING_NUM_TOKENS_FIELD),
+                        btreeTuple.getFieldStart(PARTITIONING_NUM_TOKENS_FIELD));
+                IInvertedListCursor invListCursor = partSearcher.getCachedInvertedListCursor();
+                resetInvertedListCursor(btreeTuple, invListCursor);
+                invListPartitions.addInvertedListCursor(invListCursor, numTokens);
+            }
+        } finally {
+            ctx.btreeCursor.close();
+            ctx.btreeCursor.reset();
+        }
+    }
+}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndexFactory.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndexFactory.java
new file mode 100644
index 0000000..854a30f
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndexFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk;
+
+import java.io.File;
+
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexFileNameMapper;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedListBuilder;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedListBuilderFactory;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
+
+public class PartitionedOnDiskInvertedIndexFactory extends OnDiskInvertedIndexFactory {
+    
+    public PartitionedOnDiskInvertedIndexFactory(IBufferCache bufferCache, IFileMapProvider fileMapProvider,
+            IInvertedListBuilderFactory invListBuilderFactory, ITypeTraits[] invListTypeTraits,
+            IBinaryComparatorFactory[] invListCmpFactories, ITypeTraits[] tokenTypeTraits,
+            IBinaryComparatorFactory[] tokenCmpFactories, IInvertedIndexFileNameMapper fileNameMapper) {
+        super(bufferCache, fileMapProvider, invListBuilderFactory, invListTypeTraits, invListCmpFactories, tokenTypeTraits,
+                tokenCmpFactories, fileNameMapper);
+    }
+
+    @Override
+    public IInvertedIndex createIndexInstance(FileReference dictBTreeFile) throws IndexException {
+        String invListsFilePath = fileNameMapper.getInvListsFilePath(dictBTreeFile.getFile().getPath());
+        FileReference invListsFile = new FileReference(new File(invListsFilePath));
+        IInvertedListBuilder invListBuilder = invListBuilderFactory.create();
+        return new PartitionedOnDiskInvertedIndex(bufferCache, fileMapProvider, invListBuilder, invListTypeTraits,
+                invListCmpFactories, tokenTypeTraits, tokenCmpFactories, dictBTreeFile, invListsFile);
+    }
+}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/AbstractTOccurrenceSearcher.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/AbstractTOccurrenceSearcher.java
new file mode 100644
index 0000000..af597bc
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/AbstractTOccurrenceSearcher.java
@@ -0,0 +1,154 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
+import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexSearcher;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IObjectFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.exceptions.OccurrenceThresholdPanicException;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeFrameTupleAccessor;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeTupleReference;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizer;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IToken;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.ObjectCache;
+
+public abstract class AbstractTOccurrenceSearcher implements IInvertedIndexSearcher {
+    protected static final RecordDescriptor QUERY_TOKEN_REC_DESC = new RecordDescriptor(
+            new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
+
+    protected final int OBJECT_CACHE_INIT_SIZE = 10;
+    protected final int OBJECT_CACHE_EXPAND_SIZE = 10;
+
+    protected final IHyracksCommonContext ctx;
+
+    protected final InvertedListMerger invListMerger;
+    protected final SearchResult searchResult;
+    protected final IInvertedIndex invIndex;
+    protected final MultiComparator invListCmp;
+
+    protected final ArrayTupleBuilder queryTokenBuilder = new ArrayTupleBuilder(QUERY_TOKEN_REC_DESC.getFieldCount());
+    protected final ByteBuffer queryTokenFrame;
+    protected final FrameTupleAppender queryTokenAppender;
+    protected final FrameTupleAccessor queryTokenAccessor;
+    protected final FrameTupleReference searchKey = new FrameTupleReference();
+
+    protected int occurrenceThreshold;
+
+    protected final IObjectFactory<IInvertedListCursor> invListCursorFactory;
+    protected final ObjectCache<IInvertedListCursor> invListCursorCache;
+
+    public AbstractTOccurrenceSearcher(IHyracksCommonContext ctx, IInvertedIndex invIndex) {
+        this.ctx = ctx;
+        this.invListMerger = new InvertedListMerger(ctx, invIndex);
+        this.searchResult = new SearchResult(invIndex.getInvListTypeTraits(), ctx);
+        this.invIndex = invIndex;
+        this.invListCmp = MultiComparator.create(invIndex.getInvListCmpFactories());
+        this.invListCursorFactory = new InvertedListCursorFactory(invIndex);
+        this.invListCursorCache = new ObjectCache<IInvertedListCursor>(invListCursorFactory, OBJECT_CACHE_INIT_SIZE,
+                OBJECT_CACHE_EXPAND_SIZE);
+        this.queryTokenFrame = ctx.allocateFrame();
+        this.queryTokenAppender = new FrameTupleAppender(ctx.getFrameSize());
+        this.queryTokenAccessor = new FrameTupleAccessor(ctx.getFrameSize(), QUERY_TOKEN_REC_DESC);
+        this.queryTokenAccessor.reset(queryTokenFrame);
+    }
+
+    public void reset() {
+        searchResult.clear();
+        invListMerger.reset();
+    }
+
+    protected void tokenizeQuery(InvertedIndexSearchPredicate searchPred) throws HyracksDataException,
+            OccurrenceThresholdPanicException {
+        ITupleReference queryTuple = searchPred.getQueryTuple();
+        int queryFieldIndex = searchPred.getQueryFieldIndex();
+        IBinaryTokenizer queryTokenizer = searchPred.getQueryTokenizer();
+
+        queryTokenAppender.reset(queryTokenFrame, true);
+        queryTokenizer.reset(queryTuple.getFieldData(queryFieldIndex), queryTuple.getFieldStart(queryFieldIndex),
+                queryTuple.getFieldLength(queryFieldIndex));
+
+        while (queryTokenizer.hasNext()) {
+            queryTokenizer.next();
+            queryTokenBuilder.reset();
+            try {
+                IToken token = queryTokenizer.getToken();
+                token.serializeToken(queryTokenBuilder.getDataOutput());
+                queryTokenBuilder.addFieldEndOffset();
+                // WARNING: assuming one frame is big enough to hold all tokens
+                queryTokenAppender.append(queryTokenBuilder.getFieldEndOffsets(), queryTokenBuilder.getByteArray(), 0,
+                        queryTokenBuilder.getSize());
+            } catch (IOException e) {
+                throw new HyracksDataException(e);
+            }
+        }
+    }
+
+    public IFrameTupleAccessor createResultFrameTupleAccessor() {
+        return new FixedSizeFrameTupleAccessor(ctx.getFrameSize(), searchResult.getTypeTraits());
+    }
+
+    public ITupleReference createResultFrameTupleReference() {
+        return new FixedSizeTupleReference(searchResult.getTypeTraits());
+    }
+
+    @Override
+    public List<ByteBuffer> getResultBuffers() {
+        return searchResult.getBuffers();
+    }
+
+    @Override
+    public int getNumValidResultBuffers() {
+        return searchResult.getCurrentBufferIndex() + 1;
+    }
+
+    public int getOccurrenceThreshold() {
+        return occurrenceThreshold;
+    }
+
+    public void printNewResults(int maxResultBufIdx, List<ByteBuffer> buffer) {
+        StringBuffer strBuffer = new StringBuffer();
+        FixedSizeFrameTupleAccessor resultFrameTupleAcc = searchResult.getAccessor();
+        for (int i = 0; i <= maxResultBufIdx; i++) {
+            ByteBuffer testBuf = buffer.get(i);
+            resultFrameTupleAcc.reset(testBuf);
+            for (int j = 0; j < resultFrameTupleAcc.getTupleCount(); j++) {
+                strBuffer.append(IntegerSerializerDeserializer.getInt(resultFrameTupleAcc.getBuffer().array(),
+                        resultFrameTupleAcc.getFieldStartOffset(j, 0)) + ",");
+                strBuffer.append(IntegerSerializerDeserializer.getInt(resultFrameTupleAcc.getBuffer().array(),
+                        resultFrameTupleAcc.getFieldStartOffset(j, 1)) + " ");
+            }
+        }
+        System.out.println(strBuffer.toString());
+    }
+}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/ArrayListFactory.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/ArrayListFactory.java
new file mode 100644
index 0000000..493063e
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/ArrayListFactory.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search;
+
+import java.util.ArrayList;
+
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IObjectFactory;
+
+public class ArrayListFactory<T> implements IObjectFactory<ArrayList<T>>{
+    @Override
+    public ArrayList<T> create() {
+        return new ArrayList<T>();
+    }
+}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/ConjunctiveSearchModifier.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/ConjunctiveSearchModifier.java
index 1d260f0..318f1e1 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/ConjunctiveSearchModifier.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/ConjunctiveSearchModifier.java
@@ -25,7 +25,7 @@
     }
 
     @Override
-    public int getNumPrefixLists(int numQueryTokens) {
+    public int getNumPrefixLists(int occurrenceThreshold, int numInvLists) {
         return 1;
     }
     
@@ -33,4 +33,14 @@
     public String toString() {
         return "Conjunctive Search Modifier";
     }
+
+    @Override
+    public short getNumTokensLowerBound(short numQueryTokens) {
+        return -1;
+    }
+
+    @Override
+    public short getNumTokensUpperBound(short numQueryTokens) {
+        return -1;
+    }
 }
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/EditDistanceSearchModifier.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/EditDistanceSearchModifier.java
index 0580319..9c06f4d 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/EditDistanceSearchModifier.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/EditDistanceSearchModifier.java
@@ -33,8 +33,18 @@
     }
 
     @Override
-    public int getNumPrefixLists(int numQueryTokens) {
-        return numQueryTokens - getOccurrenceThreshold(numQueryTokens) + 1;
+    public int getNumPrefixLists(int occurrenceThreshold, int numInvLists) {
+        return numInvLists - occurrenceThreshold + 1;
+    }
+
+    @Override
+    public short getNumTokensLowerBound(short numQueryTokens) {
+        return (short) (numQueryTokens - edThresh);
+    }
+
+    @Override
+    public short getNumTokensUpperBound(short numQueryTokens) {
+        return (short) (numQueryTokens + edThresh);
     }
 
     public int getGramLength() {
@@ -52,7 +62,7 @@
     public void setEdThresh(int edThresh) {
         this.edThresh = edThresh;
     }
-    
+
     @Override
     public String toString() {
         return "Edit Distance Search Modifier, GramLen: " + gramLength + ", Threshold: " + edThresh;
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/InvertedListCursorFactory.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/InvertedListCursorFactory.java
new file mode 100644
index 0000000..b4b3c43
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/InvertedListCursorFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search;
+
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IObjectFactory;
+
+public class InvertedListCursorFactory implements IObjectFactory<IInvertedListCursor> {
+
+    private final IInvertedIndex invIndex;
+
+    public InvertedListCursorFactory(IInvertedIndex invIndex) {
+        this.invIndex = invIndex;
+    }
+
+    @Override
+    public IInvertedListCursor create() {
+        return invIndex.createInvertedListCursor();
+    }
+}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/InvertedListMerger.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/InvertedListMerger.java
new file mode 100644
index 0000000..18ed73c
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/InvertedListMerger.java
@@ -0,0 +1,330 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+
+import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeFrameTupleAccessor;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeTupleReference;
+
+// TODO: The merge procedure is rather confusing regarding cursor positions, hasNext() calls etc.
+// Needs an overhaul some time.
+public class InvertedListMerger {
+
+    protected final MultiComparator invListCmp;
+    protected SearchResult prevSearchResult;
+    protected SearchResult newSearchResult;
+
+    public InvertedListMerger(IHyracksCommonContext ctx, IInvertedIndex invIndex) {
+        this.invListCmp = MultiComparator.create(invIndex.getInvListCmpFactories());
+        this.prevSearchResult = new SearchResult(invIndex.getInvListTypeTraits(), ctx);
+        this.newSearchResult = new SearchResult(prevSearchResult);
+    }
+
+    public void merge(ArrayList<IInvertedListCursor> invListCursors, int occurrenceThreshold, int numPrefixLists,
+            SearchResult searchResult) throws HyracksDataException, IndexException {
+        Collections.sort(invListCursors);
+        int numInvLists = invListCursors.size();
+        SearchResult result = null;
+        for (int i = 0; i < numInvLists; i++) {
+            SearchResult swapTemp = prevSearchResult;
+            prevSearchResult = newSearchResult;
+            newSearchResult = swapTemp;
+            newSearchResult.reset();
+            if (i + 1 != numInvLists) {
+                // Use temporary search results when not merging last list.
+                result = newSearchResult;
+            } else {
+                // When merging the last list, append results to the final search result.
+                result = searchResult;
+            }
+            IInvertedListCursor invListCursor = invListCursors.get(i);
+            invListCursor.pinPages();
+            if (i < numPrefixLists) {
+                // Merge prefix list.
+                mergePrefixList(invListCursor, prevSearchResult, result);
+            } else {
+                // Merge suffix list.
+                int numInvListElements = invListCursor.size();
+                int currentNumResults = prevSearchResult.getNumResults();
+                // Should we binary search the next list or should we sort-merge it?
+                if (currentNumResults * Math.log(numInvListElements) < currentNumResults + numInvListElements) {
+                    mergeSuffixListProbe(invListCursor, prevSearchResult, result, i, numInvLists,
+                            occurrenceThreshold);
+                } else {
+                    mergeSuffixListScan(invListCursor, prevSearchResult, result, i, numInvLists,
+                            occurrenceThreshold);
+                }
+            }
+            invListCursor.unpinPages();
+        }
+    }
+
+    protected void mergeSuffixListProbe(IInvertedListCursor invListCursor, SearchResult prevSearchResult,
+            SearchResult newSearchResult, int invListIx, int numInvLists, int occurrenceThreshold)
+            throws HyracksDataException, IndexException {
+
+        int prevBufIdx = 0;
+        int maxPrevBufIdx = prevSearchResult.getCurrentBufferIndex();
+        ByteBuffer prevCurrentBuffer = prevSearchResult.getBuffers().get(0);
+
+        FixedSizeFrameTupleAccessor resultFrameTupleAcc = prevSearchResult.getAccessor();
+        FixedSizeTupleReference resultTuple = prevSearchResult.getTuple();
+
+        int resultTidx = 0;
+
+        resultFrameTupleAcc.reset(prevCurrentBuffer);
+
+        while (resultTidx < resultFrameTupleAcc.getTupleCount()) {
+
+            resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));
+            int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
+                    resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
+
+            if (invListCursor.containsKey(resultTuple, invListCmp)) {
+                count++;
+                newSearchResult.append(resultTuple, count);
+            } else {
+                if (count + numInvLists - invListIx > occurrenceThreshold) {
+                    newSearchResult.append(resultTuple, count);
+                }
+            }
+
+            resultTidx++;
+            if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
+                prevBufIdx++;
+                if (prevBufIdx <= maxPrevBufIdx) {
+                    prevCurrentBuffer = prevSearchResult.getBuffers().get(prevBufIdx);
+                    resultFrameTupleAcc.reset(prevCurrentBuffer);
+                    resultTidx = 0;
+                }
+            }
+        }
+    }
+
+    protected void mergeSuffixListScan(IInvertedListCursor invListCursor, SearchResult prevSearchResult,
+            SearchResult newSearchResult, int invListIx, int numInvLists, int occurrenceThreshold)
+            throws HyracksDataException, IndexException {
+
+        int prevBufIdx = 0;
+        int maxPrevBufIdx = prevSearchResult.getCurrentBufferIndex();
+        ByteBuffer prevCurrentBuffer = prevSearchResult.getBuffers().get(0);
+
+        FixedSizeFrameTupleAccessor resultFrameTupleAcc = prevSearchResult.getAccessor();
+        FixedSizeTupleReference resultTuple = prevSearchResult.getTuple();
+
+        boolean advanceCursor = true;
+        boolean advancePrevResult = false;
+        int resultTidx = 0;
+
+        resultFrameTupleAcc.reset(prevCurrentBuffer);
+
+        int invListTidx = 0;
+        int invListNumTuples = invListCursor.size();
+
+        if (invListCursor.hasNext())
+            invListCursor.next();
+
+        while (invListTidx < invListNumTuples && resultTidx < resultFrameTupleAcc.getTupleCount()) {
+
+            ITupleReference invListTuple = invListCursor.getTuple();
+
+            resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));
+
+            int cmp = invListCmp.compare(invListTuple, resultTuple);
+            if (cmp == 0) {
+                int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
+                        resultTuple.getFieldStart(resultTuple.getFieldCount() - 1)) + 1;
+                newSearchResult.append(resultTuple, count);
+                advanceCursor = true;
+                advancePrevResult = true;
+            } else {
+                if (cmp < 0) {
+                    advanceCursor = true;
+                    advancePrevResult = false;
+                } else {
+                    int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
+                            resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
+                    if (count + numInvLists - invListIx > occurrenceThreshold) {
+                        newSearchResult.append(resultTuple, count);
+                    }
+                    advanceCursor = false;
+                    advancePrevResult = true;
+                }
+            }
+
+            if (advancePrevResult) {
+                resultTidx++;
+                if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
+                    prevBufIdx++;
+                    if (prevBufIdx <= maxPrevBufIdx) {
+                        prevCurrentBuffer = prevSearchResult.getBuffers().get(prevBufIdx);
+                        resultFrameTupleAcc.reset(prevCurrentBuffer);
+                        resultTidx = 0;
+                    }
+                }
+            }
+
+            if (advanceCursor) {
+                invListTidx++;
+                if (invListCursor.hasNext()) {
+                    invListCursor.next();
+                }
+            }
+        }
+
+        // append remaining elements from previous result set
+        while (resultTidx < resultFrameTupleAcc.getTupleCount()) {
+
+            resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));
+
+            int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
+                    resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
+            if (count + numInvLists - invListIx > occurrenceThreshold) {
+                newSearchResult.append(resultTuple, count);
+            }
+
+            resultTidx++;
+            if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
+                prevBufIdx++;
+                if (prevBufIdx <= maxPrevBufIdx) {
+                    prevCurrentBuffer = prevSearchResult.getBuffers().get(prevBufIdx);
+                    resultFrameTupleAcc.reset(prevCurrentBuffer);
+                    resultTidx = 0;
+                }
+            }
+        }
+    }
+
+    protected void mergePrefixList(IInvertedListCursor invListCursor, SearchResult prevSearchResult,
+            SearchResult newSearchResult) throws HyracksDataException, IndexException {
+
+        int prevBufIdx = 0;
+        int maxPrevBufIdx = prevSearchResult.getCurrentBufferIndex();
+        ByteBuffer prevCurrentBuffer = prevSearchResult.getBuffers().get(0);
+
+        FixedSizeFrameTupleAccessor resultFrameTupleAcc = prevSearchResult.getAccessor();
+        FixedSizeTupleReference resultTuple = prevSearchResult.getTuple();
+
+        boolean advanceCursor = true;
+        boolean advancePrevResult = false;
+        int resultTidx = 0;
+
+        resultFrameTupleAcc.reset(prevCurrentBuffer);
+
+        int invListTidx = 0;
+        int invListNumTuples = invListCursor.size();
+
+        if (invListCursor.hasNext())
+            invListCursor.next();
+
+        while (invListTidx < invListNumTuples && resultTidx < resultFrameTupleAcc.getTupleCount()) {
+
+            ITupleReference invListTuple = invListCursor.getTuple();
+            resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));
+
+            int cmp = invListCmp.compare(invListTuple, resultTuple);
+            if (cmp == 0) {
+                int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
+                        resultTuple.getFieldStart(resultTuple.getFieldCount() - 1)) + 1;
+                newSearchResult.append(resultTuple, count);
+                advanceCursor = true;
+                advancePrevResult = true;
+            } else {
+                if (cmp < 0) {
+                    int count = 1;
+                    newSearchResult.append(invListTuple, count);
+                    advanceCursor = true;
+                    advancePrevResult = false;
+                } else {
+                    int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
+                            resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
+                    newSearchResult.append(resultTuple, count);
+                    advanceCursor = false;
+                    advancePrevResult = true;
+                }
+            }
+
+            if (advancePrevResult) {
+                resultTidx++;
+                if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
+                    prevBufIdx++;
+                    if (prevBufIdx <= maxPrevBufIdx) {
+                        prevCurrentBuffer = prevSearchResult.getBuffers().get(prevBufIdx);
+                        resultFrameTupleAcc.reset(prevCurrentBuffer);
+                        resultTidx = 0;
+                    }
+                }
+            }
+
+            if (advanceCursor) {
+                invListTidx++;
+                if (invListCursor.hasNext()) {
+                    invListCursor.next();
+                }
+            }
+        }
+
+        // append remaining new elements from inverted list
+        while (invListTidx < invListNumTuples) {
+            ITupleReference invListTuple = invListCursor.getTuple();
+            newSearchResult.append(invListTuple, 1);
+            invListTidx++;
+            if (invListCursor.hasNext()) {
+                invListCursor.next();
+            }
+        }
+
+        // append remaining elements from previous result set
+        while (resultTidx < resultFrameTupleAcc.getTupleCount()) {
+
+            resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));
+
+            int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
+                    resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
+            newSearchResult.append(resultTuple, count);
+
+            resultTidx++;
+            if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
+                prevBufIdx++;
+                if (prevBufIdx <= maxPrevBufIdx) {
+                    prevCurrentBuffer = prevSearchResult.getBuffers().get(prevBufIdx);
+                    resultFrameTupleAcc.reset(prevCurrentBuffer);
+                    resultTidx = 0;
+                }
+            }
+        }
+    }
+
+    public SearchResult createSearchResult() {
+        return new SearchResult(prevSearchResult);
+    }
+
+    public void reset() {
+        prevSearchResult.clear();
+        newSearchResult.clear();
+    }
+}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/InvertedListPartitions.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/InvertedListPartitions.java
new file mode 100644
index 0000000..2f720fb
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/InvertedListPartitions.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IObjectFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.ObjectCache;
+
+public class InvertedListPartitions {
+    private final int DEFAULT_NUM_PARTITIONS = 10;
+    private final int PARTITIONS_SLACK_SIZE = 10;
+    private final int OBJECT_CACHE_INIT_SIZE = 10;
+    private final int OBJECT_CACHE_EXPAND_SIZE = 10;
+    private final IObjectFactory<ArrayList<IInvertedListCursor>> arrayListFactory;
+    private final ObjectCache<ArrayList<IInvertedListCursor>> arrayListCache;
+    private ArrayList<IInvertedListCursor>[] partitions;
+    private int minValidPartitionIndex;
+    private int maxValidPartitionIndex;
+
+    public InvertedListPartitions() {
+        this.arrayListFactory = new ArrayListFactory<IInvertedListCursor>();
+        this.arrayListCache = new ObjectCache<ArrayList<IInvertedListCursor>>(arrayListFactory, OBJECT_CACHE_INIT_SIZE,
+                OBJECT_CACHE_EXPAND_SIZE);
+    }
+
+    @SuppressWarnings("unchecked")
+    public void reset(short numTokensLowerBound, short numTokensUpperBound) {
+        if (partitions == null) {
+            int initialSize;
+            if (numTokensUpperBound < 0) {
+                initialSize = DEFAULT_NUM_PARTITIONS;
+            } else {
+                initialSize = numTokensUpperBound + 1;
+            }
+            partitions = (ArrayList<IInvertedListCursor>[]) new ArrayList[initialSize];
+        } else {
+            if (numTokensUpperBound + 1 >= partitions.length) {
+                partitions = Arrays.copyOf(partitions, numTokensUpperBound + 1);
+            }
+            Arrays.fill(partitions, null);
+        }
+        arrayListCache.reset();
+        minValidPartitionIndex = Integer.MAX_VALUE;
+        maxValidPartitionIndex = Integer.MIN_VALUE;
+    }
+
+    public void addInvertedListCursor(IInvertedListCursor listCursor, int numTokens) {
+        if (numTokens + 1 >= partitions.length) {
+            partitions = Arrays.copyOf(partitions, numTokens + PARTITIONS_SLACK_SIZE);
+        }
+        ArrayList<IInvertedListCursor> partitionCursors = partitions[numTokens];
+        if (partitionCursors == null) {
+            partitionCursors = arrayListCache.getNext();
+            partitionCursors.clear();
+            partitions[numTokens] = partitionCursors;
+            // Update range of valid partitions.
+            if (numTokens < minValidPartitionIndex) {
+                minValidPartitionIndex = numTokens;
+            }
+            if (numTokens > maxValidPartitionIndex) {
+                maxValidPartitionIndex = numTokens;
+            }
+        }
+        partitionCursors.add(listCursor);
+    }
+
+    public ArrayList<IInvertedListCursor>[] getPartitions() {
+        return partitions;
+    }
+
+    public int getMinValidPartitionIndex() {
+        return minValidPartitionIndex;
+    }
+
+    public int getMaxValidPartitionIndex() {
+        return maxValidPartitionIndex;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/JaccardSearchModifier.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/JaccardSearchModifier.java
index 4cf7e40..ede6041 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/JaccardSearchModifier.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/JaccardSearchModifier.java
@@ -31,11 +31,21 @@
     }
 
     @Override
-    public int getNumPrefixLists(int numQueryTokens) {
-        if (numQueryTokens == 0) {
+    public int getNumPrefixLists(int occurrenceThreshold, int numInvLists) {
+        if (numInvLists == 0) {
             return 0;
         }
-        return numQueryTokens - getOccurrenceThreshold(numQueryTokens) + 1;
+        return numInvLists - occurrenceThreshold + 1;
+    }
+
+    @Override
+    public short getNumTokensLowerBound(short numQueryTokens) {
+        return (short) Math.floor(numQueryTokens * jaccThresh);
+    }
+
+    @Override
+    public short getNumTokensUpperBound(short numQueryTokens) {
+        return (short) Math.ceil(numQueryTokens / jaccThresh);
     }
 
     public float getJaccThresh() {
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/PartitionedTOccurrenceSearcher.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/PartitionedTOccurrenceSearcher.java
new file mode 100644
index 0000000..6b482ba
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/PartitionedTOccurrenceSearcher.java
@@ -0,0 +1,148 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.ShortSerializerDeserializer;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexOperationContext;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.tuples.ConcatenatingTupleReference;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexSearchModifier;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IPartitionedInvertedIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.exceptions.OccurrenceThresholdPanicException;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.OnDiskInvertedIndexSearchCursor;
+
+public class PartitionedTOccurrenceSearcher extends AbstractTOccurrenceSearcher {
+
+    protected final ArrayTupleBuilder lowerBoundTupleBuilder = new ArrayTupleBuilder(1);
+    protected final ArrayTupleReference lowerBoundTuple = new ArrayTupleReference();
+    protected final ArrayTupleBuilder upperBoundTupleBuilder = new ArrayTupleBuilder(1);
+    protected final ArrayTupleReference upperBoundTuple = new ArrayTupleReference();
+    protected final ConcatenatingTupleReference fullLowSearchKey = new ConcatenatingTupleReference(2);
+    protected final ConcatenatingTupleReference fullHighSearchKey = new ConcatenatingTupleReference(2);
+
+    protected final InvertedListPartitions partitions = new InvertedListPartitions();
+
+    public PartitionedTOccurrenceSearcher(IHyracksCommonContext ctx, IInvertedIndex invIndex) {
+        super(ctx, invIndex);
+        initHelperTuples();
+    }
+
+    private void initHelperTuples() {
+        try {
+            lowerBoundTupleBuilder.reset();
+            // Write dummy value.
+            lowerBoundTupleBuilder.getDataOutput().writeShort(Short.MIN_VALUE);
+            lowerBoundTupleBuilder.addFieldEndOffset();
+            lowerBoundTuple.reset(lowerBoundTupleBuilder.getFieldEndOffsets(), lowerBoundTupleBuilder.getByteArray());
+            // Only needed for setting the number of fields in searchKey.
+            searchKey.reset(queryTokenAccessor, 0);
+            fullLowSearchKey.reset();
+            fullLowSearchKey.addTuple(searchKey);
+            fullLowSearchKey.addTuple(lowerBoundTuple);
+
+            upperBoundTupleBuilder.reset();
+            // Write dummy value.
+            upperBoundTupleBuilder.getDataOutput().writeShort(Short.MAX_VALUE);
+            upperBoundTupleBuilder.addFieldEndOffset();
+            upperBoundTuple.reset(upperBoundTupleBuilder.getFieldEndOffsets(), upperBoundTupleBuilder.getByteArray());
+            // Only needed for setting the number of fields in searchKey.
+            searchKey.reset(queryTokenAccessor, 0);
+            fullHighSearchKey.reset();
+            fullHighSearchKey.addTuple(searchKey);
+            fullHighSearchKey.addTuple(upperBoundTuple);
+        } catch (IOException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    public void search(OnDiskInvertedIndexSearchCursor resultCursor, InvertedIndexSearchPredicate searchPred,
+            IIndexOperationContext ictx) throws HyracksDataException, IndexException {
+        tokenizeQuery(searchPred);
+        short numQueryTokens = (short) queryTokenAccessor.getTupleCount();
+
+        IInvertedIndexSearchModifier searchModifier = searchPred.getSearchModifier();
+        short numTokensLowerBound = searchModifier.getNumTokensLowerBound(numQueryTokens);
+        short numTokensUpperBound = searchModifier.getNumTokensUpperBound(numQueryTokens);
+
+        IPartitionedInvertedIndex partInvIndex = (IPartitionedInvertedIndex) invIndex;
+        invListCursorCache.reset();
+        partitions.reset(numTokensLowerBound, numTokensUpperBound);
+        for (int i = 0; i < numQueryTokens; i++) {
+            searchKey.reset(queryTokenAccessor, i);
+            partInvIndex.openInvertedListPartitionCursors(this, ictx, numTokensLowerBound, numTokensUpperBound,
+                    partitions);
+        }
+
+        occurrenceThreshold = searchModifier.getOccurrenceThreshold(numQueryTokens);
+        if (occurrenceThreshold <= 0) {
+            throw new OccurrenceThresholdPanicException("Merge Threshold is <= 0. Failing Search.");
+        }
+
+        // Process the partitions one-by-one.
+        ArrayList<IInvertedListCursor>[] partitionCursors = partitions.getPartitions();
+        int start = partitions.getMinValidPartitionIndex();
+        int end = partitions.getMaxValidPartitionIndex();
+        searchResult.reset();
+        for (int i = start; i <= end; i++) {
+            if (partitionCursors[i] == null) {
+                continue;
+            }
+            // Prune partition because no element in it can satisfy the occurrence threshold.
+            if (partitionCursors[i].size() < occurrenceThreshold) {
+                continue;
+            }
+            // Merge inverted lists of current partition.
+            int numPrefixLists = searchModifier.getNumPrefixLists(occurrenceThreshold, partitionCursors[i].size());
+            invListMerger.reset();
+            invListMerger.merge(partitionCursors[i], occurrenceThreshold, numPrefixLists, searchResult);
+        }
+
+        resultCursor.open(null, searchPred);
+    }
+
+    public void setNumTokensBoundsInSearchKeys(short numTokensLowerBound, short numTokensUpperBound) {
+        ShortSerializerDeserializer.putShort(numTokensLowerBound, lowerBoundTuple.getFieldData(0),
+                lowerBoundTuple.getFieldStart(0));
+        ShortSerializerDeserializer.putShort(numTokensUpperBound, upperBoundTuple.getFieldData(0),
+                upperBoundTuple.getFieldStart(0));
+    }
+
+    public ITupleReference getPrefixSearchKey() {
+        return searchKey;
+    }
+
+    public ITupleReference getFullLowSearchKey() {
+        return fullLowSearchKey;
+    }
+
+    public ITupleReference getFullHighSearchKey() {
+        return fullHighSearchKey;
+    }
+
+    public IInvertedListCursor getCachedInvertedListCursor() {
+        return invListCursorCache.getNext();
+    }
+}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/SearchResult.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/SearchResult.java
new file mode 100644
index 0000000..aa0d3f2
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/SearchResult.java
@@ -0,0 +1,182 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+
+import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeFrameTupleAccessor;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeFrameTupleAppender;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeTupleReference;
+
+/**
+ * Byte-buffer backed storage for intermediate and final results of inverted-index searches.
+ */
+// TODO: Rename members.
+public class SearchResult {
+    protected final ArrayList<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
+    protected final IHyracksCommonContext ctx;
+    protected final FixedSizeFrameTupleAppender appender;
+    protected final FixedSizeFrameTupleAccessor accessor;
+    protected final FixedSizeTupleReference tuple;
+    protected final ITypeTraits[] typeTraits;
+    protected final int invListElementSize;
+
+    protected int currBufIdx;
+    protected int numResults;
+
+    public SearchResult(ITypeTraits[] invListFields, IHyracksCommonContext ctx) {
+        typeTraits = new ITypeTraits[invListFields.length + 1];
+        int tmp = 0;
+        for (int i = 0; i < invListFields.length; i++) {
+            typeTraits[i] = invListFields[i];
+            tmp += invListFields[i].getFixedLength();
+        }
+        invListElementSize = tmp;
+        // Integer for counting occurrences.
+        typeTraits[invListFields.length] = IntegerPointable.TYPE_TRAITS;
+        this.ctx = ctx;
+        appender = new FixedSizeFrameTupleAppender(ctx.getFrameSize(), typeTraits);
+        accessor = new FixedSizeFrameTupleAccessor(ctx.getFrameSize(), typeTraits);
+        tuple = new FixedSizeTupleReference(typeTraits);
+        buffers.add(ctx.allocateFrame());
+    }
+
+    /**
+     * Initialize from other search-result object to share member instances except for result buffers.
+     */
+    public SearchResult(SearchResult other) {
+        this.ctx = other.ctx;
+        this.appender = other.appender;
+        this.accessor = other.accessor;
+        this.tuple = other.tuple;
+        this.typeTraits = other.typeTraits;
+        this.invListElementSize = other.invListElementSize;
+        buffers.add(ctx.allocateFrame());
+    }
+
+    public FixedSizeFrameTupleAccessor getAccessor() {
+        return accessor;
+    }
+
+    public FixedSizeFrameTupleAppender getAppender() {
+        return appender;
+    }
+
+    public FixedSizeTupleReference getTuple() {
+        return tuple;
+    }
+
+    public ArrayList<ByteBuffer> getBuffers() {
+        return buffers;
+    }
+
+    public void reset() {
+        currBufIdx = 0;
+        numResults = 0;
+        appender.reset(buffers.get(0), true);
+    }
+
+    public void clear() {
+        currBufIdx = 0;
+        numResults = 0;
+        for (ByteBuffer buffer : buffers) {
+            appender.reset(buffer, true);
+        }
+    }
+
+    public void append(ITupleReference invListElement, int count) {
+        ByteBuffer currentBuffer = buffers.get(currBufIdx);
+        if (!appender.hasSpace()) {
+            currBufIdx++;
+            if (currBufIdx >= buffers.size()) {
+                buffers.add(ctx.allocateFrame());
+            }
+            currentBuffer = buffers.get(currBufIdx);
+            appender.reset(currentBuffer, true);
+        }
+        // Append inverted-list element.
+        if (!appender.append(invListElement.getFieldData(0), invListElement.getFieldStart(0), invListElementSize)) {
+            throw new IllegalStateException();
+        }
+        // Append count.
+        if (!appender.append(count)) {
+            throw new IllegalStateException();
+        }
+        appender.incrementTupleCount(1);
+        numResults++;
+    }
+
+    public int getCurrentBufferIndex() {
+        return currBufIdx;
+    }
+
+    public ITypeTraits[] getTypeTraits() {
+        return typeTraits;
+    }
+
+    public int getNumResults() {
+        return numResults;
+    }
+
+    // TODO: This code may help to clean up the core list-merging algorithms.
+    /*
+    public SearchResultCursor getCursor() {
+        cursor.reset();
+        return cursor;
+    }
+    
+    public class SearchResultCursor {
+        private int bufferIndex;
+        private int resultIndex;
+        private int frameResultIndex;
+        private ByteBuffer currentBuffer;
+
+        public void reset() {
+            bufferIndex = 0;
+            resultIndex = 0;
+            frameResultIndex = 0;
+            currentBuffer = buffers.get(0);
+            resultFrameTupleAcc.reset(currentBuffer);
+        }
+
+        public boolean hasNext() {
+            return resultIndex < numResults;
+        }
+
+        public void next() {
+            resultTuple.reset(currentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(frameResultIndex));            
+            if (frameResultIndex < resultFrameTupleAcc.getTupleCount()) {
+                frameResultIndex++;
+            } else {
+                bufferIndex++;
+                currentBuffer = buffers.get(bufferIndex);
+                resultFrameTupleAcc.reset(currentBuffer);
+                frameResultIndex = 0;
+            }            
+            resultIndex++;
+        }
+
+        public ITupleReference getTuple() {
+            return resultTuple;
+        }
+    }
+    */
+}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/TOccurrenceSearcher.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/TOccurrenceSearcher.java
index 419d5aa..4513540 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/TOccurrenceSearcher.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/TOccurrenceSearcher.java
@@ -15,522 +15,49 @@
 
 package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search;
 
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
 
-import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
 import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexOperationContext;
 import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
-import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndex;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexSearchModifier;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexSearcher;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.exceptions.OccurrenceThresholdPanicException;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeFrameTupleAccessor;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeFrameTupleAppender;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeTupleReference;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.OnDiskInvertedIndexSearchCursor;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizer;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IToken;
 
-// TODO: The search procedure is rather confusing regarding cursor positions, hasNext() calls etc.
-// Needs an overhaul some time.
-public class TOccurrenceSearcher implements IInvertedIndexSearcher {
+public class TOccurrenceSearcher extends AbstractTOccurrenceSearcher {
 
-    protected final IHyracksCommonContext ctx;
-    protected final FixedSizeFrameTupleAppender resultFrameTupleApp;
-    protected final FixedSizeFrameTupleAccessor resultFrameTupleAcc;
-    protected final FixedSizeTupleReference resultTuple;
-    protected final int invListKeyLength;
-    protected int currentNumResults;
-
-    protected List<ByteBuffer> newResultBuffers = new ArrayList<ByteBuffer>();
-    protected List<ByteBuffer> prevResultBuffers = new ArrayList<ByteBuffer>();
-    protected List<ByteBuffer> swap = null;
-    protected int maxResultBufIdx = 0;
-
-    protected RecordDescriptor queryTokenRecDesc = new RecordDescriptor(
-            new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
-    protected ArrayTupleBuilder queryTokenBuilder = new ArrayTupleBuilder(queryTokenRecDesc.getFieldCount());
-    protected DataOutput queryTokenDos = queryTokenBuilder.getDataOutput();
-    protected FrameTupleAppender queryTokenAppender;
-    protected ByteBuffer queryTokenFrame;
-    protected final FrameTupleReference searchKey = new FrameTupleReference();
-
-    protected final IInvertedIndex invIndex;
-    protected final MultiComparator invListCmp;
-    protected final ITypeTraits[] invListFieldsWithCount;
-    protected int occurrenceThreshold;
-
-    protected final int cursorCacheSize = 10;
-    protected List<IInvertedListCursor> invListCursorCache = new ArrayList<IInvertedListCursor>(cursorCacheSize);
-    protected List<IInvertedListCursor> invListCursors = new ArrayList<IInvertedListCursor>(cursorCacheSize);
+    protected final ArrayList<IInvertedListCursor> invListCursors = new ArrayList<IInvertedListCursor>();
 
     public TOccurrenceSearcher(IHyracksCommonContext ctx, IInvertedIndex invIndex) {
-        this.ctx = ctx;
-        this.invIndex = invIndex;
-        this.invListCmp = MultiComparator.create(invIndex.getInvListCmpFactories());
-
-        ITypeTraits[] invListFields = invIndex.getInvListTypeTraits();
-        invListFieldsWithCount = new ITypeTraits[invListFields.length + 1];
-        int tmp = 0;
-        for (int i = 0; i < invListFields.length; i++) {
-            invListFieldsWithCount[i] = invListFields[i];
-            tmp += invListFields[i].getFixedLength();
-        }
-        // using an integer for counting occurrences
-        invListFieldsWithCount[invListFields.length] = IntegerPointable.TYPE_TRAITS;
-        invListKeyLength = tmp;
-
-        resultFrameTupleApp = new FixedSizeFrameTupleAppender(ctx.getFrameSize(), invListFieldsWithCount);
-        resultFrameTupleAcc = new FixedSizeFrameTupleAccessor(ctx.getFrameSize(), invListFieldsWithCount);
-        resultTuple = new FixedSizeTupleReference(invListFieldsWithCount);
-        newResultBuffers.add(ctx.allocateFrame());
-        prevResultBuffers.add(ctx.allocateFrame());
-
-        // Pre-create cursor objects.
-        for (int i = 0; i < cursorCacheSize; i++) {
-            invListCursorCache.add(invIndex.createInvertedListCursor());
-        }
-
-        queryTokenAppender = new FrameTupleAppender(ctx.getFrameSize());
-        queryTokenFrame = ctx.allocateFrame();
-
-        currentNumResults = 0;
-    }
-
-    public void reset() {
-        for (ByteBuffer b : newResultBuffers) {
-            resultFrameTupleApp.reset(b, true);
-        }
-        for (ByteBuffer b : prevResultBuffers) {
-            resultFrameTupleApp.reset(b, true);
-        }
-        currentNumResults = 0;
+        super(ctx, invIndex);
     }
 
     public void search(OnDiskInvertedIndexSearchCursor resultCursor, InvertedIndexSearchPredicate searchPred,
             IIndexOperationContext ictx) throws HyracksDataException, IndexException {
-        ITupleReference queryTuple = searchPred.getQueryTuple();
-        int queryFieldIndex = searchPred.getQueryFieldIndex();
-        IInvertedIndexSearchModifier searchModifier = searchPred.getSearchModifier();
-        IBinaryTokenizer queryTokenizer = searchPred.getQueryTokenizer();
-
-        queryTokenAppender.reset(queryTokenFrame, true);
-        queryTokenizer.reset(queryTuple.getFieldData(queryFieldIndex), queryTuple.getFieldStart(queryFieldIndex),
-                queryTuple.getFieldLength(queryFieldIndex));
-
-        while (queryTokenizer.hasNext()) {
-            queryTokenizer.next();
-            queryTokenBuilder.reset();
-            try {
-                IToken token = queryTokenizer.getToken();
-                token.serializeToken(queryTokenDos);
-                queryTokenBuilder.addFieldEndOffset();
-                // WARNING: assuming one frame is big enough to hold all tokens
-                queryTokenAppender.append(queryTokenBuilder.getFieldEndOffsets(), queryTokenBuilder.getByteArray(), 0,
-                        queryTokenBuilder.getSize());
-            } catch (IOException e) {
-                throw new HyracksDataException(e);
-            }
-        }
-
-        FrameTupleAccessor queryTokenAccessor = new FrameTupleAccessor(ctx.getFrameSize(), queryTokenRecDesc);
-        queryTokenAccessor.reset(queryTokenFrame);
+        tokenizeQuery(searchPred);
         int numQueryTokens = queryTokenAccessor.getTupleCount();
 
-        // Expand cursor cache if necessary.
-        if (numQueryTokens > invListCursorCache.size()) {
-            int diff = numQueryTokens - invListCursorCache.size();
-            for (int i = 0; i < diff; i++) {
-                invListCursorCache.add(invIndex.createInvertedListCursor());
-            }
-        }
-
         invListCursors.clear();
+        invListCursorCache.reset();
         for (int i = 0; i < numQueryTokens; i++) {
             searchKey.reset(queryTokenAccessor, i);
-            invIndex.openInvertedListCursor(invListCursorCache.get(i), searchKey, ictx);
-            invListCursors.add(invListCursorCache.get(i));
+            IInvertedListCursor invListCursor = invListCursorCache.getNext();
+            invIndex.openInvertedListCursor(invListCursor, searchKey, ictx);
+            invListCursors.add(invListCursor);
         }
-        Collections.sort(invListCursors);
-        
-        occurrenceThreshold = searchModifier.getOccurrenceThreshold(invListCursors.size());
-        // TODO: deal with panic cases properly
+
+        IInvertedIndexSearchModifier searchModifier = searchPred.getSearchModifier();
+        occurrenceThreshold = searchModifier.getOccurrenceThreshold(numQueryTokens);
         if (occurrenceThreshold <= 0) {
-            throw new OccurrenceThresholdPanicException("Merge Threshold is <= 0. Failing Search.");
+            throw new OccurrenceThresholdPanicException("Merge threshold is <= 0. Failing Search.");
         }
-        
-        int numPrefixLists = searchModifier.getNumPrefixLists(invListCursors.size());
-        maxResultBufIdx = mergePrefixLists(numPrefixLists, numQueryTokens);
-        maxResultBufIdx = mergeSuffixLists(numPrefixLists, numQueryTokens, maxResultBufIdx);
+        int numPrefixLists = searchModifier.getNumPrefixLists(occurrenceThreshold, invListCursors.size());
 
+        searchResult.reset();
+        invListMerger.merge(invListCursors, occurrenceThreshold, numPrefixLists, searchResult);
         resultCursor.open(null, searchPred);
     }
-
-    protected int mergePrefixLists(int numPrefixTokens, int numQueryTokens) throws HyracksDataException, IndexException {
-        int maxPrevBufIdx = 0;
-        for (int i = 0; i < numPrefixTokens; i++) {
-            swap = prevResultBuffers;
-            prevResultBuffers = newResultBuffers;
-            newResultBuffers = swap;
-            currentNumResults = 0;
-
-            invListCursors.get(i).pinPages();
-            maxPrevBufIdx = mergePrefixList(invListCursors.get(i), prevResultBuffers, maxPrevBufIdx, newResultBuffers);
-            invListCursors.get(i).unpinPages();
-        }
-        return maxPrevBufIdx;
-    }
-
-    protected int mergeSuffixLists(int numPrefixTokens, int numQueryTokens, int maxPrevBufIdx)
-            throws HyracksDataException, IndexException {
-        for (int i = numPrefixTokens; i < numQueryTokens; i++) {
-            swap = prevResultBuffers;
-            prevResultBuffers = newResultBuffers;
-            newResultBuffers = swap;
-
-            invListCursors.get(i).pinPages();
-            int numInvListElements = invListCursors.get(i).size();
-            // should we binary search the next list or should we sort-merge it?
-            if (currentNumResults * Math.log(numInvListElements) < currentNumResults + numInvListElements) {
-                maxPrevBufIdx = mergeSuffixListProbe(invListCursors.get(i), prevResultBuffers, maxPrevBufIdx,
-                        newResultBuffers, i, numQueryTokens);
-            } else {
-                maxPrevBufIdx = mergeSuffixListScan(invListCursors.get(i), prevResultBuffers, maxPrevBufIdx,
-                        newResultBuffers, i, numQueryTokens);
-            }
-            invListCursors.get(i).unpinPages();
-        }
-        return maxPrevBufIdx;
-    }
-
-    protected int mergeSuffixListProbe(IInvertedListCursor invListCursor, List<ByteBuffer> prevResultBuffers,
-            int maxPrevBufIdx, List<ByteBuffer> newResultBuffers, int invListIx, int numQueryTokens) throws HyracksDataException, IndexException {
-
-        int newBufIdx = 0;
-        ByteBuffer newCurrentBuffer = newResultBuffers.get(0);
-
-        int prevBufIdx = 0;
-        ByteBuffer prevCurrentBuffer = prevResultBuffers.get(0);
-
-        int resultTidx = 0;
-
-        currentNumResults = 0;
-
-        resultFrameTupleAcc.reset(prevCurrentBuffer);
-        resultFrameTupleApp.reset(newCurrentBuffer, true);
-
-        while (resultTidx < resultFrameTupleAcc.getTupleCount()) {
-
-            resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));
-            int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
-                    resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
-
-            if (invListCursor.containsKey(resultTuple, invListCmp)) {
-                count++;
-                newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
-            } else {
-                if (count + numQueryTokens - invListIx > occurrenceThreshold) {
-                    newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
-                }
-            }
-
-            resultTidx++;
-            if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
-                prevBufIdx++;
-                if (prevBufIdx <= maxPrevBufIdx) {
-                    prevCurrentBuffer = prevResultBuffers.get(prevBufIdx);
-                    resultFrameTupleAcc.reset(prevCurrentBuffer);
-                    resultTidx = 0;
-                }
-            }
-        }
-
-        return newBufIdx;
-    }
-
-    protected int mergeSuffixListScan(IInvertedListCursor invListCursor, List<ByteBuffer> prevResultBuffers,
-            int maxPrevBufIdx, List<ByteBuffer> newResultBuffers, int invListIx, int numQueryTokens)
-            throws HyracksDataException, IndexException {
-        
-        int newBufIdx = 0;
-        ByteBuffer newCurrentBuffer = newResultBuffers.get(0);
-
-        int prevBufIdx = 0;
-        ByteBuffer prevCurrentBuffer = prevResultBuffers.get(0);
-
-        boolean advanceCursor = true;
-        boolean advancePrevResult = false;
-        int resultTidx = 0;
-
-        currentNumResults = 0;
-        
-        resultFrameTupleAcc.reset(prevCurrentBuffer);
-        resultFrameTupleApp.reset(newCurrentBuffer, true);
-
-        int invListTidx = 0;
-        int invListNumTuples = invListCursor.size();
-
-        if (invListCursor.hasNext())
-            invListCursor.next();
-
-        while (invListTidx < invListNumTuples && resultTidx < resultFrameTupleAcc.getTupleCount()) {
-
-            ITupleReference invListTuple = invListCursor.getTuple();
-
-            resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));
-
-            int cmp = invListCmp.compare(invListTuple, resultTuple);
-            if (cmp == 0) {
-                int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
-                        resultTuple.getFieldStart(resultTuple.getFieldCount() - 1)) + 1;
-                newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
-                advanceCursor = true;
-                advancePrevResult = true;
-            } else {
-                if (cmp < 0) {
-                    advanceCursor = true;
-                    advancePrevResult = false;
-                } else {
-                    int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
-                            resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
-                    if (count + numQueryTokens - invListIx > occurrenceThreshold) {
-                        newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
-                    }
-                    advanceCursor = false;
-                    advancePrevResult = true;
-                }
-            }
-
-            if (advancePrevResult) {
-                resultTidx++;
-                if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
-                    prevBufIdx++;
-                    if (prevBufIdx <= maxPrevBufIdx) {
-                        prevCurrentBuffer = prevResultBuffers.get(prevBufIdx);
-                        resultFrameTupleAcc.reset(prevCurrentBuffer);
-                        resultTidx = 0;
-                    }
-                }
-            }
-
-            if (advanceCursor) {
-                invListTidx++;
-                if (invListCursor.hasNext()) {
-                    invListCursor.next();
-                }
-            }
-        }
-
-        // append remaining elements from previous result set
-        while (resultTidx < resultFrameTupleAcc.getTupleCount()) {
-
-            resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));
-
-            int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
-                    resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
-            if (count + numQueryTokens - invListIx > occurrenceThreshold) {
-                newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
-            }
-
-            resultTidx++;
-            if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
-                prevBufIdx++;
-                if (prevBufIdx <= maxPrevBufIdx) {
-                    prevCurrentBuffer = prevResultBuffers.get(prevBufIdx);
-                    resultFrameTupleAcc.reset(prevCurrentBuffer);
-                    resultTidx = 0;
-                }
-            }
-        }
-
-        return newBufIdx;
-    }
-
-    protected int mergePrefixList(IInvertedListCursor invListCursor, List<ByteBuffer> prevResultBuffers,
-            int maxPrevBufIdx, List<ByteBuffer> newResultBuffers) throws HyracksDataException, IndexException {
-        
-        int newBufIdx = 0;
-        ByteBuffer newCurrentBuffer = newResultBuffers.get(0);
-
-        int prevBufIdx = 0;
-        ByteBuffer prevCurrentBuffer = prevResultBuffers.get(0);
-
-        boolean advanceCursor = true;
-        boolean advancePrevResult = false;
-        int resultTidx = 0;
-
-        resultFrameTupleAcc.reset(prevCurrentBuffer);
-        resultFrameTupleApp.reset(newCurrentBuffer, true);
-
-        int invListTidx = 0;
-        int invListNumTuples = invListCursor.size();
-
-        if (invListCursor.hasNext())
-            invListCursor.next();
-
-        while (invListTidx < invListNumTuples && resultTidx < resultFrameTupleAcc.getTupleCount()) {
-
-            ITupleReference invListTuple = invListCursor.getTuple();
-            resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));
-
-            int cmp = invListCmp.compare(invListTuple, resultTuple);
-            if (cmp == 0) {
-                int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
-                        resultTuple.getFieldStart(resultTuple.getFieldCount() - 1)) + 1;
-                newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
-                advanceCursor = true;
-                advancePrevResult = true;
-            } else {
-                if (cmp < 0) {
-                    int count = 1;
-                    newBufIdx = appendTupleToNewResults(invListTuple, count, newBufIdx);
-                    advanceCursor = true;
-                    advancePrevResult = false;
-                } else {
-                    int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
-                            resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
-                    newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
-                    advanceCursor = false;
-                    advancePrevResult = true;
-                }
-            }
-
-            if (advancePrevResult) {
-                resultTidx++;
-                if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
-                    prevBufIdx++;
-                    if (prevBufIdx <= maxPrevBufIdx) {
-                        prevCurrentBuffer = prevResultBuffers.get(prevBufIdx);
-                        resultFrameTupleAcc.reset(prevCurrentBuffer);
-                        resultTidx = 0;
-                    }
-                }
-            }
-
-            if (advanceCursor) {
-                invListTidx++;
-                if (invListCursor.hasNext()) {
-                    invListCursor.next();
-                }
-            }
-        }
-
-        // append remaining new elements from inverted list
-        while (invListTidx < invListNumTuples) {
-            ITupleReference invListTuple = invListCursor.getTuple();
-            newBufIdx = appendTupleToNewResults(invListTuple, 1, newBufIdx);
-            invListTidx++;
-            if (invListCursor.hasNext()) {
-                invListCursor.next();
-            }
-        }
-
-        // append remaining elements from previous result set
-        while (resultTidx < resultFrameTupleAcc.getTupleCount()) {
-
-            resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));
-
-            int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
-                    resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
-            newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
-
-            resultTidx++;
-            if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
-                prevBufIdx++;
-                if (prevBufIdx <= maxPrevBufIdx) {
-                    prevCurrentBuffer = prevResultBuffers.get(prevBufIdx);
-                    resultFrameTupleAcc.reset(prevCurrentBuffer);
-                    resultTidx = 0;
-                }
-            }
-        }
-
-        return newBufIdx;
-    }
-
-    protected int appendTupleToNewResults(ITupleReference tuple, int newCount, int newBufIdx) {
-        ByteBuffer newCurrentBuffer = newResultBuffers.get(newBufIdx);
-
-        if (!resultFrameTupleApp.hasSpace()) {
-            newBufIdx++;
-            if (newBufIdx >= newResultBuffers.size()) {
-                newResultBuffers.add(ctx.allocateFrame());
-            }
-            newCurrentBuffer = newResultBuffers.get(newBufIdx);
-            resultFrameTupleApp.reset(newCurrentBuffer, true);
-        }
-
-        // append key
-        if (!resultFrameTupleApp.append(tuple.getFieldData(0), tuple.getFieldStart(0), invListKeyLength)) {
-            throw new IllegalStateException();
-        }
-
-        // append new count
-        if (!resultFrameTupleApp.append(newCount)) {
-            throw new IllegalStateException();
-        }
-
-        resultFrameTupleApp.incrementTupleCount(1);
-
-        currentNumResults++;
-
-        return newBufIdx;
-    }
-
-    public IFrameTupleAccessor createResultFrameTupleAccessor() {
-        return new FixedSizeFrameTupleAccessor(ctx.getFrameSize(), invListFieldsWithCount);
-    }
-
-    public ITupleReference createResultFrameTupleReference() {
-        return new FixedSizeTupleReference(invListFieldsWithCount);
-    }
-
-    @Override
-    public List<ByteBuffer> getResultBuffers() {
-        return newResultBuffers;
-    }
-
-    @Override
-    public int getNumValidResultBuffers() {
-        return maxResultBufIdx + 1;
-    }
-
-    public int getOccurrenceThreshold() {
-        return occurrenceThreshold;
-    }
-
-    public void printNewResults(int maxResultBufIdx, List<ByteBuffer> buffer) {
-        StringBuffer strBuffer = new StringBuffer();
-        for (int i = 0; i <= maxResultBufIdx; i++) {
-            ByteBuffer testBuf = buffer.get(i);
-            resultFrameTupleAcc.reset(testBuf);
-            for (int j = 0; j < resultFrameTupleAcc.getTupleCount(); j++) {
-                strBuffer.append(IntegerSerializerDeserializer.getInt(resultFrameTupleAcc.getBuffer().array(),
-                        resultFrameTupleAcc.getFieldStartOffset(j, 0)) + ",");
-                strBuffer.append(IntegerSerializerDeserializer.getInt(resultFrameTupleAcc.getBuffer().array(),
-                        resultFrameTupleAcc.getFieldStartOffset(j, 1)) + " ");
-            }
-        }
-        System.out.println(strBuffer.toString());
-    }
 }
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/TOccurrenceSearcherSuffixProbeOnly.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/TOccurrenceSearcherSuffixProbeOnly.java
deleted file mode 100644
index 630b810..0000000
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/TOccurrenceSearcherSuffixProbeOnly.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
-import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.OnDiskInvertedIndex;
-
-public class TOccurrenceSearcherSuffixProbeOnly extends TOccurrenceSearcher {
-
-	protected final MultiComparator invListCmp;
-	
-    public TOccurrenceSearcherSuffixProbeOnly(IHyracksTaskContext ctx, OnDiskInvertedIndex invIndex) {
-        super(ctx, invIndex);
-        this.invListCmp = MultiComparator.create(invIndex.getInvListCmpFactories());
-    }
-
-    protected int mergeSuffixLists(int numPrefixTokens, int numQueryTokens, int maxPrevBufIdx) throws HyracksDataException, IndexException {
-        for (int i = numPrefixTokens; i < numQueryTokens; i++) {
-            swap = prevResultBuffers;
-            prevResultBuffers = newResultBuffers;
-            newResultBuffers = swap;
-            currentNumResults = 0;
-
-            invListCursors.get(i).pinPages();
-            maxPrevBufIdx = mergeSuffixListProbe(invListCursors.get(i), prevResultBuffers, maxPrevBufIdx,
-                    newResultBuffers, i, numQueryTokens);
-            invListCursors.get(i).unpinPages();
-        }
-        return maxPrevBufIdx;
-    }
-
-    protected int mergeSuffixListProbe(IInvertedListCursor invListCursor, List<ByteBuffer> prevResultBuffers,
-            int maxPrevBufIdx, List<ByteBuffer> newResultBuffers, int invListIx, int numQueryTokens) throws HyracksDataException, IndexException {
-
-        int newBufIdx = 0;
-        ByteBuffer newCurrentBuffer = newResultBuffers.get(0);
-
-        int prevBufIdx = 0;
-        ByteBuffer prevCurrentBuffer = prevResultBuffers.get(0);
-
-        int resultTidx = 0;
-
-        resultFrameTupleAcc.reset(prevCurrentBuffer);
-        resultFrameTupleApp.reset(newCurrentBuffer, true);
-
-        while (resultTidx < resultFrameTupleAcc.getTupleCount()) {
-
-            resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));
-            int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
-                    resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
-
-            if (invListCursor.containsKey(resultTuple, invListCmp)) {
-                count++;
-                newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
-            } else {
-                if (count + numQueryTokens - invListIx > occurrenceThreshold) {
-                    newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
-                }
-            }
-
-            resultTidx++;
-            if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
-                prevBufIdx++;
-                if (prevBufIdx <= maxPrevBufIdx) {
-                    prevCurrentBuffer = prevResultBuffers.get(prevBufIdx);
-                    resultFrameTupleAcc.reset(prevCurrentBuffer);
-                    resultTidx = 0;
-                }
-            }
-        }
-
-        return newBufIdx;
-    }
-}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/TOccurrenceSearcherSuffixScanOnly.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/TOccurrenceSearcherSuffixScanOnly.java
deleted file mode 100644
index 3640511..0000000
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/search/TOccurrenceSearcherSuffixScanOnly.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
-import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.OnDiskInvertedIndex;
-
-public class TOccurrenceSearcherSuffixScanOnly extends TOccurrenceSearcher {
-
-	protected final MultiComparator invListCmp;
-	
-    public TOccurrenceSearcherSuffixScanOnly(IHyracksTaskContext ctx, OnDiskInvertedIndex invIndex) {
-        super(ctx, invIndex);
-        this.invListCmp = MultiComparator.create(invIndex.getInvListCmpFactories());
-    }
-
-    protected int mergeSuffixLists(int numPrefixTokens, int numQueryTokens, int maxPrevBufIdx) throws HyracksDataException, IndexException {
-        for (int i = numPrefixTokens; i < numQueryTokens; i++) {
-            swap = prevResultBuffers;
-            prevResultBuffers = newResultBuffers;
-            newResultBuffers = swap;
-            currentNumResults = 0;
-
-            invListCursors.get(i).pinPages();
-            maxPrevBufIdx = mergeSuffixListScan(invListCursors.get(i), prevResultBuffers, maxPrevBufIdx,
-                    newResultBuffers, i, numQueryTokens);
-            invListCursors.get(i).unpinPages();
-        }
-        return maxPrevBufIdx;
-    }
-
-    protected int mergeSuffixListScan(IInvertedListCursor invListCursor, List<ByteBuffer> prevResultBuffers,
-            int maxPrevBufIdx, List<ByteBuffer> newResultBuffers, int invListIx, int numQueryTokens) throws HyracksDataException, IndexException {
-
-        int newBufIdx = 0;
-        ByteBuffer newCurrentBuffer = newResultBuffers.get(0);
-
-        int prevBufIdx = 0;
-        ByteBuffer prevCurrentBuffer = prevResultBuffers.get(0);
-
-        boolean advanceCursor = true;
-        boolean advancePrevResult = false;
-        int resultTidx = 0;
-
-        resultFrameTupleAcc.reset(prevCurrentBuffer);
-        resultFrameTupleApp.reset(newCurrentBuffer, true);
-
-        while (invListCursor.hasNext() && resultTidx < resultFrameTupleAcc.getTupleCount()) {
-
-            if (advanceCursor)
-                invListCursor.next();
-
-            ITupleReference invListTuple = invListCursor.getTuple();
-
-            resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));
-
-            int cmp = invListCmp.compare(invListTuple, resultTuple);
-            if (cmp == 0) {
-                int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
-                        resultTuple.getFieldStart(resultTuple.getFieldCount() - 1)) + 1;
-                newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
-                advanceCursor = true;
-                advancePrevResult = true;
-            } else {
-                if (cmp < 0) {
-                    advanceCursor = true;
-                    advancePrevResult = false;
-                } else {
-                    int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
-                            resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
-                    if (count + numQueryTokens - invListIx > occurrenceThreshold) {
-                        newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
-                    }
-                    advanceCursor = false;
-                    advancePrevResult = true;
-                }
-            }
-
-            if (advancePrevResult) {
-                resultTidx++;
-                if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
-                    prevBufIdx++;
-                    if (prevBufIdx <= maxPrevBufIdx) {
-                        prevCurrentBuffer = prevResultBuffers.get(prevBufIdx);
-                        resultFrameTupleAcc.reset(prevCurrentBuffer);
-                        resultTidx = 0;
-                    }
-                }
-            }
-        }
-
-        // append remaining elements from previous result set
-        while (resultTidx < resultFrameTupleAcc.getTupleCount()) {
-
-            int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
-                    resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
-            newBufIdx = appendTupleToNewResults(resultTuple, count, newBufIdx);
-
-            resultTidx++;
-            if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
-                prevBufIdx++;
-                if (prevBufIdx <= maxPrevBufIdx) {
-                    prevCurrentBuffer = prevResultBuffers.get(prevBufIdx);
-                    resultFrameTupleAcc.reset(prevCurrentBuffer);
-                    resultTidx = 0;
-                }
-            }
-        }
-
-        return newBufIdx;
-    }
-}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/InvertedIndexTokenizingTupleIterator.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/InvertedIndexTokenizingTupleIterator.java
index 633213a..4f8f635 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/InvertedIndexTokenizingTupleIterator.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/InvertedIndexTokenizingTupleIterator.java
@@ -27,13 +27,13 @@
 // TODO: We can possibly avoid copying the data into a new tuple here.
 public class InvertedIndexTokenizingTupleIterator {
     // Field that is expected to be tokenized.
-    private final int DOC_FIELD_INDEX = 0;
+    protected final int DOC_FIELD_INDEX = 0;
 
-    private final int invListFieldCount;
-    private final ArrayTupleBuilder tupleBuilder;
-    private final ArrayTupleReference tupleReference;
-    private final IBinaryTokenizer tokenizer;
-    private ITupleReference inputTuple;
+    protected final int invListFieldCount;
+    protected final ArrayTupleBuilder tupleBuilder;
+    protected final ArrayTupleReference tupleReference;
+    protected final IBinaryTokenizer tokenizer;
+    protected ITupleReference inputTuple;
 
     public InvertedIndexTokenizingTupleIterator(int tokensFieldCount, int invListFieldCount, IBinaryTokenizer tokenizer) {
         this.invListFieldCount = invListFieldCount;
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/InvertedIndexUtils.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/InvertedIndexUtils.java
index 7da9416..953d72b 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/InvertedIndexUtils.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/InvertedIndexUtils.java
@@ -43,11 +43,15 @@
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedListBuilderFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndex;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndexFileManager;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls.PartitionedLSMInvertedIndex;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.inmemory.InMemoryInvertedIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.inmemory.PartitionedInMemoryInvertedIndex;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeElementInvertedListBuilder;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeElementInvertedListBuilderFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.OnDiskInvertedIndex;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.OnDiskInvertedIndexFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.PartitionedOnDiskInvertedIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.PartitionedOnDiskInvertedIndexFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
 import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
 import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
@@ -63,6 +67,15 @@
                 tokenTypeTraits, tokenCmpFactories, tokenizerFactory);
     }
 
+    public static InMemoryInvertedIndex createPartitionedInMemoryBTreeInvertedindex(IBufferCache memBufferCache,
+            IFreePageManager memFreePageManager, ITypeTraits[] invListTypeTraits,
+            IBinaryComparatorFactory[] invListCmpFactories, ITypeTraits[] tokenTypeTraits,
+            IBinaryComparatorFactory[] tokenCmpFactories, IBinaryTokenizerFactory tokenizerFactory)
+            throws BTreeException {
+        return new PartitionedInMemoryInvertedIndex(memBufferCache, memFreePageManager, invListTypeTraits,
+                invListCmpFactories, tokenTypeTraits, tokenCmpFactories, tokenizerFactory);
+    }
+
     public static OnDiskInvertedIndex createOnDiskInvertedIndex(IBufferCache bufferCache,
             IFileMapProvider fileMapProvider, ITypeTraits[] invListTypeTraits,
             IBinaryComparatorFactory[] invListCmpFactories, ITypeTraits[] tokenTypeTraits,
@@ -73,18 +86,23 @@
                 tokenTypeTraits, tokenCmpFactories, btreeFile, invListsFile);
     }
 
+    public static PartitionedOnDiskInvertedIndex createPartitionedOnDiskInvertedIndex(IBufferCache bufferCache,
+            IFileMapProvider fileMapProvider, ITypeTraits[] invListTypeTraits,
+            IBinaryComparatorFactory[] invListCmpFactories, ITypeTraits[] tokenTypeTraits,
+            IBinaryComparatorFactory[] tokenCmpFactories, FileReference invListsFile) throws IndexException {
+        IInvertedListBuilder builder = new FixedSizeElementInvertedListBuilder(invListTypeTraits);
+        FileReference btreeFile = getBTreeFile(invListsFile);
+        return new PartitionedOnDiskInvertedIndex(bufferCache, fileMapProvider, builder, invListTypeTraits,
+                invListCmpFactories, tokenTypeTraits, tokenCmpFactories, btreeFile, invListsFile);
+    }
+
     public static FileReference getBTreeFile(FileReference invListsFile) {
         return new FileReference(new File(invListsFile.getFile().getPath() + "_btree"));
     }
 
-    public static LSMInvertedIndex createLSMInvertedIndex(IInMemoryBufferCache memBufferCache,
-            IInMemoryFreePageManager memFreePageManager, IFileMapProvider diskFileMapProvider,
+    public static BTreeFactory createDeletedKeysBTreeFactory(IFileMapProvider diskFileMapProvider,
             ITypeTraits[] invListTypeTraits, IBinaryComparatorFactory[] invListCmpFactories,
-            ITypeTraits[] tokenTypeTraits, IBinaryComparatorFactory[] tokenCmpFactories,
-            IBinaryTokenizerFactory tokenizerFactory, IBufferCache diskBufferCache, IIOManager ioManager,
-            String onDiskDir, ILSMFlushController flushController, ILSMMergePolicy mergePolicy,
-            ILSMOperationTrackerFactory opTrackerFactory, ILSMIOOperationScheduler ioScheduler) throws IndexException {
-
+            IBufferCache diskBufferCache) throws BTreeException {
         TypeAwareTupleWriterFactory tupleWriterFactory = new TypeAwareTupleWriterFactory(invListTypeTraits);
         ITreeIndexFrameFactory leafFrameFactory = BTreeUtils.getLeafFrameFactory(tupleWriterFactory,
                 BTreeLeafFrameType.REGULAR_NSM);
@@ -95,6 +113,19 @@
         BTreeFactory deletedKeysBTreeFactory = new BTreeFactory(diskBufferCache, diskFileMapProvider,
                 freePageManagerFactory, interiorFrameFactory, leafFrameFactory, invListCmpFactories,
                 invListCmpFactories.length);
+        return deletedKeysBTreeFactory;
+    }
+
+    public static LSMInvertedIndex createLSMInvertedIndex(IInMemoryBufferCache memBufferCache,
+            IInMemoryFreePageManager memFreePageManager, IFileMapProvider diskFileMapProvider,
+            ITypeTraits[] invListTypeTraits, IBinaryComparatorFactory[] invListCmpFactories,
+            ITypeTraits[] tokenTypeTraits, IBinaryComparatorFactory[] tokenCmpFactories,
+            IBinaryTokenizerFactory tokenizerFactory, IBufferCache diskBufferCache, IIOManager ioManager,
+            String onDiskDir, ILSMFlushController flushController, ILSMMergePolicy mergePolicy,
+            ILSMOperationTrackerFactory opTrackerFactory, ILSMIOOperationScheduler ioScheduler) throws IndexException {
+
+        BTreeFactory deletedKeysBTreeFactory = createDeletedKeysBTreeFactory(diskFileMapProvider, invListTypeTraits,
+                invListCmpFactories, diskBufferCache);
 
         FileReference onDiskDirFileRef = new FileReference(new File(onDiskDir));
         LSMInvertedIndexFileManager fileManager = new LSMInvertedIndexFileManager(ioManager, diskFileMapProvider,
@@ -112,4 +143,32 @@
                 ioScheduler);
         return invIndex;
     }
+
+    public static PartitionedLSMInvertedIndex createPartitionedLSMInvertedIndex(IInMemoryBufferCache memBufferCache,
+            IInMemoryFreePageManager memFreePageManager, IFileMapProvider diskFileMapProvider,
+            ITypeTraits[] invListTypeTraits, IBinaryComparatorFactory[] invListCmpFactories,
+            ITypeTraits[] tokenTypeTraits, IBinaryComparatorFactory[] tokenCmpFactories,
+            IBinaryTokenizerFactory tokenizerFactory, IBufferCache diskBufferCache, IIOManager ioManager,
+            String onDiskDir, ILSMFlushController flushController, ILSMMergePolicy mergePolicy,
+            ILSMOperationTrackerFactory opTrackerFactory, ILSMIOOperationScheduler ioScheduler) throws IndexException {
+
+        BTreeFactory deletedKeysBTreeFactory = createDeletedKeysBTreeFactory(diskFileMapProvider, invListTypeTraits,
+                invListCmpFactories, diskBufferCache);
+
+        FileReference onDiskDirFileRef = new FileReference(new File(onDiskDir));
+        LSMInvertedIndexFileManager fileManager = new LSMInvertedIndexFileManager(ioManager, diskFileMapProvider,
+                onDiskDirFileRef, deletedKeysBTreeFactory);
+
+        IInvertedListBuilderFactory invListBuilderFactory = new FixedSizeElementInvertedListBuilderFactory(
+                invListTypeTraits);
+        PartitionedOnDiskInvertedIndexFactory invIndexFactory = new PartitionedOnDiskInvertedIndexFactory(
+                diskBufferCache, diskFileMapProvider, invListBuilderFactory, invListTypeTraits, invListCmpFactories,
+                tokenTypeTraits, tokenCmpFactories, fileManager);
+
+        PartitionedLSMInvertedIndex invIndex = new PartitionedLSMInvertedIndex(memBufferCache, memFreePageManager,
+                invIndexFactory, deletedKeysBTreeFactory, fileManager, diskFileMapProvider, invListTypeTraits,
+                invListCmpFactories, tokenTypeTraits, tokenCmpFactories, tokenizerFactory, flushController,
+                mergePolicy, opTrackerFactory, ioScheduler);
+        return invIndex;
+    }
 }
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/ObjectCache.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/ObjectCache.java
new file mode 100644
index 0000000..b073f20
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/ObjectCache.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util;
+
+import java.util.ArrayList;
+
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IObjectFactory;
+
+public class ObjectCache<T> {
+    protected final int expandSize;
+    protected final IObjectFactory<T> objFactory;
+    protected final ArrayList<T> cache;
+    protected int lastReturned = 0;
+
+    public ObjectCache(IObjectFactory<T> objFactory, int initialSize, int expandSize) {
+        this.objFactory = objFactory;
+        this.cache = new ArrayList<T>(initialSize);
+        this.expandSize = expandSize;
+        expand(initialSize);
+    }
+
+    private void expand(int expandSize) {
+        for (int i = 0; i < expandSize; i++) {
+            cache.add(objFactory.create());
+        }
+    }
+
+    public void reset() {
+        lastReturned = 0;
+    }
+
+    public T getNext() {
+        if (lastReturned >= cache.size()) {
+            expand(expandSize);
+        }
+        return cache.get(lastReturned++);
+    }
+}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/PartitionedInvertedIndexTokenizingTupleIterator.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/PartitionedInvertedIndexTokenizingTupleIterator.java
new file mode 100644
index 0000000..a6a4bc1
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/PartitionedInvertedIndexTokenizingTupleIterator.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util;
+
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizer;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IToken;
+
+// TODO: We can possibly avoid copying the data into a new tuple here.
+public class PartitionedInvertedIndexTokenizingTupleIterator extends InvertedIndexTokenizingTupleIterator {
+
+    protected short numTokens = 0;
+
+    public PartitionedInvertedIndexTokenizingTupleIterator(int tokensFieldCount, int invListFieldCount,
+            IBinaryTokenizer tokenizer) {
+        super(tokensFieldCount, invListFieldCount, tokenizer);
+    }
+
+    public void reset(ITupleReference inputTuple) {
+        super.reset(inputTuple);
+        // Run through the tokenizer once to get the total number of tokens.
+        numTokens = 0;
+        while (tokenizer.hasNext()) {
+            tokenizer.next();
+            numTokens++;
+        }
+        super.reset(inputTuple);
+    }
+
+    public void next() throws HyracksDataException {
+        tokenizer.next();
+        IToken token = tokenizer.getToken();
+        tupleBuilder.reset();
+        try {
+            // Add token field.
+            token.serializeToken(tupleBuilder.getDataOutput());
+            tupleBuilder.addFieldEndOffset();
+            // Add field with number of tokens.
+            tupleBuilder.getDataOutput().writeShort(numTokens);
+            tupleBuilder.addFieldEndOffset();
+        } catch (IOException e) {
+            throw new HyracksDataException(e);
+        }
+        // Add inverted-list element fields.
+        for (int i = 0; i < invListFieldCount; i++) {
+            tupleBuilder.addField(inputTuple.getFieldData(i + 1), inputTuple.getFieldStart(i + 1),
+                    inputTuple.getFieldLength(i + 1));
+        }
+        // Reset tuple reference for insert operation.
+        tupleReference.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray());
+    }
+
+    public short getNumTokens() {
+        return numTokens;
+    }
+}
diff --git a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexBulkLoadTest.java b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexBulkLoadTest.java
new file mode 100644
index 0000000..f7a36f0
--- /dev/null
+++ b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexBulkLoadTest.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex;
+
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.common.AbstractInvertedIndexLoadTest;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.LSMInvertedIndexTestContext.InvertedIndexType;
+
+public class PartitionedLSMInvertedIndexBulkLoadTest extends AbstractInvertedIndexLoadTest {
+
+    public PartitionedLSMInvertedIndexBulkLoadTest() {
+        super(InvertedIndexType.PARTITIONED_LSM, true, 1);
+    }
+}
diff --git a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexDeleteTest.java b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexDeleteTest.java
new file mode 100644
index 0000000..4fd529b
--- /dev/null
+++ b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexDeleteTest.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex;
+
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.common.AbstractInvertedIndexDeleteTest;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.LSMInvertedIndexTestContext.InvertedIndexType;
+
+public class PartitionedLSMInvertedIndexDeleteTest extends AbstractInvertedIndexDeleteTest {
+
+    public PartitionedLSMInvertedIndexDeleteTest() {
+        super(InvertedIndexType.PARTITIONED_LSM, false);
+    }
+}
\ No newline at end of file
diff --git a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexInsertTest.java b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexInsertTest.java
new file mode 100644
index 0000000..4608f81
--- /dev/null
+++ b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexInsertTest.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex;
+
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.common.AbstractInvertedIndexLoadTest;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.LSMInvertedIndexTestContext.InvertedIndexType;
+
+public class PartitionedLSMInvertedIndexInsertTest extends AbstractInvertedIndexLoadTest {
+
+    public PartitionedLSMInvertedIndexInsertTest() {
+        super(InvertedIndexType.PARTITIONED_LSM, false, 1);
+    }
+}
diff --git a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexMergeTest.java b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexMergeTest.java
new file mode 100644
index 0000000..52bb3f8
--- /dev/null
+++ b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexMergeTest.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex;
+
+import java.io.IOException;
+
+import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.datagen.TupleGenerator;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import edu.uci.ics.hyracks.storage.am.config.AccessMethodTestsConfig;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.common.AbstractInvertedIndexLoadTest;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.LSMInvertedIndexTestContext;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.LSMInvertedIndexTestContext.InvertedIndexType;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.LSMInvertedIndexTestUtils;
+
+public class PartitionedLSMInvertedIndexMergeTest extends AbstractInvertedIndexLoadTest {
+
+    private final int maxTreesToMerge = AccessMethodTestsConfig.LSM_INVINDEX_MAX_TREES_TO_MERGE;
+    
+    public PartitionedLSMInvertedIndexMergeTest() {
+        super(InvertedIndexType.PARTITIONED_LSM, true, 1);
+    }
+
+    @Override
+    protected void runTest(LSMInvertedIndexTestContext testCtx, TupleGenerator tupleGen) throws IOException,
+            IndexException {
+        IIndex invIndex = testCtx.getIndex();        
+        invIndex.create();
+        invIndex.activate();
+        ILSMIndexAccessor invIndexAccessor = (ILSMIndexAccessor) invIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+
+        for (int i = 0; i < maxTreesToMerge; i++) {
+            for (int j = 0; j < i; j++) {
+                if (bulkLoad) {
+                    LSMInvertedIndexTestUtils.bulkLoadInvIndex(testCtx, tupleGen, NUM_DOCS_TO_INSERT);
+                } else {
+                    LSMInvertedIndexTestUtils.insertIntoInvIndex(testCtx, tupleGen, NUM_DOCS_TO_INSERT);
+                }
+            }
+            // Perform merge.
+            ILSMIOOperation ioop = invIndexAccessor.createMergeOperation(NoOpIOOperationCallback.INSTANCE);
+            if (ioop != null) {
+                invIndexAccessor.merge(ioop);
+            }
+            validateAndCheckIndex(testCtx);
+            runTinySearchWorkload(testCtx, tupleGen);
+        }
+        
+        invIndex.deactivate();
+        invIndex.destroy();
+    }
+}
diff --git a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexMultiBulkLoadTest.java b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexMultiBulkLoadTest.java
new file mode 100644
index 0000000..80a3c0b
--- /dev/null
+++ b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexMultiBulkLoadTest.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex;
+
+import edu.uci.ics.hyracks.storage.am.config.AccessMethodTestsConfig;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.common.AbstractInvertedIndexLoadTest;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.LSMInvertedIndexTestContext.InvertedIndexType;
+
+public class PartitionedLSMInvertedIndexMultiBulkLoadTest extends AbstractInvertedIndexLoadTest {
+
+    public PartitionedLSMInvertedIndexMultiBulkLoadTest() {
+        super(InvertedIndexType.PARTITIONED_LSM, true, AccessMethodTestsConfig.LSM_INVINDEX_NUM_BULKLOAD_ROUNDS);
+    }
+}
diff --git a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexSearchTest.java b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexSearchTest.java
new file mode 100644
index 0000000..c8a7667
--- /dev/null
+++ b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/PartitionedLSMInvertedIndexSearchTest.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex;
+
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.common.AbstractInvertedIndexSearchTest;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.LSMInvertedIndexTestContext.InvertedIndexType;
+
+public class PartitionedLSMInvertedIndexSearchTest extends AbstractInvertedIndexSearchTest {
+
+    public PartitionedLSMInvertedIndexSearchTest() {
+        super(InvertedIndexType.PARTITIONED_LSM, false);
+    }
+}
diff --git a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndexDeleteTest.java b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndexDeleteTest.java
new file mode 100644
index 0000000..eac7765
--- /dev/null
+++ b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndexDeleteTest.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.inmemory;
+
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.common.AbstractInvertedIndexDeleteTest;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.LSMInvertedIndexTestContext.InvertedIndexType;
+
+public class PartitionedInMemoryInvertedIndexDeleteTest extends AbstractInvertedIndexDeleteTest {
+    
+    public PartitionedInMemoryInvertedIndexDeleteTest() {
+        super(InvertedIndexType.PARTITIONED_INMEMORY, false);
+    }
+}
diff --git a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndexInsertTest.java b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndexInsertTest.java
new file mode 100644
index 0000000..8342efd
--- /dev/null
+++ b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndexInsertTest.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.inmemory;
+
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.common.AbstractInvertedIndexLoadTest;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.LSMInvertedIndexTestContext.InvertedIndexType;
+
+public class PartitionedInMemoryInvertedIndexInsertTest extends AbstractInvertedIndexLoadTest {
+
+    public PartitionedInMemoryInvertedIndexInsertTest() {
+        super(InvertedIndexType.PARTITIONED_INMEMORY, false, 1);
+    }
+}
diff --git a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndexSearchTest.java b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndexSearchTest.java
new file mode 100644
index 0000000..385d65d
--- /dev/null
+++ b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndexSearchTest.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.inmemory;
+
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.common.AbstractInvertedIndexSearchTest;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.LSMInvertedIndexTestContext.InvertedIndexType;
+
+public class PartitionedInMemoryInvertedIndexSearchTest extends AbstractInvertedIndexSearchTest {
+
+    public PartitionedInMemoryInvertedIndexSearchTest() {
+        super(InvertedIndexType.PARTITIONED_INMEMORY, false);
+    }
+}
diff --git a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/multithread/LSMInvertedIndexMultiThreadTest.java b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/multithread/LSMInvertedIndexMultiThreadTest.java
index 8b01a40..bd48068 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/multithread/LSMInvertedIndexMultiThreadTest.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/multithread/LSMInvertedIndexMultiThreadTest.java
@@ -39,17 +39,17 @@
 public class LSMInvertedIndexMultiThreadTest {
 
     protected final Logger LOGGER = Logger.getLogger(LSMInvertedIndexMultiThreadTest.class.getName());
-    
+
     // Machine-specific number of threads to use for testing.
     protected final int REGULAR_NUM_THREADS = Runtime.getRuntime().availableProcessors();
     // Excessive number of threads for testing.
     protected final int EXCESSIVE_NUM_THREADS = Runtime.getRuntime().availableProcessors() * 4;
     protected final int NUM_OPERATIONS = AccessMethodTestsConfig.LSM_INVINDEX_MULTITHREAD_NUM_OPERATIONS;
-    
-    private final LSMInvertedIndexTestHarness harness = new LSMInvertedIndexTestHarness();
-    private final LSMInvertedIndexWorkerFactory workerFactory = new LSMInvertedIndexWorkerFactory();
-    private final ArrayList<TestWorkloadConf> workloadConfs = getTestWorkloadConf();
-    
+
+    protected final LSMInvertedIndexTestHarness harness = new LSMInvertedIndexTestHarness();
+    protected final LSMInvertedIndexWorkerFactory workerFactory = new LSMInvertedIndexWorkerFactory();
+    protected final ArrayList<TestWorkloadConf> workloadConfs = getTestWorkloadConf();
+
     protected void setUp() throws HyracksException {
         harness.setUp();
     }
@@ -58,8 +58,8 @@
         harness.tearDown();
     }
 
-    protected void runTest(LSMInvertedIndexTestContext testCtx, TupleGenerator tupleGen, int numThreads, TestWorkloadConf conf,
-            String dataMsg) throws InterruptedException, TreeIndexException, HyracksException {
+    protected void runTest(LSMInvertedIndexTestContext testCtx, TupleGenerator tupleGen, int numThreads,
+            TestWorkloadConf conf, String dataMsg) throws InterruptedException, TreeIndexException, HyracksException {
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("LSMInvertedIndex MultiThread Test:\nData: " + dataMsg + "; Threads: " + numThreads
                     + "; Workload: " + conf.toString() + ".");
@@ -123,23 +123,23 @@
 
         return workloadConfs;
     }
-    
+
     @Test
     public void wordTokensInvIndexTest() throws IOException, IndexException, InterruptedException {
         String dataMsg = "Documents";
         int[] numThreads = new int[] { REGULAR_NUM_THREADS, EXCESSIVE_NUM_THREADS };
         for (int i = 0; i < numThreads.length; i++) {
-            for (TestWorkloadConf conf : workloadConfs) {                
+            for (TestWorkloadConf conf : workloadConfs) {
                 setUp();
                 LSMInvertedIndexTestContext testCtx = LSMInvertedIndexTestUtils.createWordInvIndexTestContext(harness,
-                        InvertedIndexType.LSM);
+                        getIndexType());
                 TupleGenerator tupleGen = LSMInvertedIndexTestUtils.createStringDocumentTupleGen(harness.getRandom());
                 runTest(testCtx, tupleGen, numThreads[i], conf, dataMsg);
                 tearDown();
             }
         }
     }
-    
+
     @Test
     public void hashedNGramTokensInvIndexTest() throws IOException, IndexException, InterruptedException {
         String dataMsg = "Person Names";
@@ -148,11 +148,15 @@
             for (TestWorkloadConf conf : workloadConfs) {
                 setUp();
                 LSMInvertedIndexTestContext testCtx = LSMInvertedIndexTestUtils.createHashedNGramInvIndexTestContext(
-                        harness, InvertedIndexType.LSM);
+                        harness, getIndexType());
                 TupleGenerator tupleGen = LSMInvertedIndexTestUtils.createPersonNamesTupleGen(harness.getRandom());
                 runTest(testCtx, tupleGen, numThreads[i], conf, dataMsg);
                 tearDown();
             }
         }
     }
+
+    protected InvertedIndexType getIndexType() {
+        return InvertedIndexType.LSM;
+    }
 }
diff --git a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/multithread/PartitionedLSMInvertedIndexMultiThreadTest.java b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/multithread/PartitionedLSMInvertedIndexMultiThreadTest.java
new file mode 100644
index 0000000..1adaf61
--- /dev/null
+++ b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/multithread/PartitionedLSMInvertedIndexMultiThreadTest.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.multithread;
+
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.LSMInvertedIndexTestContext.InvertedIndexType;
+
+public class PartitionedLSMInvertedIndexMultiThreadTest extends LSMInvertedIndexMultiThreadTest {
+
+    protected InvertedIndexType getIndexType() {
+        return InvertedIndexType.PARTITIONED_LSM;
+    }
+}
diff --git a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndexBulkLoadTest.java b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndexBulkLoadTest.java
new file mode 100644
index 0000000..f641630
--- /dev/null
+++ b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndexBulkLoadTest.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk;
+
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.common.AbstractInvertedIndexLoadTest;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.LSMInvertedIndexTestContext.InvertedIndexType;
+
+public class PartitionedOnDiskInvertedIndexBulkLoadTest extends AbstractInvertedIndexLoadTest {
+
+    public PartitionedOnDiskInvertedIndexBulkLoadTest() {
+        super(InvertedIndexType.PARTITIONED_ONDISK, true, 1);
+    }
+}
diff --git a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndexSearchTest.java b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndexSearchTest.java
new file mode 100644
index 0000000..4fa25ed
--- /dev/null
+++ b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/PartitionedOnDiskInvertedIndexSearchTest.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk;
+
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.common.AbstractInvertedIndexSearchTest;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.LSMInvertedIndexTestContext.InvertedIndexType;
+
+public class PartitionedOnDiskInvertedIndexSearchTest extends AbstractInvertedIndexSearchTest {
+
+    public PartitionedOnDiskInvertedIndexSearchTest() {
+        super(InvertedIndexType.PARTITIONED_ONDISK, true);
+    }
+}
diff --git a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestContext.java b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestContext.java
index 34b11c7..22c010d 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestContext.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestContext.java
@@ -45,23 +45,28 @@
     public static enum InvertedIndexType {
         INMEMORY,
         ONDISK,
-        LSM
+        LSM,
+        PARTITIONED_INMEMORY,
+        PARTITIONED_ONDISK,
+        PARTITIONED_LSM
     };
 
     protected IInvertedIndex invIndex;
     protected IBinaryComparatorFactory[] allCmpFactories;
     protected IBinaryTokenizerFactory tokenizerFactory;
+    protected InvertedIndexType invIndexType;
     protected InvertedIndexTokenizingTupleIterator indexTupleIter;
     protected HashSet<Comparable> allTokens = new HashSet<Comparable>();
     protected List<ITupleReference> documentCorpus = new ArrayList<ITupleReference>();
     
     public LSMInvertedIndexTestContext(ISerializerDeserializer[] fieldSerdes, IIndex index,
-            IBinaryTokenizerFactory tokenizerFactory) {
+            IBinaryTokenizerFactory tokenizerFactory, InvertedIndexType invIndexType,
+            InvertedIndexTokenizingTupleIterator indexTupleIter) {
         super(fieldSerdes, index);
-        this.tokenizerFactory = tokenizerFactory;
         invIndex = (IInvertedIndex) index;
-        indexTupleIter = new InvertedIndexTokenizingTupleIterator(invIndex.getTokenTypeTraits().length,
-                invIndex.getInvListTypeTraits().length, tokenizerFactory.createTokenizer());
+        this.tokenizerFactory = tokenizerFactory;
+        this.invIndexType = invIndexType;
+        this.indexTupleIter = indexTupleIter;
     }
 
     @Override
@@ -88,8 +93,9 @@
         return allCmpFactories;
     }
 
-    public static LSMInvertedIndexTestContext create(LSMInvertedIndexTestHarness harness, ISerializerDeserializer[] fieldSerdes,
-            int tokenFieldCount, IBinaryTokenizerFactory tokenizerFactory, InvertedIndexType invIndexType) throws IndexException {
+    public static LSMInvertedIndexTestContext create(LSMInvertedIndexTestHarness harness,
+            ISerializerDeserializer[] fieldSerdes, int tokenFieldCount, IBinaryTokenizerFactory tokenizerFactory,
+            InvertedIndexType invIndexType) throws IndexException {
         ITypeTraits[] allTypeTraits = SerdeUtils.serdesToTypeTraits(fieldSerdes);
         IBinaryComparatorFactory[] allCmpFactories = SerdeUtils.serdesToComparatorFactories(fieldSerdes,
                 fieldSerdes.length);
@@ -108,7 +114,7 @@
             invListTypeTraits[i] = allTypeTraits[i + tokenFieldCount];
             invListCmpFactories[i] = allCmpFactories[i + tokenFieldCount];
         }
-        // Create index and test context.
+        // Create index and test context.        
         IInvertedIndex invIndex;
         switch (invIndexType) {
             case INMEMORY: {
@@ -117,12 +123,24 @@
                         tokenCmpFactories, tokenizerFactory);
                 break;
             }
+            case PARTITIONED_INMEMORY: {
+                invIndex = InvertedIndexUtils.createPartitionedInMemoryBTreeInvertedindex(harness.getMemBufferCache(),
+                        harness.getMemFreePageManager(), invListTypeTraits, invListCmpFactories, tokenTypeTraits,
+                        tokenCmpFactories, tokenizerFactory);
+                break;
+            }
             case ONDISK: {
                 invIndex = InvertedIndexUtils.createOnDiskInvertedIndex(harness.getDiskBufferCache(),
                         harness.getDiskFileMapProvider(), invListTypeTraits, invListCmpFactories, tokenTypeTraits,
                         tokenCmpFactories, harness.getInvListsFileRef());
                 break;
             }
+            case PARTITIONED_ONDISK: {
+                invIndex = InvertedIndexUtils.createPartitionedOnDiskInvertedIndex(harness.getDiskBufferCache(),
+                        harness.getDiskFileMapProvider(), invListTypeTraits, invListCmpFactories, tokenTypeTraits,
+                        tokenCmpFactories, harness.getInvListsFileRef());
+                break;
+            }
             case LSM: {
                 invIndex = InvertedIndexUtils.createLSMInvertedIndex(harness.getMemBufferCache(),
                         harness.getMemFreePageManager(), harness.getDiskFileMapProvider(), invListTypeTraits,
@@ -132,11 +150,42 @@
                         harness.getIOScheduler());
                 break;
             }
+            case PARTITIONED_LSM: {
+                invIndex = InvertedIndexUtils.createPartitionedLSMInvertedIndex(harness.getMemBufferCache(),
+                        harness.getMemFreePageManager(), harness.getDiskFileMapProvider(), invListTypeTraits,
+                        invListCmpFactories, tokenTypeTraits, tokenCmpFactories, tokenizerFactory,
+                        harness.getDiskBufferCache(), harness.getIOManager(), harness.getOnDiskDir(),
+                        harness.getFlushController(), harness.getMergePolicy(), harness.getOperationTrackerFactory(),
+                        harness.getIOScheduler());
+                break;
+            }
             default: {
                 throw new InvertedIndexException("Unknow inverted-index type '" + invIndexType + "'.");
             }
         }
-        LSMInvertedIndexTestContext testCtx = new LSMInvertedIndexTestContext(fieldSerdes, invIndex, tokenizerFactory);
+        InvertedIndexTokenizingTupleIterator indexTupleIter = null;
+        switch (invIndexType) {
+            case INMEMORY:
+            case ONDISK:
+            case LSM: {
+                indexTupleIter = new InvertedIndexTokenizingTupleIterator(invIndex.getTokenTypeTraits().length,
+                        invIndex.getInvListTypeTraits().length, tokenizerFactory.createTokenizer());
+                break;
+            }
+            case PARTITIONED_INMEMORY:
+            case PARTITIONED_ONDISK:
+            case PARTITIONED_LSM: {
+                indexTupleIter = new PartitionedInvertedIndexTokenizingTupleIterator(
+                        invIndex.getTokenTypeTraits().length, invIndex.getInvListTypeTraits().length,
+                        tokenizerFactory.createTokenizer());
+                break;
+            }
+            default: {
+                throw new InvertedIndexException("Unknow inverted-index type '" + invIndexType + "'.");
+            }
+        }
+        LSMInvertedIndexTestContext testCtx = new LSMInvertedIndexTestContext(fieldSerdes, invIndex, tokenizerFactory,
+                invIndexType, indexTupleIter);
         return testCtx;
     }
 
@@ -192,4 +241,8 @@
     public List<ITupleReference> getDocumentCorpus() {
         return documentCorpus;
     }
+    
+    public InvertedIndexType getInvertedIndexType() {
+        return invIndexType;
+    }
 }
diff --git a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestUtils.java b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestUtils.java
index 68824f5..3bd8129 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestUtils.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/util/LSMInvertedIndexTestUtils.java
@@ -40,6 +40,7 @@
 import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleReference;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.ShortSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import edu.uci.ics.hyracks.storage.am.btree.OrderedIndexTestUtils;
 import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
@@ -99,57 +100,105 @@
         return tupleGen;
     }
 
+    private static ISerializerDeserializer[] getNonHashedIndexFieldSerdes(InvertedIndexType invIndexType)
+            throws IndexException {
+        ISerializerDeserializer[] fieldSerdes = null;
+        switch (invIndexType) {
+            case INMEMORY:
+            case ONDISK:
+            case LSM: {
+                fieldSerdes = new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE };
+                break;
+            }
+            case PARTITIONED_INMEMORY:
+            case PARTITIONED_ONDISK:
+            case PARTITIONED_LSM: {
+                // Such indexes also include the set-size for partitioning.
+                fieldSerdes = new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE,
+                        ShortSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE };
+                break;
+            }
+            default: {
+                throw new IndexException("Unhandled inverted index type '" + invIndexType + "'.");
+            }
+        }
+        return fieldSerdes;
+    }
+
+    private static ISerializerDeserializer[] getHashedIndexFieldSerdes(InvertedIndexType invIndexType)
+            throws IndexException {
+        ISerializerDeserializer[] fieldSerdes = null;
+        switch (invIndexType) {
+            case INMEMORY:
+            case ONDISK:
+            case LSM: {
+                fieldSerdes = new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE };
+                break;
+            }
+            case PARTITIONED_INMEMORY:
+            case PARTITIONED_ONDISK:
+            case PARTITIONED_LSM: {
+                // Such indexes also include the set-size for partitioning.
+                fieldSerdes = new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE,
+                        ShortSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE };
+                break;
+            }
+            default: {
+                throw new IndexException("Unhandled inverted index type '" + invIndexType + "'.");
+            }
+        }
+        return fieldSerdes;
+    }
+
     public static LSMInvertedIndexTestContext createWordInvIndexTestContext(LSMInvertedIndexTestHarness harness,
             InvertedIndexType invIndexType) throws IOException, IndexException {
-        ISerializerDeserializer[] fieldSerdes = new ISerializerDeserializer[] {
-                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE };
+        ISerializerDeserializer[] fieldSerdes = getNonHashedIndexFieldSerdes(invIndexType);
         ITokenFactory tokenFactory = new UTF8WordTokenFactory();
         IBinaryTokenizerFactory tokenizerFactory = new DelimitedUTF8StringBinaryTokenizerFactory(true, false,
                 tokenFactory);
-        LSMInvertedIndexTestContext testCtx = LSMInvertedIndexTestContext.create(harness, fieldSerdes, 1, tokenizerFactory,
-                invIndexType);
+        LSMInvertedIndexTestContext testCtx = LSMInvertedIndexTestContext.create(harness, fieldSerdes,
+                fieldSerdes.length - 1, tokenizerFactory, invIndexType);
         return testCtx;
     }
 
     public static LSMInvertedIndexTestContext createHashedWordInvIndexTestContext(LSMInvertedIndexTestHarness harness,
             InvertedIndexType invIndexType) throws IOException, IndexException {
-        ISerializerDeserializer[] fieldSerdes = new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE,
-                IntegerSerializerDeserializer.INSTANCE };
+        ISerializerDeserializer[] fieldSerdes = getHashedIndexFieldSerdes(invIndexType);
         ITokenFactory tokenFactory = new HashedUTF8WordTokenFactory();
         IBinaryTokenizerFactory tokenizerFactory = new DelimitedUTF8StringBinaryTokenizerFactory(true, false,
                 tokenFactory);
-        LSMInvertedIndexTestContext testCtx = LSMInvertedIndexTestContext.create(harness, fieldSerdes, 1, tokenizerFactory,
-                invIndexType);
+        LSMInvertedIndexTestContext testCtx = LSMInvertedIndexTestContext.create(harness, fieldSerdes,
+                fieldSerdes.length - 1, tokenizerFactory, invIndexType);
         return testCtx;
     }
 
     public static LSMInvertedIndexTestContext createNGramInvIndexTestContext(LSMInvertedIndexTestHarness harness,
             InvertedIndexType invIndexType) throws IOException, IndexException {
-        ISerializerDeserializer[] fieldSerdes = new ISerializerDeserializer[] {
-                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE };
+        ISerializerDeserializer[] fieldSerdes = getNonHashedIndexFieldSerdes(invIndexType);
         ITokenFactory tokenFactory = new UTF8NGramTokenFactory();
         IBinaryTokenizerFactory tokenizerFactory = new NGramUTF8StringBinaryTokenizerFactory(TEST_GRAM_LENGTH, true,
                 true, false, tokenFactory);
-        LSMInvertedIndexTestContext testCtx = LSMInvertedIndexTestContext.create(harness, fieldSerdes, 1, tokenizerFactory,
-                invIndexType);
+        LSMInvertedIndexTestContext testCtx = LSMInvertedIndexTestContext.create(harness, fieldSerdes,
+                fieldSerdes.length - 1, tokenizerFactory, invIndexType);
         return testCtx;
     }
 
     public static LSMInvertedIndexTestContext createHashedNGramInvIndexTestContext(LSMInvertedIndexTestHarness harness,
             InvertedIndexType invIndexType) throws IOException, IndexException {
-        ISerializerDeserializer[] fieldSerdes = new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE,
-                IntegerSerializerDeserializer.INSTANCE };
+        ISerializerDeserializer[] fieldSerdes = getHashedIndexFieldSerdes(invIndexType);
         ITokenFactory tokenFactory = new HashedUTF8NGramTokenFactory();
         IBinaryTokenizerFactory tokenizerFactory = new NGramUTF8StringBinaryTokenizerFactory(TEST_GRAM_LENGTH, true,
                 true, false, tokenFactory);
-        LSMInvertedIndexTestContext testCtx = LSMInvertedIndexTestContext.create(harness, fieldSerdes, 1, tokenizerFactory,
-                invIndexType);
+        LSMInvertedIndexTestContext testCtx = LSMInvertedIndexTestContext.create(harness, fieldSerdes,
+                fieldSerdes.length - 1, tokenizerFactory, invIndexType);
         return testCtx;
     }
 
     public static void bulkLoadInvIndex(LSMInvertedIndexTestContext testCtx, TupleGenerator tupleGen, int numDocs)
             throws IndexException, IOException {
-        SortedSet<CheckTuple> tmpMemIndex = new TreeSet<CheckTuple>();;
+        SortedSet<CheckTuple> tmpMemIndex = new TreeSet<CheckTuple>();
         // First generate the expected index by inserting the documents one-by-one.
         for (int i = 0; i < numDocs; i++) {
             ITupleReference tuple = tupleGen.next();
@@ -254,8 +303,8 @@
      * Compares actual and expected indexes by comparing their inverted-lists one by one. Exercises the openInvertedListCursor() method of the inverted-index accessor.
      */
     @SuppressWarnings("unchecked")
-    public static void compareActualAndExpectedIndexes(LSMInvertedIndexTestContext testCtx) throws HyracksDataException,
-            IndexException {
+    public static void compareActualAndExpectedIndexes(LSMInvertedIndexTestContext testCtx)
+            throws HyracksDataException, IndexException {
         IInvertedIndex invIndex = (IInvertedIndex) testCtx.getIndex();
         ISerializerDeserializer[] fieldSerdes = testCtx.getFieldSerdes();
         MultiComparator invListCmp = MultiComparator.create(invIndex.getInvListCmpFactories());
@@ -330,10 +379,34 @@
     /**
      * Determine the expected results with the simple ScanCount algorithm.
      */
+    public static void getExpectedResults(int[] scanCountArray, TreeSet<CheckTuple> checkTuples,
+            ITupleReference searchDocument, IBinaryTokenizer tokenizer, ISerializerDeserializer tokenSerde,
+            IInvertedIndexSearchModifier searchModifier, List<Integer> expectedResults, InvertedIndexType invIndexType)
+            throws IOException {
+        boolean isPartitioned = false;
+        switch (invIndexType) {
+            case INMEMORY:
+            case ONDISK:
+            case LSM: {
+                isPartitioned = false;
+                break;
+            }
+            case PARTITIONED_INMEMORY:
+            case PARTITIONED_ONDISK:
+            case PARTITIONED_LSM: {
+                isPartitioned = true;
+                break;
+            }
+        }
+        getExpectedResults(scanCountArray, checkTuples, searchDocument, tokenizer, tokenSerde, searchModifier,
+                expectedResults, isPartitioned);
+    }
+
     @SuppressWarnings("unchecked")
     public static void getExpectedResults(int[] scanCountArray, TreeSet<CheckTuple> checkTuples,
             ITupleReference searchDocument, IBinaryTokenizer tokenizer, ISerializerDeserializer tokenSerde,
-            IInvertedIndexSearchModifier searchModifier, List<Integer> expectedResults) throws IOException {
+            IInvertedIndexSearchModifier searchModifier, List<Integer> expectedResults, boolean isPartitioned)
+            throws IOException {
         // Reset scan count array.
         Arrays.fill(scanCountArray, 0);
         expectedResults.clear();
@@ -341,9 +414,25 @@
         ByteArrayAccessibleOutputStream baaos = new ByteArrayAccessibleOutputStream();
         tokenizer.reset(searchDocument.getFieldData(0), searchDocument.getFieldStart(0),
                 searchDocument.getFieldLength(0));
+        // Run though tokenizer to get number of tokens.
         int numQueryTokens = 0;
         while (tokenizer.hasNext()) {
             tokenizer.next();
+            numQueryTokens++;
+        }
+        short numTokensLowerBound = -1;
+        short numTokensUpperBound = -1;
+        int invListElementField = 1;
+        if (isPartitioned) {
+            numTokensLowerBound = searchModifier.getNumTokensLowerBound((short) numQueryTokens);
+            numTokensUpperBound = searchModifier.getNumTokensUpperBound((short) numQueryTokens);
+            invListElementField = 2;
+        }
+        int occurrenceThreshold = searchModifier.getOccurrenceThreshold(numQueryTokens);
+        tokenizer.reset(searchDocument.getFieldData(0), searchDocument.getFieldStart(0),
+                searchDocument.getFieldLength(0));
+        while (tokenizer.hasNext()) {
+            tokenizer.next();
             IToken token = tokenizer.getToken();
             baaos.reset();
             DataOutput out = new DataOutputStream(baaos);
@@ -351,10 +440,28 @@
             ByteArrayInputStream inStream = new ByteArrayInputStream(baaos.getByteArray(), 0, baaos.size());
             DataInput dataIn = new DataInputStream(inStream);
             Comparable tokenObj = (Comparable) tokenSerde.deserialize(dataIn);
-            CheckTuple lowKey = new CheckTuple(1, 1);
-            lowKey.appendField(tokenObj);
-            CheckTuple highKey = new CheckTuple(1, 1);
-            highKey.appendField(tokenObj);
+            CheckTuple lowKey;
+            if (numTokensLowerBound < 0) {
+                // Index is not partitioned, or no length filtering is possible for this search modifier.
+                lowKey = new CheckTuple(1, 1);
+                lowKey.appendField(tokenObj);
+            } else {
+                // Index is length partitioned, and search modifier supports length filtering.
+                lowKey = new CheckTuple(2, 2);
+                lowKey.appendField(tokenObj);
+                lowKey.appendField(Short.valueOf(numTokensLowerBound));
+            }
+            CheckTuple highKey;
+            if (numTokensUpperBound < 0) {
+                // Index is not partitioned, or no length filtering is possible for this search modifier.
+                highKey = new CheckTuple(1, 1);
+                highKey.appendField(tokenObj);
+            } else {
+                // Index is length partitioned, and search modifier supports length filtering.
+                highKey = new CheckTuple(2, 2);
+                highKey.appendField(tokenObj);
+                highKey.appendField(Short.valueOf(numTokensUpperBound));
+            }
 
             // Get view over check tuples containing inverted-list corresponding to token. 
             SortedSet<CheckTuple> invList = OrderedIndexTestUtils.getPrefixExpectedSubset(checkTuples, lowKey, highKey);
@@ -362,13 +469,11 @@
             // Iterate over inverted list and update scan count array.
             while (invListIter.hasNext()) {
                 CheckTuple checkTuple = invListIter.next();
-                Integer element = (Integer) checkTuple.getField(1);
+                Integer element = (Integer) checkTuple.getField(invListElementField);
                 scanCountArray[element]++;
             }
-            numQueryTokens++;
         }
 
-        int occurrenceThreshold = searchModifier.getOccurrenceThreshold(numQueryTokens);
         // Run through scan count array, and see whether elements satisfy the given occurrence threshold.
         expectedResults.clear();
         for (int i = 0; i < scanCountArray.length; i++) {
@@ -394,7 +499,7 @@
         IIndexCursor resultCursor = accessor.createSearchCursor();
         int numQueries = numDocQueries + numRandomQueries;
         for (int i = 0; i < numQueries; i++) {
-            // If number of documents in the corpus io less than numDocQueries, then replace the remaining ones with random queries.
+            // If number of documents in the corpus is less than numDocQueries, then replace the remaining ones with random queries.
             if (i >= numDocQueries || i >= documentCorpus.size()) {
                 // Generate a random query.
                 ITupleReference randomQuery = tupleGen.next();
@@ -438,8 +543,9 @@
 
                     // Get expected results.
                     List<Integer> expectedResults = new ArrayList<Integer>();
-                    LSMInvertedIndexTestUtils.getExpectedResults(scanCountArray, testCtx.getCheckTuples(), searchDocument,
-                            tokenizer, testCtx.getFieldSerdes()[0], searchModifier, expectedResults);
+                    LSMInvertedIndexTestUtils.getExpectedResults(scanCountArray, testCtx.getCheckTuples(),
+                            searchDocument, tokenizer, testCtx.getFieldSerdes()[0], searchModifier, expectedResults,
+                            testCtx.getInvertedIndexType());
 
                     Iterator<Integer> expectedIter = expectedResults.iterator();
                     Iterator<Integer> actualIter = actualResults.iterator();