Added in-memory sorting algorithm for the in memory RTree to be used prior to flushing.

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1359 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
index a1e25ae..8370221 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
@@ -57,6 +57,7 @@
 import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeInteriorFrame;
 import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeLeafFrame;
 import edu.uci.ics.hyracks.storage.am.rtree.impls.RTree;
+import edu.uci.ics.hyracks.storage.am.rtree.impls.RTreeSearchCursor;
 import edu.uci.ics.hyracks.storage.am.rtree.impls.SearchPredicate;
 import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
 import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
@@ -89,6 +90,11 @@
     private final static int MEM_RTREE_FILE_ID = 0;
     private final static int MEM_BTREE_FILE_ID = 1;
 
+    // This is used to estimate number of tuples in the memory RTree for
+    // efficient memory allocation in the sort operation prior to flushing
+    private int memRTreeTuples = 0;
+    private RTreeTupleSorter rTreeTupleSorter = null;
+
     // On-disk components.
     private final ILSMFileManager fileManager;
     protected final IBufferCache diskBufferCache;
@@ -291,7 +297,7 @@
         if (ctx.getIndexOp() == IndexOp.PHYSICALDELETE) {
             throw new UnsupportedOperationException("Physical delete not yet supported in LSM R-tree");
         }
-        
+
         if (ctx.getIndexOp() == IndexOp.INSERT) {
             // Before each insert, we must check whether there exist a killer
             // tuple in the memBTree. If we find a killer tuple, we must truly
@@ -322,6 +328,7 @@
                 }
             }
             ctx.memRTreeAccessor.insert(tuple);
