Added the split method
Fixed various bugs

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_indexes@386 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/MultiComparator.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/MultiComparator.java
index 7ed20bc..4b3445e 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/MultiComparator.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/ophelpers/MultiComparator.java
@@ -24,6 +24,7 @@
 import edu.uci.ics.hyracks.api.dataflow.value.ITypeTrait;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.comparators.IntegerBinaryComparatorFactory;
 
 public class MultiComparator {
 
@@ -31,6 +32,12 @@
 
     private IBinaryComparator[] cmps = null;
     private ITypeTrait[] typeTraits;
+    
+    private IBinaryComparator intCmp = IntegerBinaryComparatorFactory.INSTANCE.createBinaryComparator();
+
+    public IBinaryComparator getIntCmp() {
+        return intCmp;
+    }
 
     public MultiComparator(ITypeTrait[] typeTraits, IBinaryComparator[] cmps) {
         this.typeTraits = typeTraits;
diff --git a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/api/IRTreeFrame.java b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/api/IRTreeFrame.java
index c8f5231..f95eb74 100644
--- a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/api/IRTreeFrame.java
+++ b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/api/IRTreeFrame.java
@@ -1,16 +1,33 @@
 package edu.uci.ics.hyracks.storage.am.rtree.api;
 
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.ISplitKey;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleReference;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
-import edu.uci.ics.hyracks.storage.am.rtree.impls.RTreeSplitKey;
+import edu.uci.ics.hyracks.storage.am.rtree.impls.Rectangle;
+import edu.uci.ics.hyracks.storage.am.rtree.impls.TupleEntryArrayList;
 
 public interface IRTreeFrame extends ITreeIndexFrame {
 
-    public int getChildPageId(ITupleReference tuple, MultiComparator cmp);
+    public int getRightPage();
 
-    public void adjustNode(ITreeIndexTupleReference[] tuples, MultiComparator cmp);
+    public void setRightPage(int rightPage);
 
-    public void adjustTuple(ITupleReference tuple, MultiComparator cmp);
+    public int getChildPageId(ITupleReference tuple, TupleEntryArrayList entries, ITreeIndexTupleReference[] nodesMBRs,
+            MultiComparator cmp);
+
+    public void adjustNodeMBR(ITreeIndexTupleReference[] tuples, MultiComparator cmp);
+
+    public void adjustKey(ITupleReference tuple, MultiComparator cmp);
+
+    public int split(ITreeIndexFrame rightFrame, ITupleReference tuple, MultiComparator cmp, ISplitKey splitKey,
+            TupleEntryArrayList entries1, TupleEntryArrayList entries2, Rectangle[] rec) throws Exception;
+
+    public void reinsert(ITupleReference tuple, ITreeIndexTupleReference nodeMBR, TupleEntryArrayList entries,
+            ISplitKey splitKey, MultiComparator cmp) throws Exception;
+
+    public Rectangle intersect(ITupleReference tuple, int tupleIndex, MultiComparator cmp);
+
+    public int getChildPageIdIfIntersect(ITupleReference tuple, int tupleIndex, MultiComparator cmp);
 }
diff --git a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/NSMRTreeFrame.java b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/NSMRTreeFrame.java
index 9c37434..3735dd8 100644
--- a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/NSMRTreeFrame.java
+++ b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/NSMRTreeFrame.java
@@ -1,12 +1,9 @@
 package edu.uci.ics.hyracks.storage.am.rtree.frames;
 
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 
-import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
-import edu.uci.ics.hyracks.dataflow.common.data.comparators.IntegerBinaryComparatorFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.DoubleSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import edu.uci.ics.hyracks.storage.am.common.api.ISplitKey;
@@ -16,22 +13,61 @@
 import edu.uci.ics.hyracks.storage.am.common.frames.TreeIndexNSMFrame;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
 import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeFrame;
+import edu.uci.ics.hyracks.storage.am.rtree.impls.EntriesOrder;
 import edu.uci.ics.hyracks.storage.am.rtree.impls.RTreeSplitKey;
+import edu.uci.ics.hyracks.storage.am.rtree.impls.Rectangle;
+import edu.uci.ics.hyracks.storage.am.rtree.impls.TupleEntryArrayList;
 import edu.uci.ics.hyracks.storage.am.rtree.impls.UnorderedSlotManager;
 import edu.uci.ics.hyracks.storage.am.rtree.tuples.RTreeTypeAwareTupleWriter;
 
 public class NSMRTreeFrame extends TreeIndexNSMFrame implements IRTreeFrame {
 
-    public ITreeIndexTupleReference[] tuples;
-    private final IBinaryComparator intComp = IntegerBinaryComparatorFactory.INSTANCE.createBinaryComparator();
+    protected static final int pageNsnOff = smFlagOff + 1;
+    private static final int rightPageOff = pageNsnOff + 4;
+
+    private ITreeIndexTupleReference[] tuples;
+
+    private ITreeIndexTupleReference cmpFrameTuple;
+
+    private static final double reinsertFactor = 0.3;
+    private static final double splitFactor = 0.4;
+    private static final int nearMinimumOverlapFactor = 32;
 
     public NSMRTreeFrame(ITreeIndexTupleWriter tupleWriter) {
         super(tupleWriter, new UnorderedSlotManager());
-        this.tuples = new ITreeIndexTupleReference[4]; // change this to number
-                                                       // of dim * 2
+        // TODO: change this to number of dim * 2 (at least it must be 4)
+        this.tuples = new ITreeIndexTupleReference[4];
         for (int i = 0; i < 4; i++) {
             this.tuples[i] = tupleWriter.createTupleReference();
         }
+        cmpFrameTuple = tupleWriter.createTupleReference();
+    }
+    
+    @Override
+    public void initBuffer(byte level) {
+        super.initBuffer(level);
+        buf.putInt(pageNsnOff, -1);
+        buf.putInt(rightPageOff, -1);
+    }
+
+    public void setTupleCount(int tupleCount) {
+        buf.putInt(tupleCountOff, tupleCount);
+    }
+
+    @Override
+    protected void resetSpaceParams() {
+        buf.putInt(freeSpaceOff, rightPageOff + 4);
+        buf.putInt(totalFreeSpaceOff, buf.capacity() - (rightPageOff + 4));
+    }
+
+    @Override
+    public int getRightPage() {
+        return buf.getInt(rightPageOff);
+    }
+
+    @Override
+    public void setRightPage(int rightPage) {
+        buf.putInt(rightPageOff, rightPage);
     }
 
     public ITreeIndexTupleReference[] getTuples() {
@@ -60,73 +96,333 @@
         return ret;
     }
 
+    // @Override
+    // public int split(ITreeIndexFrame rightFrame, ITupleReference tuple,
+    // MultiComparator cmp, ISplitKey splitKey)
+    // throws Exception {
+    //
+    // RTreeSplitKey rTreeSplitKey = ((RTreeSplitKey) splitKey);
+    // RTreeTypeAwareTupleWriter rTreeTupleWriterLeftFrame =
+    // ((RTreeTypeAwareTupleWriter) tupleWriter);
+    // RTreeTypeAwareTupleWriter rTreeTupleWriterRightFrame =
+    // ((RTreeTypeAwareTupleWriter) rightFrame.getTupleWriter());
+    // frameTuple.setFieldCount(cmp.getFieldCount());
+    // rightFrame.setPageTupleFieldCount(cmp.getFieldCount());
+    //
+    // ByteBuffer right = rightFrame.getBuffer();
+    // int tupleCount = getTupleCount();
+    //
+    // int tuplesToLeft;
+    // int mid = tupleCount / 2;
+    // ITreeIndexFrame targetFrame = null;
+    // int tupleOff = slotManager.getTupleOff(slotManager.getSlotOff(mid));
+    // frameTuple.resetByTupleOffset(buf, tupleOff);
+    // if (cmp.compare(tuple, frameTuple) >= 0) {
+    // tuplesToLeft = mid + (tupleCount % 2);
+    // targetFrame = rightFrame;
+    // } else {
+    // tuplesToLeft = mid;
+    // targetFrame = this;
+    // }
+    // int tuplesToRight = tupleCount - tuplesToLeft;
+    //
+    // // copy entire page
+    // System.arraycopy(buf.array(), 0, right.array(), 0, buf.capacity());
+    //
+    // // on right page we need to copy rightmost slots to left
+    // int src = rightFrame.getSlotManager().getSlotEndOff();
+    // int dest = rightFrame.getSlotManager().getSlotEndOff() + tuplesToLeft
+    // * rightFrame.getSlotManager().getSlotSize();
+    // int length = rightFrame.getSlotManager().getSlotSize() * tuplesToRight;
+    // System.arraycopy(right.array(), src, right.array(), dest, length);
+    // right.putInt(tupleCountOff, tuplesToRight);
+    //
+    // // on left page only change the tupleCount indicator
+    // buf.putInt(tupleCountOff, tuplesToLeft);
+    //
+    // // compact both pages
+    // rightFrame.compact(cmp);
+    // compact(cmp);
+    //
+    // // insert last key
+    // targetFrame.insert(tuple, cmp);
+    //
+    // // set split key to be highest value in left page
+    // // TODO: find a better way to find the key size
+    // tupleOff = slotManager.getTupleOff(slotManager.getSlotEndOff());
+    // frameTuple.resetByTupleOffset(buf, tupleOff);
+    //
+    // int splitKeySize = tupleWriter.bytesRequired(frameTuple, 0,
+    // cmp.getKeyFieldCount());
+    // splitKey.initData(splitKeySize);
+    // this.adjustNodeMBR(tuples, cmp);
+    // rTreeTupleWriterLeftFrame.writeTupleFields(tuples, 0,
+    // rTreeSplitKey.getLeftPageBuffer(), 0);
+    // rTreeSplitKey.getLeftTuple().resetByTupleOffset(rTreeSplitKey.getLeftPageBuffer(),
+    // 0);
+    //
+    // ((IRTreeFrame) rightFrame).adjustNodeMBR(((NSMRTreeFrame)
+    // rightFrame).getTuples(), cmp);
+    // rTreeTupleWriterRightFrame.writeTupleFields(((NSMRTreeFrame)
+    // rightFrame).getTuples(), 0,
+    // rTreeSplitKey.getRightPageBuffer(), 0);
+    // rTreeSplitKey.getRightTuple().resetByTupleOffset(rTreeSplitKey.getRightPageBuffer(),
+    // 0);
+    //
+    // return 0;
+    // }
+
     @Override
-    public int split(ITreeIndexFrame rightFrame, ITupleReference tuple, MultiComparator cmp, ISplitKey splitKey)
-            throws Exception {
+    public int split(ITreeIndexFrame rightFrame, ITupleReference tuple, MultiComparator cmp, ISplitKey splitKey,
+            TupleEntryArrayList entries1, TupleEntryArrayList entries2, Rectangle[] rec) throws Exception {
+
+        // RTreeSplitKey rTreeSplitKey = ((RTreeSplitKey) splitKey);
+        // RTreeTypeAwareTupleWriter rTreeTupleWriterLeftFrame =
+        // ((RTreeTypeAwareTupleWriter) tupleWriter);
+        // RTreeTypeAwareTupleWriter rTreeTupleWriterRightFrame =
+        // ((RTreeTypeAwareTupleWriter) rightFrame.getTupleWriter());
+        // frameTuple.setFieldCount(cmp.getFieldCount());
+        // rightFrame.setPageTupleFieldCount(cmp.getFieldCount());
+        //
+        // ByteBuffer right = rightFrame.getBuffer();
+        // int tupleCount = getTupleCount();
+        //
+        // int tuplesToLeft;
+        // int mid = tupleCount / 2;
+        // ITreeIndexFrame targetFrame = null;
+        // int tupleOff = slotManager.getTupleOff(slotManager.getSlotOff(mid));
+        // frameTuple.resetByTupleOffset(buf, tupleOff);
+        // if (cmp.compare(tuple, frameTuple) >= 0) {
+        // tuplesToLeft = mid + (tupleCount % 2);
+        // targetFrame = rightFrame;
+        // } else {
+        // tuplesToLeft = mid;
+        // targetFrame = this;
+        // }
+        // int tuplesToRight = tupleCount - tuplesToLeft;
+        //
+        // // copy entire page
+        // System.arraycopy(buf.array(), 0, right.array(), 0, buf.capacity());
+        //
+        // // on right page we need to copy rightmost slots to left
+        // int src = rightFrame.getSlotManager().getSlotEndOff();
+        // int dest = rightFrame.getSlotManager().getSlotEndOff() + tuplesToLeft
+        // * rightFrame.getSlotManager().getSlotSize();
+        // int length = rightFrame.getSlotManager().getSlotSize() *
+        // tuplesToRight;
+        // System.arraycopy(right.array(), src, right.array(), dest, length);
+        // right.putInt(tupleCountOff, tuplesToRight);
+        //
+        // // on left page only change the tupleCount indicator
+        // buf.putInt(tupleCountOff, tuplesToLeft);
+        //
+        // // compact both pages
+        // rightFrame.compact(cmp);
+        // compact(cmp);
+        //
+        // // insert last key
+        // targetFrame.insert(tuple, cmp);
+        //
+        // // set split key to be highest value in left page
+        // // TODO: find a better way to find the key size
+        // tupleOff = slotManager.getTupleOff(slotManager.getSlotEndOff());
+        // frameTuple.resetByTupleOffset(buf, tupleOff);
+        //
+        // int splitKeySize = tupleWriter.bytesRequired(frameTuple, 0,
+        // cmp.getKeyFieldCount());
+        // splitKey.initData(splitKeySize);
+        // this.adjustNodeMBR(tuples, cmp);
+        // rTreeTupleWriterLeftFrame.writeTupleFields(tuples, 0,
+        // rTreeSplitKey.getLeftPageBuffer(), 0);
+        // rTreeSplitKey.getLeftTuple().resetByTupleOffset(rTreeSplitKey.getLeftPageBuffer(),
+        // 0);
+        //
+        // ((IRTreeFrame) rightFrame).adjustNodeMBR(((NSMRTreeFrame)
+        // rightFrame).getTuples(), cmp);
+        // rTreeTupleWriterRightFrame.writeTupleFields(((NSMRTreeFrame)
+        // rightFrame).getTuples(), 0,
+        // rTreeSplitKey.getRightPageBuffer(), 0);
+        // rTreeSplitKey.getRightTuple().resetByTupleOffset(rTreeSplitKey.getRightPageBuffer(),
+        // 0);
+        //
+        // return 0;
 
         RTreeSplitKey rTreeSplitKey = ((RTreeSplitKey) splitKey);
         RTreeTypeAwareTupleWriter rTreeTupleWriterLeftFrame = ((RTreeTypeAwareTupleWriter) tupleWriter);
         RTreeTypeAwareTupleWriter rTreeTupleWriterRightFrame = ((RTreeTypeAwareTupleWriter) rightFrame.getTupleWriter());
-        frameTuple.setFieldCount(cmp.getFieldCount());
         rightFrame.setPageTupleFieldCount(cmp.getFieldCount());
+        frameTuple.setFieldCount(cmp.getFieldCount());
+        cmpFrameTuple.setFieldCount(cmp.getFieldCount());
 
-        ByteBuffer right = rightFrame.getBuffer();
-        int tupleCount = getTupleCount();
+        // calculations are based on the R*-tree paper
+        int m = (int) Math.floor((getTupleCount() + 1) * splitFactor);
+        int splitDistribution = getTupleCount() - (2 * m) + 2;
 
-        int tuplesToLeft;
-        int mid = tupleCount / 2;
-        ITreeIndexFrame targetFrame = null;
-        int tupleOff = slotManager.getTupleOff(slotManager.getSlotOff(mid));
-        frameTuple.resetByTupleOffset(buf, tupleOff);
-        if (cmp.compare(tuple, frameTuple) >= 0) {
-            tuplesToLeft = mid + (tupleCount % 2);
-            targetFrame = rightFrame;
-        } else {
-            tuplesToLeft = mid;
-            targetFrame = this;
+        // to calculate the minimum margin in order to pick the split axis
+        double minMargin = Double.MAX_VALUE;
+        int splitAxis = 0, sortOrder = 0;
+
+        int maxFieldPos = cmp.getKeyFieldCount() / 2;
+        for (int i = 0; i < maxFieldPos; i++) {
+            int j = maxFieldPos + i;
+            for (int k = 0; k < getTupleCount(); ++k) {
+
+                frameTuple.resetByTupleIndex(this, k);
+
+                double LowerKey = DoubleSerializerDeserializer.getDouble(frameTuple.getFieldData(i),
+                        frameTuple.getFieldStart(i));
+                double UpperKey = DoubleSerializerDeserializer.getDouble(frameTuple.getFieldData(j),
+                        frameTuple.getFieldStart(j));
+
+                entries1.add(k, LowerKey);
+                entries2.add(k, UpperKey);
+            }
+            double LowerKey = DoubleSerializerDeserializer.getDouble(tuple.getFieldData(i), tuple.getFieldStart(i));
+            double UpperKey = DoubleSerializerDeserializer.getDouble(tuple.getFieldData(j), tuple.getFieldStart(j));
+
+            entries1.add(-1, LowerKey);
+            entries2.add(-1, UpperKey);
+
+            entries1.sort(EntriesOrder.ASCENDING, getTupleCount() + 1);
+            entries2.sort(EntriesOrder.ASCENDING, getTupleCount() + 1);
+
+            double lowerMargin = 0.0, upperMargin = 0.0;
+            // generate distribution
+            for (int k = 1; k <= splitDistribution; ++k) {
+                int d = m - 1 + k;
+
+                generateDist(tuple, entries1, rec[0], 0, d);
+                generateDist(tuple, entries2, rec[1], 0, d);
+                generateDist(tuple, entries1, rec[2], d, getTupleCount() + 1);
+                generateDist(tuple, entries2, rec[3], d, getTupleCount() + 1);
+
+                // calculate the margin of the distributions
+                lowerMargin += rec[0].margin() + rec[2].margin();
+                upperMargin += rec[1].margin() + rec[3].margin();
+            }
+            double margin = Math.min(lowerMargin, upperMargin);
+
+            // store minimum margin as split axis
+            if (margin < minMargin) {
+                minMargin = margin;
+                splitAxis = i;
+                sortOrder = (lowerMargin < upperMargin) ? 0 : 2;
+            }
+
+            entries1.clear();
+            entries2.clear();
         }
-        int tuplesToRight = tupleCount - tuplesToLeft;
 
-        // copy entire page
-        System.arraycopy(buf.array(), 0, right.array(), 0, buf.capacity());
+        for (int i = 0; i < getTupleCount(); ++i) {
+            frameTuple.resetByTupleIndex(this, i);
+            double key = DoubleSerializerDeserializer.getDouble(frameTuple.getFieldData(splitAxis + sortOrder),
+                    frameTuple.getFieldStart(splitAxis + sortOrder));
+            entries1.add(i, key);
+        }
+        double key = DoubleSerializerDeserializer.getDouble(tuple.getFieldData(splitAxis + sortOrder),
+                tuple.getFieldStart(splitAxis + sortOrder));
+        entries1.add(-1, key);
+        entries1.sort(EntriesOrder.ASCENDING, getTupleCount() + 1);
 
-        // on right page we need to copy rightmost slots to left
-        int src = rightFrame.getSlotManager().getSlotEndOff();
-        int dest = rightFrame.getSlotManager().getSlotEndOff() + tuplesToLeft
-                * rightFrame.getSlotManager().getSlotSize();
-        int length = rightFrame.getSlotManager().getSlotSize() * tuplesToRight;
-        System.arraycopy(right.array(), src, right.array(), dest, length);
-        right.putInt(tupleCountOff, tuplesToRight);
+        double minArea = Double.MAX_VALUE;
+        double minOverlap = Double.MAX_VALUE;
+        int splitPoint = 0;
+        for (int i = 1; i <= splitDistribution; ++i) {
+            int d = m - 1 + i;
 
-        // on left page only change the tupleCount indicator
-        buf.putInt(tupleCountOff, tuplesToLeft);
+            generateDist(tuple, entries1, rec[0], 0, d);
+            generateDist(tuple, entries1, rec[2], d, getTupleCount() + 1);
+
+            double overlap = rec[0].overlappedArea(rec[2]);
+            if (overlap < minOverlap) {
+                splitPoint = d;
+                minOverlap = overlap;
+                minArea = rec[0].area() + rec[2].area();
+            } else if (overlap == minOverlap) {
+                double area = rec[0].area() + rec[2].area();
+                if (area < minArea) {
+                    splitPoint = d;
+                    minArea = area;
+                }
+            }
+        }
+        int startIndex, endIndex;
+        if (splitPoint < (getTupleCount() + 1) / 2) {
+            startIndex = 0;
+            endIndex = splitPoint;
+        } else {
+            startIndex = splitPoint;
+            endIndex = (getTupleCount() + 1);
+        }
+        boolean tupleInserted = false;
+        int totalBytes = 0, numOfDeletedTuples = 0;
+        for (int i = startIndex; i < endIndex; i++) { // TODO: is there a better
+                                                      // way
+            // to split the entries?
+            if (entries1.get(i).getTupleIndex() != -1) {
+                frameTuple.resetByTupleIndex(this, entries1.get(i).getTupleIndex());
+                rightFrame.insert(frameTuple, cmp);
+                ((UnorderedSlotManager) slotManager).modifySlot(
+                        slotManager.getSlotOff(entries1.get(i).getTupleIndex()), -1);
+                totalBytes += tupleWriter.bytesRequired(frameTuple);
+                numOfDeletedTuples++;
+            } else {
+                rightFrame.insert(tuple, cmp);
+                tupleInserted = true;
+            }
+        }
+
+        ((UnorderedSlotManager) slotManager).deleteEmptySlots();
+
+        // maintain space information
+        buf.putInt(totalFreeSpaceOff, buf.getInt(totalFreeSpaceOff) + totalBytes
+                + (slotManager.getSlotSize() * numOfDeletedTuples));
+
+        if (!tupleInserted) {
+            insert(tuple, cmp);
+        }
 
         // compact both pages
         rightFrame.compact(cmp);
         compact(cmp);
 
-        // insert last key
-        targetFrame.insert(tuple, cmp);
-
-        // set split key to be highest value in left page
-        // TODO: find a better way to find the key size
-        tupleOff = slotManager.getTupleOff(slotManager.getSlotEndOff());
+        int tupleOff = slotManager.getTupleOff(slotManager.getSlotEndOff());
         frameTuple.resetByTupleOffset(buf, tupleOff);
-
         int splitKeySize = tupleWriter.bytesRequired(frameTuple, 0, cmp.getKeyFieldCount());
+
         splitKey.initData(splitKeySize);
-        this.adjustNode(tuples, cmp);
+        this.adjustNodeMBR(tuples, cmp);
         rTreeTupleWriterLeftFrame.writeTupleFields(tuples, 0, rTreeSplitKey.getLeftPageBuffer(), 0);
         rTreeSplitKey.getLeftTuple().resetByTupleOffset(rTreeSplitKey.getLeftPageBuffer(), 0);
 
-        ((IRTreeFrame) rightFrame).adjustNode(((NSMRTreeFrame) rightFrame).getTuples(), cmp);
+        ((IRTreeFrame) rightFrame).adjustNodeMBR(((NSMRTreeFrame) rightFrame).getTuples(), cmp);
         rTreeTupleWriterRightFrame.writeTupleFields(((NSMRTreeFrame) rightFrame).getTuples(), 0,
                 rTreeSplitKey.getRightPageBuffer(), 0);
         rTreeSplitKey.getRightTuple().resetByTupleOffset(rTreeSplitKey.getRightPageBuffer(), 0);
 
+        entries1.clear();
         return 0;
     }
 
+    public void generateDist(ITupleReference tuple, TupleEntryArrayList entries, Rectangle rec, int start, int end) {
+        int j = 0;
+        while (entries.get(j).getTupleIndex() == -1) {
+            j++;
+        }
+        frameTuple.resetByTupleIndex(this, entries.get(j).getTupleIndex());
+        rec.set(frameTuple);
+        for (int i = start; i < end; ++i) {
+            if (i != j) {
+                if (entries.get(i).getTupleIndex() != -1) {
+                    frameTuple.resetByTupleIndex(this, entries.get(i).getTupleIndex());
+                    rec.enlarge(frameTuple);
+                } else {
+                    rec.enlarge(tuple);
+                }
+            }
+        }
+    }
+
     @Override
     public void insertSorted(ITupleReference tuple, MultiComparator cmp) throws HyracksDataException {
         try {
@@ -137,44 +433,183 @@
     }
 
     @Override
-    public int getChildPageId(ITupleReference tuple, MultiComparator cmp) {
-        // find least overlap enlargement, use minimum enlarged area to
-        // break tie, if tie still exists use minimum area to break it
+    public int getChildPageId(ITupleReference tuple, TupleEntryArrayList entries, ITreeIndexTupleReference[] nodesMBRs,
+            MultiComparator cmp) {
+
+        cmpFrameTuple.setFieldCount(cmp.getFieldCount());
+        frameTuple.setFieldCount(cmp.getFieldCount());
 
         int bestChild = 0;
         double minEnlargedArea = Double.MAX_VALUE;
 
-        // if (getLevel() == 1) {
-        for (int i = 0; i < getTupleCount(); i++) {
-            frameTuple.resetByTupleIndex(this, i);
-            double enlargedArea = enlargedArea(frameTuple, tuple, cmp);
-            if (enlargedArea < minEnlargedArea) {
-                minEnlargedArea = enlargedArea;
-                bestChild = i;
-            }
-
-            else if (enlargedArea == minEnlargedArea) {
-                double area = area(frameTuple, cmp);
-                frameTuple.resetByTupleIndex(this, bestChild);
-                double bestArea = area(frameTuple, cmp);
-                if (area < bestArea) {
+        // the children pointers in the node point to leaves
+        if (getLevel() == 1) {
+            // find least overlap enlargement, use minimum enlarged area to
+            // break tie, if tie still exists use minimum area to break it
+            for (int i = 0; i < getTupleCount(); ++i) {
+                frameTuple.resetByTupleIndex(this, i);
+                double enlargedArea = enlargedArea(frameTuple, tuple, cmp);
+                entries.add(i, enlargedArea);
+                if (enlargedArea < minEnlargedArea) {
+                    minEnlargedArea = enlargedArea;
                     bestChild = i;
                 }
             }
-        }
-        // } else { // find minimum enlarged area, use minimum area to break
-        // tie
+            if (minEnlargedArea < entries.getDoubleEpsilon() || minEnlargedArea > entries.getDoubleEpsilon()) {
+                minEnlargedArea = Double.MAX_VALUE;
+                int k;
+                if (getTupleCount() > nearMinimumOverlapFactor) {
+                    // sort the entries based on their area enlargement needed
+                    // to include the object
+                    entries.sort(EntriesOrder.ASCENDING, getTupleCount());
+                    k = nearMinimumOverlapFactor;
+                } else {
+                    k = getTupleCount();
+                }
 
-        // }
+                double minOverlap = Double.MAX_VALUE;
+                int id = 0;
+                for (int i = 0; i < k; ++i) {
+                    double difference = 0.0;
+                    for (int j = 0; j < getTupleCount(); ++j) {
+                        frameTuple.resetByTupleIndex(this, j);
+                        cmpFrameTuple.resetByTupleIndex(this, entries.get(i).getTupleIndex());
+
+                        int c = cmp.getIntCmp().compare(frameTuple.getFieldData(cmp.getKeyFieldCount()),
+                                frameTuple.getFieldStart(cmp.getKeyFieldCount()),
+                                frameTuple.getFieldLength(cmp.getKeyFieldCount()),
+                                cmpFrameTuple.getFieldData(cmp.getKeyFieldCount()),
+                                cmpFrameTuple.getFieldStart(cmp.getKeyFieldCount()),
+                                cmpFrameTuple.getFieldLength(cmp.getKeyFieldCount()));
+                        if (c != 0) {
+                            double intersection = overlappedArea(frameTuple, tuple, cmpFrameTuple, cmp);
+                            if (intersection != 0.0) {
+                                difference += intersection - overlappedArea(frameTuple, null, cmpFrameTuple, cmp);
+                            }
+                        } else {
+                            id = j;
+                        }
+                    }
+
+                    double enlargedArea = enlargedArea(cmpFrameTuple, tuple, cmp);
+                    if (difference < minOverlap) {
+                        minOverlap = difference;
+                        minEnlargedArea = enlargedArea;
+                        bestChild = id;
+                    } else if (difference == minOverlap) {
+                        if (enlargedArea < minEnlargedArea) {
+                            minEnlargedArea = enlargedArea;
+                            bestChild = id;
+                        } else if (enlargedArea == minEnlargedArea) {
+                            double area = area(cmpFrameTuple, cmp);
+                            frameTuple.resetByTupleIndex(this, bestChild);
+                            double minArea = area(frameTuple, cmp);
+                            if (area < minArea) {
+                                bestChild = id;
+                            }
+                        }
+                    }
+                }
+            }
+        } else { // find minimum enlarged area, use minimum area to break tie
+            for (int i = 0; i < getTupleCount(); i++) {
+                frameTuple.resetByTupleIndex(this, i);
+                double enlargedArea = enlargedArea(frameTuple, tuple, cmp);
+                if (enlargedArea < minEnlargedArea) {
+                    minEnlargedArea = enlargedArea;
+                    bestChild = i;
+                } else if (enlargedArea == minEnlargedArea) {
+                    double area = area(frameTuple, cmp);
+                    frameTuple.resetByTupleIndex(this, bestChild);
+                    double minArea = area(frameTuple, cmp);
+                    if (area < minArea) {
+                        bestChild = i;
+                    }
+                }
+            }
+        }
         frameTuple.resetByTupleIndex(this, bestChild);
         if (minEnlargedArea > 0.0) {
             enlarge(frameTuple, tuple, cmp);
         }
+        nodesMBRs[(int) getLevel() - 1].resetByTupleIndex(this, bestChild);
+
+        entries.clear();
 
         // return the page id of the bestChild tuple
         return buf.getInt(frameTuple.getFieldStart(cmp.getKeyFieldCount()));
     }
 
+    @Override
+    public void reinsert(ITupleReference tuple, ITreeIndexTupleReference nodeMBR, TupleEntryArrayList entries,
+            ISplitKey splitKey, MultiComparator cmp) throws Exception {
+
+        nodeMBR.setFieldCount(cmp.getFieldCount());
+        int maxFieldPos = cmp.getKeyFieldCount() / 2;
+        for (int i = 0; i < getTupleCount() + 1; ++i) {
+            if (i < getTupleCount()) {
+                frameTuple.resetByTupleIndex(this, i);
+            }
+            double centerDistance = 0.0;
+            for (int j = 0; j < maxFieldPos; j++) {
+                int k = maxFieldPos + j;
+
+                // TODO: an optimization can be done here, compute nodeCenter
+                // only once and reuse it
+                double nodeCenter = (DoubleSerializerDeserializer.getDouble(nodeMBR.getFieldData(j),
+                        nodeMBR.getFieldStart(j)) + DoubleSerializerDeserializer.getDouble(nodeMBR.getFieldData(k),
+                        nodeMBR.getFieldStart(k))) / 2.0;
+
+                double childCenter;
+                if (i < getTupleCount()) {
+                    childCenter = (DoubleSerializerDeserializer.getDouble(frameTuple.getFieldData(j),
+                            frameTuple.getFieldStart(j)) + DoubleSerializerDeserializer.getDouble(
+                            frameTuple.getFieldData(k), frameTuple.getFieldStart(k))) / 2.0;
+                } else { // special case to deal with the new tuple to be
+                         // inserted
+                    childCenter = (DoubleSerializerDeserializer
+                            .getDouble(tuple.getFieldData(j), tuple.getFieldStart(j)) + DoubleSerializerDeserializer
+                            .getDouble(tuple.getFieldData(k), tuple.getFieldStart(k))) / 2.0;
+                }
+
+                double d = childCenter - nodeCenter;
+                centerDistance += d * d;
+            }
+            if (i < getTupleCount()) {
+                entries.add(i, centerDistance);
+            } else { // special case to deal with the new tuple to be inserted
+                entries.add(-1, centerDistance);
+            }
+
+        }
+        entries.sort(EntriesOrder.DESCENDING, getTupleCount() + 1);
+
+        int j = (int) Math.floor((getTupleCount() + 1) * reinsertFactor);
+        for (int i = 0; i < j; i++) {
+            if (entries.get(i).getTupleIndex() != -1) {
+                frameTuple.resetByTupleIndex(this, i);
+                delete(frameTuple, cmp, false);
+            } else {
+                delete(tuple, cmp, false);
+            }
+
+        }
+
+        // rebuild the node's MBR
+        RTreeSplitKey rTreeSplitKey = ((RTreeSplitKey) splitKey);
+        RTreeTypeAwareTupleWriter rTreeTupleWriterLeftFrame = ((RTreeTypeAwareTupleWriter) tupleWriter);
+        this.adjustNodeMBR(tuples, cmp);
+
+        int tupleOff = slotManager.getTupleOff(slotManager.getSlotEndOff());
+        frameTuple.resetByTupleOffset(buf, tupleOff);
+        int splitKeySize = tupleWriter.bytesRequired(frameTuple, 0, cmp.getKeyFieldCount());
+        rTreeSplitKey.initData(splitKeySize);
+
+        rTreeTupleWriterLeftFrame.writeTupleFields(tuples, 0, rTreeSplitKey.getLeftPageBuffer(), 0);
+
+        entries.clear();
+    }
+
     public double area(ITupleReference tuple, MultiComparator cmp) {
         double area = 1.0;
         int maxFieldPos = cmp.getKeyFieldCount() / 2;
@@ -186,6 +621,54 @@
         return area;
     }
 
+    public double overlappedArea(ITupleReference tuple1, ITupleReference tupleToBeInserted, ITupleReference tuple2,
+            MultiComparator cmp) {
+        double area = 1.0;
+        double f1, f2;
+
+        int maxFieldPos = cmp.getKeyFieldCount() / 2;
+        for (int i = 0; i < maxFieldPos; i++) {
+            int j = maxFieldPos + i;
+            double pHigh1, pLow1;
+            if (tupleToBeInserted != null) {
+                int c = cmp.getComparators()[i].compare(tuple1.getFieldData(i), tuple1.getFieldStart(i),
+                        tuple1.getFieldLength(i), tupleToBeInserted.getFieldData(i),
+                        tupleToBeInserted.getFieldStart(i), tupleToBeInserted.getFieldLength(i));
+                if (c < 0) {
+                    pLow1 = DoubleSerializerDeserializer.getDouble(tuple1.getFieldData(i), tuple1.getFieldStart(i));
+                } else {
+                    pLow1 = DoubleSerializerDeserializer.getDouble(tupleToBeInserted.getFieldData(i),
+                            tupleToBeInserted.getFieldStart(i));
+                }
+
+                c = cmp.getComparators()[j].compare(tuple1.getFieldData(j), tuple1.getFieldStart(j),
+                        tuple1.getFieldLength(j), tupleToBeInserted.getFieldData(j),
+                        tupleToBeInserted.getFieldStart(j), tupleToBeInserted.getFieldLength(j));
+                if (c > 0) {
+                    pHigh1 = DoubleSerializerDeserializer.getDouble(tuple1.getFieldData(j), tuple1.getFieldStart(j));
+                } else {
+                    pHigh1 = DoubleSerializerDeserializer.getDouble(tupleToBeInserted.getFieldData(j),
+                            tupleToBeInserted.getFieldStart(j));
+                }
+            } else {
+                pLow1 = DoubleSerializerDeserializer.getDouble(tuple1.getFieldData(i), tuple1.getFieldStart(i));
+                pHigh1 = DoubleSerializerDeserializer.getDouble(tuple1.getFieldData(j), tuple1.getFieldStart(j));
+            }
+
+            double pLow2 = DoubleSerializerDeserializer.getDouble(tuple2.getFieldData(i), tuple2.getFieldStart(i));
+            double pHigh2 = DoubleSerializerDeserializer.getDouble(tuple2.getFieldData(j), tuple2.getFieldStart(j));
+
+            if (pLow1 > pHigh2 || pHigh1 < pLow2) {
+                return 0.0;
+            }
+
+            f1 = Math.max(pLow1, pLow2);
+            f2 = Math.min(pHigh1, pHigh2);
+            area *= f2 - f1;
+        }
+        return area;
+    }
+
     public double enlargedArea(ITupleReference tuple, ITupleReference tupleToBeInserted, MultiComparator cmp) {
         double areaBeforeEnlarge = area(tuple, cmp);
         double areaAfterEnlarge = 1.0;
@@ -240,24 +723,16 @@
     }
 
     @Override
-    public void adjustTuple(ITupleReference tuple, MultiComparator cmp) {
+    public void adjustKey(ITupleReference tuple, MultiComparator cmp) {
         frameTuple.setFieldCount(cmp.getFieldCount());
-        for (int i = 0; i < getTupleCount(); i++) {
-            frameTuple.resetByTupleIndex(this, i);
-
-            int c = intComp.compare(frameTuple.getFieldData(cmp.getKeyFieldCount()),
-                    frameTuple.getFieldStart(cmp.getKeyFieldCount()),
-                    frameTuple.getFieldLength(cmp.getKeyFieldCount()), tuple.getFieldData(cmp.getKeyFieldCount()),
-                    tuple.getFieldStart(cmp.getKeyFieldCount()), tuple.getFieldLength(cmp.getKeyFieldCount()));
-            if (c == 0) {
-                tupleWriter.writeTuple(tuple, buf, getTupleOffset(i));
-                break;
-            }
+        int tupleIndex = slotManager.findTupleIndex(tuple, frameTuple, cmp, null, null);
+        if (tupleIndex != -1) {
+            tupleWriter.writeTuple(tuple, buf, getTupleOffset(tupleIndex));
         }
     }
 
     @Override
-    public void adjustNode(ITreeIndexTupleReference[] tuples, MultiComparator cmp) {
+    public void adjustNodeMBR(ITreeIndexTupleReference[] tuples, MultiComparator cmp) {
         for (int i = 0; i < tuples.length; i++) {
             tuples[i].setFieldCount(cmp.getKeyFieldCount());
             tuples[i].resetByTupleIndex(this, 0);
@@ -283,4 +758,59 @@
             }
         }
     }
+
+    @Override
+    public int getChildPageIdIfIntersect(ITupleReference tuple, int tupleIndex, MultiComparator cmp) {
+        frameTuple.setFieldCount(cmp.getFieldCount());
+        frameTuple.resetByTupleIndex(this, tupleIndex);
+        int maxFieldPos = cmp.getKeyFieldCount() / 2;
+        for (int i = 0; i < maxFieldPos; i++) {
+            int j = maxFieldPos + i;
+            int c = cmp.getComparators()[i].compare(tuple.getFieldData(i), tuple.getFieldStart(i),
+                    tuple.getFieldLength(i), frameTuple.getFieldData(j), frameTuple.getFieldStart(j),
+                    frameTuple.getFieldLength(j));
+            if (c > 0) {
+                return -1;
+            }
+            c = cmp.getComparators()[i].compare(tuple.getFieldData(j), tuple.getFieldStart(j), tuple.getFieldLength(j),
+                    frameTuple.getFieldData(i), frameTuple.getFieldStart(i), frameTuple.getFieldLength(i));
+
+            if (c < 0) {
+                return -1;
+            }
+        }
+        return buf.getInt(frameTuple.getFieldStart(cmp.getKeyFieldCount()));
+    }
+
+    @Override
+    public Rectangle intersect(ITupleReference tuple, int tupleIndex, MultiComparator cmp) {
+        frameTuple.setFieldCount(cmp.getFieldCount());
+        frameTuple.resetByTupleIndex(this, tupleIndex);
+        int maxFieldPos = cmp.getKeyFieldCount() / 2;
+        for (int i = 0; i < maxFieldPos; i++) {
+            int j = maxFieldPos + i;
+            int c = cmp.getComparators()[i].compare(tuple.getFieldData(i), tuple.getFieldStart(i),
+                    tuple.getFieldLength(i), frameTuple.getFieldData(j), frameTuple.getFieldStart(j),
+                    frameTuple.getFieldLength(j));
+            if (c > 0) {
+                return null;
+            }
+            c = cmp.getComparators()[i].compare(tuple.getFieldData(j), tuple.getFieldStart(j), tuple.getFieldLength(j),
+                    frameTuple.getFieldData(i), frameTuple.getFieldStart(i), frameTuple.getFieldLength(i));
+
+            if (c < 0) {
+                return null;
+            }
+        }
+        Rectangle rec = new Rectangle(maxFieldPos);
+        rec.set(frameTuple);
+        return rec;
+    }
+
+    @Override
+    public int split(ITreeIndexFrame rightFrame, ITupleReference tuple, MultiComparator cmp, ISplitKey splitKey)
+            throws Exception {
+        // TODO Auto-generated method stub
+        return 0;
+    }
 }
diff --git a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/ByteArrayList.java b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/ByteArrayList.java
index 7fa5ba4..c6aebea 100644
--- a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/ByteArrayList.java
+++ b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/ByteArrayList.java
@@ -31,13 +31,17 @@
     }
 
     // WARNING: caller is responsible for checking size > 0
-    public int getLast() {
+    public byte getLast() {
         return data[size - 1];
     }
 
-    public int get(int i) {
+    public byte get(int i) {
         return data[i];
     }
+    
+    public void set(int i, byte b) {
+        data[i] = b;
+    }
 
     public void clear() {
         size = 0;
diff --git a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/EntriesOrder.java b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/EntriesOrder.java
new file mode 100644
index 0000000..a04f23e
--- /dev/null
+++ b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/EntriesOrder.java
@@ -0,0 +1,5 @@
+package edu.uci.ics.hyracks.storage.am.rtree.impls;
+
+public enum EntriesOrder {
+    ASCENDING, DESCENDING
+}
diff --git a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/IntArrayList.java b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/IntArrayList.java
new file mode 100644
index 0000000..1cc95d1
--- /dev/null
+++ b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/IntArrayList.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.rtree.impls;
+
+public class IntArrayList {
+    private int[] data;
+    private int size;
+    private final int growth;
+
+    public IntArrayList(int initialCapacity, int growth) {
+        data = new int[initialCapacity];
+        size = 0;
+        this.growth = growth;
+    }
+
+    public int size() {
+        return size;
+    }
+
+    public void add(int i) {
+        if (size == data.length) {
+            int[] newData = new int[data.length + growth];
+            System.arraycopy(data, 0, newData, 0, data.length);
+            data = newData;
+        }
+
+        data[size++] = i;
+    }
+
+    public void removeLast() {
+        if (size > 0)
+            size--;
+    }
+
+    // WARNING: caller is responsible for checking size > 0
+    public int getLast() {
+        return data[size - 1];
+    }
+
+    public int get(int i) {
+        return data[i];
+    }
+
+    public void clear() {
+        size = 0;
+    }
+
+    public boolean isEmpty() {
+        return size == 0;
+    }
+}
diff --git a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTree.java b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTree.java
index 573aea2..c0d1501 100644
--- a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTree.java
+++ b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTree.java
@@ -1,11 +1,10 @@
 package edu.uci.ics.hyracks.storage.am.rtree.impls;
 
 import java.util.ArrayList;
+import java.util.Stack;
 
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.DoubleSerializerDeserializer;
-import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
 import edu.uci.ics.hyracks.storage.am.common.api.IFreePageManager;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrame;
 import edu.uci.ics.hyracks.storage.am.common.frames.FrameOpSpaceStatus;
@@ -21,10 +20,11 @@
 public class RTree {
 
     private boolean created = false;
-    private final int metaDataPage = 0; // page containing meta data, e.g.,
-                                        // maxPage
     private final int rootPage = 1; // the root page never changes
 
+    private int nsn = 0; // Global node sequence number
+    private int numOfPages = 1;
+
     private final IFreePageManager freePageManager;
     private final IBufferCache bufferCache;
     private int fileId;
@@ -33,6 +33,7 @@
     private final IRTreeFrameFactory leafFrameFactory;
     private final MultiComparator interiorCmp;
     private final MultiComparator leafCmp;
+    public int dim;
 
     public int rootSplits = 0;
     public int[] splitsByLevel = new int[500];
@@ -43,15 +44,17 @@
     public long pins = 0;
     public long unpins = 0;
     public byte currentLevel = 0;
+    public long totalTuplesInserted = 0;
 
     public RTree(IBufferCache bufferCache, IFreePageManager freePageManager, IRTreeFrameFactory interiorFrameFactory,
-            IRTreeFrameFactory leafFrameFactory, MultiComparator interiorCmp, MultiComparator leafCmp) {
+            IRTreeFrameFactory leafFrameFactory, MultiComparator interiorCmp, MultiComparator leafCmp, int dim) {
         this.bufferCache = bufferCache;
         this.freePageManager = freePageManager;
         this.interiorFrameFactory = interiorFrameFactory;
         this.leafFrameFactory = leafFrameFactory;
         this.interiorCmp = interiorCmp;
         this.leafCmp = leafCmp;
+        this.dim = dim;
     }
 
     public String printStats() {
@@ -65,12 +68,17 @@
         strBuilder.append(String.format("READ LATCHES:  %10d %10d\n", readLatchesAcquired, readLatchesReleased));
         strBuilder.append(String.format("WRITE LATCHES: %10d %10d\n", writeLatchesAcquired, writeLatchesReleased));
         strBuilder.append(String.format("PINS:          %10d %10d\n", pins, unpins));
+
+        strBuilder.append(String.format("Num of Pages:          %10d\n", numOfPages));
+
         return strBuilder.toString();
     }
 
     public void printTree(IRTreeFrame leafFrame, IRTreeFrame interiorFrame, ISerializerDeserializer[] fields)
             throws Exception {
+        totalTuplesInserted = 0;
         printTree(rootPage, null, false, leafFrame, interiorFrame, fields);
+        System.out.println(totalTuplesInserted);
     }
 
     public void printTree(int pageId, ICachedPage parent, boolean unpin, IRTreeFrame leafFrame,
@@ -112,6 +120,7 @@
                     printTree(children.get(i), node, i == children.size() - 1, leafFrame, interiorFrame, fields);
                 }
             } else {
+                totalTuplesInserted += interiorFrame.getTupleCount();
                 node.releaseReadLatch();
                 readLatchesReleased++;
                 bufferCache.unpin(node);
@@ -168,7 +177,7 @@
     public RTreeOpContext createOpContext(TreeIndexOp op, IRTreeFrame interiorFrame, IRTreeFrame leafFrame,
             ITreeIndexMetaDataFrame metaFrame) {
         // TODO: figure out better tree-height hint
-        return new RTreeOpContext(op, interiorFrame, leafFrame, metaFrame, 6);
+        return new RTreeOpContext(op, interiorFrame, leafFrame, metaFrame, 8, dim);
     }
 
     private void createNewRoot(RTreeOpContext ctx) throws Exception {
@@ -247,6 +256,9 @@
         ctx.splitKey.getLeftTuple().setFieldCount(interiorCmp.getFieldCount());
         ctx.splitKey.getRightTuple().setFieldCount(interiorCmp.getFieldCount());
         ctx.interiorFrame.setPageTupleFieldCount(interiorCmp.getFieldCount());
+        for (int i = 0; i < currentLevel; i++) {
+            ctx.overflowArray.add((byte) 0);
+        }
         insertImpl(rootPage, null, (byte) 0, ctx);
 
         // we split the root, here is the key for a new root
@@ -255,6 +267,43 @@
         }
     }
 
+    // public void insert(ITupleReference tuple, RTreeOpContext ctx) throws
+    // Exception {
+    // ctx.reset();
+    // ctx.setTuple(tuple);
+    // ctx.splitKey.reset();
+    // ctx.splitKey.getLeftTuple().setFieldCount(interiorCmp.getFieldCount());
+    // ctx.splitKey.getRightTuple().setFieldCount(interiorCmp.getFieldCount());
+    // ctx.interiorFrame.setPageTupleFieldCount(interiorCmp.getFieldCount());
+    // for (int i = 0; i < currentLevel; i++) {
+    // ctx.overflowArray.add((byte) 0);
+    // }
+    //
+    // findLeaf(ctx);
+    //
+    //
+    // insertImpl(rootPage, null, (byte) 0, ctx);
+    //
+    // // we split the root, here is the key for a new root
+    // if (ctx.splitKey.getLeftPageBuffer() != null) {
+    // createNewRoot(ctx);
+    // }
+    // }
+    //
+    // public void findLeaf(RTreeOpContext ctx) throws Exception {
+    // ICachedPage node =
+    // bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, rootPage),
+    // false);
+    // pins++;
+    // while (true) {
+    // boolean isLeaf = ctx.interiorFrame.isLeaf();
+    // acquireLatch(node, ctx.op, isLeaf);
+    // }
+    //
+    //
+    //
+    // }
+
     public void insertImpl(int pageId, ICachedPage parent, byte desiredLevel, RTreeOpContext ctx) throws Exception {
         ICachedPage node = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, pageId), false);
         pins++;
@@ -272,9 +321,13 @@
 
         // the children pointers in the node point to leaves
         if (ctx.interiorFrame.getLevel() > desiredLevel) {
-            int childPageId = ctx.interiorFrame.getChildPageId(ctx.getTuple(), interiorCmp);
-            insertImpl(childPageId, node, desiredLevel, ctx);
+            int childPageId = ctx.interiorFrame.getChildPageId(ctx.getTuple(), ctx.tupleEntries1, ctx.nodesMBRs,
+                    interiorCmp);
+            if (childPageId < 0) {
+                System.out.println();
+            }
 
+            insertImpl(childPageId, node, desiredLevel, ctx);
             if (ctx.splitKey.getLeftPageBuffer() != null) {
                 node = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, pageId), false);
                 pins++;
@@ -282,11 +335,8 @@
                 writeLatchesAcquired++;
                 try {
                     ctx.interiorFrame.setPage(node);
-                    ctx.interiorFrame.adjustTuple(ctx.splitKey.getLeftTuple(), interiorCmp);
+                    ctx.interiorFrame.adjustKey(ctx.splitKey.getLeftTuple(), interiorCmp);
                     insertTuple(pageId, ctx.splitKey.getRightTuple(), ctx, isLeaf);
-                    // RTreeSplitKey splitKey =
-                    // ctx.rightSplitKey.duplicate(ctx.interiorFrame.getTupleWriter().createTupleReference());
-                    // insertTuple(pageId, splitKey.getTuple(), ctx, isLeaf);
                 } finally {
                     node.releaseWriteLatch();
                     writeLatchesReleased++;
@@ -309,6 +359,10 @@
             }
         }
     }
+    
+    private void findLeaf() {
+        
+    }
 
     private void insertTuple(int pageId, ITupleReference tuple, RTreeOpContext ctx, boolean isLeaf) throws Exception {
         FrameOpSpaceStatus spaceStatus;
@@ -342,6 +396,7 @@
             }
 
             case INSUFFICIENT_SPACE: {
+                System.out.println("Split");
                 int rightPageId = freePageManager.getFreePage(ctx.metaFrame);
                 ICachedPage rightNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, rightPageId), true);
                 pins++;
@@ -349,15 +404,33 @@
                 writeLatchesAcquired++;
 
                 try {
+                    // if the level is not the root level and this is the
+                    // first overflow treatment in the given level during the
+                    // insertion of one data rectangle
+                    /*
+                     * if (ctx.overflowArray.get((int)
+                     * ctx.interiorFrame.getLevel()) == 0 && pageId != rootPage)
+                     * { if (!isLeaf) { ctx.overflowArray.set((int)
+                     * ctx.interiorFrame.getLevel(), (byte) 1);
+                     * ctx.interiorFrame.reinsert(tuple,
+                     * ctx.nodesMBRs[ctx.interiorFrame.getLevel()], ctx.entries,
+                     * ctx.splitKey, interiorCmp); } else {
+                     * ctx.overflowArray.set(0, (byte) 1);
+                     * ctx.leafFrame.reinsert(tuple, ctx.nodesMBRs[0],
+                     * ctx.entries, ctx.splitKey, leafCmp); } } else {
+                     */
                     IRTreeFrame rightFrame;
                     int ret;
+                    numOfPages++; // debug
                     if (!isLeaf) {
                         splitsByLevel[ctx.interiorFrame.getLevel()]++; // debug
                         rightFrame = interiorFrameFactory.getFrame();
                         rightFrame.setPage(rightNode);
                         rightFrame.initBuffer((byte) ctx.interiorFrame.getLevel());
                         rightFrame.setPageTupleFieldCount(interiorCmp.getFieldCount());
-                        ret = ctx.interiorFrame.split(rightFrame, tuple, interiorCmp, ctx.splitKey);
+                        ret = ctx.interiorFrame.split(rightFrame, tuple, interiorCmp, ctx.splitKey, ctx.tupleEntries1,
+                                ctx.tupleEntries2, ctx.rec);
+                        ctx.interiorFrame.setRightPage(rightPageId);
                         rightFrame.setPageLsn(rightFrame.getPageLsn() + 1);
                         ctx.interiorFrame.setPageLsn(ctx.interiorFrame.getPageLsn() + 1);
                     } else {
@@ -366,7 +439,9 @@
                         rightFrame.setPage(rightNode);
                         rightFrame.initBuffer((byte) 0);
                         rightFrame.setPageTupleFieldCount(leafCmp.getFieldCount());
-                        ret = ctx.leafFrame.split(rightFrame, tuple, leafCmp, ctx.splitKey);
+                        ret = ctx.leafFrame.split(rightFrame, tuple, leafCmp, ctx.splitKey, ctx.tupleEntries1,
+                                ctx.tupleEntries2, ctx.rec);
+                        ctx.leafFrame.setRightPage(rightPageId);
                         rightFrame.setPageLsn(rightFrame.getPageLsn() + 1);
                         ctx.leafFrame.setPageLsn(ctx.leafFrame.getPageLsn() + 1);
                     }
@@ -376,7 +451,7 @@
                     } else {
                         ctx.splitKey.setPages(pageId, rightPageId);
                     }
-
+                    // }
                 } finally {
                     rightNode.releaseWriteLatch();
                     writeLatchesReleased++;
@@ -388,6 +463,58 @@
         }
     }
 
+    public void search(Stack<Integer> s, ITupleReference tuple, RTreeOpContext ctx, ArrayList<Rectangle> results) throws Exception {
+        ctx.reset();
+        ctx.setTuple(tuple);
+        ctx.interiorFrame.setPageTupleFieldCount(interiorCmp.getFieldCount());
+        ctx.leafFrame.setPageTupleFieldCount(leafCmp.getFieldCount());
+        s.push(rootPage);
+        while (!s.isEmpty()) {
+            int pageId = s.pop();
+            ICachedPage node = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, pageId), false);
+            pins++;
+            node.acquireReadLatch();
+            readLatchesAcquired++;
+
+            try {
+
+                ctx.interiorFrame.setPage(node);
+                boolean isLeaf = ctx.interiorFrame.isLeaf();
+                int tupleCount = ctx.interiorFrame.getTupleCount();
+
+                if (!isLeaf) {
+                    for (int i = 0; i < tupleCount; i++) {
+                        // check intersection, if intersect, call search
+                        int childPageId = ctx.interiorFrame.getChildPageIdIfIntersect(ctx.tuple, i, interiorCmp);
+                        if (childPageId != -1) {
+                            s.push(childPageId);
+                        }
+                    }
+
+                } else {
+                    for (int i = 0; i < tupleCount; i++) {
+                        ctx.leafFrame.setPage(node);
+
+                        // check intersection, if intersect, add the tuple to the
+                        // result set
+                        Rectangle rec = ctx.leafFrame.intersect(ctx.tuple, i, leafCmp);
+                        if (rec != null) {
+                            // add the tuple to the result set
+                            results.add(rec);
+                        }
+                    }
+
+                }
+
+            } finally {
+                node.releaseReadLatch();
+                readLatchesReleased++;
+                bufferCache.unpin(node);
+                unpins++;
+            }
+        }
+    }
+
     public IRTreeFrameFactory getInteriorFrameFactory() {
         return interiorFrameFactory;
     }
diff --git a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTreeOpContext.java b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTreeOpContext.java
index f23eb48..d72dfda 100644
--- a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTreeOpContext.java
+++ b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTreeOpContext.java
@@ -2,6 +2,8 @@
 
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrame;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleReference;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.TreeIndexOp;
 import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeFrame;
 
@@ -12,18 +14,37 @@
     public final ITreeIndexMetaDataFrame metaFrame;
     public final ByteArrayList overflowArray;
     public final RTreeSplitKey splitKey;
-    public int insertLevel;
+    public final SpatialUtils spatialUtils;
     public ITupleReference tuple;
+    public TupleEntryArrayList tupleEntries1;
+    public TupleEntryArrayList tupleEntries2;
+    public ITreeIndexTupleReference[] nodesMBRs;
+    public IntArrayList path; // used like a stack
+    public Rectangle[] rec;
 
     public RTreeOpContext(TreeIndexOp op, IRTreeFrame interiorFrame, IRTreeFrame leafFrame,
-            ITreeIndexMetaDataFrame metaFrame, int treeHeightHint) {
+            ITreeIndexMetaDataFrame metaFrame, int treeHeightHint, int dim) {
         this.op = op;
         this.interiorFrame = interiorFrame;
         this.leafFrame = leafFrame;
         this.metaFrame = metaFrame;
-        splitKey = new RTreeSplitKey(leafFrame.getTupleWriter().createTupleReference(), leafFrame.getTupleWriter()
-                .createTupleReference());
+        splitKey = new RTreeSplitKey(interiorFrame.getTupleWriter().createTupleReference(), interiorFrame
+                .getTupleWriter().createTupleReference());
         overflowArray = new ByteArrayList(treeHeightHint, treeHeightHint);
+        spatialUtils = new SpatialUtils();
+        // TODO: find a better way to know number of entries per node
+        tupleEntries1 = new TupleEntryArrayList(100, 100, spatialUtils);
+        tupleEntries2 = new TupleEntryArrayList(100, 100, spatialUtils);
+        nodesMBRs = new ITreeIndexTupleReference[treeHeightHint];
+        path = new IntArrayList(treeHeightHint, treeHeightHint);
+        for (int i = 0; i < treeHeightHint; i++) {
+            nodesMBRs[i] = interiorFrame.getTupleWriter().createTupleReference();
+            nodesMBRs[i].setFieldCount(nodesMBRs[i].getFieldCount());
+        }
+        rec = new Rectangle[4];
+        for (int i = 0; i < 4; i++) {
+            rec[i] = new Rectangle(dim);
+        }
     }
 
     public ITupleReference getTuple() {
@@ -35,7 +56,14 @@
     }
 
     public void reset() {
-        if (overflowArray != null)
+        if (overflowArray != null) {
             overflowArray.clear();
+        }
+        if (tupleEntries1 != null) {
+            tupleEntries1.clear();
+        }
+        if (tupleEntries2 != null) {
+            tupleEntries2.clear();
+        }
     }
 }
diff --git a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/Rectangle.java b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/Rectangle.java
new file mode 100644
index 0000000..4fa9179
--- /dev/null
+++ b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/Rectangle.java
@@ -0,0 +1,126 @@
+package edu.uci.ics.hyracks.storage.am.rtree.impls;
+
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.DoubleSerializerDeserializer;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+
+public class Rectangle {
+    private int dim;
+    private double[] low;
+    private double[] high;
+
+    public Rectangle(int dim) {
+        this.dim = dim;
+        low = new double[this.dim];
+        high = new double[this.dim];
+    }
+
+    public int getDim() {
+        return dim;
+    }
+
+    public double getLow(int i) {
+        return low[i];
+    }
+
+    public double getHigh(int i) {
+        return high[i];
+    }
+
+    public void setLow(int i, double value) {
+        low[i] = value;
+    }
+
+    public void setHigh(int i, double value) {
+        high[i] = value;
+    }
+
+    public void set(ITupleReference tuple) {
+        for (int i = 0; i < getDim(); i++) {
+            int j = i + getDim();
+            setLow(i, DoubleSerializerDeserializer.getDouble(tuple.getFieldData(i), tuple.getFieldStart(i)));
+            setHigh(i, DoubleSerializerDeserializer.getDouble(tuple.getFieldData(j), tuple.getFieldStart(j)));
+        }
+    }
+
+    public void enlarge(ITupleReference tupleToBeInserted) {
+        for (int i = 0; i < getDim(); i++) {
+            int j = getDim() + i;
+            double low = DoubleSerializerDeserializer.getDouble(tupleToBeInserted.getFieldData(i),
+                    tupleToBeInserted.getFieldStart(i));
+            if (getLow(i) > low) {
+                setLow(i, low);
+            }
+            double high = DoubleSerializerDeserializer.getDouble(tupleToBeInserted.getFieldData(j),
+                    tupleToBeInserted.getFieldStart(j));
+            if (getHigh(i) < high) {
+                setHigh(i, high);
+            }
+        }
+    }
+
+    public double margin() {
+        double margin = 0.0;
+        double mul = Math.pow(2, (double) getDim() - 1.0);
+        for (int i = 0; i < getDim(); i++) {
+            margin += (getHigh(i) - getLow(i)) * mul;
+        }
+        return margin;
+    }
+    
+    public double overlappedArea(ITupleReference tuple) {
+        double area = 1.0;
+        double f1, f2;
+        
+        for (int i = 0; i < getDim(); i++)
+        {
+            int j = getDim() + i;
+            double low = DoubleSerializerDeserializer.getDouble(tuple.getFieldData(i),
+                    tuple.getFieldStart(i));
+            double high = DoubleSerializerDeserializer.getDouble(tuple.getFieldData(j),
+                    tuple.getFieldStart(j));
+            if (getLow(i) > high || getHigh(i) < low) {
+                return 0.0;
+            }
+            f1 = Math.max(getLow(i), low);
+            f2 = Math.min(getHigh(i), high);
+            area *= f2 - f1;
+        }
+        return area;
+    }
+    
+    public double overlappedArea(Rectangle rec) {
+        double area = 1.0;
+        double f1, f2;
+        
+        for (int i = 0; i < getDim(); i++)
+        {
+            if (getLow(i) > rec.getHigh(i) || getHigh(i) < rec.getLow(i)) {
+                return 0.0;
+            }
+
+            f1 = Math.max(getLow(i), rec.getLow(i));
+            f2 = Math.min(getHigh(i), rec.getHigh(i));
+            area *= f2 - f1;
+        }
+        return area;
+    }
+    
+    public double area(ITupleReference tuple) {
+        double area = 1.0;
+        for (int i = 0; i < getDim(); i++) {
+            int j = getDim() + i;
+            area *= DoubleSerializerDeserializer.getDouble(tuple.getFieldData(j), tuple.getFieldStart(j))
+                    - DoubleSerializerDeserializer.getDouble(tuple.getFieldData(i), tuple.getFieldStart(i));
+        }
+        return area;
+    }
+    
+    public double area() {
+        double area = 1.0;
+        for (int i = 0; i < getDim(); i++) {
+            area *= getHigh(i) - getLow(i);
+        }
+        return area;
+    }
+}
diff --git a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/SpatialUtils.java b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/SpatialUtils.java
new file mode 100644
index 0000000..791999f
--- /dev/null
+++ b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/SpatialUtils.java
@@ -0,0 +1,18 @@
+package edu.uci.ics.hyracks.storage.am.rtree.impls;
+
+
+public class SpatialUtils {
+    private double doubleEpsilon;
+
+    public SpatialUtils() {
+        double temp = 0.5;
+        while (1 + temp > 1) {
+            temp /= 2;
+        }
+        this.doubleEpsilon = temp;
+    }
+    
+    public double getDoubleEpsilon() {
+        return doubleEpsilon;
+    }
+}
diff --git a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/TupleEntry.java b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/TupleEntry.java
new file mode 100644
index 0000000..326fc4c
--- /dev/null
+++ b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/TupleEntry.java
@@ -0,0 +1,42 @@
+package edu.uci.ics.hyracks.storage.am.rtree.impls;
+
+
+public class TupleEntry implements Comparable <TupleEntry> {
+    private int tupleIndex;
+    private double value;
+    private final double doubleEpsilon;
+    
+    public TupleEntry(double doubleEpsilon) {
+        this.doubleEpsilon = doubleEpsilon;
+    }
+    
+    public int getTupleIndex() {
+        return tupleIndex;
+    }
+    
+    public void setTupleIndex(int tupleIndex) {
+        this.tupleIndex = tupleIndex;
+    }
+    
+    public double getValue() {
+        return value;
+    }
+    
+    public void setValue(double value) {
+        this.value = value;
+    }
+    
+    public double getDoubleEpsilon() {
+        return doubleEpsilon;
+    }
+
+    public int compareTo(TupleEntry tupleEntry) {
+        double cmp = this.getValue() - tupleEntry.getValue();
+        if (cmp > getDoubleEpsilon())
+            return 1;
+        cmp = tupleEntry.getValue() - this.getValue();
+        if (cmp > getDoubleEpsilon())
+            return -1;
+        return 0;
+    }
+}
diff --git a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/TupleEntryArrayList.java b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/TupleEntryArrayList.java
new file mode 100644
index 0000000..2eca75b
--- /dev/null
+++ b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/TupleEntryArrayList.java
@@ -0,0 +1,70 @@
+package edu.uci.ics.hyracks.storage.am.rtree.impls;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+public class TupleEntryArrayList {
+    private TupleEntry[] data;
+    private int size;
+    private final int growth;
+    private final double doubleEpsilon;
+
+    public TupleEntryArrayList(int initialCapacity, int growth, SpatialUtils spatialUtils) {
+        doubleEpsilon = spatialUtils.getDoubleEpsilon();
+        data = new TupleEntry[initialCapacity];
+        size = 0;
+        this.growth = growth;
+    }
+
+    public double getDoubleEpsilon() {
+        return doubleEpsilon;
+    }
+
+    public int size() {
+        return size;
+    }
+
+    public void add(int tupleIndex, double value) {
+        if (size == data.length) {
+            TupleEntry[] newData = new TupleEntry[data.length + growth];
+            System.arraycopy(data, 0, newData, 0, data.length);
+            data = newData;
+        }
+        if (data[size] == null) {
+            data[size] = new TupleEntry(doubleEpsilon);
+        }
+        data[size].setTupleIndex(tupleIndex);
+        data[size].setValue(value);
+        size++;
+    }
+
+    public void removeLast() {
+        if (size > 0)
+            size--;
+    }
+
+    // WARNING: caller is responsible for checking size > 0
+    public TupleEntry getLast() {
+        return data[size - 1];
+    }
+
+    public TupleEntry get(int i) {
+        return data[i];
+    }
+
+    public void clear() {
+        size = 0;
+    }
+
+    public boolean isEmpty() {
+        return size == 0;
+    }
+
+    public void sort(EntriesOrder order, int tupleCount) {
+        if (order == EntriesOrder.ASCENDING) {
+            Arrays.sort(data, 0, tupleCount);
+        } else {
+            Arrays.sort(data, 0, tupleCount, Collections.reverseOrder());
+        }
+    }
+}
diff --git a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/UnorderedSlotManager.java b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/UnorderedSlotManager.java
index 95a89bc..428ecd6 100644
--- a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/UnorderedSlotManager.java
+++ b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/UnorderedSlotManager.java
@@ -6,17 +6,22 @@
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.FindTupleMode;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.FindTupleNoExactMatchPolicy;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.rtree.frames.NSMRTreeFrame;
 
 public class UnorderedSlotManager extends AbstractSlotManager {
+    @Override
     public int findTupleIndex(ITupleReference searchKey, ITreeIndexTupleReference frameTuple, MultiComparator multiCmp,
             FindTupleMode mode, FindTupleNoExactMatchPolicy matchPolicy) {
-        if (mode == FindTupleMode.FTM_EXACT) {
-            for (int i = 0; i < frame.getTupleCount(); i++) {
-                frameTuple.resetByTupleIndex(frame, i);
-                int cmp = multiCmp.compare(searchKey, frameTuple);
-                if (cmp == 0) {
-                    return i;
-                }
+        for (int i = 0; i < frame.getTupleCount(); i++) {
+            frameTuple.resetByTupleIndex(frame, i);
+            int cmp = multiCmp.getIntCmp().compare(frameTuple.getFieldData(multiCmp.getKeyFieldCount()),
+                    frameTuple.getFieldStart(multiCmp.getKeyFieldCount()),
+                    frameTuple.getFieldLength(multiCmp.getKeyFieldCount()),
+                    searchKey.getFieldData(multiCmp.getKeyFieldCount()),
+                    searchKey.getFieldStart(multiCmp.getKeyFieldCount()),
+                    searchKey.getFieldLength(multiCmp.getKeyFieldCount()));
+            if (cmp == 0) {
+                return i;
             }
         }
         return -1;
@@ -29,4 +34,23 @@
         return slotOff;
     }
 
+    public void modifySlot(int slotOff, int tupleOff) {
+        setSlot(slotOff, tupleOff);
+    }
+
+    public void deleteEmptySlots() {
+        int slotOff = getSlotStartOff();
+        int numOfSlots = ((getSlotStartOff() - getSlotEndOff()) / slotSize) + 1;
+        for (int i = 0; i < numOfSlots; i++) {
+            if (frame.getBuffer().getInt(slotOff) == -1) {
+                int slotStartOff = getSlotEndOff();
+                int length = slotOff - slotStartOff;
+                System.arraycopy(frame.getBuffer().array(), slotStartOff, frame.getBuffer().array(),
+                        slotStartOff + slotSize, length);
+                ((NSMRTreeFrame)frame).setTupleCount(frame.getTupleCount() - 1);
+            } else {
+                slotOff -= slotSize;
+            }
+        }
+    }
 }
diff --git a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/tuples/RTreeTypeAwareTupleWriter.java b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/tuples/RTreeTypeAwareTupleWriter.java
index e932624..b2cf5e9 100644
--- a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/tuples/RTreeTypeAwareTupleWriter.java
+++ b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/tuples/RTreeTypeAwareTupleWriter.java
@@ -3,7 +3,6 @@
 import java.nio.ByteBuffer;
 
 import edu.uci.ics.hyracks.api.dataflow.value.ITypeTrait;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.DoubleSerializerDeserializer;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleReference;
 import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriter;
 
@@ -33,8 +32,6 @@
 
         // write data
         for (int i = 0; i < refs.length; i++) {
-            double d3 = DoubleSerializerDeserializer.getDouble(refs[i].getFieldData(i), refs[i].getFieldStart(i));
-
             System.arraycopy(refs[i].getFieldData(i), refs[i].getFieldStart(i), targetBuf.array(), runner,
                     refs[i].getFieldLength(i));
             runner += refs[i].getFieldLength(i);
diff --git a/hyracks-tests/hyracks-storage-am-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/rtree/RTreeTest.java b/hyracks-tests/hyracks-storage-am-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/rtree/RTreeTest.java
index 0e561fa..91c786b 100644
--- a/hyracks-tests/hyracks-storage-am-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/rtree/RTreeTest.java
+++ b/hyracks-tests/hyracks-storage-am-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/rtree/RTreeTest.java
@@ -1,9 +1,13 @@
 package edu.uci.ics.hyracks.storage.am.rtree;
 
+import java.io.BufferedReader;
 import java.io.DataOutput;
 import java.io.File;
+import java.io.FileReader;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Random;
+import java.util.Stack;
 
 import org.junit.Test;
 
@@ -35,6 +39,7 @@
 import edu.uci.ics.hyracks.storage.am.rtree.frames.NSMRTreeFrameFactory;
 import edu.uci.ics.hyracks.storage.am.rtree.impls.RTree;
 import edu.uci.ics.hyracks.storage.am.rtree.impls.RTreeOpContext;
+import edu.uci.ics.hyracks.storage.am.rtree.impls.Rectangle;
 import edu.uci.ics.hyracks.storage.am.rtree.tuples.RTreeTypeAwareTupleWriterFactory;
 import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
 import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
@@ -43,12 +48,12 @@
 
 public class RTreeTest extends AbstractRTreeTest {
 
-    private static final int PAGE_SIZE = 256;
-    private static final int NUM_PAGES = 10;
+    private static final int PAGE_SIZE = 1024;
+    private static final int NUM_PAGES = 1000;
     private static final int HYRACKS_FRAME_SIZE = 128;
     private IHyracksStageletContext ctx = TestUtils.create(HYRACKS_FRAME_SIZE);
 
-    @Test
+    // @Test
     public void test01() throws Exception {
 
         TestStorageManagerComponentHolder.init(PAGE_SIZE, NUM_PAGES);
@@ -60,8 +65,8 @@
         bufferCache.openFile(fileId);
 
         // declare interior-frame-tuple fields
-        int interiorTieldCount = 5;
-        ITypeTrait[] interiorTypeTraits = new ITypeTrait[interiorTieldCount];
+        int interiorFieldCount = 5;
+        ITypeTrait[] interiorTypeTraits = new ITypeTrait[interiorFieldCount];
         interiorTypeTraits[0] = new TypeTrait(8);
         interiorTypeTraits[1] = new TypeTrait(8);
         interiorTypeTraits[2] = new TypeTrait(8);
@@ -101,8 +106,9 @@
         IRTreeFrame leafFrame = leafFrameFactory.getFrame();
         IFreePageManager freePageManager = new LinkedListFreePageManager(bufferCache, fileId, 0);
 
+        int dim = 2;
         RTree rtree = new RTree(bufferCache, freePageManager, interiorFrameFactory, leafFrameFactory, interiorCmp,
-                leafCmp);
+                leafCmp, dim);
         rtree.create(fileId, leafFrame, metaFrame);
         rtree.open(fileId);
 
@@ -124,7 +130,7 @@
 
         Random rnd = new Random();
         rnd.setSeed(50);
-
+        Stack<Integer> s = new Stack<Integer>();
         for (int i = 0; i < 10000; i++) {
 
             double p1x = rnd.nextDouble();
@@ -151,11 +157,9 @@
 
             tuple.reset(accessor, 0);
 
-            // if (i % 1000 == 0) {
             long end = System.currentTimeMillis();
             print("INSERTING " + i + " " + Math.min(p1x, p2x) + " " + Math.min(p1y, p2y) + " " + Math.max(p1x, p2x)
                     + " " + Math.max(p1y, p2y) + "\n");
-            // }
 
             try {
                 rtree.insert(tuple, insertOpCtx);
@@ -165,9 +169,225 @@
             }
         }
 
+        // rtree.printTree(leafFrame, interiorFrame, recDescSers);
+        // System.out.println();
+
+        RTreeOpContext searchOpCtx = rtree.createOpContext(TreeIndexOp.TI_SEARCH, interiorFrame, leafFrame, metaFrame);
+        ArrayList<Rectangle> results = new ArrayList<Rectangle>();
+        rtree.search(s, tuple, searchOpCtx, results);
+
+        // for (int i = 0; i < results.size(); i++) {
+        // for (int j = 0; j < dim; j++) {
+        // System.out.print(results.get(i).getLow(j) + " " +
+        // results.get(i).getHigh(j));
+        // }
+        // System.out.println();
+        // }
+        System.out.println("Number of Results: " + results.size());
+
+        String stats = rtree.printStats();
+        print(stats);
+
+        rtree.close();
+        bufferCache.closeFile(fileId);
+        bufferCache.close();
+
+    }
+
+    @Test
+    public void test02() throws Exception {
+
+        TestStorageManagerComponentHolder.init(PAGE_SIZE, NUM_PAGES);
+        IBufferCache bufferCache = TestStorageManagerComponentHolder.getBufferCache(ctx);
+        IFileMapProvider fmp = TestStorageManagerComponentHolder.getFileMapProvider(ctx);
+        FileReference file = new FileReference(new File(fileName));
+        bufferCache.createFile(file);
+        int fileId = fmp.lookupFileId(file);
+        bufferCache.openFile(fileId);
+
+        // declare interior-frame-tuple fields
+        int interiorFieldCount = 5;
+        ITypeTrait[] interiorTypeTraits = new ITypeTrait[interiorFieldCount];
+        interiorTypeTraits[0] = new TypeTrait(8);
+        interiorTypeTraits[1] = new TypeTrait(8);
+        interiorTypeTraits[2] = new TypeTrait(8);
+        interiorTypeTraits[3] = new TypeTrait(8);
+        interiorTypeTraits[4] = new TypeTrait(4);
+
+        // declare keys
+        int keyFieldCount = 4;
+        IBinaryComparator[] cmps = new IBinaryComparator[keyFieldCount];
+        cmps[0] = DoubleBinaryComparatorFactory.INSTANCE.createBinaryComparator();
+        cmps[1] = cmps[0];
+        cmps[2] = cmps[0];
+        cmps[3] = cmps[0];
+
+        // declare leaf-frame-tuple fields
+        int leafFieldCount = 5;
+        ITypeTrait[] leafTypeTraits = new ITypeTrait[leafFieldCount];
+        leafTypeTraits[0] = new TypeTrait(8);
+        leafTypeTraits[1] = new TypeTrait(8);
+        leafTypeTraits[2] = new TypeTrait(8);
+        leafTypeTraits[3] = new TypeTrait(8);
+        leafTypeTraits[4] = new TypeTrait(4);
+
+        MultiComparator interiorCmp = new MultiComparator(interiorTypeTraits, cmps);
+        MultiComparator leafCmp = new MultiComparator(leafTypeTraits, cmps);
+
+        RTreeTypeAwareTupleWriterFactory interiorTupleWriterFactory = new RTreeTypeAwareTupleWriterFactory(
+                interiorTypeTraits);
+        RTreeTypeAwareTupleWriterFactory leafTupleWriterFactory = new RTreeTypeAwareTupleWriterFactory(leafTypeTraits);
+
+        IRTreeFrameFactory interiorFrameFactory = new NSMRTreeFrameFactory(interiorTupleWriterFactory);
+        IRTreeFrameFactory leafFrameFactory = new NSMRTreeFrameFactory(leafTupleWriterFactory);
+        ITreeIndexMetaDataFrameFactory metaFrameFactory = new LIFOMetaDataFrameFactory();
+        ITreeIndexMetaDataFrame metaFrame = metaFrameFactory.getFrame();
+
+        IRTreeFrame interiorFrame = interiorFrameFactory.getFrame();
+        IRTreeFrame leafFrame = leafFrameFactory.getFrame();
+        IFreePageManager freePageManager = new LinkedListFreePageManager(bufferCache, fileId, 0);
+
+        int dim = 2;
+        RTree rtree = new RTree(bufferCache, freePageManager, interiorFrameFactory, leafFrameFactory, interiorCmp,
+                leafCmp, dim);
+        rtree.create(fileId, leafFrame, metaFrame);
+        rtree.open(fileId);
+
+        ByteBuffer hyracksFrame = ctx.allocateFrame();
+        FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());
+        ArrayTupleBuilder tb = new ArrayTupleBuilder(leafCmp.getFieldCount());
+        DataOutput dos = tb.getDataOutput();
+
+        @SuppressWarnings("rawtypes")
+        ISerializerDeserializer[] recDescSers = { DoubleSerializerDeserializer.INSTANCE,
+                DoubleSerializerDeserializer.INSTANCE, DoubleSerializerDeserializer.INSTANCE,
+                DoubleSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE };
+        RecordDescriptor recDesc = new RecordDescriptor(recDescSers);
+        IFrameTupleAccessor accessor = new FrameTupleAccessor(ctx.getFrameSize(), recDesc);
+        accessor.reset(hyracksFrame);
+        FrameTupleReference tuple = new FrameTupleReference();
+
+        RTreeOpContext insertOpCtx = rtree.createOpContext(TreeIndexOp.TI_INSERT, interiorFrame, leafFrame, metaFrame);
+
+        File datasetFile = new File("/home/salsubaiee/dataset.txt");
+        BufferedReader reader = new BufferedReader(new FileReader(datasetFile));
+
+        Random rnd = new Random();
+        rnd.setSeed(50);
+        String inputLine = reader.readLine();
+        int index = 0;
+
+        while (inputLine != null) {
+
+            String[] splittedLine1 = inputLine.split(",");
+            String[] splittedLine2 = splittedLine1[0].split("\\s");
+
+            double p1x = Double.valueOf(splittedLine2[1].trim()).doubleValue();
+            double p1y = Double.valueOf(splittedLine2[2].trim()).doubleValue();
+            double p2x = p1x;
+            double p2y = p1y;
+
+            int pk = rnd.nextInt();
+
+            tb.reset();
+            DoubleSerializerDeserializer.INSTANCE.serialize(p1x, dos);
+            tb.addFieldEndOffset();
+            DoubleSerializerDeserializer.INSTANCE.serialize(p1y, dos);
+            tb.addFieldEndOffset();
+            DoubleSerializerDeserializer.INSTANCE.serialize(p2x, dos);
+            tb.addFieldEndOffset();
+            DoubleSerializerDeserializer.INSTANCE.serialize(p2y, dos);
+            tb.addFieldEndOffset();
+            IntegerSerializerDeserializer.INSTANCE.serialize(pk, dos);
+            tb.addFieldEndOffset();
+
+            appender.reset(hyracksFrame, true);
+            appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
+
+            tuple.reset(accessor, 0);
+
+            long end = System.currentTimeMillis();
+            print("INSERTING " + index + " " + Math.min(p1x, p2x) + " " + Math.min(p1y, p2y) + " " + Math.max(p1x, p2x)
+                    + " " + Math.max(p1y, p2y) + "\n");
+
+            try {
+                rtree.insert(tuple, insertOpCtx);
+            } catch (TreeIndexException e) {
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+            
+            if (index == 10000) {
+                break;
+            }
+            inputLine = reader.readLine();
+            index++;
+
+//            rtree.printTree(leafFrame, interiorFrame, recDescSers);
+//            System.out.println();
+        }
+
         rtree.printTree(leafFrame, interiorFrame, recDescSers);
         System.out.println();
 
+        RTreeOpContext searchOpCtx = rtree.createOpContext(TreeIndexOp.TI_SEARCH, interiorFrame, leafFrame, metaFrame);
+
+        File querysetFile = new File("/home/salsubaiee/queryset.txt");
+        BufferedReader reader2 = new BufferedReader(new FileReader(querysetFile));
+
+        inputLine = reader2.readLine();
+        int totalResults = 0;
+        index = 0;
+        Stack<Integer> s = new Stack<Integer>();
+        while (inputLine != null) {
+
+            String[] splittedLine1 = inputLine.split(",");
+            String[] splittedLine2 = splittedLine1[0].split("\\s");
+
+            double p1x = Double.valueOf(splittedLine2[1].trim()).doubleValue();
+            double p1y = Double.valueOf(splittedLine2[2].trim()).doubleValue();
+            double p2x = Double.valueOf(splittedLine2[3].trim()).doubleValue();
+            double p2y = Double.valueOf(splittedLine2[4].trim()).doubleValue();
+
+            int pk = rnd.nextInt();
+
+            tb.reset();
+            DoubleSerializerDeserializer.INSTANCE.serialize(p1x, dos);
+            tb.addFieldEndOffset();
+            DoubleSerializerDeserializer.INSTANCE.serialize(p1y, dos);
+            tb.addFieldEndOffset();
+            DoubleSerializerDeserializer.INSTANCE.serialize(p2x, dos);
+            tb.addFieldEndOffset();
+            DoubleSerializerDeserializer.INSTANCE.serialize(p2y, dos);
+            tb.addFieldEndOffset();
+            IntegerSerializerDeserializer.INSTANCE.serialize(pk, dos);
+            tb.addFieldEndOffset();
+
+            appender.reset(hyracksFrame, true);
+            appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
+
+            tuple.reset(accessor, 0);
+
+            long end = System.currentTimeMillis();
+            print("SEARCHING " + index + " " + Math.min(p1x, p2x) + " " + Math.min(p1y, p2y) + " " + Math.max(p1x, p2x)
+                    + " " + Math.max(p1y, p2y) + "\n");
+
+            try {
+                ArrayList<Rectangle> results = new ArrayList<Rectangle>();
+                rtree.search(s, tuple, searchOpCtx, results);
+                totalResults += results.size();
+            } catch (TreeIndexException e) {
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+
+            inputLine = reader2.readLine();
+            index++;
+
+        }
+
+        System.out.println("Number of Results: " + totalResults);
+
         String stats = rtree.printStats();
         print(stats);