Moved files.

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_inverted_index_updates_new@1815 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/InvertedIndexComponentFinalizer.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/InvertedIndexComponentFinalizer.java
new file mode 100644
index 0000000..8c00d90
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/InvertedIndexComponentFinalizer.java
@@ -0,0 +1,107 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrame;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponentFinalizer;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.OnDiskInvertedIndex;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
+import edu.uci.ics.hyracks.storage.common.file.BufferedFileHandle;
+import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
+
+public class InvertedIndexComponentFinalizer implements ILSMComponentFinalizer {
+    
+    protected final IFileMapProvider fileMapProvider;
+    
+    public InvertedIndexComponentFinalizer(IFileMapProvider fileMapProvider) {
+        this.fileMapProvider = fileMapProvider;
+    }
+    
+    @Override
+    public boolean isValid(Object lsmComponent) throws HyracksDataException {
+        OnDiskInvertedIndex index = (OnDiskInvertedIndex) lsmComponent;
+        ITreeIndex treeIndex = index.getBTree();
+        IBufferCache bufferCache = treeIndex.getBufferCache();
+        FileReference fileRef = new FileReference(file);
+        bufferCache.createFile(fileRef);
+        int fileId = fileMapProvider.lookupFileId(fileRef);
+        bufferCache.openFile(fileId);
+        treeIndex.open(fileId);
+        try {
+            int metadataPage = treeIndex.getFreePageManager().getFirstMetadataPage();
+            ITreeIndexMetaDataFrame metadataFrame = treeIndex.getFreePageManager().getMetaDataFrameFactory().createFrame();
+            ICachedPage page = bufferCache.pin(BufferedFileHandle.getDiskPageId(treeIndex.getFileId(), metadataPage), false);
+            page.acquireReadLatch();
+            try {
+                metadataFrame.setPage(page);
+                return metadataFrame.isValid();
+            } finally {
+                page.releaseReadLatch();
+                bufferCache.unpin(page);
+            }
+        } finally {
+            treeIndex.close();
+            bufferCache.closeFile(fileId);
+            bufferCache.deleteFile(fileId, false);
+        }
+    }
+
+    @Override
+    public void finalize(Object lsmComponent) throws HyracksDataException {
+        OnDiskInvertedIndex index = (OnDiskInvertedIndex) lsmComponent;
+        ITreeIndex treeIndex = index.getBTree();
+        int fileId = treeIndex.getFileId();
+        IBufferCache bufferCache = treeIndex.getBufferCache();
+        // Flush all dirty pages of the tree. 
+        // By default, metadata and data are flushed async in the buffercache.
+        // This means that the flush issues writes to the OS, but the data may still lie in filesystem buffers.
+        ITreeIndexMetaDataFrame metadataFrame = treeIndex.getFreePageManager().getMetaDataFrameFactory().createFrame();        
+        int startPage = 0;
+        int maxPage = treeIndex.getFreePageManager().getMaxPage(metadataFrame);
+        for (int i = startPage; i <= maxPage; i++) {
+            ICachedPage page = bufferCache.tryPin(BufferedFileHandle.getDiskPageId(fileId, i));
+            // If tryPin returns null, it means the page is not cached, and therefore cannot be dirty.
+            if (page == null) {
+                continue;
+            }
+            try {
+                bufferCache.flushDirtyPage(page);
+            } finally {
+                bufferCache.unpin(page);
+            }
+        }
+        // Forces all pages of given file to disk. This guarantees the data makes it to disk.
+        bufferCache.force(fileId, true);
+        int metadataPageId = treeIndex.getFreePageManager().getFirstMetadataPage();
+        ICachedPage metadataPage = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, metadataPageId), false);
+        metadataPage.acquireWriteLatch();
+        try {
+            metadataFrame.setPage(metadataPage);
+            metadataFrame.setValid(true);
+            // Flush the single modified page to disk.
+            bufferCache.flushDirtyPage(metadataPage);
+            // Force modified metadata page to disk.
+            bufferCache.force(fileId, true);
+        } finally {
+            metadataPage.releaseWriteLatch();
+            bufferCache.unpin(metadataPage);
+        }
+    }
+}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
new file mode 100644
index 0000000..784aa35
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
@@ -0,0 +1,583 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
+import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexBulkLoadContext;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexBulkLoader;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexOpContext;
+import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexType;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponentFinalizer;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFileManager;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushController;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMHarness;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndexFileManager.LSMInvertedFileNameComponent;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.inmemory.InMemoryInvertedIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.OnDiskInvertedIndexFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.OnDiskInvertedIndex;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
+
+public class LSMInvertedIndex implements ILSMIndex, IIndex {
+	private final Logger LOGGER = Logger.getLogger(LSMInvertedIndex.class.getName());
+	
+	public class LSMInvertedIndexComponent {
+        private final IIndex invIndex;
+        private final BTree deleteKeysBTree;
+
+        LSMInvertedIndexComponent(IIndex invIndex, BTree deleteKeysBTree) {
+            this.invIndex = invIndex;
+            this.deleteKeysBTree = deleteKeysBTree;
+        }
+
+        public IIndex getInvIndex() {
+            return invIndex;
+        }
+
+        public BTree getDeletedKeysBTree() {
+            return deleteKeysBTree;
+        }
+    }
+	
+    protected final LSMHarness lsmHarness;
+    
+    // In-memory components.
+    protected final LSMInvertedIndexComponent memComponent;
+    protected final IBufferCache memBufferCache;
+    protected final InMemoryFreePageManager memFreePageManager;
+
+    // On-disk components.
+    protected final ILSMFileManager fileManager;
+    // For creating inverted indexes in flush and merge.
+    protected final OnDiskInvertedIndexFactory diskInvIndexFactory;
+    protected final IBufferCache diskBufferCache;
+    protected final IFileMapProvider diskFileMapProvider;
+    // List of LSMInvertedIndexComponent instances. Using Object for better sharing via
+    // ILSMIndex + LSMHarness.
+    protected final LinkedList<Object> diskComponents = new LinkedList<Object>();
+    // Helps to guarantees physical consistency of LSM components.
+    protected final ILSMComponentFinalizer componentFinalizer;
+    
+    // Type traits and comparators for tokens and inverted-list elements.
+    protected final ITypeTraits[] invListTypeTraits;
+    protected final IBinaryComparatorFactory[] invListCmpFactories;
+    protected final ITypeTraits[] tokenTypeTraits;
+    protected final IBinaryComparatorFactory[] tokenCmpFactories;
+    
+    private boolean isActivated = false;
+
+    public LSMInvertedIndex(IBufferCache memBufferCache, InMemoryFreePageManager memFreePageManager, OnDiskInvertedIndexFactory diskInvIndexFactory,
+            ILSMFileManager fileManager, IFileMapProvider diskFileMapProvider, ITypeTraits[] invListTypeTraits,
+            IBinaryComparatorFactory[] invListCmpFactories, ITypeTraits[] tokenTypeTraits,
+            IBinaryComparatorFactory[] tokenCmpFactories, ILSMFlushController flushController,
+            ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, ILSMIOOperationScheduler ioScheduler) {
+        // TODO: Finish this one properly.
+        InMemoryInvertedIndex memInvIndex = null;
+        BTree deleteKeysBTree = null;
+        memComponent = new LSMInvertedIndexComponent(memInvIndex, deleteKeysBTree);
+        this.memBufferCache = memBufferCache;
+        this.memFreePageManager = memFreePageManager;
+        this.fileManager = fileManager;
+        this.diskInvIndexFactory = diskInvIndexFactory;        
+        this.diskBufferCache = diskInvIndexFactory.getBufferCache();
+        this.diskFileMapProvider = diskFileMapProvider;
+        this.invListTypeTraits = invListTypeTraits;
+        this.invListCmpFactories = invListCmpFactories;
+        this.tokenTypeTraits = tokenTypeTraits;
+        this.tokenCmpFactories = tokenCmpFactories;
+        
+        this.lsmHarness = new LSMHarness(this, flushController, mergePolicy, opTracker, ioScheduler);
+        this.componentFinalizer = new InvertedIndexComponentFinalizer(diskFileMapProvider);
+    }
+
+    @Override
+    public synchronized void create(int indexFileId) throws HyracksDataException {
+        if (isActivated) {
+            return;
+        }
+        
+        // TODO: What else is needed here?
+        memoryInvertedIndex.create(indexFileId);
+        fileManager.createDirs();
+    }
+
+    @Override
+    public void open(int indexFileId) throws HyracksDataException {
+        synchronized (this) {
+            if (isOpen)
+                return;
+
+            isOpen = true;
+            memoryInvertedIndex.open(indexFileId);
+            // TODO: What else is needed here?
+            // ...
+        }
+
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        synchronized (this) {
+            if (!isOpen) {
+                return;
+            }
+            // TODO: What else is needed here?
+            // ...
+            memoryInvertedIndex.close();
+            isOpen = false;
+        }
+    }
+
+    public IIndexAccessor createAccessor() {
+        return new LSMInvertedIndexAccessor(lsmHarness, createOpContext());
+    }
+
+    private LSMInvertedIndexOpContext createOpContext() {
+        return new LSMInvertedIndexOpContext(memoryInvertedIndex);
+    }
+
+    @Override
+    public IIndexBulkLoadContext beginBulkLoad(float fillFactor) throws IndexException, HyracksDataException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public void bulkLoadAddTuple(ITupleReference tuple, IIndexBulkLoadContext ictx) throws HyracksDataException {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void endBulkLoad(IIndexBulkLoadContext ictx) throws HyracksDataException {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public IBufferCache getBufferCache() {
+        return diskBufferCache;
+    }
+
+    @Override
+    public IndexType getIndexType() {
+        return IndexType.INVERTED;
+    }
+
+    public boolean insertUpdateOrDelete(ITupleReference tuple, IIndexOpContext ictx) throws HyracksDataException,
+            IndexException {
+        // TODO: Only insert is supported for now. Will need the context for later when update and delete 
+        // are also supported.
+        LSMInvertedIndexOpContext ctx = (LSMInvertedIndexOpContext) ictx;
+        memAccessor.insert(tuple);
+
+        return true;
+    }
+
+    @Override
+    public void search(IIndexCursor cursor, List<Object> diskComponents, ISearchPredicate pred, IIndexOpContext ictx,
+            boolean includeMemComponent, AtomicInteger searcherRefCount) throws HyracksDataException, IndexException {
+        IIndexAccessor componentAccessor;
+
+        // Over-provision by 1 if includeMemComponent == false, but that's okay!
+        ArrayList<IIndexAccessor> indexAccessors = new ArrayList<IIndexAccessor>(diskComponents.size() + 1);
+
+        if (includeMemComponent) {
+            componentAccessor = memoryInvertedIndex.createAccessor();
+            indexAccessors.add(componentAccessor);
+        }
+
+        for (int i = 0; i < diskComponents.size(); i++) {
+            componentAccessor = ((IInvertedIndex) diskComponents.get(i)).createAccessor();
+            indexAccessors.add(componentAccessor);
+        }
+
+        LSMInvertedIndexCursorInitialState initState = new LSMInvertedIndexCursorInitialState(indexAccessors, ictx,
+                includeMemComponent, searcherRefCount, lsmHarness);
+        LSMInvertedIndexSearchCursor lsmCursor = (LSMInvertedIndexSearchCursor) cursor;
+        lsmCursor.open(initState, pred);
+    }
+
+    public void mergeSearch(IIndexCursor cursor, List<Object> diskComponents, ISearchPredicate pred,
+            IIndexOpContext ictx, boolean includeMemComponent, AtomicInteger searcherRefCount)
+            throws HyracksDataException, IndexException {
+        IIndexAccessor componentAccessor;
+
+        // Over-provision by 1 if includeMemComponent == false, but that's okay!
+        ArrayList<IIndexAccessor> indexAccessors = new ArrayList<IIndexAccessor>(diskComponents.size() + 1);
+
+        if (includeMemComponent) {
+            componentAccessor = memoryInvertedIndex.createAccessor();
+            indexAccessors.add(componentAccessor);
+        }
+
+        for (int i = 0; i < diskComponents.size(); i++) {
+            componentAccessor = ((IInvertedIndex) diskComponents.get(i)).createAccessor();
+            indexAccessors.add(componentAccessor);
+        }
+
+        LSMInvertedIndexCursorInitialState initState = new LSMInvertedIndexCursorInitialState(indexAccessors, ictx,
+                includeMemComponent, searcherRefCount, lsmHarness);
+        LSMInvertedIndexRangeSearchCursor rangeSearchCursor = (LSMInvertedIndexRangeSearchCursor) cursor;
+        rangeSearchCursor.open(initState, pred);
+    }
+
+    @Override
+    public Object merge(List<Object> mergedComponents) throws HyracksDataException, IndexException {
+        LSMInvertedIndexOpContext ctx = createOpContext();
+
+        IIndexCursor cursor = new LSMInvertedIndexRangeSearchCursor();
+        RangePredicate mergePred = new RangePredicate(null, null, true, true, null, null);
+
+        //Scan diskInvertedIndexes ignoring the memoryInvertedIndex.
+        List<Object> mergingComponents = lsmHarness.mergeSearch(cursor, mergePred, ctx, false);
+        mergedComponents.addAll(mergingComponents);
+
+        // Nothing to merge.
+        if (mergedComponents.size() <= 1) {
+            cursor.close();
+            return null;
+        }
+
+        // Bulk load the tuples from all diskInvertedIndexes into the new diskInvertedIndex.
+        LSMInvertedFileNameComponent fNameComponent = getMergeTargetFileName(mergedComponents);
+        BTree diskBTree = createDiskBTree(fileManager.createMergeFile(fNameComponent.getBTreeFileName()), true);
+        //    - Create an InvertedIndex instance
+        OnDiskInvertedIndex mergedDiskInvertedIndex = createDiskInvertedIndex(
+                fileManager.createMergeFile(fNameComponent.getInvertedFileName()), true, diskBTree);
+
+        IIndexBulkLoadContext bulkLoadCtx = mergedDiskInvertedIndex.beginBulkLoad(1.0f);
+        try {
+            while (cursor.hasNext()) {
+                cursor.next();
+                ITupleReference tuple = cursor.getTuple();
+                mergedDiskInvertedIndex.bulkLoadAddTuple(tuple, bulkLoadCtx);
+            }
+        } finally {
+            cursor.close();
+        }
+        mergedDiskInvertedIndex.endBulkLoad(bulkLoadCtx);
+
+        return mergedDiskInvertedIndex;
+    }
+
+    private LSMInvertedFileNameComponent getMergeTargetFileName(List<Object> mergingDiskTrees)
+            throws HyracksDataException {
+        BTree firstTree = ((OnDiskInvertedIndex) mergingDiskTrees.get(0)).getBTree();
+        BTree lastTree = ((OnDiskInvertedIndex) mergingDiskTrees.get(mergingDiskTrees.size() - 1)).getBTree();
+        FileReference firstFile = diskFileMapProvider.lookupFileName(firstTree.getFileId());
+        FileReference lastFile = diskFileMapProvider.lookupFileName(lastTree.getFileId());
+        LSMInvertedFileNameComponent component = (LSMInvertedFileNameComponent) ((LSMInvertedIndexFileManager) fileManager)
+                .getRelMergeFileName(firstFile.getFile().getName(), lastFile.getFile().getName());
+        return component;
+    }
+
+    @Override
+    public void addMergedComponent(Object newComponent, List<Object> mergedComponents) {
+        diskInvertedIndexList.removeAll(mergedComponents);
+        diskInvertedIndexList.addLast(newComponent);
+    }
+
+    @Override
+    public void cleanUpAfterMerge(List<Object> mergedComponents) throws HyracksDataException {
+        for (Object o : mergedComponents) {
+            OnDiskInvertedIndex oldInvertedIndex = (OnDiskInvertedIndex) o;
+            BTree oldBTree = oldInvertedIndex.getBTree();
+
+            //delete a diskBTree file.
+            FileReference fileRef = diskFileMapProvider.lookupFileName(oldBTree.getFileId());
+            diskBufferCache.closeFile(oldBTree.getFileId());
+            diskBufferCache.deleteFile(oldBTree.getFileId(), false);
+            oldBTree.close();
+            fileRef.getFile().delete();
+
+            //delete a diskInvertedIndex file.
+            fileRef = diskFileMapProvider.lookupFileName(oldInvertedIndex.getFileId());
+            diskBufferCache.closeFile(oldInvertedIndex.getFileId());
+            diskBufferCache.deleteFile(oldInvertedIndex.getFileId(), false);
+            oldInvertedIndex.close();
+            fileRef.getFile().delete();
+        }
+    }
+
+    @Override
+    public Object flush() throws HyracksDataException, IndexException {
+
+        // ---------------------------------------------------
+        // [Flow]
+        // #. Create a scanCursor for the BTree of the memoryInvertedIndex to iterate all keys in it.
+        // #. Create an diskInvertedIndex where all keys of memoryInvertedIndex will be bulkloaded.
+        //    - Create a BTree instance for diskBTree 
+        //    - Create an InvertedIndex instance
+        // #. Begin the bulkload of the diskInvertedIndex.
+        // #. While iterating the scanCursor, add each key into the diskInvertedIndex in the bulkload mode.
+        // #. End the bulkload.
+        // #. Return the newly created diskInvertedIndex.
+        // ---------------------------------------------------
+
+        // #. Create a scanCursor of memoryInvertedIndex to iterate all keys in it.
+        BTree inMemBtree = ((InMemoryInvertedIndex) memoryInvertedIndex).getBTree();
+        IIndexAccessor btreeAccessor = inMemBtree.createAccessor();
+        MultiComparator btreeMultiComparator = MultiComparator.create(inMemBtree.getComparatorFactories());
+        RangePredicate scanPred = new RangePredicate(null, null, true, true, btreeMultiComparator, btreeMultiComparator);
+
+        IIndexCursor scanCursor = btreeAccessor.createSearchCursor();
+        btreeAccessor.search(scanCursor, scanPred);
+
+        // #. Create a diskInvertedIndex where all keys of memoryInvertedIndex will be bulkloaded.
+        //    - Create a BTree instance for diskBTree
+        LSMInvertedFileNameComponent fNameComponent = (LSMInvertedFileNameComponent) fileManager.getRelFlushFileName();
+        BTree diskBTree = createDiskBTree(fileManager.createFlushFile(fNameComponent.getBTreeFileName()), true);
+        //    - Create an InvertedIndex instance
+        OnDiskInvertedIndex diskInvertedIndex = createDiskInvertedIndex(
+                fileManager.createFlushFile(fNameComponent.getInvertedFileName()), true, diskBTree);
+
+        // #. Begin the bulkload of the diskInvertedIndex.
+        IIndexBulkLoadContext bulkLoadCtx = diskInvertedIndex.beginBulkLoad(1.0f);
+        try {
+            while (scanCursor.hasNext()) {
+                scanCursor.next();
+                diskInvertedIndex.bulkLoadAddTuple(scanCursor.getTuple(), bulkLoadCtx);
+            }
+        } finally {
+            scanCursor.close();
+        }
+        diskInvertedIndex.endBulkLoad(bulkLoadCtx);
+
+        return diskInvertedIndex;
+    }
+
+    private BTree createBTreeFlushTarget() throws HyracksDataException {
+        LSMInvertedFileNameComponent fNameComponent = (LSMInvertedFileNameComponent) fileManager.getRelFlushFileName();
+        FileReference fileRef = fileManager.createFlushFile(fNameComponent.getBTreeFileName());
+        return createDiskBTree(fileRef, true);
+    }
+
+    private BTree createDiskBTree(FileReference fileRef, boolean createBTree) throws HyracksDataException {
+        // File will be deleted during cleanup of merge().
+        diskBufferCache.createFile(fileRef);
+        int diskBTreeFileId = diskFileMapProvider.lookupFileId(fileRef);
+        // File will be closed during cleanup of merge().
+        diskBufferCache.openFile(diskBTreeFileId);
+        // Create new BTree instance.
+        BTree diskBTree = diskBTreeFactory.createIndexInstance();
+        if (createBTree) {
+            diskBTree.create(diskBTreeFileId);
+        }
+        // BTree will be closed during cleanup of merge().
+        diskBTree.open(diskBTreeFileId);
+        return diskBTree;
+    }
+
+    private OnDiskInvertedIndex createInvertedIndexFlushTarget(BTree diskBTree) throws HyracksDataException {
+        FileReference fileRef = fileManager.createFlushFile((String) fileManager.getRelFlushFileName());
+        return createDiskInvertedIndex(fileRef, true, diskBTree);
+    }
+
+    private OnDiskInvertedIndex createDiskInvertedIndex(FileReference fileRef, boolean createInvertedIndex, BTree diskBTree)
+            throws HyracksDataException {
+        // File will be deleted during cleanup of merge().
+        diskBufferCache.createFile(fileRef);
+        int diskInvertedIndexFileId = diskFileMapProvider.lookupFileId(fileRef);
+        // File will be closed during cleanup of merge().
+        diskBufferCache.openFile(diskInvertedIndexFileId);
+        // Create new InvertedIndex instance.
+        OnDiskInvertedIndex diskInvertedIndex = (OnDiskInvertedIndex) diskInvertedIndexFactory.createIndexInstance(diskBTree);
+        if (createInvertedIndex) {
+            diskInvertedIndex.create(diskInvertedIndexFileId);
+        }
+        // InvertedIndex will be closed during cleanup of merge().
+        diskInvertedIndex.open(diskInvertedIndexFileId);
+        return diskInvertedIndex;
+    }
+
+    public void addFlushedComponent(Object index) {
+        diskInvertedIndexList.addFirst(index);
+    }
+
+    @Override
+    public InMemoryFreePageManager getInMemoryFreePageManager() {
+        // TODO This code should be changed more generally if IInMemoryInvertedIndex interface is defined and
+        //      InMemoryBtreeInvertedIndex implements IInMemoryInvertedIndex
+        InMemoryInvertedIndex memoryBTreeInvertedIndex = (InMemoryInvertedIndex) memoryInvertedIndex;
+        return (InMemoryFreePageManager) memoryBTreeInvertedIndex.getBTree().getFreePageManager();
+    }
+
+    @Override
+    public void resetInMemoryComponent() throws HyracksDataException {
+        // TODO This code should be changed more generally if IInMemoryInvertedIndex interface is defined and
+        //      InMemoryBtreeInvertedIndex implements IInMemoryInvertedIndex
+        InMemoryInvertedIndex memoryBTreeInvertedIndex = (InMemoryInvertedIndex) memoryInvertedIndex;
+        BTree memBTree = memoryBTreeInvertedIndex.getBTree();
+        InMemoryFreePageManager memFreePageManager = (InMemoryFreePageManager) memBTree.getFreePageManager();
+        memFreePageManager.reset();
+        memBTree.create(memBTree.getFileId());
+    }
+
+    @Override
+    public List<Object> getDiskComponents() {
+        return diskInvertedIndexList;
+    }
+
+    @Override
+    public ILSMComponentFinalizer getComponentFinalizer() {
+        return componentFinalizer;
+    }
+
+    @Override
+    public IInvertedListCursor createInvertedListCursor() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public void openInvertedListCursor(IInvertedListCursor listCursor, ITupleReference tupleReference)
+            throws HyracksDataException, IndexException {
+        // TODO Auto-generated method stub
+        
+    }
+
+    @Override
+    public IBinaryComparatorFactory[] getInvListElementCmpFactories() {
+        return memoryInvertedIndex.getInvListCmpFactories();
+    }
+
+    @Override
+    public ITypeTraits[] getTypeTraits() {
+        return memoryInvertedIndex.getInvListTypeTraits();
+    }
+
+    @Override
+    public void create() throws HyracksDataException {
+        // TODO Auto-generated method stub
+        
+    }
+
+    @Override
+    public void activate() throws HyracksDataException {
+        // TODO Auto-generated method stub
+        
+    }
+
+    @Override
+    public void clear() throws HyracksDataException {
+        // TODO Auto-generated method stub
+        
+    }
+
+    @Override
+    public void deactivate() throws HyracksDataException {
+        // TODO Auto-generated method stub
+        
+    }
+
+    @Override
+    public void destroy() throws HyracksDataException {
+        // TODO Auto-generated method stub
+        
+    }
+
+    @Override
+    public IIndexAccessor createAccessor(IModificationOperationCallback modificationCallback,
+            ISearchOperationCallback searchCallback) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public void validate() throws HyracksDataException {
+        // TODO Auto-generated method stub
+        
+    }
+
+    @Override
+    public long getInMemorySize() {
+        // TODO Auto-generated method stub
+        return 0;
+    }
+
+    @Override
+    public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput) throws IndexException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public ILSMIOOperation createMergeOperation(ILSMIOOperationCallback callback) throws HyracksDataException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public Object merge(List<Object> mergedComponents, ILSMIOOperation operation) throws HyracksDataException,
+            IndexException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public Object flush(ILSMIOOperation operation) throws HyracksDataException, IndexException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public ILSMFlushController getFlushController() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public ILSMOperationTracker getOperationTracker() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public ILSMIOOperationScheduler getIOScheduler() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
new file mode 100644
index 0000000..e042892
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexOpContext;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMHarness;
+
+public class LSMInvertedIndexAccessor implements ILSMIndexAccessor {
+
+    protected LSMHarness lsmHarness;
+    protected IIndexOpContext ctx;
+
+    public LSMInvertedIndexAccessor(LSMHarness lsmHarness, IIndexOpContext ctx) {
+        this.lsmHarness = lsmHarness;
+        this.ctx = ctx;
+    }
+
+    public void insert(ITupleReference tuple) throws HyracksDataException, IndexException {
+        ctx.reset(IndexOp.INSERT);
+        lsmHarness.insertUpdateOrDelete(tuple, ctx);
+    }
+
+    public void update(ITupleReference tuple) throws HyracksDataException, IndexException {
+        //not supported yet
+    }
+
+    public void delete(ITupleReference tuple) throws HyracksDataException, IndexException {
+        //not supported yet
+    }
+
+    public IIndexCursor createSearchCursor() {
+        return new LSMInvertedIndexSearchCursor(); 
+    }
+
+    public void search(IIndexCursor cursor, ISearchPredicate searchPred) throws HyracksDataException, IndexException {
+        ctx.reset(IndexOp.SEARCH);
+        //search include in-memory components
+        lsmHarness.search(cursor, searchPred, ctx, true);
+    }
+
+    public void flush() throws HyracksDataException, IndexException {
+        lsmHarness.flush();
+    }
+
+    public void merge() throws HyracksDataException, IndexException {
+        lsmHarness.merge();
+    }
+}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexCursorInitialState.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexCursorInitialState.java
new file mode 100644
index 0000000..4b859ac
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexCursorInitialState.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import edu.uci.ics.hyracks.storage.am.common.api.ICursorInitialState;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexOpContext;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMHarness;
+import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
+
+public class LSMInvertedIndexCursorInitialState implements ICursorInitialState {
+
+    private final boolean includeMemComponent;
+    private final AtomicInteger searcherfRefCount;
+    private final LSMHarness lsmHarness;
+    private final List<IIndexAccessor> indexAccessors;
+    private final IIndexOpContext opContext;
+
+    public LSMInvertedIndexCursorInitialState(List<IIndexAccessor> indexAccessors, IIndexOpContext ctx,
+            boolean includeMemComponent, AtomicInteger searcherfRefCount, LSMHarness lsmHarness) {
+        this.indexAccessors = indexAccessors;
+        this.includeMemComponent = includeMemComponent;
+        this.searcherfRefCount = searcherfRefCount;
+        this.lsmHarness = lsmHarness;
+        this.opContext = ctx;
+    }
+
+    @Override
+    public ICachedPage getPage() {
+        return null;
+    }
+
+    @Override
+    public void setPage(ICachedPage page) {
+    }
+
+    public List<IIndexAccessor> getIndexAccessors() {
+        return indexAccessors;
+    }
+
+    public AtomicInteger getSearcherRefCount() {
+        return searcherfRefCount;
+    }
+
+    public boolean getIncludeMemComponent() {
+        return includeMemComponent;
+    }
+
+    public LSMHarness getLSMHarness() {
+        return lsmHarness;
+    }
+
+    public IIndexOpContext getOpContext() {
+        return opContext;
+    }
+}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java
new file mode 100644
index 0000000..4c893a9
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java
@@ -0,0 +1,181 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls;
+
+import java.io.File;
+import java.io.FilenameFilter;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.control.nc.io.IOManager;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMTreeFileManager;
+import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
+
+public class LSMInvertedIndexFileManager extends LSMTreeFileManager {
+
+    private static final String INVERTED_STRING = "i";
+    private static final String BTREE_STRING = "b";
+
+    private static FilenameFilter btreeFilter = new FilenameFilter() {
+        public boolean accept(File dir, String name) {
+            return !name.startsWith(".") && name.endsWith(BTREE_STRING);
+        }
+    };
+    
+    private static FilenameFilter invertedFilter = new FilenameFilter() {
+        public boolean accept(File dir, String name) {
+            return !name.startsWith(".") && name.endsWith(INVERTED_STRING);
+        }
+    };
+    
+    public LSMInvertedIndexFileManager(IOManager ioManager, IFileMapProvider fileMapProvider, String baseDir) {
+        super(ioManager, fileMapProvider, baseDir);
+    }
+
+    @Override
+    public Object getRelFlushFileName() {
+        String baseName = (String) super.getRelFlushFileName();
+        return new LSMInvertedFileNameComponent(baseName + SPLIT_STRING + INVERTED_STRING, baseName + SPLIT_STRING
+                + BTREE_STRING);
+
+    }
+
+    @Override
+    public Object getRelMergeFileName(String firstFileName, String lastFileName) throws HyracksDataException {
+        String baseName = (String) super.getRelMergeFileName(firstFileName, lastFileName);
+        return new LSMInvertedFileNameComponent(baseName + SPLIT_STRING + INVERTED_STRING, baseName + SPLIT_STRING
+                + BTREE_STRING);
+    }
+
+//    @Override
+//    public List<Object> cleanupAndGetValidFiles(Object lsmComponent, ILSMComponentFinalizer componentFinalizer) throws HyracksDataException {
+//        List<Object> validFiles = new ArrayList<Object>();
+//        ArrayList<ComparableFileName> allInvertedFiles = new ArrayList<ComparableFileName>();
+//        ArrayList<ComparableFileName> allBTreeFiles = new ArrayList<ComparableFileName>();
+//        LSMInvertedComponent component = (LSMInvertedComponent) lsmComponent;
+//        
+//        // Gather files from all IODeviceHandles.
+//        for (IODeviceHandle dev : ioManager.getIODevices()) {            
+//            getValidFiles(dev, btreeFilter, component.getBTree(), componentFinalizer, allBTreeFiles);
+//            HashSet<String> btreeFilesSet = new HashSet<String>();
+//            for (ComparableFileName cmpFileName : allBTreeFiles) {
+//                int index = cmpFileName.fileName.lastIndexOf(SPLIT_STRING);
+//                btreeFilesSet.add(cmpFileName.fileName.substring(0, index));
+//            }
+//            // List of valid Inverted files that may or may not have a BTree buddy. Will check for buddies below.
+//            ArrayList<ComparableFileName> tmpAllInvertedFiles = new ArrayList<ComparableFileName>();
+//            getValidFiles(dev, invertedFilter, component.getInverted(), componentFinalizer, tmpAllInvertedFiles);
+//            // Look for buddy BTrees for all valid Inverteds. 
+//            // If no buddy is found, delete the file, otherwise add the Inverted to allInvertedFiles. 
+//            for (ComparableFileName cmpFileName : tmpAllInvertedFiles) {
+//                int index = cmpFileName.fileName.lastIndexOf(SPLIT_STRING);
+//                String file = cmpFileName.fileName.substring(0, index);
+//                if (btreeFilesSet.contains(file)) {
+//                    allInvertedFiles.add(cmpFileName);
+//                } else {
+//                    // Couldn't find the corresponding BTree file; thus, delete
+//                    // the Inverted file.
+//                    File invalidInvertedFile = new File(cmpFileName.fullPath);
+//                    invalidInvertedFile.delete();
+//                }
+//            }
+//        }
+//        // Sanity check.
+//        if (allInvertedFiles.size() != allBTreeFiles.size()) {
+//            throw new HyracksDataException("Unequal number of valid Inverted and BTree files found. Aborting cleanup.");
+//        }
+//        
+//        // Trivial cases.
+//        if (allInvertedFiles.isEmpty() || allBTreeFiles.isEmpty()) {
+//            return validFiles;
+//        }
+//
+//        if (allInvertedFiles.size() == 1 && allBTreeFiles.size() == 1) {
+//            validFiles.add(new LSMInvertedFileNameComponent(allInvertedFiles.get(0).fullPath, allBTreeFiles.get(0).fullPath));
+//            return validFiles;
+//        }
+//
+//        // Sorts files names from earliest to latest timestamp.
+//        Collections.sort(allInvertedFiles);
+//        Collections.sort(allBTreeFiles);
+//
+//        List<ComparableFileName> validComparableInvertedFiles = new ArrayList<ComparableFileName>();
+//        ComparableFileName lastInverted = allInvertedFiles.get(0);
+//        validComparableInvertedFiles.add(lastInverted);
+//
+//        List<ComparableFileName> validComparableBTreeFiles = new ArrayList<ComparableFileName>();
+//        ComparableFileName lastBTree = allBTreeFiles.get(0);
+//        validComparableBTreeFiles.add(lastBTree);
+//
+//        for (int i = 1; i < allInvertedFiles.size(); i++) {
+//            ComparableFileName currentInverted = allInvertedFiles.get(i);
+//            ComparableFileName currentBTree = allBTreeFiles.get(i);
+//            // Current start timestamp is greater than last stop timestamp.
+//            if (currentInverted.interval[0].compareTo(lastInverted.interval[1]) > 0
+//                    && currentBTree.interval[0].compareTo(lastBTree.interval[1]) > 0) {
+//                validComparableInvertedFiles.add(currentInverted);
+//                validComparableBTreeFiles.add(currentBTree);
+//                lastInverted = currentInverted;
+//                lastBTree = currentBTree;
+//            } else if (currentInverted.interval[0].compareTo(lastInverted.interval[0]) >= 0
+//                    && currentInverted.interval[1].compareTo(lastInverted.interval[1]) <= 0
+//                    && currentBTree.interval[0].compareTo(lastBTree.interval[0]) >= 0
+//                    && currentBTree.interval[1].compareTo(lastBTree.interval[1]) <= 0) {
+//                // Invalid files are completely contained in last interval.
+//                File invalidInvertedFile = new File(currentInverted.fullPath);
+//                invalidInvertedFile.delete();
+//                File invalidBTreeFile = new File(currentBTree.fullPath);
+//                invalidBTreeFile.delete();
+//            } else {
+//                // This scenario should not be possible.
+//                throw new HyracksDataException("Found LSM files with overlapping but not contained timetamp intervals.");
+//            }
+//        }
+//
+//        // Sort valid files in reverse lexicographical order, such that newer
+//        // files come first.
+//        Collections.sort(validComparableInvertedFiles, recencyCmp);
+//        Collections.sort(validComparableBTreeFiles, recencyCmp);
+//
+//        Iterator<ComparableFileName> invertedFileIter = validComparableInvertedFiles.iterator();
+//        Iterator<ComparableFileName> btreeFileIter = validComparableBTreeFiles.iterator();
+//        while (invertedFileIter.hasNext() && btreeFileIter.hasNext()) {
+//            ComparableFileName cmpInvertedFileName = invertedFileIter.next();
+//            ComparableFileName cmpBTreeFileName = btreeFileIter.next();
+//            validFiles.add(new LSMInvertedFileNameComponent(cmpInvertedFileName.fullPath, cmpBTreeFileName.fullPath));
+//        }
+//
+//        return validFiles;
+//    }
+
+    public class LSMInvertedFileNameComponent {
+        private final String invertedFileName;
+        private final String btreeFileName;
+
+        LSMInvertedFileNameComponent(String invertedFileName, String btreeFileName) {
+            this.invertedFileName = invertedFileName;
+            this.btreeFileName = btreeFileName;
+        }
+
+        public String getInvertedFileName() {
+            return invertedFileName;
+        }
+
+        public String getBTreeFileName() {
+            return btreeFileName;
+        }
+    }
+
+}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java
new file mode 100644
index 0000000..3ea50e0
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls;
+
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexOpContext;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.inmemory.InMemoryInvertedIndex;
+
+public class LSMInvertedIndexOpContext implements IIndexOpContext {
+    
+    private IndexOp op;
+    private final MultiComparator cmp;
+    private final int invListFieldCount;
+    private final int tokenFieldCount;
+    
+    public LSMInvertedIndexOpContext(IInvertedIndex memoryInvertedIndex) {
+    	InMemoryInvertedIndex memoryBTreeInvertedIndex = (InMemoryInvertedIndex)memoryInvertedIndex;
+    	BTree btree = memoryBTreeInvertedIndex.getBTree();
+    	this.cmp = MultiComparator.create(btree.getComparatorFactories());
+    	this.invListFieldCount = memoryBTreeInvertedIndex.getInvListCmpFactories().length;
+    	this.tokenFieldCount = cmp.getKeyFieldCount() - invListFieldCount;
+    }
+    
+    @Override
+    public void reset() {
+        // TODO Auto-generated method stub
+    }
+
+    @Override
+    public void reset(IndexOp newOp) {
+        op = newOp;
+    }
+    
+    public int getInvListFieldCount() {
+    	return invListFieldCount;
+    }
+    
+    public int getTokenFieldCount() {
+    	return tokenFieldCount;
+    }
+    
+    public MultiComparator getComparator() {
+    	return cmp;
+    }
+}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursor.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursor.java
new file mode 100644
index 0000000..4bce429
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursor.java
@@ -0,0 +1,268 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.util.TupleUtils;
+import edu.uci.ics.hyracks.storage.am.common.api.ICursorInitialState;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMHarness;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.InvertedIndexAccessor;
+
+public class LSMInvertedIndexRangeSearchCursor implements IIndexCursor {
+
+    private LSMHarness harness;
+    private boolean includeMemComponent;
+    private AtomicInteger searcherRefCount;
+    private List<IIndexAccessor> indexAccessors;
+    private List<IIndexCursor> indexCursors;
+    private boolean flagEOF = false;
+    private boolean flagFirstNextCall = true;
+
+    private PriorityQueue<PriorityQueueElement> outputPriorityQueue;
+    private MultiComparator memoryInvertedIndexComparator;
+    private PriorityQueueComparator pqCmp;
+    private int tokenFieldCount;
+    private int invListFieldCount;
+    private ITupleReference resultTuple;
+    private BitSet closedCursors;
+
+    @Override
+    public void open(ICursorInitialState initialState, ISearchPredicate searchPred) throws HyracksDataException {
+        LSMInvertedIndexCursorInitialState lsmInitialState = (LSMInvertedIndexCursorInitialState) initialState;
+        this.harness = lsmInitialState.getLSMHarness();
+        this.includeMemComponent = lsmInitialState.getIncludeMemComponent();
+        this.searcherRefCount = lsmInitialState.getSearcherRefCount();
+        this.indexAccessors = lsmInitialState.getIndexAccessors();
+        this.indexCursors = new ArrayList<IIndexCursor>(indexAccessors.size());
+        LSMInvertedIndexOpContext opContext = (LSMInvertedIndexOpContext) lsmInitialState.getOpContext();
+        this.memoryInvertedIndexComparator = opContext.getComparator();
+        this.tokenFieldCount = opContext.getTokenFieldCount();
+        this.invListFieldCount = opContext.getInvListFieldCount();
+        closedCursors = new BitSet(indexAccessors.size());
+
+        //create all cursors
+        IIndexCursor cursor;
+        for (IIndexAccessor accessor : indexAccessors) {
+            InvertedIndexAccessor invIndexAccessor = (InvertedIndexAccessor) accessor;
+            cursor = invIndexAccessor.createRangeSearchCursor();
+            try {
+                invIndexAccessor.rangeSearch(cursor, searchPred);
+            } catch (IndexException e) {
+                throw new HyracksDataException(e);
+            }
+            indexCursors.add(cursor);
+        }
+    }
+
+    @Override
+    public boolean hasNext() throws HyracksDataException {
+
+        if (flagEOF) {
+            return false;
+        }
+
+        if (flagFirstNextCall) {
+            for (IIndexCursor c : indexCursors) {
+                if (c.hasNext()) {
+                    return true;
+                }
+            }
+        } else {
+            if (outputPriorityQueue.size() > 0) {
+                return true;
+            }
+        }
+
+        flagEOF = true;
+        return false;
+    }
+
+    @Override
+    public void next() throws HyracksDataException {
+
+        PriorityQueueElement pqElement;
+        IIndexCursor cursor;
+        int cursorIndex;
+
+        if (flagEOF) {
+            return;
+        }
+
+        //When the next() is called for the first time, initialize priority queue.
+        if (flagFirstNextCall) {
+            flagFirstNextCall = false;
+
+            //create and initialize PriorityQueue
+            pqCmp = new PriorityQueueComparator(memoryInvertedIndexComparator);
+            outputPriorityQueue = new PriorityQueue<PriorityQueueElement>(indexCursors.size(), pqCmp);
+
+            //read the first tuple from each cursor and insert into outputPriorityQueue
+
+            for (int i = 0; i < indexCursors.size(); i++) {
+                cursor = indexCursors.get(i);
+                if (cursor.hasNext()) {
+                    cursor.next();
+                    pqElement = new PriorityQueueElement(cursor.getTuple(), i);
+                    outputPriorityQueue.offer(pqElement);
+                }
+                //else {
+                //    //do nothing for the cursor who reached EOF.
+                //}
+            }
+        }
+
+        //If you reach here, priority queue is set up to provide the smallest <tokenFields, invListFields>
+        //Get the smallest element from priority queue. 
+        //This element will be the result tuple which will be served to the caller when getTuple() is called. 
+        //Then, insert new element from the cursor where the smallest element came from.
+        pqElement = outputPriorityQueue.poll();
+        if (pqElement != null) {
+            resultTuple = pqElement.getTuple();
+            cursorIndex = pqElement.getCursorIndex();
+            cursor = indexCursors.get(cursorIndex);
+            if (cursor.hasNext()) {
+                cursor.next();
+                pqElement = new PriorityQueueElement(cursor.getTuple(), cursorIndex);
+                outputPriorityQueue.offer(pqElement);
+            } else {
+                cursor.close();
+                closedCursors.set(cursorIndex, true);
+
+//                 If the current cursor reached EOF, read a tuple from another cursor and insert into the priority queue.
+                for (int i = 0; i < indexCursors.size(); i++) {
+                    if (closedCursors.get(i))
+                        continue;
+
+                    cursor = indexCursors.get(i);
+                    if (cursor.hasNext()) {
+                        cursor.next();
+                        pqElement = new PriorityQueueElement(cursor.getTuple(), i);
+                        outputPriorityQueue.offer(pqElement);
+                        break;
+                    } else {
+                        cursor.close();
+                        closedCursors.set(i, true);
+                    }
+                }
+                //if (i == indexCursors.size()) {
+                //    all cursors reached EOF and the only tuples that you have are in the priority queue.
+                //    do nothing here!.
+                //}
+            }
+        }
+        //else {
+        // do nothing!!
+        // which means when the getTuple() is called, the pre-existing result tuple or null will be returned to the caller.
+        //}
+
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        try {
+            for (int i = 0; i < indexCursors.size(); i++) {
+                if (closedCursors.get(i)) {
+                    continue;
+                }
+                indexCursors.get(i).close();
+                closedCursors.set(i, true);
+            }
+        } finally {
+            harness.closeSearchCursor(searcherRefCount, includeMemComponent);
+        }
+    }
+
+    @Override
+    public void reset() {
+        // TODO Auto-generated method stub
+    }
+
+    @Override
+    public ITupleReference getTuple() {
+        return resultTuple;
+    }
+
+    public class PriorityQueueComparator implements Comparator<PriorityQueueElement> {
+
+        private final MultiComparator cmp;
+
+        public PriorityQueueComparator(MultiComparator cmp) {
+            this.cmp = cmp;
+        }
+
+        @Override
+        public int compare(PriorityQueueElement elementA, PriorityQueueElement elementB) {
+            int result = cmp.compare(elementA.getTuple(), elementB.getTuple());
+            if (result != 0) {
+                return result;
+            }
+            if (elementA.getCursorIndex() > elementB.getCursorIndex()) {
+                return 1;
+            } else {
+                return -1;
+            }
+        }
+
+        public MultiComparator getMultiComparator() {
+            return cmp;
+        }
+    }
+
+    public class PriorityQueueElement {
+        private ITupleReference tuple;
+        private int cursorIndex;
+
+        public PriorityQueueElement(ITupleReference tuple, int cursorIndex) {
+//            reset(tuple, cursorIndex);
+            try {
+                reset(TupleUtils.copyTuple(tuple), cursorIndex);
+            } catch (HyracksDataException e) {
+                // TODO Auto-generated catch block
+                e.printStackTrace();
+            }
+        }
+
+        public ITupleReference getTuple() {
+            return tuple;
+        }
+
+        public int getCursorIndex() {
+            return cursorIndex;
+        }
+
+        public void reset(ITupleReference tuple, int cursorIndex) {
+            this.tuple = tuple;
+            this.cursorIndex = cursorIndex;
+        }
+    }
+
+}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexSearchCursor.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexSearchCursor.java
new file mode 100644
index 0000000..c5c8645
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexSearchCursor.java
@@ -0,0 +1,174 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.ICursorInitialState;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMHarness;
+
+public class LSMInvertedIndexSearchCursor implements IIndexCursor {
+
+    private int cursorIndex = -1;
+    private LSMHarness harness;
+    private boolean includeMemComponent;
+    private AtomicInteger searcherRefCount;
+    private List<IIndexAccessor> indexAccessors;
+    private List<IIndexCursor> indexCursors;// = new ArrayList<IIndexCursor>();
+    private ISearchPredicate searchPred;
+    
+    public LSMInvertedIndexSearchCursor() {
+        indexCursors = new ArrayList<IIndexCursor>();
+    }
+    
+    @Override
+    public void open(ICursorInitialState initialState, ISearchPredicate searchPred) throws HyracksDataException {
+        LSMInvertedIndexCursorInitialState lsmInitialState = (LSMInvertedIndexCursorInitialState) initialState;
+        harness = lsmInitialState.getLSMHarness();
+        includeMemComponent = lsmInitialState.getIncludeMemComponent();
+        searcherRefCount = lsmInitialState.getSearcherRefCount();
+        indexAccessors = lsmInitialState.getIndexAccessors();
+        indexCursors.clear();
+//        indexCursors = new ArrayList<IIndexCursor>(indexAccessors.size());
+        cursorIndex = 0;
+        this.searchPred = searchPred;
+
+        IIndexAccessor currentAccessor;
+        IIndexCursor currentCursor;
+        while (cursorIndex < indexAccessors.size()) {
+            // Open cursors and perform search lazily as each component is passed over
+            currentAccessor = indexAccessors.get(cursorIndex);
+            currentCursor = currentAccessor.createSearchCursor();
+            try {
+                currentAccessor.search(currentCursor, searchPred);
+            } catch (IndexException e) {
+                throw new HyracksDataException(e);
+            }
+            indexCursors.add(currentCursor);
+
+            if (currentCursor.hasNext()) {
+                break;
+            }
+
+            // Close as we go to release any resources
+            currentCursor.close();
+            cursorIndex++;
+        }
+
+    }
+
+    @Override
+    public boolean hasNext() throws HyracksDataException {
+        IIndexAccessor currentAccessor;
+        IIndexCursor currentCursor;
+        
+        if(cursorIndex >= indexAccessors.size()) {
+            return false;
+        }
+        
+        currentCursor = indexCursors.get(cursorIndex);
+        if (currentCursor.hasNext()) {
+            return true;
+        } else {
+            currentCursor.close();
+            cursorIndex++;
+            while (cursorIndex < indexAccessors.size()) {
+                currentAccessor = indexAccessors.get(cursorIndex);
+                currentCursor = currentAccessor.createSearchCursor();
+                try {
+                    currentAccessor.search(currentCursor, searchPred);
+                } catch (IndexException e) {
+                    throw new HyracksDataException(e);
+                }
+                indexCursors.add(currentCursor);
+
+                if (currentCursor.hasNext()) {
+                    return true;
+                } else {
+                    currentCursor.close();
+                    cursorIndex++;
+                }
+            }
+        }
+        
+        return false;
+    }
+
+    @Override
+    public void next() throws HyracksDataException {
+        IIndexAccessor currentAccessor;
+        IIndexCursor currentCursor = indexCursors.get(cursorIndex);
+
+        if (currentCursor.hasNext()) {
+            currentCursor.next();
+        } else {
+            currentCursor.close();
+            cursorIndex++;
+            while (cursorIndex < indexAccessors.size()) {
+                currentAccessor = indexAccessors.get(cursorIndex);
+                currentCursor = currentAccessor.createSearchCursor();
+                try {
+                    currentAccessor.search(currentCursor, searchPred);
+                } catch (IndexException e) {
+                    throw new HyracksDataException(e);
+                }
+                indexCursors.add(currentCursor);
+
+                if (currentCursor.hasNext()) {
+                    currentCursor.next();
+                    break;
+                } else {
+                    cursorIndex++;
+                }
+            }
+        }
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        cursorIndex = -1;
+        for (int i = 0; i < indexCursors.size(); i++) {
+            indexCursors.get(i).close();
+        }
+        indexCursors.clear();
+        harness.closeSearchCursor(searcherRefCount, includeMemComponent);
+    }
+
+    @Override
+    public void reset() throws HyracksDataException {
+        cursorIndex = 0;
+        for (int i = 0; i < indexCursors.size(); i++) {
+            indexCursors.get(i).reset();
+        }
+    }
+
+    @Override
+    public ITupleReference getTuple() throws HyracksDataException {
+        if (cursorIndex < indexCursors.size()) {
+            return indexCursors.get(cursorIndex).getTuple();
+        } else {
+            return null;
+        }
+    }
+
+}