Added bulk loader for lsm inverted index and test (no multi-bulk-load test yet).

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_inverted_index_updates_new@1835 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
index 560072c..fb943e7 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
@@ -40,6 +40,7 @@
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
 import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
 import edu.uci.ics.hyracks.storage.am.common.api.IndexType;
+import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponentFinalizer;
@@ -292,12 +293,6 @@
     }
     
     @Override
-    public long getInMemorySize() {
-        InMemoryBufferCache memBufferCache = (InMemoryBufferCache) memComponent.getInvIndex().getBufferCache();
-        return memBufferCache.getNumPages() * memBufferCache.getPageSize();
-    }
-
-    @Override
     public ILSMIOOperation createMergeOperation(ILSMIOOperationCallback callback) throws HyracksDataException {
         // TODO Auto-generated method stub
         return null;
@@ -305,8 +300,7 @@
     
     @Override
     public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput) throws IndexException {
-        // TODO Auto-generated method stub
-        return null;
+        return new LSMInvertedIndexBulkLoader(fillFactor, verifyInput);
     }
     
     public boolean insertUpdateOrDelete(ITupleReference tuple, IIndexOpContext ictx) throws HyracksDataException,
@@ -374,11 +368,8 @@
         }
         invIndexBulkLoader.end();
 
-        // Create an empty deleted keys BTree.
-        ITreeIndex deletedKeysBTree = createDiskTree(deletedKeysBTreeFactory, mergeOp.getDeletedKeysBTreeMergeTarget(),
-                true);
-        IIndexBulkLoader deletedKeysBulkLoader = deletedKeysBTree.createBulkLoader(1.0f, false);
-        deletedKeysBulkLoader.end();
+        // Create an empty deleted keys BTree (do nothing with the returned index).
+        createDiskTree(deletedKeysBTreeFactory, mergeOp.getDeletedKeysBTreeMergeTarget(), true);
 
         return mergedDiskInvertedIndex;
     }
@@ -459,11 +450,60 @@
         diskComponents.addFirst(index);
     }
 
-    @Override
-    public InMemoryFreePageManager getInMemoryFreePageManager() {
-        return memFreePageManager;
-    }
+    public class LSMInvertedIndexBulkLoader implements IIndexBulkLoader {
+        private final IInvertedIndex invIndex;
+        private final BTree deletedKeysBTree;
+        private final IIndexBulkLoader invIndexBulkLoader;
 
+        public LSMInvertedIndexBulkLoader(float fillFactor, boolean verifyInput) throws IndexException {
+            // Note that by using a flush target file name, we state that the
+            // new bulk loaded tree is "newer" than any other merged tree.
+            try {
+                LSMInvertedIndexFileNameComponent fileNameComponent = (LSMInvertedIndexFileNameComponent) fileManager
+                        .getRelFlushFileName();
+                FileReference dictBTreeFileRef = fileManager.createFlushFile(fileNameComponent.getDictBTreeFileName());
+                invIndex = createDiskInvIndex(diskInvIndexFactory, dictBTreeFileRef, true);
+                // Create an empty deleted-keys BTree.
+                FileReference deletedKeysBTreeFile = fileManager.createFlushFile(fileNameComponent
+                        .getDeletedKeysBTreeFileName());
+                deletedKeysBTree = (BTree) createDiskTree(deletedKeysBTreeFactory, deletedKeysBTreeFile, true);
+            } catch (HyracksDataException e) {
+                throw new TreeIndexException(e);
+            }
+            invIndexBulkLoader = invIndex.createBulkLoader(fillFactor, verifyInput);
+        }
+
+        @Override
+        public void add(ITupleReference tuple) throws IndexException, HyracksDataException {
+            try {
+                invIndexBulkLoader.add(tuple);
+            } catch (IndexException e) {
+                handleException();
+                throw e;
+            } catch (HyracksDataException e) {
+                handleException();
+                throw e;
+            } catch (RuntimeException e) {
+                handleException();
+                throw e;
+            }
+        }
+
+        protected void handleException() throws HyracksDataException {
+            invIndex.deactivate();
+            invIndex.destroy();
+            deletedKeysBTree.deactivate();
+            deletedKeysBTree.destroy();
+        }
+
+        @Override
+        public void end() throws IndexException, HyracksDataException {
+            invIndexBulkLoader.end();
+            LSMInvertedIndexComponent diskComponent = new LSMInvertedIndexComponent(invIndex, deletedKeysBTree);
+            lsmHarness.addBulkLoadedComponent(diskComponent);
+        }
+    }
+    
     @Override
     public void resetInMemoryComponent() throws HyracksDataException {
         memFreePageManager.reset();
@@ -472,6 +512,17 @@
     }
 
     @Override
+    public long getInMemorySize() {
+        InMemoryBufferCache memBufferCache = (InMemoryBufferCache) memComponent.getInvIndex().getBufferCache();
+        return memBufferCache.getNumPages() * memBufferCache.getPageSize();
+    }
+    
+    @Override
+    public InMemoryFreePageManager getInMemoryFreePageManager() {
+        return memFreePageManager;
+    }
+    
+    @Override
     public List<Object> getDiskComponents() {
         return diskComponents;
     }
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
index 91018bf..35a177f 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/ondisk/OnDiskInvertedIndex.java
@@ -391,7 +391,10 @@
 
         @Override
         public void end() throws IndexException, HyracksDataException {
-            createAndInsertBTreeTuple();
+            // The last tuple builder is empty if add() was never called.
+            if (lastTupleBuilder.getSize() != 0) {
+                createAndInsertBTreeTuple();
+            }
             btreeBulkloader.end();
 
             if (currentPage != null) {
diff --git a/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/LSMInvertedIndexBulkLoadTest.java b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/LSMInvertedIndexBulkLoadTest.java
new file mode 100644
index 0000000..7bb5f4b
--- /dev/null
+++ b/hyracks-tests/hyracks-storage-am-lsm-invertedindex-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/LSMInvertedIndexBulkLoadTest.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex;
+
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.common.AbstractInvertedIndexLoadTest;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.InvertedIndexTestContext.InvertedIndexType;
+
+public class LSMInvertedIndexBulkLoadTest extends AbstractInvertedIndexLoadTest {
+
+    public LSMInvertedIndexBulkLoadTest() {
+        super(InvertedIndexType.LSM, true);
+    }
+}