Moved files.
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_inverted_index_updates_new@1815 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/InvertedIndexComponentFinalizer.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/InvertedIndexComponentFinalizer.java
new file mode 100644
index 0000000..8c00d90
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/InvertedIndexComponentFinalizer.java
@@ -0,0 +1,107 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrame;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponentFinalizer;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.OnDiskInvertedIndex;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
+import edu.uci.ics.hyracks.storage.common.file.BufferedFileHandle;
+import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
+
+public class InvertedIndexComponentFinalizer implements ILSMComponentFinalizer {
+
+ protected final IFileMapProvider fileMapProvider;
+
+ public InvertedIndexComponentFinalizer(IFileMapProvider fileMapProvider) {
+ this.fileMapProvider = fileMapProvider;
+ }
+
+ @Override
+ public boolean isValid(Object lsmComponent) throws HyracksDataException {
+ OnDiskInvertedIndex index = (OnDiskInvertedIndex) lsmComponent;
+ ITreeIndex treeIndex = index.getBTree();
+ IBufferCache bufferCache = treeIndex.getBufferCache();
+ FileReference fileRef = new FileReference(file);
+ bufferCache.createFile(fileRef);
+ int fileId = fileMapProvider.lookupFileId(fileRef);
+ bufferCache.openFile(fileId);
+ treeIndex.open(fileId);
+ try {
+ int metadataPage = treeIndex.getFreePageManager().getFirstMetadataPage();
+ ITreeIndexMetaDataFrame metadataFrame = treeIndex.getFreePageManager().getMetaDataFrameFactory().createFrame();
+ ICachedPage page = bufferCache.pin(BufferedFileHandle.getDiskPageId(treeIndex.getFileId(), metadataPage), false);
+ page.acquireReadLatch();
+ try {
+ metadataFrame.setPage(page);
+ return metadataFrame.isValid();
+ } finally {
+ page.releaseReadLatch();
+ bufferCache.unpin(page);
+ }
+ } finally {
+ treeIndex.close();
+ bufferCache.closeFile(fileId);
+ bufferCache.deleteFile(fileId, false);
+ }
+ }
+
+ @Override
+ public void finalize(Object lsmComponent) throws HyracksDataException {
+ OnDiskInvertedIndex index = (OnDiskInvertedIndex) lsmComponent;
+ ITreeIndex treeIndex = index.getBTree();
+ int fileId = treeIndex.getFileId();
+ IBufferCache bufferCache = treeIndex.getBufferCache();
+ // Flush all dirty pages of the tree.
+ // By default, metadata and data are flushed async in the buffercache.
+ // This means that the flush issues writes to the OS, but the data may still lie in filesystem buffers.
+ ITreeIndexMetaDataFrame metadataFrame = treeIndex.getFreePageManager().getMetaDataFrameFactory().createFrame();
+ int startPage = 0;
+ int maxPage = treeIndex.getFreePageManager().getMaxPage(metadataFrame);
+ for (int i = startPage; i <= maxPage; i++) {
+ ICachedPage page = bufferCache.tryPin(BufferedFileHandle.getDiskPageId(fileId, i));
+ // If tryPin returns null, it means the page is not cached, and therefore cannot be dirty.
+ if (page == null) {
+ continue;
+ }
+ try {
+ bufferCache.flushDirtyPage(page);
+ } finally {
+ bufferCache.unpin(page);
+ }
+ }
+ // Forces all pages of given file to disk. This guarantees the data makes it to disk.
+ bufferCache.force(fileId, true);
+ int metadataPageId = treeIndex.getFreePageManager().getFirstMetadataPage();
+ ICachedPage metadataPage = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, metadataPageId), false);
+ metadataPage.acquireWriteLatch();
+ try {
+ metadataFrame.setPage(metadataPage);
+ metadataFrame.setValid(true);
+ // Flush the single modified page to disk.
+ bufferCache.flushDirtyPage(metadataPage);
+ // Force modified metadata page to disk.
+ bufferCache.force(fileId, true);
+ } finally {
+ metadataPage.releaseWriteLatch();
+ bufferCache.unpin(metadataPage);
+ }
+ }
+}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
new file mode 100644
index 0000000..784aa35
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
@@ -0,0 +1,583 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
+import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexBulkLoadContext;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexBulkLoader;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexOpContext;
+import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexType;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponentFinalizer;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFileManager;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushController;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.InMemoryFreePageManager;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMHarness;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndexFileManager.LSMInvertedFileNameComponent;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.inmemory.InMemoryInvertedIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.OnDiskInvertedIndexFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.OnDiskInvertedIndex;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
+
+public class LSMInvertedIndex implements ILSMIndex, IIndex {
+ private final Logger LOGGER = Logger.getLogger(LSMInvertedIndex.class.getName());
+
+ public class LSMInvertedIndexComponent {
+ private final IIndex invIndex;
+ private final BTree deleteKeysBTree;
+
+ LSMInvertedIndexComponent(IIndex invIndex, BTree deleteKeysBTree) {
+ this.invIndex = invIndex;
+ this.deleteKeysBTree = deleteKeysBTree;
+ }
+
+ public IIndex getInvIndex() {
+ return invIndex;
+ }
+
+ public BTree getDeletedKeysBTree() {
+ return deleteKeysBTree;
+ }
+ }
+
+ protected final LSMHarness lsmHarness;
+
+ // In-memory components.
+ protected final LSMInvertedIndexComponent memComponent;
+ protected final IBufferCache memBufferCache;
+ protected final InMemoryFreePageManager memFreePageManager;
+
+ // On-disk components.
+ protected final ILSMFileManager fileManager;
+ // For creating inverted indexes in flush and merge.
+ protected final OnDiskInvertedIndexFactory diskInvIndexFactory;
+ protected final IBufferCache diskBufferCache;
+ protected final IFileMapProvider diskFileMapProvider;
+ // List of LSMInvertedIndexComponent instances. Using Object for better sharing via
+ // ILSMIndex + LSMHarness.
+ protected final LinkedList<Object> diskComponents = new LinkedList<Object>();
+ // Helps to guarantees physical consistency of LSM components.
+ protected final ILSMComponentFinalizer componentFinalizer;
+
+ // Type traits and comparators for tokens and inverted-list elements.
+ protected final ITypeTraits[] invListTypeTraits;
+ protected final IBinaryComparatorFactory[] invListCmpFactories;
+ protected final ITypeTraits[] tokenTypeTraits;
+ protected final IBinaryComparatorFactory[] tokenCmpFactories;
+
+ private boolean isActivated = false;
+
+ public LSMInvertedIndex(IBufferCache memBufferCache, InMemoryFreePageManager memFreePageManager, OnDiskInvertedIndexFactory diskInvIndexFactory,
+ ILSMFileManager fileManager, IFileMapProvider diskFileMapProvider, ITypeTraits[] invListTypeTraits,
+ IBinaryComparatorFactory[] invListCmpFactories, ITypeTraits[] tokenTypeTraits,
+ IBinaryComparatorFactory[] tokenCmpFactories, ILSMFlushController flushController,
+ ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, ILSMIOOperationScheduler ioScheduler) {
+ // TODO: Finish this one properly.
+ InMemoryInvertedIndex memInvIndex = null;
+ BTree deleteKeysBTree = null;
+ memComponent = new LSMInvertedIndexComponent(memInvIndex, deleteKeysBTree);
+ this.memBufferCache = memBufferCache;
+ this.memFreePageManager = memFreePageManager;
+ this.fileManager = fileManager;
+ this.diskInvIndexFactory = diskInvIndexFactory;
+ this.diskBufferCache = diskInvIndexFactory.getBufferCache();
+ this.diskFileMapProvider = diskFileMapProvider;
+ this.invListTypeTraits = invListTypeTraits;
+ this.invListCmpFactories = invListCmpFactories;
+ this.tokenTypeTraits = tokenTypeTraits;
+ this.tokenCmpFactories = tokenCmpFactories;
+
+ this.lsmHarness = new LSMHarness(this, flushController, mergePolicy, opTracker, ioScheduler);
+ this.componentFinalizer = new InvertedIndexComponentFinalizer(diskFileMapProvider);
+ }
+
+ @Override
+ public synchronized void create(int indexFileId) throws HyracksDataException {
+ if (isActivated) {
+ return;
+ }
+
+ // TODO: What else is needed here?
+ memoryInvertedIndex.create(indexFileId);
+ fileManager.createDirs();
+ }
+
+ @Override
+ public void open(int indexFileId) throws HyracksDataException {
+ synchronized (this) {
+ if (isOpen)
+ return;
+
+ isOpen = true;
+ memoryInvertedIndex.open(indexFileId);
+ // TODO: What else is needed here?
+ // ...
+ }
+
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ synchronized (this) {
+ if (!isOpen) {
+ return;
+ }
+ // TODO: What else is needed here?
+ // ...
+ memoryInvertedIndex.close();
+ isOpen = false;
+ }
+ }
+
+ public IIndexAccessor createAccessor() {
+ return new LSMInvertedIndexAccessor(lsmHarness, createOpContext());
+ }
+
+ private LSMInvertedIndexOpContext createOpContext() {
+ return new LSMInvertedIndexOpContext(memoryInvertedIndex);
+ }
+
+ @Override
+ public IIndexBulkLoadContext beginBulkLoad(float fillFactor) throws IndexException, HyracksDataException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void bulkLoadAddTuple(ITupleReference tuple, IIndexBulkLoadContext ictx) throws HyracksDataException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void endBulkLoad(IIndexBulkLoadContext ictx) throws HyracksDataException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public IBufferCache getBufferCache() {
+ return diskBufferCache;
+ }
+
+ @Override
+ public IndexType getIndexType() {
+ return IndexType.INVERTED;
+ }
+
+ public boolean insertUpdateOrDelete(ITupleReference tuple, IIndexOpContext ictx) throws HyracksDataException,
+ IndexException {
+ // TODO: Only insert is supported for now. Will need the context for later when update and delete
+ // are also supported.
+ LSMInvertedIndexOpContext ctx = (LSMInvertedIndexOpContext) ictx;
+ memAccessor.insert(tuple);
+
+ return true;
+ }
+
+ @Override
+ public void search(IIndexCursor cursor, List<Object> diskComponents, ISearchPredicate pred, IIndexOpContext ictx,
+ boolean includeMemComponent, AtomicInteger searcherRefCount) throws HyracksDataException, IndexException {
+ IIndexAccessor componentAccessor;
+
+ // Over-provision by 1 if includeMemComponent == false, but that's okay!
+ ArrayList<IIndexAccessor> indexAccessors = new ArrayList<IIndexAccessor>(diskComponents.size() + 1);
+
+ if (includeMemComponent) {
+ componentAccessor = memoryInvertedIndex.createAccessor();
+ indexAccessors.add(componentAccessor);
+ }
+
+ for (int i = 0; i < diskComponents.size(); i++) {
+ componentAccessor = ((IInvertedIndex) diskComponents.get(i)).createAccessor();
+ indexAccessors.add(componentAccessor);
+ }
+
+ LSMInvertedIndexCursorInitialState initState = new LSMInvertedIndexCursorInitialState(indexAccessors, ictx,
+ includeMemComponent, searcherRefCount, lsmHarness);
+ LSMInvertedIndexSearchCursor lsmCursor = (LSMInvertedIndexSearchCursor) cursor;
+ lsmCursor.open(initState, pred);
+ }
+
+ public void mergeSearch(IIndexCursor cursor, List<Object> diskComponents, ISearchPredicate pred,
+ IIndexOpContext ictx, boolean includeMemComponent, AtomicInteger searcherRefCount)
+ throws HyracksDataException, IndexException {
+ IIndexAccessor componentAccessor;
+
+ // Over-provision by 1 if includeMemComponent == false, but that's okay!
+ ArrayList<IIndexAccessor> indexAccessors = new ArrayList<IIndexAccessor>(diskComponents.size() + 1);
+
+ if (includeMemComponent) {
+ componentAccessor = memoryInvertedIndex.createAccessor();
+ indexAccessors.add(componentAccessor);
+ }
+
+ for (int i = 0; i < diskComponents.size(); i++) {
+ componentAccessor = ((IInvertedIndex) diskComponents.get(i)).createAccessor();
+ indexAccessors.add(componentAccessor);
+ }
+
+ LSMInvertedIndexCursorInitialState initState = new LSMInvertedIndexCursorInitialState(indexAccessors, ictx,
+ includeMemComponent, searcherRefCount, lsmHarness);
+ LSMInvertedIndexRangeSearchCursor rangeSearchCursor = (LSMInvertedIndexRangeSearchCursor) cursor;
+ rangeSearchCursor.open(initState, pred);
+ }
+
+ @Override
+ public Object merge(List<Object> mergedComponents) throws HyracksDataException, IndexException {
+ LSMInvertedIndexOpContext ctx = createOpContext();
+
+ IIndexCursor cursor = new LSMInvertedIndexRangeSearchCursor();
+ RangePredicate mergePred = new RangePredicate(null, null, true, true, null, null);
+
+ //Scan diskInvertedIndexes ignoring the memoryInvertedIndex.
+ List<Object> mergingComponents = lsmHarness.mergeSearch(cursor, mergePred, ctx, false);
+ mergedComponents.addAll(mergingComponents);
+
+ // Nothing to merge.
+ if (mergedComponents.size() <= 1) {
+ cursor.close();
+ return null;
+ }
+
+ // Bulk load the tuples from all diskInvertedIndexes into the new diskInvertedIndex.
+ LSMInvertedFileNameComponent fNameComponent = getMergeTargetFileName(mergedComponents);
+ BTree diskBTree = createDiskBTree(fileManager.createMergeFile(fNameComponent.getBTreeFileName()), true);
+ // - Create an InvertedIndex instance
+ OnDiskInvertedIndex mergedDiskInvertedIndex = createDiskInvertedIndex(
+ fileManager.createMergeFile(fNameComponent.getInvertedFileName()), true, diskBTree);
+
+ IIndexBulkLoadContext bulkLoadCtx = mergedDiskInvertedIndex.beginBulkLoad(1.0f);
+ try {
+ while (cursor.hasNext()) {
+ cursor.next();
+ ITupleReference tuple = cursor.getTuple();
+ mergedDiskInvertedIndex.bulkLoadAddTuple(tuple, bulkLoadCtx);
+ }
+ } finally {
+ cursor.close();
+ }
+ mergedDiskInvertedIndex.endBulkLoad(bulkLoadCtx);
+
+ return mergedDiskInvertedIndex;
+ }
+
+ private LSMInvertedFileNameComponent getMergeTargetFileName(List<Object> mergingDiskTrees)
+ throws HyracksDataException {
+ BTree firstTree = ((OnDiskInvertedIndex) mergingDiskTrees.get(0)).getBTree();
+ BTree lastTree = ((OnDiskInvertedIndex) mergingDiskTrees.get(mergingDiskTrees.size() - 1)).getBTree();
+ FileReference firstFile = diskFileMapProvider.lookupFileName(firstTree.getFileId());
+ FileReference lastFile = diskFileMapProvider.lookupFileName(lastTree.getFileId());
+ LSMInvertedFileNameComponent component = (LSMInvertedFileNameComponent) ((LSMInvertedIndexFileManager) fileManager)
+ .getRelMergeFileName(firstFile.getFile().getName(), lastFile.getFile().getName());
+ return component;
+ }
+
+ @Override
+ public void addMergedComponent(Object newComponent, List<Object> mergedComponents) {
+ diskInvertedIndexList.removeAll(mergedComponents);
+ diskInvertedIndexList.addLast(newComponent);
+ }
+
+ @Override
+ public void cleanUpAfterMerge(List<Object> mergedComponents) throws HyracksDataException {
+ for (Object o : mergedComponents) {
+ OnDiskInvertedIndex oldInvertedIndex = (OnDiskInvertedIndex) o;
+ BTree oldBTree = oldInvertedIndex.getBTree();
+
+ //delete a diskBTree file.
+ FileReference fileRef = diskFileMapProvider.lookupFileName(oldBTree.getFileId());
+ diskBufferCache.closeFile(oldBTree.getFileId());
+ diskBufferCache.deleteFile(oldBTree.getFileId(), false);
+ oldBTree.close();
+ fileRef.getFile().delete();
+
+ //delete a diskInvertedIndex file.
+ fileRef = diskFileMapProvider.lookupFileName(oldInvertedIndex.getFileId());
+ diskBufferCache.closeFile(oldInvertedIndex.getFileId());
+ diskBufferCache.deleteFile(oldInvertedIndex.getFileId(), false);
+ oldInvertedIndex.close();
+ fileRef.getFile().delete();
+ }
+ }
+
+ @Override
+ public Object flush() throws HyracksDataException, IndexException {
+
+ // ---------------------------------------------------
+ // [Flow]
+ // #. Create a scanCursor for the BTree of the memoryInvertedIndex to iterate all keys in it.
+ // #. Create an diskInvertedIndex where all keys of memoryInvertedIndex will be bulkloaded.
+ // - Create a BTree instance for diskBTree
+ // - Create an InvertedIndex instance
+ // #. Begin the bulkload of the diskInvertedIndex.
+ // #. While iterating the scanCursor, add each key into the diskInvertedIndex in the bulkload mode.
+ // #. End the bulkload.
+ // #. Return the newly created diskInvertedIndex.
+ // ---------------------------------------------------
+
+ // #. Create a scanCursor of memoryInvertedIndex to iterate all keys in it.
+ BTree inMemBtree = ((InMemoryInvertedIndex) memoryInvertedIndex).getBTree();
+ IIndexAccessor btreeAccessor = inMemBtree.createAccessor();
+ MultiComparator btreeMultiComparator = MultiComparator.create(inMemBtree.getComparatorFactories());
+ RangePredicate scanPred = new RangePredicate(null, null, true, true, btreeMultiComparator, btreeMultiComparator);
+
+ IIndexCursor scanCursor = btreeAccessor.createSearchCursor();
+ btreeAccessor.search(scanCursor, scanPred);
+
+ // #. Create a diskInvertedIndex where all keys of memoryInvertedIndex will be bulkloaded.
+ // - Create a BTree instance for diskBTree
+ LSMInvertedFileNameComponent fNameComponent = (LSMInvertedFileNameComponent) fileManager.getRelFlushFileName();
+ BTree diskBTree = createDiskBTree(fileManager.createFlushFile(fNameComponent.getBTreeFileName()), true);
+ // - Create an InvertedIndex instance
+ OnDiskInvertedIndex diskInvertedIndex = createDiskInvertedIndex(
+ fileManager.createFlushFile(fNameComponent.getInvertedFileName()), true, diskBTree);
+
+ // #. Begin the bulkload of the diskInvertedIndex.
+ IIndexBulkLoadContext bulkLoadCtx = diskInvertedIndex.beginBulkLoad(1.0f);
+ try {
+ while (scanCursor.hasNext()) {
+ scanCursor.next();
+ diskInvertedIndex.bulkLoadAddTuple(scanCursor.getTuple(), bulkLoadCtx);
+ }
+ } finally {
+ scanCursor.close();
+ }
+ diskInvertedIndex.endBulkLoad(bulkLoadCtx);
+
+ return diskInvertedIndex;
+ }
+
+ private BTree createBTreeFlushTarget() throws HyracksDataException {
+ LSMInvertedFileNameComponent fNameComponent = (LSMInvertedFileNameComponent) fileManager.getRelFlushFileName();
+ FileReference fileRef = fileManager.createFlushFile(fNameComponent.getBTreeFileName());
+ return createDiskBTree(fileRef, true);
+ }
+
+ private BTree createDiskBTree(FileReference fileRef, boolean createBTree) throws HyracksDataException {
+ // File will be deleted during cleanup of merge().
+ diskBufferCache.createFile(fileRef);
+ int diskBTreeFileId = diskFileMapProvider.lookupFileId(fileRef);
+ // File will be closed during cleanup of merge().
+ diskBufferCache.openFile(diskBTreeFileId);
+ // Create new BTree instance.
+ BTree diskBTree = diskBTreeFactory.createIndexInstance();
+ if (createBTree) {
+ diskBTree.create(diskBTreeFileId);
+ }
+ // BTree will be closed during cleanup of merge().
+ diskBTree.open(diskBTreeFileId);
+ return diskBTree;
+ }
+
+ private OnDiskInvertedIndex createInvertedIndexFlushTarget(BTree diskBTree) throws HyracksDataException {
+ FileReference fileRef = fileManager.createFlushFile((String) fileManager.getRelFlushFileName());
+ return createDiskInvertedIndex(fileRef, true, diskBTree);
+ }
+
+ private OnDiskInvertedIndex createDiskInvertedIndex(FileReference fileRef, boolean createInvertedIndex, BTree diskBTree)
+ throws HyracksDataException {
+ // File will be deleted during cleanup of merge().
+ diskBufferCache.createFile(fileRef);
+ int diskInvertedIndexFileId = diskFileMapProvider.lookupFileId(fileRef);
+ // File will be closed during cleanup of merge().
+ diskBufferCache.openFile(diskInvertedIndexFileId);
+ // Create new InvertedIndex instance.
+ OnDiskInvertedIndex diskInvertedIndex = (OnDiskInvertedIndex) diskInvertedIndexFactory.createIndexInstance(diskBTree);
+ if (createInvertedIndex) {
+ diskInvertedIndex.create(diskInvertedIndexFileId);
+ }
+ // InvertedIndex will be closed during cleanup of merge().
+ diskInvertedIndex.open(diskInvertedIndexFileId);
+ return diskInvertedIndex;
+ }
+
+ public void addFlushedComponent(Object index) {
+ diskInvertedIndexList.addFirst(index);
+ }
+
+ @Override
+ public InMemoryFreePageManager getInMemoryFreePageManager() {
+ // TODO This code should be changed more generally if IInMemoryInvertedIndex interface is defined and
+ // InMemoryBtreeInvertedIndex implements IInMemoryInvertedIndex
+ InMemoryInvertedIndex memoryBTreeInvertedIndex = (InMemoryInvertedIndex) memoryInvertedIndex;
+ return (InMemoryFreePageManager) memoryBTreeInvertedIndex.getBTree().getFreePageManager();
+ }
+
+ @Override
+ public void resetInMemoryComponent() throws HyracksDataException {
+ // TODO This code should be changed more generally if IInMemoryInvertedIndex interface is defined and
+ // InMemoryBtreeInvertedIndex implements IInMemoryInvertedIndex
+ InMemoryInvertedIndex memoryBTreeInvertedIndex = (InMemoryInvertedIndex) memoryInvertedIndex;
+ BTree memBTree = memoryBTreeInvertedIndex.getBTree();
+ InMemoryFreePageManager memFreePageManager = (InMemoryFreePageManager) memBTree.getFreePageManager();
+ memFreePageManager.reset();
+ memBTree.create(memBTree.getFileId());
+ }
+
+ @Override
+ public List<Object> getDiskComponents() {
+ return diskInvertedIndexList;
+ }
+
+ @Override
+ public ILSMComponentFinalizer getComponentFinalizer() {
+ return componentFinalizer;
+ }
+
+ @Override
+ public IInvertedListCursor createInvertedListCursor() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void openInvertedListCursor(IInvertedListCursor listCursor, ITupleReference tupleReference)
+ throws HyracksDataException, IndexException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public IBinaryComparatorFactory[] getInvListElementCmpFactories() {
+ return memoryInvertedIndex.getInvListCmpFactories();
+ }
+
+ @Override
+ public ITypeTraits[] getTypeTraits() {
+ return memoryInvertedIndex.getInvListTypeTraits();
+ }
+
+ @Override
+ public void create() throws HyracksDataException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void activate() throws HyracksDataException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void clear() throws HyracksDataException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void deactivate() throws HyracksDataException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void destroy() throws HyracksDataException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public IIndexAccessor createAccessor(IModificationOperationCallback modificationCallback,
+ ISearchOperationCallback searchCallback) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void validate() throws HyracksDataException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public long getInMemorySize() {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput) throws IndexException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public ILSMIOOperation createMergeOperation(ILSMIOOperationCallback callback) throws HyracksDataException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Object merge(List<Object> mergedComponents, ILSMIOOperation operation) throws HyracksDataException,
+ IndexException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Object flush(ILSMIOOperation operation) throws HyracksDataException, IndexException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public ILSMFlushController getFlushController() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public ILSMOperationTracker getOperationTracker() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public ILSMIOOperationScheduler getIOScheduler() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
new file mode 100644
index 0000000..e042892
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexOpContext;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMHarness;
+
+public class LSMInvertedIndexAccessor implements ILSMIndexAccessor {
+
+ protected LSMHarness lsmHarness;
+ protected IIndexOpContext ctx;
+
+ public LSMInvertedIndexAccessor(LSMHarness lsmHarness, IIndexOpContext ctx) {
+ this.lsmHarness = lsmHarness;
+ this.ctx = ctx;
+ }
+
+ public void insert(ITupleReference tuple) throws HyracksDataException, IndexException {
+ ctx.reset(IndexOp.INSERT);
+ lsmHarness.insertUpdateOrDelete(tuple, ctx);
+ }
+
+ public void update(ITupleReference tuple) throws HyracksDataException, IndexException {
+ //not supported yet
+ }
+
+ public void delete(ITupleReference tuple) throws HyracksDataException, IndexException {
+ //not supported yet
+ }
+
+ public IIndexCursor createSearchCursor() {
+ return new LSMInvertedIndexSearchCursor();
+ }
+
+ public void search(IIndexCursor cursor, ISearchPredicate searchPred) throws HyracksDataException, IndexException {
+ ctx.reset(IndexOp.SEARCH);
+ //search include in-memory components
+ lsmHarness.search(cursor, searchPred, ctx, true);
+ }
+
+ public void flush() throws HyracksDataException, IndexException {
+ lsmHarness.flush();
+ }
+
+ public void merge() throws HyracksDataException, IndexException {
+ lsmHarness.merge();
+ }
+}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexCursorInitialState.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexCursorInitialState.java
new file mode 100644
index 0000000..4b859ac
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexCursorInitialState.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import edu.uci.ics.hyracks.storage.am.common.api.ICursorInitialState;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexOpContext;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMHarness;
+import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
+
+public class LSMInvertedIndexCursorInitialState implements ICursorInitialState {
+
+ private final boolean includeMemComponent;
+ private final AtomicInteger searcherfRefCount;
+ private final LSMHarness lsmHarness;
+ private final List<IIndexAccessor> indexAccessors;
+ private final IIndexOpContext opContext;
+
+ public LSMInvertedIndexCursorInitialState(List<IIndexAccessor> indexAccessors, IIndexOpContext ctx,
+ boolean includeMemComponent, AtomicInteger searcherfRefCount, LSMHarness lsmHarness) {
+ this.indexAccessors = indexAccessors;
+ this.includeMemComponent = includeMemComponent;
+ this.searcherfRefCount = searcherfRefCount;
+ this.lsmHarness = lsmHarness;
+ this.opContext = ctx;
+ }
+
+ @Override
+ public ICachedPage getPage() {
+ return null;
+ }
+
+ @Override
+ public void setPage(ICachedPage page) {
+ }
+
+ public List<IIndexAccessor> getIndexAccessors() {
+ return indexAccessors;
+ }
+
+ public AtomicInteger getSearcherRefCount() {
+ return searcherfRefCount;
+ }
+
+ public boolean getIncludeMemComponent() {
+ return includeMemComponent;
+ }
+
+ public LSMHarness getLSMHarness() {
+ return lsmHarness;
+ }
+
+ public IIndexOpContext getOpContext() {
+ return opContext;
+ }
+}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java
new file mode 100644
index 0000000..4c893a9
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java
@@ -0,0 +1,181 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls;
+
+import java.io.File;
+import java.io.FilenameFilter;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.control.nc.io.IOManager;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMTreeFileManager;
+import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
+
+public class LSMInvertedIndexFileManager extends LSMTreeFileManager {
+
+ private static final String INVERTED_STRING = "i";
+ private static final String BTREE_STRING = "b";
+
+ private static FilenameFilter btreeFilter = new FilenameFilter() {
+ public boolean accept(File dir, String name) {
+ return !name.startsWith(".") && name.endsWith(BTREE_STRING);
+ }
+ };
+
+ private static FilenameFilter invertedFilter = new FilenameFilter() {
+ public boolean accept(File dir, String name) {
+ return !name.startsWith(".") && name.endsWith(INVERTED_STRING);
+ }
+ };
+
+ public LSMInvertedIndexFileManager(IOManager ioManager, IFileMapProvider fileMapProvider, String baseDir) {
+ super(ioManager, fileMapProvider, baseDir);
+ }
+
+ @Override
+ public Object getRelFlushFileName() {
+ String baseName = (String) super.getRelFlushFileName();
+ return new LSMInvertedFileNameComponent(baseName + SPLIT_STRING + INVERTED_STRING, baseName + SPLIT_STRING
+ + BTREE_STRING);
+
+ }
+
+ @Override
+ public Object getRelMergeFileName(String firstFileName, String lastFileName) throws HyracksDataException {
+ String baseName = (String) super.getRelMergeFileName(firstFileName, lastFileName);
+ return new LSMInvertedFileNameComponent(baseName + SPLIT_STRING + INVERTED_STRING, baseName + SPLIT_STRING
+ + BTREE_STRING);
+ }
+
+// @Override
+// public List<Object> cleanupAndGetValidFiles(Object lsmComponent, ILSMComponentFinalizer componentFinalizer) throws HyracksDataException {
+// List<Object> validFiles = new ArrayList<Object>();
+// ArrayList<ComparableFileName> allInvertedFiles = new ArrayList<ComparableFileName>();
+// ArrayList<ComparableFileName> allBTreeFiles = new ArrayList<ComparableFileName>();
+// LSMInvertedComponent component = (LSMInvertedComponent) lsmComponent;
+//
+// // Gather files from all IODeviceHandles.
+// for (IODeviceHandle dev : ioManager.getIODevices()) {
+// getValidFiles(dev, btreeFilter, component.getBTree(), componentFinalizer, allBTreeFiles);
+// HashSet<String> btreeFilesSet = new HashSet<String>();
+// for (ComparableFileName cmpFileName : allBTreeFiles) {
+// int index = cmpFileName.fileName.lastIndexOf(SPLIT_STRING);
+// btreeFilesSet.add(cmpFileName.fileName.substring(0, index));
+// }
+// // List of valid Inverted files that may or may not have a BTree buddy. Will check for buddies below.
+// ArrayList<ComparableFileName> tmpAllInvertedFiles = new ArrayList<ComparableFileName>();
+// getValidFiles(dev, invertedFilter, component.getInverted(), componentFinalizer, tmpAllInvertedFiles);
+// // Look for buddy BTrees for all valid Inverteds.
+// // If no buddy is found, delete the file, otherwise add the Inverted to allInvertedFiles.
+// for (ComparableFileName cmpFileName : tmpAllInvertedFiles) {
+// int index = cmpFileName.fileName.lastIndexOf(SPLIT_STRING);
+// String file = cmpFileName.fileName.substring(0, index);
+// if (btreeFilesSet.contains(file)) {
+// allInvertedFiles.add(cmpFileName);
+// } else {
+// // Couldn't find the corresponding BTree file; thus, delete
+// // the Inverted file.
+// File invalidInvertedFile = new File(cmpFileName.fullPath);
+// invalidInvertedFile.delete();
+// }
+// }
+// }
+// // Sanity check.
+// if (allInvertedFiles.size() != allBTreeFiles.size()) {
+// throw new HyracksDataException("Unequal number of valid Inverted and BTree files found. Aborting cleanup.");
+// }
+//
+// // Trivial cases.
+// if (allInvertedFiles.isEmpty() || allBTreeFiles.isEmpty()) {
+// return validFiles;
+// }
+//
+// if (allInvertedFiles.size() == 1 && allBTreeFiles.size() == 1) {
+// validFiles.add(new LSMInvertedFileNameComponent(allInvertedFiles.get(0).fullPath, allBTreeFiles.get(0).fullPath));
+// return validFiles;
+// }
+//
+// // Sorts files names from earliest to latest timestamp.
+// Collections.sort(allInvertedFiles);
+// Collections.sort(allBTreeFiles);
+//
+// List<ComparableFileName> validComparableInvertedFiles = new ArrayList<ComparableFileName>();
+// ComparableFileName lastInverted = allInvertedFiles.get(0);
+// validComparableInvertedFiles.add(lastInverted);
+//
+// List<ComparableFileName> validComparableBTreeFiles = new ArrayList<ComparableFileName>();
+// ComparableFileName lastBTree = allBTreeFiles.get(0);
+// validComparableBTreeFiles.add(lastBTree);
+//
+// for (int i = 1; i < allInvertedFiles.size(); i++) {
+// ComparableFileName currentInverted = allInvertedFiles.get(i);
+// ComparableFileName currentBTree = allBTreeFiles.get(i);
+// // Current start timestamp is greater than last stop timestamp.
+// if (currentInverted.interval[0].compareTo(lastInverted.interval[1]) > 0
+// && currentBTree.interval[0].compareTo(lastBTree.interval[1]) > 0) {
+// validComparableInvertedFiles.add(currentInverted);
+// validComparableBTreeFiles.add(currentBTree);
+// lastInverted = currentInverted;
+// lastBTree = currentBTree;
+// } else if (currentInverted.interval[0].compareTo(lastInverted.interval[0]) >= 0
+// && currentInverted.interval[1].compareTo(lastInverted.interval[1]) <= 0
+// && currentBTree.interval[0].compareTo(lastBTree.interval[0]) >= 0
+// && currentBTree.interval[1].compareTo(lastBTree.interval[1]) <= 0) {
+// // Invalid files are completely contained in last interval.
+// File invalidInvertedFile = new File(currentInverted.fullPath);
+// invalidInvertedFile.delete();
+// File invalidBTreeFile = new File(currentBTree.fullPath);
+// invalidBTreeFile.delete();
+// } else {
+// // This scenario should not be possible.
+// throw new HyracksDataException("Found LSM files with overlapping but not contained timetamp intervals.");
+// }
+// }
+//
+// // Sort valid files in reverse lexicographical order, such that newer
+// // files come first.
+// Collections.sort(validComparableInvertedFiles, recencyCmp);
+// Collections.sort(validComparableBTreeFiles, recencyCmp);
+//
+// Iterator<ComparableFileName> invertedFileIter = validComparableInvertedFiles.iterator();
+// Iterator<ComparableFileName> btreeFileIter = validComparableBTreeFiles.iterator();
+// while (invertedFileIter.hasNext() && btreeFileIter.hasNext()) {
+// ComparableFileName cmpInvertedFileName = invertedFileIter.next();
+// ComparableFileName cmpBTreeFileName = btreeFileIter.next();
+// validFiles.add(new LSMInvertedFileNameComponent(cmpInvertedFileName.fullPath, cmpBTreeFileName.fullPath));
+// }
+//
+// return validFiles;
+// }
+
+ public class LSMInvertedFileNameComponent {
+ private final String invertedFileName;
+ private final String btreeFileName;
+
+ LSMInvertedFileNameComponent(String invertedFileName, String btreeFileName) {
+ this.invertedFileName = invertedFileName;
+ this.btreeFileName = btreeFileName;
+ }
+
+ public String getInvertedFileName() {
+ return invertedFileName;
+ }
+
+ public String getBTreeFileName() {
+ return btreeFileName;
+ }
+ }
+
+}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java
new file mode 100644
index 0000000..3ea50e0
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls;
+
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexOpContext;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.inmemory.InMemoryInvertedIndex;
+
+public class LSMInvertedIndexOpContext implements IIndexOpContext {
+
+ private IndexOp op;
+ private final MultiComparator cmp;
+ private final int invListFieldCount;
+ private final int tokenFieldCount;
+
+ public LSMInvertedIndexOpContext(IInvertedIndex memoryInvertedIndex) {
+ InMemoryInvertedIndex memoryBTreeInvertedIndex = (InMemoryInvertedIndex)memoryInvertedIndex;
+ BTree btree = memoryBTreeInvertedIndex.getBTree();
+ this.cmp = MultiComparator.create(btree.getComparatorFactories());
+ this.invListFieldCount = memoryBTreeInvertedIndex.getInvListCmpFactories().length;
+ this.tokenFieldCount = cmp.getKeyFieldCount() - invListFieldCount;
+ }
+
+ @Override
+ public void reset() {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public void reset(IndexOp newOp) {
+ op = newOp;
+ }
+
+ public int getInvListFieldCount() {
+ return invListFieldCount;
+ }
+
+ public int getTokenFieldCount() {
+ return tokenFieldCount;
+ }
+
+ public MultiComparator getComparator() {
+ return cmp;
+ }
+}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursor.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursor.java
new file mode 100644
index 0000000..4bce429
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursor.java
@@ -0,0 +1,268 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.util.TupleUtils;
+import edu.uci.ics.hyracks.storage.am.common.api.ICursorInitialState;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMHarness;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.InvertedIndexAccessor;
+
+public class LSMInvertedIndexRangeSearchCursor implements IIndexCursor {
+
+ private LSMHarness harness;
+ private boolean includeMemComponent;
+ private AtomicInteger searcherRefCount;
+ private List<IIndexAccessor> indexAccessors;
+ private List<IIndexCursor> indexCursors;
+ private boolean flagEOF = false;
+ private boolean flagFirstNextCall = true;
+
+ private PriorityQueue<PriorityQueueElement> outputPriorityQueue;
+ private MultiComparator memoryInvertedIndexComparator;
+ private PriorityQueueComparator pqCmp;
+ private int tokenFieldCount;
+ private int invListFieldCount;
+ private ITupleReference resultTuple;
+ private BitSet closedCursors;
+
+ @Override
+ public void open(ICursorInitialState initialState, ISearchPredicate searchPred) throws HyracksDataException {
+ LSMInvertedIndexCursorInitialState lsmInitialState = (LSMInvertedIndexCursorInitialState) initialState;
+ this.harness = lsmInitialState.getLSMHarness();
+ this.includeMemComponent = lsmInitialState.getIncludeMemComponent();
+ this.searcherRefCount = lsmInitialState.getSearcherRefCount();
+ this.indexAccessors = lsmInitialState.getIndexAccessors();
+ this.indexCursors = new ArrayList<IIndexCursor>(indexAccessors.size());
+ LSMInvertedIndexOpContext opContext = (LSMInvertedIndexOpContext) lsmInitialState.getOpContext();
+ this.memoryInvertedIndexComparator = opContext.getComparator();
+ this.tokenFieldCount = opContext.getTokenFieldCount();
+ this.invListFieldCount = opContext.getInvListFieldCount();
+ closedCursors = new BitSet(indexAccessors.size());
+
+ //create all cursors
+ IIndexCursor cursor;
+ for (IIndexAccessor accessor : indexAccessors) {
+ InvertedIndexAccessor invIndexAccessor = (InvertedIndexAccessor) accessor;
+ cursor = invIndexAccessor.createRangeSearchCursor();
+ try {
+ invIndexAccessor.rangeSearch(cursor, searchPred);
+ } catch (IndexException e) {
+ throw new HyracksDataException(e);
+ }
+ indexCursors.add(cursor);
+ }
+ }
+
+ @Override
+ public boolean hasNext() throws HyracksDataException {
+
+ if (flagEOF) {
+ return false;
+ }
+
+ if (flagFirstNextCall) {
+ for (IIndexCursor c : indexCursors) {
+ if (c.hasNext()) {
+ return true;
+ }
+ }
+ } else {
+ if (outputPriorityQueue.size() > 0) {
+ return true;
+ }
+ }
+
+ flagEOF = true;
+ return false;
+ }
+
+ @Override
+ public void next() throws HyracksDataException {
+
+ PriorityQueueElement pqElement;
+ IIndexCursor cursor;
+ int cursorIndex;
+
+ if (flagEOF) {
+ return;
+ }
+
+ //When the next() is called for the first time, initialize priority queue.
+ if (flagFirstNextCall) {
+ flagFirstNextCall = false;
+
+ //create and initialize PriorityQueue
+ pqCmp = new PriorityQueueComparator(memoryInvertedIndexComparator);
+ outputPriorityQueue = new PriorityQueue<PriorityQueueElement>(indexCursors.size(), pqCmp);
+
+ //read the first tuple from each cursor and insert into outputPriorityQueue
+
+ for (int i = 0; i < indexCursors.size(); i++) {
+ cursor = indexCursors.get(i);
+ if (cursor.hasNext()) {
+ cursor.next();
+ pqElement = new PriorityQueueElement(cursor.getTuple(), i);
+ outputPriorityQueue.offer(pqElement);
+ }
+ //else {
+ // //do nothing for the cursor who reached EOF.
+ //}
+ }
+ }
+
+ //If you reach here, priority queue is set up to provide the smallest <tokenFields, invListFields>
+ //Get the smallest element from priority queue.
+ //This element will be the result tuple which will be served to the caller when getTuple() is called.
+ //Then, insert new element from the cursor where the smallest element came from.
+ pqElement = outputPriorityQueue.poll();
+ if (pqElement != null) {
+ resultTuple = pqElement.getTuple();
+ cursorIndex = pqElement.getCursorIndex();
+ cursor = indexCursors.get(cursorIndex);
+ if (cursor.hasNext()) {
+ cursor.next();
+ pqElement = new PriorityQueueElement(cursor.getTuple(), cursorIndex);
+ outputPriorityQueue.offer(pqElement);
+ } else {
+ cursor.close();
+ closedCursors.set(cursorIndex, true);
+
+// If the current cursor reached EOF, read a tuple from another cursor and insert into the priority queue.
+ for (int i = 0; i < indexCursors.size(); i++) {
+ if (closedCursors.get(i))
+ continue;
+
+ cursor = indexCursors.get(i);
+ if (cursor.hasNext()) {
+ cursor.next();
+ pqElement = new PriorityQueueElement(cursor.getTuple(), i);
+ outputPriorityQueue.offer(pqElement);
+ break;
+ } else {
+ cursor.close();
+ closedCursors.set(i, true);
+ }
+ }
+ //if (i == indexCursors.size()) {
+ // all cursors reached EOF and the only tuples that you have are in the priority queue.
+ // do nothing here!.
+ //}
+ }
+ }
+ //else {
+ // do nothing!!
+ // which means when the getTuple() is called, the pre-existing result tuple or null will be returned to the caller.
+ //}
+
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ try {
+ for (int i = 0; i < indexCursors.size(); i++) {
+ if (closedCursors.get(i)) {
+ continue;
+ }
+ indexCursors.get(i).close();
+ closedCursors.set(i, true);
+ }
+ } finally {
+ harness.closeSearchCursor(searcherRefCount, includeMemComponent);
+ }
+ }
+
+ @Override
+ public void reset() {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public ITupleReference getTuple() {
+ return resultTuple;
+ }
+
+ public class PriorityQueueComparator implements Comparator<PriorityQueueElement> {
+
+ private final MultiComparator cmp;
+
+ public PriorityQueueComparator(MultiComparator cmp) {
+ this.cmp = cmp;
+ }
+
+ @Override
+ public int compare(PriorityQueueElement elementA, PriorityQueueElement elementB) {
+ int result = cmp.compare(elementA.getTuple(), elementB.getTuple());
+ if (result != 0) {
+ return result;
+ }
+ if (elementA.getCursorIndex() > elementB.getCursorIndex()) {
+ return 1;
+ } else {
+ return -1;
+ }
+ }
+
+ public MultiComparator getMultiComparator() {
+ return cmp;
+ }
+ }
+
+ public class PriorityQueueElement {
+ private ITupleReference tuple;
+ private int cursorIndex;
+
+ public PriorityQueueElement(ITupleReference tuple, int cursorIndex) {
+// reset(tuple, cursorIndex);
+ try {
+ reset(TupleUtils.copyTuple(tuple), cursorIndex);
+ } catch (HyracksDataException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ public ITupleReference getTuple() {
+ return tuple;
+ }
+
+ public int getCursorIndex() {
+ return cursorIndex;
+ }
+
+ public void reset(ITupleReference tuple, int cursorIndex) {
+ this.tuple = tuple;
+ this.cursorIndex = cursorIndex;
+ }
+ }
+
+}
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexSearchCursor.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexSearchCursor.java
new file mode 100644
index 0000000..c5c8645
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexSearchCursor.java
@@ -0,0 +1,174 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.ICursorInitialState;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMHarness;
+
+public class LSMInvertedIndexSearchCursor implements IIndexCursor {
+
+ private int cursorIndex = -1;
+ private LSMHarness harness;
+ private boolean includeMemComponent;
+ private AtomicInteger searcherRefCount;
+ private List<IIndexAccessor> indexAccessors;
+ private List<IIndexCursor> indexCursors;// = new ArrayList<IIndexCursor>();
+ private ISearchPredicate searchPred;
+
+ public LSMInvertedIndexSearchCursor() {
+ indexCursors = new ArrayList<IIndexCursor>();
+ }
+
+ @Override
+ public void open(ICursorInitialState initialState, ISearchPredicate searchPred) throws HyracksDataException {
+ LSMInvertedIndexCursorInitialState lsmInitialState = (LSMInvertedIndexCursorInitialState) initialState;
+ harness = lsmInitialState.getLSMHarness();
+ includeMemComponent = lsmInitialState.getIncludeMemComponent();
+ searcherRefCount = lsmInitialState.getSearcherRefCount();
+ indexAccessors = lsmInitialState.getIndexAccessors();
+ indexCursors.clear();
+// indexCursors = new ArrayList<IIndexCursor>(indexAccessors.size());
+ cursorIndex = 0;
+ this.searchPred = searchPred;
+
+ IIndexAccessor currentAccessor;
+ IIndexCursor currentCursor;
+ while (cursorIndex < indexAccessors.size()) {
+ // Open cursors and perform search lazily as each component is passed over
+ currentAccessor = indexAccessors.get(cursorIndex);
+ currentCursor = currentAccessor.createSearchCursor();
+ try {
+ currentAccessor.search(currentCursor, searchPred);
+ } catch (IndexException e) {
+ throw new HyracksDataException(e);
+ }
+ indexCursors.add(currentCursor);
+
+ if (currentCursor.hasNext()) {
+ break;
+ }
+
+ // Close as we go to release any resources
+ currentCursor.close();
+ cursorIndex++;
+ }
+
+ }
+
+ @Override
+ public boolean hasNext() throws HyracksDataException {
+ IIndexAccessor currentAccessor;
+ IIndexCursor currentCursor;
+
+ if(cursorIndex >= indexAccessors.size()) {
+ return false;
+ }
+
+ currentCursor = indexCursors.get(cursorIndex);
+ if (currentCursor.hasNext()) {
+ return true;
+ } else {
+ currentCursor.close();
+ cursorIndex++;
+ while (cursorIndex < indexAccessors.size()) {
+ currentAccessor = indexAccessors.get(cursorIndex);
+ currentCursor = currentAccessor.createSearchCursor();
+ try {
+ currentAccessor.search(currentCursor, searchPred);
+ } catch (IndexException e) {
+ throw new HyracksDataException(e);
+ }
+ indexCursors.add(currentCursor);
+
+ if (currentCursor.hasNext()) {
+ return true;
+ } else {
+ currentCursor.close();
+ cursorIndex++;
+ }
+ }
+ }
+
+ return false;
+ }
+
+ @Override
+ public void next() throws HyracksDataException {
+ IIndexAccessor currentAccessor;
+ IIndexCursor currentCursor = indexCursors.get(cursorIndex);
+
+ if (currentCursor.hasNext()) {
+ currentCursor.next();
+ } else {
+ currentCursor.close();
+ cursorIndex++;
+ while (cursorIndex < indexAccessors.size()) {
+ currentAccessor = indexAccessors.get(cursorIndex);
+ currentCursor = currentAccessor.createSearchCursor();
+ try {
+ currentAccessor.search(currentCursor, searchPred);
+ } catch (IndexException e) {
+ throw new HyracksDataException(e);
+ }
+ indexCursors.add(currentCursor);
+
+ if (currentCursor.hasNext()) {
+ currentCursor.next();
+ break;
+ } else {
+ cursorIndex++;
+ }
+ }
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ cursorIndex = -1;
+ for (int i = 0; i < indexCursors.size(); i++) {
+ indexCursors.get(i).close();
+ }
+ indexCursors.clear();
+ harness.closeSearchCursor(searcherRefCount, includeMemComponent);
+ }
+
+ @Override
+ public void reset() throws HyracksDataException {
+ cursorIndex = 0;
+ for (int i = 0; i < indexCursors.size(); i++) {
+ indexCursors.get(i).reset();
+ }
+ }
+
+ @Override
+ public ITupleReference getTuple() throws HyracksDataException {
+ if (cursorIndex < indexCursors.size()) {
+ return indexCursors.get(cursorIndex).getTuple();
+ } else {
+ return null;
+ }
+ }
+
+}