Started implementing a simple merge.

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1045 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-lsmtree-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsmtree/rtree/impls/LSMRTree.java b/hyracks-storage-am-lsmtree-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsmtree/rtree/impls/LSMRTree.java
index 2c837bf..251c5a0 100644
--- a/hyracks-storage-am-lsmtree-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsmtree/rtree/impls/LSMRTree.java
+++ b/hyracks-storage-am-lsmtree-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsmtree/rtree/impls/LSMRTree.java
@@ -61,7 +61,6 @@
     private final RTreeFactory rTreeFactory;
     private final BTreeFactory bTreeFactory;
     private final IFileMapManager fileMapManager;
-    private int threadReferenceCounter;
     private int threadRefCount;
     private boolean flushFlag;
 
@@ -84,7 +83,6 @@
         this.inDiskBTreeList = new LinkedList<ITreeIndex>();
         this.inDiskTreeCounter = 0;
         this.fileMapManager = fileMapManager;
-        this.threadReferenceCounter = 0;
         this.threadRefCount = 0;
         this.created = false;
         this.flushFlag = false;
@@ -195,7 +193,106 @@
 
     @Override
     public void merge() throws Exception {
-        // TODO Auto-generated method stub
+
+        // Cursor setting -- almost the same as search, only difference is
+        // "no cursor for in-memory tree"
+        int numberOfInDiskTrees;   
+        synchronized (inDiskRTreeList) {
+            numberOfInDiskTrees = inDiskRTreeList.size();
+        }
+        
+        RTreeSearchCursor[] rtreeCursors = new RTreeSearchCursor[numberOfInDiskTrees];
+        BTreeRangeSearchCursor[] btreeCursors = new BTreeRangeSearchCursor[numberOfInDiskTrees];
+        
+        for (int i = 0; i < numberOfInDiskTrees; i++) {
+            rtreeCursors[i] = new RTreeSearchCursor((IRTreeInteriorFrame) rtreeInteriorFrameFactory.createFrame(),
+                    (IRTreeLeafFrame) rtreeLeafFrameFactory.createFrame());
+
+            btreeCursors[i] = new BTreeRangeSearchCursor((IBTreeLeafFrame) btreeLeafFrameFactory.createFrame(), false);
+        }
+        
+        // Create a new in-Disk RTree
+        // Register the RTree information into system.
+        inDiskTreeCounter++;
+        FileReference rtreeFile = new FileReference(new File(getNextFileName(rtreeFileName, inDiskTreeCounter)));
+        // TODO: Delete the file during cleanup.
+        bufferCache.createFile(rtreeFile);
+        int newDiskRTreeId = fileMapManager.lookupFileId(rtreeFile);
+        // TODO: Close the file during cleanup.
+        bufferCache.openFile(newDiskRTreeId);
+
+        // Create new in-Disk RTree.
+        RTree mergedRTree = (RTree) rTreeFactory.createIndexInstance(newDiskRTreeId);
+        mergedRTree.create(newDiskRTreeId);
+        // TODO: Close the RTree during cleanup.
+        mergedRTree.open(newDiskRTreeId);
+
+        
+        // Create a new in-Disk BTree, which have full fillfactor.
+        // Register the BTree information into system.
+        FileReference btreeFile = new FileReference(new File(getNextFileName(btreeFileName, inDiskTreeCounter)));
+        // TODO: Delete the file during cleanup.
+        bufferCache.createFile(btreeFile);
+        int newDiskBTreeId = fileMapManager.lookupFileId(btreeFile);
+        // TODO: Close the file during cleanup.
+        bufferCache.openFile(newDiskBTreeId);
+
+        // Create new in-Disk BTree.
+        BTree mergedBTree = (BTree) bTreeFactory.createIndexInstance(newDiskBTreeId);
+        mergedBTree.create(newDiskBTreeId);
+        // TODO: Close the BTree during cleanup.
+        mergedBTree.open(newDiskBTreeId);
+        
+        // BulkLoad the tuples from the RTree into the new disk RTree.
+        IIndexBulkLoadContext rtreeBulkLoadCtx = mergedRTree.beginBulkLoad(1.0f);
+        
+        // BulkLoad the tuples from the in-memory tree into the new disk BTree.
+        IIndexBulkLoadContext btreeBulkLoadCtx = mergedBTree.beginBulkLoad(1.0f);
+        
+        for (int i = 0; i < numberOfInDiskTrees; i++) {
+            
+            // scan the RTrees
+            ITreeIndexCursor rtreeScanCursor = new RTreeSearchCursor(
+                    (IRTreeInteriorFrame) rtreeInteriorFrameFactory.createFrame(),
+                    (IRTreeLeafFrame) rtreeLeafFrameFactory.createFrame());
+            SearchPredicate rtreeNullPredicate = new SearchPredicate(null, null);
+
+            ITreeIndexAccessor onDiskRTreeAccessor = inDiskRTreeList.get(i).createAccessor();
+            onDiskRTreeAccessor.search(rtreeScanCursor, rtreeNullPredicate);
+
+            try {
+                while (rtreeScanCursor.hasNext()) {
+                    rtreeScanCursor.next();
+                    ITupleReference frameTuple = rtreeScanCursor.getTuple();
+                    mergedRTree.bulkLoadAddTuple(frameTuple, rtreeBulkLoadCtx);
+                }
+            } finally {
+                rtreeScanCursor.close();
+            }
+           
+
+            // scan the BTrees
+            ITreeIndexCursor btreeScanCursor = new BTreeRangeSearchCursor(
+                    (IBTreeLeafFrame) btreeLeafFrameFactory.createFrame(), false);
+            RangePredicate btreeNullPredicate = new RangePredicate(true, null, null, true, true, null, null);
+            ITreeIndexAccessor onDiskBTreeAccessor = inDiskBTreeList.get(i).createAccessor();
+            onDiskBTreeAccessor.search(btreeScanCursor, btreeNullPredicate);
+            
+            try {
+                while (btreeScanCursor.hasNext()) {
+                    btreeScanCursor.next();
+                    ITupleReference frameTuple = btreeScanCursor.getTuple();
+                    mergedBTree.bulkLoadAddTuple(frameTuple, btreeBulkLoadCtx);
+                }
+            } finally {
+                btreeScanCursor.close();
+            }
+            
+        }
+        mergedRTree.endBulkLoad(rtreeBulkLoadCtx);
+        mergedBTree.endBulkLoad(btreeBulkLoadCtx);
+
+        // TODO: complete the merge code
 
     }
 
