- Added support for concurrent inserts

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_indexes@392 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/api/IRTreeFrame.java b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/api/IRTreeFrame.java
index 7be43ff..9d60464 100644
--- a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/api/IRTreeFrame.java
+++ b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/api/IRTreeFrame.java
@@ -5,25 +5,28 @@
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleReference;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+import edu.uci.ics.hyracks.storage.am.rtree.impls.FindPathList;
 import edu.uci.ics.hyracks.storage.am.rtree.impls.Rectangle;
 import edu.uci.ics.hyracks.storage.am.rtree.impls.TupleEntryArrayList;
 
 public interface IRTreeFrame extends ITreeIndexFrame {
-    
+
+    public int findTuple(ITupleReference tuple, FindPathList findPathList, int parentId, MultiComparator cmp);
+
     public int findTuple(ITupleReference tuple, MultiComparator cmp);
-    
+
     public int getPageNsn();
-    
+
     public void setPageNsn(int pageNsn);
-    
+
     public int getRightPage();
 
     public void setRightPage(int rightPage);
 
     public int getBestChildPageId(MultiComparator cmp);
-    
-    public boolean checkEnlargement(ITupleReference tuple, TupleEntryArrayList entries, ITreeIndexTupleReference[] nodesMBRs,
-            MultiComparator cmp);
+
+    public boolean checkEnlargement(ITupleReference tuple, TupleEntryArrayList entries,
+            ITreeIndexTupleReference[] nodesMBRs, MultiComparator cmp);
 
     public void adjustNodeMBR(ITreeIndexTupleReference[] tuples, MultiComparator cmp);
 
@@ -38,6 +41,6 @@
     public Rectangle intersect(ITupleReference tuple, int tupleIndex, MultiComparator cmp);
 
     public int getChildPageIdIfIntersect(ITupleReference tuple, int tupleIndex, MultiComparator cmp);
-    
+
     public void enlarge(ITupleReference tuple, MultiComparator cmp);
 }
diff --git a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/NSMRTreeFrame.java b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/NSMRTreeFrame.java
index ed43240..7d0e886 100644
--- a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/NSMRTreeFrame.java
+++ b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/NSMRTreeFrame.java
@@ -1,6 +1,5 @@
 package edu.uci.ics.hyracks.storage.am.rtree.frames;
 
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -15,6 +14,7 @@
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
 import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeFrame;
 import edu.uci.ics.hyracks.storage.am.rtree.impls.EntriesOrder;
+import edu.uci.ics.hyracks.storage.am.rtree.impls.FindPathList;
 import edu.uci.ics.hyracks.storage.am.rtree.impls.RTreeSplitKey;
 import edu.uci.ics.hyracks.storage.am.rtree.impls.Rectangle;
 import edu.uci.ics.hyracks.storage.am.rtree.impls.TupleEntryArrayList;
@@ -42,23 +42,23 @@
         }
         cmpFrameTuple = tupleWriter.createTupleReference();
     }
-    
+
     @Override
     public void initBuffer(byte level) {
         super.initBuffer(level);
         buf.putInt(pageNsnOff, 0);
         buf.putInt(rightPageOff, -1);
     }
-    
+
     public void setTupleCount(int tupleCount) {
         buf.putInt(tupleCountOff, tupleCount);
     }
-    
+
     @Override
     public void setPageNsn(int pageNsn) {
         buf.putInt(pageNsnOff, pageNsn);
     }
-    
+
     @Override
     public int getPageNsn() {
         return buf.getInt(pageNsnOff);
@@ -80,14 +80,10 @@
         buf.putInt(rightPageOff, rightPage);
     }
 