+            memRTreeTuples++;
 
         } else {
             // For each delete operation, we make sure that we run a true
@@ -386,20 +393,38 @@
 
         // scan the memory RTree
         ITreeIndexAccessor memRTreeAccessor = memComponent.getRTree().createAccessor();
-        IIndexCursor rtreeScanCursor = memRTreeAccessor.createSearchCursor();
+        RTreeSearchCursor rtreeScanCursor = (RTreeSearchCursor) memRTreeAccessor.createSearchCursor();
         SearchPredicate rtreeNullPredicate = new SearchPredicate(null, null);
         memRTreeAccessor.search(rtreeScanCursor, rtreeNullPredicate);
         LSMRTreeFileNameComponent fileNames = (LSMRTreeFileNameComponent) fileManager.getRelFlushFileName();
         FileReference rtreeFile = fileManager.createFlushFile(fileNames.getRTreeFileName());
         RTree diskRTree = (RTree) createDiskTree(diskRTreeFactory, rtreeFile, true);
 
+        if (rTreeTupleSorter == null) {
+            // TODO: Pass the Hilbert cmps here
+            rTreeTupleSorter = new RTreeTupleSorter(memRTreeTuples, MEM_RTREE_FILE_ID, rtreeCmpFactories,
+                    rtreeLeafFrameFactory.createFrame(), rtreeLeafFrameFactory.createFrame(), memComponent.getRTree()
+                            .getBufferCache());
+        } else {
+            rTreeTupleSorter.reset();
+        }
         // BulkLoad the tuples from the in-memory tree into the new disk RTree.
         IIndexBulkLoadContext rtreeBulkLoadCtx = diskRTree.beginBulkLoad(1.0f);
 
         try {
             while (rtreeScanCursor.hasNext()) {
                 rtreeScanCursor.next();
-                ITupleReference frameTuple = rtreeScanCursor.getTuple();
+                rTreeTupleSorter.insertTupleEntry(rtreeScanCursor.getPageId(), rtreeScanCursor.getTupleOffset());
+            }
+        } finally {
+            rtreeScanCursor.close();
+        }
+        rTreeTupleSorter.sort();
+
+        try {
+            while (rTreeTupleSorter.hasNext()) {
+                rTreeTupleSorter.next();
+                ITupleReference frameTuple = rTreeTupleSorter.getTuple();
                 diskRTree.bulkLoadAddTuple(frameTuple, rtreeBulkLoadCtx);
             }
         } finally {
@@ -515,6 +540,7 @@
         memComponent.getRTree().create(MEM_RTREE_FILE_ID);
         memComponent.getBTree().create(MEM_BTREE_FILE_ID);
         memFreePageManager.reset();
+        memRTreeTuples = 0;
     }
 
     @Override
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/RTreeTupleSorter.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/RTreeTupleSorter.java
new file mode 100644
index 0000000..ab6eb59
--- /dev/null
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/RTreeTupleSorter.java
@@ -0,0 +1,188 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.rtree.impls;
+
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleReference;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
+import edu.uci.ics.hyracks.storage.common.file.BufferedFileHandle;
+
+public class RTreeTupleSorter {
+    private final IBinaryComparator[] comparators;
+    private int numTuples;
+    private int currentTupleIndex;
+    private int[] tPointers;
+    private IBufferCache bufferCache;
+    private final ITreeIndexFrame leafFrame1;
+    private final ITreeIndexFrame leafFrame2;
+    private ITreeIndexTupleReference frameTuple1;
+    private ITreeIndexTupleReference frameTuple2;
+    private final int fileId;
+    private final static int ARRAY_GROWTH = 1000; // Must be at least of size 2
+
+    public RTreeTupleSorter(int initialSize, int fileId, IBinaryComparatorFactory[] comparatorFactories,
+            ITreeIndexFrame leafFrame1, ITreeIndexFrame leafFrame2, IBufferCache bufferCache) {
+        this.fileId = fileId;
+        comparators = new IBinaryComparator[comparatorFactories.length];
+        for (int i = 0; i < comparatorFactories.length; ++i) {
+            comparators[i] = comparatorFactories[i].createBinaryComparator();
+        }
+        this.leafFrame1 = leafFrame1;
+        this.leafFrame2 = leafFrame2;
+        this.bufferCache = bufferCache;
+        tPointers = new int[initialSize * 2];
+        frameTuple1 = leafFrame1.createTupleReference();
+        frameTuple2 = leafFrame2.createTupleReference();
+        currentTupleIndex = 0;
+    }
+
+    public void reset() {
+        numTuples = 0;
+        currentTupleIndex = 0;
+    }
+
+    public boolean hasNext() throws HyracksDataException {
+        if (numTuples <= currentTupleIndex) {
+            return false;
+        }
+        ICachedPage node1 = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, tPointers[currentTupleIndex * 2]),
+                false);
+        leafFrame1.setPage(node1);
+        frameTuple1.resetByTupleOffset(leafFrame1.getBuffer(), tPointers[currentTupleIndex * 2 + 1]);
+
+        return true;
+    }
+
+    public void next() {
+        currentTupleIndex++;
+    }
+
+    public ITupleReference getTuple() {
+        return frameTuple1;
+    }
+
+    public void insertTupleEntry(int pageId, int tupleOffset) {
+        if (numTuples * 2 == tPointers.length) {
+            int[] newData = new int[tPointers.length + ARRAY_GROWTH];
+            System.arraycopy(tPointers, 0, newData, 0, tPointers.length);
+            tPointers = newData;
+        }
+
+        tPointers[numTuples * 2] = pageId;
+        tPointers[numTuples * 2 + 1] = tupleOffset;
+        numTuples++;
+    }
+
+    public void sort() throws HyracksDataException {
+        sort(tPointers, 0, numTuples);
+    }
+
+    private void sort(int[] tPointers, int offset, int length) throws HyracksDataException {
+        int m = offset + (length >> 1);
+        int mi = tPointers[m * 2];
+        int mj = tPointers[m * 2 + 1];
+
+        int a = offset;
+        int b = a;
+        int c = offset + length - 1;
+        int d = c;
+        while (true) {
+            while (b <= c) {
+                int cmp = compare(tPointers, b, mi, mj);
+                if (cmp > 0) {
+                    break;
+                }
+                if (cmp == 0) {
+                    swap(tPointers, a++, b);
+                }
+                ++b;
+            }
+            while (c >= b) {
+                int cmp = compare(tPointers, c, mi, mj);
+                if (cmp < 0) {
+                    break;
+                }
+                if (cmp == 0) {
+                    swap(tPointers, c, d--);
+                }
+                --c;
+            }
+            if (b > c)
+                break;
+            swap(tPointers, b++, c--);
+        }
+
+        int s;
+        int n = offset + length;
+        s = Math.min(a - offset, b - a);
+        vecswap(tPointers, offset, b - s, s);
+        s = Math.min(d - c, n - d - 1);
+        vecswap(tPointers, b, n - s, s);
+
+        if ((s = b - a) > 1) {
+            sort(tPointers, offset, s);
+        }
+        if ((s = d - c) > 1) {
+            sort(tPointers, n - s, s);
+        }
+    }
+
+    private void swap(int x[], int a, int b) {
+        for (int i = 0; i < 2; ++i) {
+            int t = x[a * 2 + i];
+            x[a * 2 + i] = x[b * 2 + i];
+            x[b * 2 + i] = t;
+        }
+    }
+
+    private void vecswap(int x[], int a, int b, int n) {
+        for (int i = 0; i < n; i++, a++, b++) {
+            swap(x, a, b);
+        }
+    }
+
+    private int compare(int[] tPointers, int tp1, int tp2i, int tp2j) throws HyracksDataException {
+        int i1 = tPointers[tp1 * 2];
+        int j1 = tPointers[tp1 * 2 + 1];
+
+        int i2 = tp2i;
+        int j2 = tp2j;
+
+        ICachedPage node1 = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, i1), false);
+        leafFrame1.setPage(node1);
+        ICachedPage node2 = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, i2), false);
+        leafFrame2.setPage(node2);
+
+        frameTuple1.resetByTupleOffset(leafFrame1.getBuffer(), j1);
+        frameTuple2.resetByTupleOffset(leafFrame2.getBuffer(), j2);
+
+        for (int f = 0; f < comparators.length; ++f) {
+            int c = comparators[f].compare(frameTuple1.getFieldData(f), frameTuple1.getFieldStart(f),
+                    frameTuple1.getFieldLength(f), frameTuple2.getFieldData(f), frameTuple2.getFieldStart(f),
+                    frameTuple2.getFieldLength(f));
+            if (c != 0) {
+                return c;
+            }
+        }
+        return 0;
+    }
+
+}
\ No newline at end of file