@@ -207,10 +304,10 @@
         ITreeIndexCursor rtreeScanCursor = new RTreeSearchCursor(
                 (IRTreeInteriorFrame) rtreeInteriorFrameFactory.createFrame(),
                 (IRTreeLeafFrame) rtreeLeafFrameFactory.createFrame());
-        SearchPredicate searchPredicate = new SearchPredicate(null, null);
+        SearchPredicate rtreeNullPredicate = new SearchPredicate(null, null);
 
         ITreeIndexAccessor memRTreeAccessor = memRTree.createAccessor();
-        memRTreeAccessor.search(rtreeScanCursor, searchPredicate);
+        memRTreeAccessor.search(rtreeScanCursor, rtreeNullPredicate);
 
         // Create a new in-Disk RTree
 
@@ -299,21 +396,6 @@
         memRTree.create(rtreeFileId);
         memBTree.create(btreeFileId);
     }
-
-    public void decreaseThreadReferenceCounter() throws Exception {
-        synchronized (this) {
-            threadReferenceCounter--;
-            if (flushFlag == true) {
-                if (threadReferenceCounter == 0) {
-                    flush();
-                    flushFlag = false;
-                    return;
-                } else if (threadReferenceCounter < 0) {
-                    throw new Error("Thread reference counter is below zero. This indicates a programming error!");
-                }
-            }
-        }
-    }
     
     public void threadEnter() {
         threadRefCount++;
@@ -379,7 +461,7 @@
         while (continuePerformOp == false) {
             synchronized (this) {
                 if (!flushFlag) {
-                    threadReferenceCounter++;
+                    threadRefCount++;
                     continuePerformOp = true;
                 }
             }
diff --git a/hyracks-storage-am-lsmtree-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsmtree/rtree/impls/LSMRTreeSearchCursor.java b/hyracks-storage-am-lsmtree-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsmtree/rtree/impls/LSMRTreeSearchCursor.java
index 17f2d95..e8f29d4 100644
--- a/hyracks-storage-am-lsmtree-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsmtree/rtree/impls/LSMRTreeSearchCursor.java
+++ b/hyracks-storage-am-lsmtree-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsmtree/rtree/impls/LSMRTreeSearchCursor.java
@@ -19,8 +19,8 @@
 
 public class LSMRTreeSearchCursor implements ITreeIndexCursor {
 
-    private BTreeRangeSearchCursor[] btreeCursors;
     private RTreeSearchCursor[] rtreeCursors;
+    private BTreeRangeSearchCursor[] btreeCursors;
     private BTree.BTreeAccessor memBtreeAccessor;
     private ITreeIndexAccessor[] onDiskBTreeAccessors;
     private int currentCursror;
@@ -100,15 +100,8 @@
         onDiskBTreeAccessors = new ITreeIndexAccessor[numberOfTrees - 1];
 
         for (int i = 0; i < numberOfTrees; i++) {
-            if (i < ((LSMRTreeCursorInitialState) initialState).getNumberOfTrees() - 1) { // we
-                                                                                          // already
-                                                                                          // have
-                                                                                          // an
-                                                                                          // accessor
-                                                                                          // for
-                                                                                          // the
-                                                                                          // in-memory
-                                                                                          // b-tree
+            // we already have an accessor for the in-memory b-tree
+            if (i < numberOfTrees - 1) {
                 onDiskBTreeAccessors[i] = lsmRTree.getInDiskBTreeList().get(i).createAccessor();
             }
 
@@ -131,7 +124,7 @@
 
     @Override
     public void close() throws Exception {
-        lsmRTree.decreaseThreadReferenceCounter();
+        lsmRTree.threadExit();
         for (int i = 0; i < numberOfTrees; i++) {
             rtreeCursors[i].close();
             btreeCursors[i].close();