-    public ITreeIndexTupleReference[] getTuples() {
+    private ITreeIndexTupleReference[] getTuples() {
         return tuples;
     }
 
-    public void setTuples(ITreeIndexTupleReference[] tuples) {
-        this.tuples = tuples;
-    }
-
     // for debugging
     public ArrayList<Integer> getChildren(MultiComparator cmp) {
         ArrayList<Integer> ret = new ArrayList<Integer>();
@@ -185,8 +181,7 @@
     @Override
     public int split(ITreeIndexFrame rightFrame, ITupleReference tuple, MultiComparator cmp, ISplitKey splitKey,
             TupleEntryArrayList entries1, TupleEntryArrayList entries2, Rectangle[] rec) throws Exception {
-        
-        
+
         // RTreeSplitKey rTreeSplitKey = ((RTreeSplitKey) splitKey);
         // RTreeTypeAwareTupleWriter rTreeTupleWriterLeftFrame =
         // ((RTreeTypeAwareTupleWriter) tupleWriter);
@@ -219,7 +214,8 @@
         // int src = rightFrame.getSlotManager().getSlotEndOff();
         // int dest = rightFrame.getSlotManager().getSlotEndOff() + tuplesToLeft
         // * rightFrame.getSlotManager().getSlotSize();
-        // int length = rightFrame.getSlotManager().getSlotSize() * tuplesToRight;
+        // int length = rightFrame.getSlotManager().getSlotSize() *
+        // tuplesToRight;
         // System.arraycopy(right.array(), src, right.array(), dest, length);
         // right.putInt(tupleCountOff, tuplesToRight);
         //
@@ -256,8 +252,7 @@
         // 0);
         //
         // return 0;
-        
-        
+
         RTreeSplitKey rTreeSplitKey = ((RTreeSplitKey) splitKey);
         RTreeTypeAwareTupleWriter rTreeTupleWriterLeftFrame = ((RTreeTypeAwareTupleWriter) tupleWriter);
         RTreeTypeAwareTupleWriter rTreeTupleWriterRightFrame = ((RTreeTypeAwareTupleWriter) rightFrame.getTupleWriter());
@@ -392,7 +387,7 @@
         // compact both pages
         rightFrame.compact(cmp);
         compact(cmp);
-        
+
         if (!tupleInserted) {
             insert(tuple, cmp);
         }
@@ -415,7 +410,7 @@
         return 0;
     }
 
-    public void generateDist(ITupleReference tuple, TupleEntryArrayList entries, Rectangle rec, int start, int end) {
+    private void generateDist(ITupleReference tuple, TupleEntryArrayList entries, Rectangle rec, int start, int end) {
         int j = 0;
         while (entries.get(j).getTupleIndex() == -1) {
             j++;
@@ -447,10 +442,10 @@
     public int getBestChildPageId(MultiComparator cmp) {
         return buf.getInt(frameTuple.getFieldStart(cmp.getKeyFieldCount()));
     }
-    
+
     @Override
-    public boolean checkEnlargement(ITupleReference tuple, TupleEntryArrayList entries, ITreeIndexTupleReference[] nodesMBRs,
-            MultiComparator cmp) {
+    public boolean checkEnlargement(ITupleReference tuple, TupleEntryArrayList entries,
+            ITreeIndexTupleReference[] nodesMBRs, MultiComparator cmp) {
 
         cmpFrameTuple.setFieldCount(cmp.getFieldCount());
         frameTuple.setFieldCount(cmp.getFieldCount());
@@ -545,127 +540,130 @@
             }
         }
         entries.clear();
-        
+
         frameTuple.resetByTupleIndex(this, bestChild);
         if (minEnlargedArea > 0.0) {
             return true;
-            //enlarge(frameTuple, tuple, cmp);
+            // enlarge(frameTuple, tuple, cmp);
         } else {
             return false;
         }
-        
-        //nodesMBRs[(int) getLevel() - 1].resetByTupleIndex(this, bestChild);
 
-        
+        // nodesMBRs[(int) getLevel() - 1].resetByTupleIndex(this, bestChild);
 
         // return the page id of the bestChild tuple
-        //return buf.getInt(frameTuple.getFieldStart(cmp.getKeyFieldCount()));
+        // return buf.getInt(frameTuple.getFieldStart(cmp.getKeyFieldCount()));
     }
-    
-//    @Override
-//    public int getChildPageId(ITupleReference tuple, TupleEntryArrayList entries, ITreeIndexTupleReference[] nodesMBRs,
-//            MultiComparator cmp) {
-//
-//        cmpFrameTuple.setFieldCount(cmp.getFieldCount());
-//        frameTuple.setFieldCount(cmp.getFieldCount());
-//
-//        int bestChild = 0;
-//        double minEnlargedArea = Double.MAX_VALUE;
-//
-//        // the children pointers in the node point to leaves
-//        if (getLevel() == 1) {
-//            // find least overlap enlargement, use minimum enlarged area to
-//            // break tie, if tie still exists use minimum area to break it
-//            for (int i = 0; i < getTupleCount(); ++i) {
-//                frameTuple.resetByTupleIndex(this, i);
-//                double enlargedArea = enlargedArea(frameTuple, tuple, cmp);
-//                entries.add(i, enlargedArea);
-//                if (enlargedArea < minEnlargedArea) {
-//                    minEnlargedArea = enlargedArea;
-//                    bestChild = i;
-//                }
-//            }
-//            if (minEnlargedArea < entries.getDoubleEpsilon() || minEnlargedArea > entries.getDoubleEpsilon()) {
-//                minEnlargedArea = Double.MAX_VALUE;
-//                int k;
-//                if (getTupleCount() > nearMinimumOverlapFactor) {
-//                    // sort the entries based on their area enlargement needed
-//                    // to include the object
-//                    entries.sort(EntriesOrder.ASCENDING, getTupleCount());
-//                    k = nearMinimumOverlapFactor;
-//                } else {
-//                    k = getTupleCount();
-//                }
-//
-//                double minOverlap = Double.MAX_VALUE;
-//                int id = 0;
-//                for (int i = 0; i < k; ++i) {
-//                    double difference = 0.0;
-//                    for (int j = 0; j < getTupleCount(); ++j) {
-//                        frameTuple.resetByTupleIndex(this, j);
-//                        cmpFrameTuple.resetByTupleIndex(this, entries.get(i).getTupleIndex());
-//
-//                        int c = cmp.getIntCmp().compare(frameTuple.getFieldData(cmp.getKeyFieldCount()),
-//                                frameTuple.getFieldStart(cmp.getKeyFieldCount()),
-//                                frameTuple.getFieldLength(cmp.getKeyFieldCount()),
-//                                cmpFrameTuple.getFieldData(cmp.getKeyFieldCount()),
-//                                cmpFrameTuple.getFieldStart(cmp.getKeyFieldCount()),
-//                                cmpFrameTuple.getFieldLength(cmp.getKeyFieldCount()));
-//                        if (c != 0) {
-//                            double intersection = overlappedArea(frameTuple, tuple, cmpFrameTuple, cmp);
-//                            if (intersection != 0.0) {
-//                                difference += intersection - overlappedArea(frameTuple, null, cmpFrameTuple, cmp);
-//                            }
-//                        } else {
-//                            id = j;
-//                        }
-//                    }
-//
-//                    double enlargedArea = enlargedArea(cmpFrameTuple, tuple, cmp);
-//                    if (difference < minOverlap) {
-//                        minOverlap = difference;
-//                        minEnlargedArea = enlargedArea;
-//                        bestChild = id;
-//                    } else if (difference == minOverlap) {
-//                        if (enlargedArea < minEnlargedArea) {
-//                            minEnlargedArea = enlargedArea;
-//                            bestChild = id;
-//                        } else if (enlargedArea == minEnlargedArea) {
-//                            double area = area(cmpFrameTuple, cmp);
-//                            frameTuple.resetByTupleIndex(this, bestChild);
-//                            double minArea = area(frameTuple, cmp);
-//                            if (area < minArea) {
-//                                bestChild = id;
-//                            }
-//                        }
-//                    }
-//                }
-//            }
-//        } else { // find minimum enlarged area, use minimum area to break tie
-//            for (int i = 0; i < getTupleCount(); i++) {
-//                frameTuple.resetByTupleIndex(this, i);
-//                double enlargedArea = enlargedArea(frameTuple, tuple, cmp);
-//                if (enlargedArea < minEnlargedArea) {
-//                    minEnlargedArea = enlargedArea;
-//                    bestChild = i;
-//                } else if (enlargedArea == minEnlargedArea) {
-//                    double area = area(frameTuple, cmp);
-//                    frameTuple.resetByTupleIndex(this, bestChild);
-//                    double minArea = area(frameTuple, cmp);
-//                    if (area < minArea) {
-//                        bestChild = i;
-//                    }
-//                }
-//            }
-//        }
-//        frameTuple.resetByTupleIndex(this, bestChild);
-//        nodesMBRs[(int) getLevel() - 1].resetByTupleIndex(this, bestChild);
-//
-//        entries.clear();
-//
-//        // return the page id of the bestChild tuple
-//        return buf.getInt(frameTuple.getFieldStart(cmp.getKeyFieldCount()));
-//    }
+
+    // @Override
+    // public int getChildPageId(ITupleReference tuple, TupleEntryArrayList
+    // entries, ITreeIndexTupleReference[] nodesMBRs,
+    // MultiComparator cmp) {
+    //
+    // cmpFrameTuple.setFieldCount(cmp.getFieldCount());
+    // frameTuple.setFieldCount(cmp.getFieldCount());
+    //
+    // int bestChild = 0;
+    // double minEnlargedArea = Double.MAX_VALUE;
+    //
+    // // the children pointers in the node point to leaves
+    // if (getLevel() == 1) {
+    // // find least overlap enlargement, use minimum enlarged area to
+    // // break tie, if tie still exists use minimum area to break it
+    // for (int i = 0; i < getTupleCount(); ++i) {
+    // frameTuple.resetByTupleIndex(this, i);
+    // double enlargedArea = enlargedArea(frameTuple, tuple, cmp);
+    // entries.add(i, enlargedArea);
+    // if (enlargedArea < minEnlargedArea) {
+    // minEnlargedArea = enlargedArea;
+    // bestChild = i;
+    // }
+    // }
+    // if (minEnlargedArea < entries.getDoubleEpsilon() || minEnlargedArea >
+    // entries.getDoubleEpsilon()) {
+    // minEnlargedArea = Double.MAX_VALUE;
+    // int k;
+    // if (getTupleCount() > nearMinimumOverlapFactor) {
+    // // sort the entries based on their area enlargement needed
+    // // to include the object
+    // entries.sort(EntriesOrder.ASCENDING, getTupleCount());
+    // k = nearMinimumOverlapFactor;
+    // } else {
+    // k = getTupleCount();
+    // }
+    //
+    // double minOverlap = Double.MAX_VALUE;
+    // int id = 0;
+    // for (int i = 0; i < k; ++i) {
+    // double difference = 0.0;
+    // for (int j = 0; j < getTupleCount(); ++j) {
+    // frameTuple.resetByTupleIndex(this, j);
+    // cmpFrameTuple.resetByTupleIndex(this, entries.get(i).getTupleIndex());
+    //
+    // int c =
+    // cmp.getIntCmp().compare(frameTuple.getFieldData(cmp.getKeyFieldCount()),
+    // frameTuple.getFieldStart(cmp.getKeyFieldCount()),
+    // frameTuple.getFieldLength(cmp.getKeyFieldCount()),
+    // cmpFrameTuple.getFieldData(cmp.getKeyFieldCount()),
+    // cmpFrameTuple.getFieldStart(cmp.getKeyFieldCount()),
+    // cmpFrameTuple.getFieldLength(cmp.getKeyFieldCount()));
+    // if (c != 0) {
+    // double intersection = overlappedArea(frameTuple, tuple, cmpFrameTuple,
+    // cmp);
+    // if (intersection != 0.0) {
+    // difference += intersection - overlappedArea(frameTuple, null,
+    // cmpFrameTuple, cmp);
+    // }
+    // } else {
+    // id = j;
+    // }
+    // }
+    //
+    // double enlargedArea = enlargedArea(cmpFrameTuple, tuple, cmp);
+    // if (difference < minOverlap) {
+    // minOverlap = difference;
+    // minEnlargedArea = enlargedArea;
+    // bestChild = id;
+    // } else if (difference == minOverlap) {
+    // if (enlargedArea < minEnlargedArea) {
+    // minEnlargedArea = enlargedArea;
+    // bestChild = id;
+    // } else if (enlargedArea == minEnlargedArea) {
+    // double area = area(cmpFrameTuple, cmp);
+    // frameTuple.resetByTupleIndex(this, bestChild);
+    // double minArea = area(frameTuple, cmp);
+    // if (area < minArea) {
+    // bestChild = id;
+    // }
+    // }
+    // }
+    // }
+    // }
+    // } else { // find minimum enlarged area, use minimum area to break tie
+    // for (int i = 0; i < getTupleCount(); i++) {
+    // frameTuple.resetByTupleIndex(this, i);
+    // double enlargedArea = enlargedArea(frameTuple, tuple, cmp);
+    // if (enlargedArea < minEnlargedArea) {
+    // minEnlargedArea = enlargedArea;
+    // bestChild = i;
+    // } else if (enlargedArea == minEnlargedArea) {
+    // double area = area(frameTuple, cmp);
+    // frameTuple.resetByTupleIndex(this, bestChild);
+    // double minArea = area(frameTuple, cmp);
+    // if (area < minArea) {
+    // bestChild = i;
+    // }
+    // }
+    // }
+    // }
+    // frameTuple.resetByTupleIndex(this, bestChild);
+    // nodesMBRs[(int) getLevel() - 1].resetByTupleIndex(this, bestChild);
+    //
+    // entries.clear();
+    //
+    // // return the page id of the bestChild tuple
+    // return buf.getInt(frameTuple.getFieldStart(cmp.getKeyFieldCount()));
+    // }
 
     @Override
     public void reinsert(ITupleReference tuple, ITreeIndexTupleReference nodeMBR, TupleEntryArrayList entries,
@@ -737,7 +735,7 @@
         entries.clear();
     }
 
-    public double area(ITupleReference tuple, MultiComparator cmp) {
+    private double area(ITupleReference tuple, MultiComparator cmp) {
         double area = 1.0;
         int maxFieldPos = cmp.getKeyFieldCount() / 2;
         for (int i = 0; i < maxFieldPos; i++) {
@@ -748,7 +746,7 @@
         return area;
     }
 
-    public double overlappedArea(ITupleReference tuple1, ITupleReference tupleToBeInserted, ITupleReference tuple2,
+    private double overlappedArea(ITupleReference tuple1, ITupleReference tupleToBeInserted, ITupleReference tuple2,
             MultiComparator cmp) {
         double area = 1.0;
         double f1, f2;
@@ -796,7 +794,7 @@
         return area;
     }
 
-    public double enlargedArea(ITupleReference tuple, ITupleReference tupleToBeInserted, MultiComparator cmp) {
+    private double enlargedArea(ITupleReference tuple, ITupleReference tupleToBeInserted, MultiComparator cmp) {
         double areaBeforeEnlarge = area(tuple, cmp);
         double areaAfterEnlarge = 1.0;
 
@@ -837,15 +835,15 @@
                     frameTuple.getFieldLength(i), tuple.getFieldData(i), tuple.getFieldStart(i),
                     tuple.getFieldLength(i));
             if (c > 0) {
-                System.arraycopy(tuple.getFieldData(i), tuple.getFieldStart(i),
-                        frameTuple.getFieldData(i), frameTuple.getFieldStart(i), tuple.getFieldLength(i));
+                System.arraycopy(tuple.getFieldData(i), tuple.getFieldStart(i), frameTuple.getFieldData(i),
+                        frameTuple.getFieldStart(i), tuple.getFieldLength(i));
             }
-            c = cmp.getComparators()[j].compare(frameTuple.getFieldData(j), frameTuple.getFieldStart(j), frameTuple.getFieldLength(j),
-                    tuple.getFieldData(j), tuple.getFieldStart(j),
+            c = cmp.getComparators()[j].compare(frameTuple.getFieldData(j), frameTuple.getFieldStart(j),
+                    frameTuple.getFieldLength(j), tuple.getFieldData(j), tuple.getFieldStart(j),
                     tuple.getFieldLength(j));
             if (c < 0) {
-                System.arraycopy(tuple.getFieldData(j), tuple.getFieldStart(j),
-                        frameTuple.getFieldData(j), frameTuple.getFieldStart(j), tuple.getFieldLength(j));
+                System.arraycopy(tuple.getFieldData(j), tuple.getFieldStart(j), frameTuple.getFieldData(j),
+                        frameTuple.getFieldStart(j), tuple.getFieldLength(j));
             }
         }
     }
@@ -858,11 +856,41 @@
             tupleWriter.writeTuple(tuple, buf, getTupleOffset(tupleIndex));
         }
     }
-    
+
     @Override
     public int findTuple(ITupleReference tuple, MultiComparator cmp) {
         frameTuple.setFieldCount(cmp.getFieldCount());
-        return slotManager.findTupleIndex(tuple, frameTuple, cmp, null, null);
+        for (int i = 0; i < getTupleCount(); i++) {
+            frameTuple.resetByTupleIndex(this, i);
+            int c = cmp.getIntCmp().compare(frameTuple.getFieldData(cmp.getKeyFieldCount()),
+                    frameTuple.getFieldStart(cmp.getKeyFieldCount()),
+                    frameTuple.getFieldLength(cmp.getKeyFieldCount()), tuple.getFieldData(cmp.getKeyFieldCount()),
+                    tuple.getFieldStart(cmp.getKeyFieldCount()), tuple.getFieldLength(cmp.getKeyFieldCount()));
+            if (c == 0) {
+                return i;
+            }
+        }
+        return -1;
+    }
+
+    @Override
+    public int findTuple(ITupleReference tuple, FindPathList findPathList, int parentIndex, MultiComparator cmp) {
+        frameTuple.setFieldCount(cmp.getFieldCount());
+        for (int i = 0; i < getTupleCount(); i++) {
+            frameTuple.resetByTupleIndex(this, i);
+            int c = cmp.getIntCmp().compare(frameTuple.getFieldData(cmp.getKeyFieldCount()),
+                    frameTuple.getFieldStart(cmp.getKeyFieldCount()),
+                    frameTuple.getFieldLength(cmp.getKeyFieldCount()), tuple.getFieldData(cmp.getKeyFieldCount()),
+                    tuple.getFieldStart(cmp.getKeyFieldCount()), tuple.getFieldLength(cmp.getKeyFieldCount()));
+            if (c == 0) {
+                return i;
+            } else {
+                int pageId = IntegerSerializerDeserializer.getInt(frameTuple.getFieldData(cmp.getKeyFieldCount()),
+                        frameTuple.getFieldStart(cmp.getKeyFieldCount()));
+                findPathList.add(pageId, parentIndex);
+            }
+        }
+        return -1;
     }
 
     @Override
@@ -916,27 +944,6 @@
         return buf.getInt(frameTuple.getFieldStart(cmp.getKeyFieldCount()));
     }
 
-    public boolean contains(ITupleReference tuple, MultiComparator cmp) {
-        frameTuple.setFieldCount(cmp.getFieldCount());
-        int maxFieldPos = cmp.getKeyFieldCount() / 2;
-        for (int i = 0; i < maxFieldPos; i++) {
-            int j = maxFieldPos + i;
-            int c = cmp.getComparators()[i].compare(tuple.getFieldData(i), tuple.getFieldStart(i),
-                    tuple.getFieldLength(i), frameTuple.getFieldData(j), frameTuple.getFieldStart(j),
-                    frameTuple.getFieldLength(j));
-            if (c > 0) {
-                return false;
-            }
-            c = cmp.getComparators()[i].compare(tuple.getFieldData(j), tuple.getFieldStart(j), tuple.getFieldLength(j),
-                    frameTuple.getFieldData(i), frameTuple.getFieldStart(i), frameTuple.getFieldLength(i));
-
-            if (c < 0) {
-                return false;
-            }
-        }
-        return true;
-    }
-    
     @Override
     public Rectangle intersect(ITupleReference tuple, int tupleIndex, MultiComparator cmp) {
         frameTuple.setFieldCount(cmp.getFieldCount());
diff --git a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/FindPathList.java b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/FindPathList.java
new file mode 100644
index 0000000..2522e17
--- /dev/null
+++ b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/FindPathList.java
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.rtree.impls;
+
+public class FindPathList {
+    private int[] pageIds;
+    private int[] pageLsns;
+    private int[] parentIndexes;
+    private int size;
+    private final int growth;
+    private int first;
+
+    public FindPathList(int initialCapacity, int growth) {
+        pageIds = new int[initialCapacity];
+        pageLsns = new int[initialCapacity];
+        parentIndexes = new int[initialCapacity];
+        size = 0;
+        this.growth = growth;
+        first = 0;
+    }
+
+    public int size() {
+        return size;
+    }
+
+    public void add(int pageId, int parentId) {
+        if (size == pageIds.length) {
+            int[] newPageIds = new int[pageIds.length + growth];
+            System.arraycopy(pageIds, 0, newPageIds, 0, pageIds.length);
+            pageIds = newPageIds;
+
+            int[] newPageLsns = new int[pageLsns.length + growth];
+            System.arraycopy(pageLsns, 0, newPageLsns, 0, pageLsns.length);
+            pageLsns = newPageLsns;
+            
+            int[] newParentIds = new int[parentIndexes.length + growth];
+            System.arraycopy(parentIndexes, 0, newParentIds, 0, parentIndexes.length);
+            parentIndexes = newParentIds;
+        }
+
+        pageIds[size] = pageId;
+        parentIndexes[size] = parentId;
+        size++;
+    }
+
+    public void moveFirst() {
+        first++;
+    }
+
+    public int getFirstPageId() {
+        return pageIds[first];
+    }
+    
+    public int getFirstPageLsn() {
+        return pageLsns[first];
+    }
+
+    public int getFirstParentIndex() {
+        return parentIndexes[first];
+    }
+
+    public int getPageId(int i) {
+        return pageIds[i];
+    }
+    
+    public int getPageLsn(int i) {
+        return pageLsns[i];
+    }
+    
+    public void setPageLsn(int i, int pageLsn) {
+        pageLsns[i] = pageLsn;
+    }
+
+    public int getParentIndex(int i) {
+        return parentIndexes[i];
+    }
+    
+    public boolean isLast() {
+        return size == first;
+    }
+
+    public void clear() {
+        size = 0;
+        first = 0;
+    }
+
+    public boolean isEmpty() {
+        return size == 0;
+    }
+}
diff --git a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTree.java b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTree.java
index 6554bb4..534ca50 100644
--- a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTree.java
+++ b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTree.java
@@ -400,7 +400,7 @@
                 writeLatchesAcquired++;
                 ctx.interiorFrame.setPage(parentNode);
             }
-        }
+        } 
         if (foundParent) {
             ctx.interiorFrame.adjustKey(ctx.splitKey.getLeftTuple(), interiorCmp);
             insertTuple(parentNode, parentId, ctx.splitKey.getRightTuple(), ctx, ctx.interiorFrame.isLeaf());
@@ -411,71 +411,68 @@
             writeLatchesReleased++;
             bufferCache.unpin(parentNode);
             unpins++;
+            return;
         }
 
-        // // very rare situation when the there is a root split, search for the
-        // parent tuple
-        // ctx.path.clear();
-        // ctx.pageLsns.clear();
-        // findPath(ctx, isLeaf);
+        // very rare situation when the there is a root split, do an exhaustive
+        // breadth-first traversal looking for the parent tuple
 
-        // newstack = findPath( item->parent )
-        // replace part of stack to new one
-        // latch( parent->page, X-mode )
-        // findParent( item );
-
+        ctx.path.clear();
+        ctx.pageLsns.clear();
+        ctx.findPathList.clear();
+        findPath(ctx);
+        updateParent(ctx);
     }
 
-    // public void findPath(RTreeOpContext ctx, boolean isLeaf) throws Exception
-    // {
-    // int pageId = rootPage;
-    // int parentLsn = 0;
-    // ctx.path.add(pageId);
-    // ctx.pageLsns.add(parentLsn);
-    //
-    // while (!ctx.path.isEmpty()) {
-    // pageId = ctx.path.getLast();
-    // parentLsn = ctx.pageLsns.getLast();
-    // ICachedPage node =
-    // bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, pageId), false);
-    // pins++;
-    // node.acquireReadLatch();
-    // readLatchesAcquired++;
-    // ctx.interiorFrame.setPage(node);
-    //
-    // if (pageId != rootPage && parentLsn < ctx.interiorFrame.getPageNsn()) {
-    // ctx.path.add(ctx.interiorFrame.getRightPage());
-    // ctx.pageLsns.add(0);
-    // }
-    //
-    //
-    // }
-    //
-    // push stack, [root, 0, 0] // page, LSN, parent
-    // while( stack )
-    // ptr = top of stack
-    // latch( ptr->page, S-mode )
-    // if ( ptr->parent->page->lsn < ptr->page->nsn )
-    // push stack, [ ptr->page->rightlink, 0, ptr->parent ]
-    // end
-    // for( each tuple on page )
-    // if ( tuple->pagepointer == item->page )
-    // return stack
-    // else
-    // add to stack at the end [tuple->pagepointer,0, ptr]
-    // end
-    // end
-    // unlatch( ptr->page )
-    // pop stack
-    // end
-    //
-    //
-    //
-    //
-    //
-    //
-    // }
+    public void findPath(RTreeOpContext ctx) throws Exception {
+        int pageId = rootPage;
+        int parentIndex = -1;
+        int parentLsn = 0;
+        int pageLsn, pageIndex;
+        ctx.findPathList.add(pageId, parentIndex);
 
+        while (!ctx.findPathList.isLast()) {
+            pageId = ctx.findPathList.getFirstPageId();
+            parentIndex = ctx.findPathList.getFirstParentIndex();
+            ctx.findPathList.moveFirst();
+            ICachedPage node = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, pageId), false);
+            pins++;
+            node.acquireReadLatch();
+            readLatchesAcquired++;
+            ctx.interiorFrame.setPage(node);
+            pageLsn = ctx.interiorFrame.getPageLsn();
+            pageIndex = ctx.findPathList.size() - 1;
+            ctx.findPathList.setPageLsn(pageIndex, pageLsn);
+
+            if (pageId != rootPage && parentLsn < ctx.interiorFrame.getPageNsn()) {
+                ctx.findPathList.add(ctx.interiorFrame.getRightPage(), parentIndex);
+            }
+            parentLsn = pageLsn;
+            
+            if (ctx.interiorFrame.findTuple(ctx.splitKey.getLeftTuple(), ctx.findPathList, pageIndex, interiorCmp) != -1) {
+                fillPath(ctx, pageIndex);
+
+                node.releaseReadLatch();
+                readLatchesReleased++;
+                bufferCache.unpin(node);
+                unpins++;
+                return;
+            }
+            node.releaseReadLatch();
+            readLatchesReleased++;
+            bufferCache.unpin(node);
+            unpins++;
+        } 
+    }
+
+    public void fillPath(RTreeOpContext ctx, int pageIndex) throws Exception {
+        if (pageIndex != -1) {
+            fillPath(ctx, ctx.findPathList.getParentIndex(pageIndex));
+            ctx.path.add(ctx.findPathList.getPageId(pageIndex));
+            ctx.pageLsns.add(ctx.findPathList.getPageLsn(pageIndex));
+        }
+    }
+    
     public ICachedPage findLeaf(RTreeOpContext ctx) throws Exception {
         int pageId = rootPage;
         boolean writeLatched = false;
diff --git a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTreeOpContext.java b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTreeOpContext.java
index f321efc..6a07c93 100644
--- a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTreeOpContext.java
+++ b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTreeOpContext.java
@@ -20,6 +20,7 @@
     public ITreeIndexTupleReference[] nodesMBRs;
     public final IntArrayList path;
     public final IntArrayList pageLsns;
+    public final FindPathList findPathList; // works as a queue
     public Rectangle[] rec;
 
     public RTreeOpContext(TreeIndexOp op, IRTreeFrame interiorFrame, IRTreeFrame leafFrame,
@@ -38,6 +39,7 @@
         nodesMBRs = new ITreeIndexTupleReference[treeHeightHint];
         path = new IntArrayList(treeHeightHint, treeHeightHint);
         pageLsns = new IntArrayList(treeHeightHint, treeHeightHint);
+        findPathList = new FindPathList(100, 100);
         for (int i = 0; i < treeHeightHint; i++) {
             nodesMBRs[i] = interiorFrame.getTupleWriter().createTupleReference();
             nodesMBRs[i].setFieldCount(nodesMBRs[i].getFieldCount());
@@ -72,5 +74,8 @@
         if (pageLsns != null) {
             pageLsns.clear();
         }
+        if (findPathList != null) {
+            pageLsns.clear();
+        }
     }
 }
diff --git a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/UnorderedSlotManager.java b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/UnorderedSlotManager.java
index d65aff1..77bddd7 100644
--- a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/UnorderedSlotManager.java
+++ b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/UnorderedSlotManager.java
@@ -12,18 +12,6 @@
     @Override
     public int findTupleIndex(ITupleReference searchKey, ITreeIndexTupleReference frameTuple, MultiComparator multiCmp,
             FindTupleMode mode, FindTupleNoExactMatchPolicy matchPolicy) {
-        for (int i = 0; i < frame.getTupleCount(); i++) {
-            frameTuple.resetByTupleIndex(frame, i);
-            int cmp = multiCmp.getIntCmp().compare(frameTuple.getFieldData(multiCmp.getKeyFieldCount()),
-                    frameTuple.getFieldStart(multiCmp.getKeyFieldCount()),
-                    frameTuple.getFieldLength(multiCmp.getKeyFieldCount()),
-                    searchKey.getFieldData(multiCmp.getKeyFieldCount()),
-                    searchKey.getFieldStart(multiCmp.getKeyFieldCount()),
-                    searchKey.getFieldLength(multiCmp.getKeyFieldCount()));
-            if (cmp == 0) {
-                return i;
-            }
-        }
         return -1;
     }