- Started to add support for concurrency control
- Fixed a bug in the split method

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_indexes@391 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-rtree/.classpath b/hyracks-storage-am-rtree/.classpath
new file mode 100644
index 0000000..1f3c1ff
--- /dev/null
+++ b/hyracks-storage-am-rtree/.classpath
@@ -0,0 +1,7 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<classpath>
+	<classpathentry kind="src" output="target/classes" path="src/main/java"/>
+	<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6"/>
+	<classpathentry kind="con" path="org.maven.ide.eclipse.MAVEN2_CLASSPATH_CONTAINER"/>
+	<classpathentry kind="output" path="target/classes"/>
+</classpath>
diff --git a/hyracks-storage-am-rtree/pom.xml b/hyracks-storage-am-rtree/pom.xml
index d0c48c1..ff158ce 100644
--- a/hyracks-storage-am-rtree/pom.xml
+++ b/hyracks-storage-am-rtree/pom.xml
@@ -51,14 +51,7 @@
   		<version>0.1.4</version>
   		<type>jar</type>
   		<scope>compile</scope>
-  	</dependency>  	
-  	<dependency>
-  		<groupId>edu.uci.ics.hyracks</groupId>
-  		<artifactId>hyracks-storage-am-btree</artifactId>
-  		<version>0.1.4</version>
-  		<type>jar</type>
-  		<scope>compile</scope>
-  	</dependency>  	
+  	</dependency>  		
   	<dependency>
   		<groupId>junit</groupId>
   		<artifactId>junit</artifactId>
diff --git a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/api/IRTreeFrame.java b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/api/IRTreeFrame.java
index f95eb74..7be43ff 100644
--- a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/api/IRTreeFrame.java
+++ b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/api/IRTreeFrame.java
@@ -9,12 +9,20 @@
 import edu.uci.ics.hyracks.storage.am.rtree.impls.TupleEntryArrayList;
 
 public interface IRTreeFrame extends ITreeIndexFrame {
-
+    
+    public int findTuple(ITupleReference tuple, MultiComparator cmp);
+    
+    public int getPageNsn();
+    
+    public void setPageNsn(int pageNsn);
+    
     public int getRightPage();
 
     public void setRightPage(int rightPage);
 
-    public int getChildPageId(ITupleReference tuple, TupleEntryArrayList entries, ITreeIndexTupleReference[] nodesMBRs,
+    public int getBestChildPageId(MultiComparator cmp);
+    
+    public boolean checkEnlargement(ITupleReference tuple, TupleEntryArrayList entries, ITreeIndexTupleReference[] nodesMBRs,
             MultiComparator cmp);
 
     public void adjustNodeMBR(ITreeIndexTupleReference[] tuples, MultiComparator cmp);
@@ -30,4 +38,6 @@
     public Rectangle intersect(ITupleReference tuple, int tupleIndex, MultiComparator cmp);
 
     public int getChildPageIdIfIntersect(ITupleReference tuple, int tupleIndex, MultiComparator cmp);
+    
+    public void enlarge(ITupleReference tuple, MultiComparator cmp);
 }
diff --git a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/NSMRTreeFrame.java b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/NSMRTreeFrame.java
index 3735dd8..ed43240 100644
--- a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/NSMRTreeFrame.java
+++ b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/NSMRTreeFrame.java
@@ -1,5 +1,6 @@
 package edu.uci.ics.hyracks.storage.am.rtree.frames;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -21,9 +22,8 @@
 import edu.uci.ics.hyracks.storage.am.rtree.tuples.RTreeTypeAwareTupleWriter;
 
 public class NSMRTreeFrame extends TreeIndexNSMFrame implements IRTreeFrame {
-
     protected static final int pageNsnOff = smFlagOff + 1;
-    private static final int rightPageOff = pageNsnOff + 4;
+    protected static final int rightPageOff = pageNsnOff + 4;
 
     private ITreeIndexTupleReference[] tuples;
 
@@ -46,13 +46,23 @@
     @Override
     public void initBuffer(byte level) {
         super.initBuffer(level);
-        buf.putInt(pageNsnOff, -1);
+        buf.putInt(pageNsnOff, 0);
         buf.putInt(rightPageOff, -1);
     }
-
+    
     public void setTupleCount(int tupleCount) {
         buf.putInt(tupleCountOff, tupleCount);
     }
+    
+    @Override
+    public void setPageNsn(int pageNsn) {
+        buf.putInt(pageNsnOff, pageNsn);
+    }
+    
+    @Override
+    public int getPageNsn() {
+        return buf.getInt(pageNsnOff);
+    }
 
     @Override
     protected void resetSpaceParams() {
@@ -175,7 +185,8 @@
     @Override
     public int split(ITreeIndexFrame rightFrame, ITupleReference tuple, MultiComparator cmp, ISplitKey splitKey,
             TupleEntryArrayList entries1, TupleEntryArrayList entries2, Rectangle[] rec) throws Exception {
-
+        
+        
         // RTreeSplitKey rTreeSplitKey = ((RTreeSplitKey) splitKey);
         // RTreeTypeAwareTupleWriter rTreeTupleWriterLeftFrame =
         // ((RTreeTypeAwareTupleWriter) tupleWriter);
@@ -208,8 +219,7 @@
         // int src = rightFrame.getSlotManager().getSlotEndOff();
         // int dest = rightFrame.getSlotManager().getSlotEndOff() + tuplesToLeft
         // * rightFrame.getSlotManager().getSlotSize();
-        // int length = rightFrame.getSlotManager().getSlotSize() *
-        // tuplesToRight;
+        // int length = rightFrame.getSlotManager().getSlotSize() * tuplesToRight;
         // System.arraycopy(right.array(), src, right.array(), dest, length);
         // right.putInt(tupleCountOff, tuplesToRight);
         //
@@ -246,7 +256,8 @@
         // 0);
         //
         // return 0;
-
+        
+        
         RTreeSplitKey rTreeSplitKey = ((RTreeSplitKey) splitKey);
         RTreeTypeAwareTupleWriter rTreeTupleWriterLeftFrame = ((RTreeTypeAwareTupleWriter) tupleWriter);
         RTreeTypeAwareTupleWriter rTreeTupleWriterRightFrame = ((RTreeTypeAwareTupleWriter) rightFrame.getTupleWriter());
@@ -378,13 +389,13 @@
         buf.putInt(totalFreeSpaceOff, buf.getInt(totalFreeSpaceOff) + totalBytes
                 + (slotManager.getSlotSize() * numOfDeletedTuples));
 
-        if (!tupleInserted) {
-            insert(tuple, cmp);
-        }
-
         // compact both pages
         rightFrame.compact(cmp);
         compact(cmp);
+        
+        if (!tupleInserted) {
+            insert(tuple, cmp);
+        }
 
         int tupleOff = slotManager.getTupleOff(slotManager.getSlotEndOff());
         frameTuple.resetByTupleOffset(buf, tupleOff);
@@ -433,7 +444,12 @@
     }
 
     @Override
-    public int getChildPageId(ITupleReference tuple, TupleEntryArrayList entries, ITreeIndexTupleReference[] nodesMBRs,
+    public int getBestChildPageId(MultiComparator cmp) {
+        return buf.getInt(frameTuple.getFieldStart(cmp.getKeyFieldCount()));
+    }
+    
+    @Override
+    public boolean checkEnlargement(ITupleReference tuple, TupleEntryArrayList entries, ITreeIndexTupleReference[] nodesMBRs,
             MultiComparator cmp) {
 
         cmpFrameTuple.setFieldCount(cmp.getFieldCount());
@@ -528,17 +544,128 @@
                 }
             }
         }
+        entries.clear();
+        
         frameTuple.resetByTupleIndex(this, bestChild);
         if (minEnlargedArea > 0.0) {
-            enlarge(frameTuple, tuple, cmp);
+            return true;
+            //enlarge(frameTuple, tuple, cmp);
+        } else {
+            return false;
         }
-        nodesMBRs[(int) getLevel() - 1].resetByTupleIndex(this, bestChild);
+        
+        //nodesMBRs[(int) getLevel() - 1].resetByTupleIndex(this, bestChild);
 
-        entries.clear();
+        
 
         // return the page id of the bestChild tuple
-        return buf.getInt(frameTuple.getFieldStart(cmp.getKeyFieldCount()));
+        //return buf.getInt(frameTuple.getFieldStart(cmp.getKeyFieldCount()));
     }
+    
+//    @Override
+//    public int getChildPageId(ITupleReference tuple, TupleEntryArrayList entries, ITreeIndexTupleReference[] nodesMBRs,
+//            MultiComparator cmp) {
+//
+//        cmpFrameTuple.setFieldCount(cmp.getFieldCount());
+//        frameTuple.setFieldCount(cmp.getFieldCount());
+//
+//        int bestChild = 0;
+//        double minEnlargedArea = Double.MAX_VALUE;
+//
+//        // the children pointers in the node point to leaves
+//        if (getLevel() == 1) {
+//            // find least overlap enlargement, use minimum enlarged area to
+//            // break tie, if tie still exists use minimum area to break it
+//            for (int i = 0; i < getTupleCount(); ++i) {
+//                frameTuple.resetByTupleIndex(this, i);
+//                double enlargedArea = enlargedArea(frameTuple, tuple, cmp);
+//                entries.add(i, enlargedArea);
+//                if (enlargedArea < minEnlargedArea) {
+//                    minEnlargedArea = enlargedArea;
+//                    bestChild = i;
+//                }
+//            }
+//            if (minEnlargedArea < entries.getDoubleEpsilon() || minEnlargedArea > entries.getDoubleEpsilon()) {
+//                minEnlargedArea = Double.MAX_VALUE;
+//                int k;
+//                if (getTupleCount() > nearMinimumOverlapFactor) {
+//                    // sort the entries based on their area enlargement needed
+//                    // to include the object
+//                    entries.sort(EntriesOrder.ASCENDING, getTupleCount());
+//                    k = nearMinimumOverlapFactor;
+//                } else {
+//                    k = getTupleCount();
+//                }
+//
+//                double minOverlap = Double.MAX_VALUE;
+//                int id = 0;
+//                for (int i = 0; i < k; ++i) {
+//                    double difference = 0.0;
+//                    for (int j = 0; j < getTupleCount(); ++j) {
+//                        frameTuple.resetByTupleIndex(this, j);
+//                        cmpFrameTuple.resetByTupleIndex(this, entries.get(i).getTupleIndex());
+//
+//                        int c = cmp.getIntCmp().compare(frameTuple.getFieldData(cmp.getKeyFieldCount()),
+//                                frameTuple.getFieldStart(cmp.getKeyFieldCount()),
+//                                frameTuple.getFieldLength(cmp.getKeyFieldCount()),
+//                                cmpFrameTuple.getFieldData(cmp.getKeyFieldCount()),
+//                                cmpFrameTuple.getFieldStart(cmp.getKeyFieldCount()),
+//                                cmpFrameTuple.getFieldLength(cmp.getKeyFieldCount()));
+//                        if (c != 0) {
+//                            double intersection = overlappedArea(frameTuple, tuple, cmpFrameTuple, cmp);
+//                            if (intersection != 0.0) {
+//                                difference += intersection - overlappedArea(frameTuple, null, cmpFrameTuple, cmp);
+//                            }
+//                        } else {
+//                            id = j;
+//                        }
+//                    }
+//
+//                    double enlargedArea = enlargedArea(cmpFrameTuple, tuple, cmp);
+//                    if (difference < minOverlap) {
+//                        minOverlap = difference;
+//                        minEnlargedArea = enlargedArea;
+//                        bestChild = id;
+//                    } else if (difference == minOverlap) {
+//                        if (enlargedArea < minEnlargedArea) {
+//                            minEnlargedArea = enlargedArea;
+//                            bestChild = id;
+//                        } else if (enlargedArea == minEnlargedArea) {
+//                            double area = area(cmpFrameTuple, cmp);
+//                            frameTuple.resetByTupleIndex(this, bestChild);
+//                            double minArea = area(frameTuple, cmp);
+//                            if (area < minArea) {
+//                                bestChild = id;
+//                            }
+//                        }
+//                    }
+//                }
+//            }
+//        } else { // find minimum enlarged area, use minimum area to break tie
+//            for (int i = 0; i < getTupleCount(); i++) {
+//                frameTuple.resetByTupleIndex(this, i);
+//                double enlargedArea = enlargedArea(frameTuple, tuple, cmp);
+//                if (enlargedArea < minEnlargedArea) {
+//                    minEnlargedArea = enlargedArea;
+//                    bestChild = i;
+//                } else if (enlargedArea == minEnlargedArea) {
+//                    double area = area(frameTuple, cmp);
+//                    frameTuple.resetByTupleIndex(this, bestChild);
+//                    double minArea = area(frameTuple, cmp);
+//                    if (area < minArea) {
+//                        bestChild = i;
+//                    }
+//                }
+//            }
+//        }
+//        frameTuple.resetByTupleIndex(this, bestChild);
+//        nodesMBRs[(int) getLevel() - 1].resetByTupleIndex(this, bestChild);
+//
+//        entries.clear();
+//
+//        // return the page id of the bestChild tuple
+//        return buf.getInt(frameTuple.getFieldStart(cmp.getKeyFieldCount()));
+//    }
 
     @Override
     public void reinsert(ITupleReference tuple, ITreeIndexTupleReference nodeMBR, TupleEntryArrayList entries,
@@ -701,23 +828,24 @@
         return areaAfterEnlarge - areaBeforeEnlarge;
     }
 
-    public void enlarge(ITupleReference tuple, ITupleReference tupleToBeInserted, MultiComparator cmp) {
+    @Override
+    public void enlarge(ITupleReference tuple, MultiComparator cmp) {
         int maxFieldPos = cmp.getKeyFieldCount() / 2;
         for (int i = 0; i < maxFieldPos; i++) {
             int j = maxFieldPos + i;
-            int c = cmp.getComparators()[i].compare(tuple.getFieldData(i), tuple.getFieldStart(i),
-                    tuple.getFieldLength(i), tupleToBeInserted.getFieldData(i), tupleToBeInserted.getFieldStart(i),
-                    tupleToBeInserted.getFieldLength(i));
+            int c = cmp.getComparators()[i].compare(frameTuple.getFieldData(i), frameTuple.getFieldStart(i),
+                    frameTuple.getFieldLength(i), tuple.getFieldData(i), tuple.getFieldStart(i),
+                    tuple.getFieldLength(i));
             if (c > 0) {
-                System.arraycopy(tupleToBeInserted.getFieldData(i), tupleToBeInserted.getFieldStart(i),
-                        tuple.getFieldData(i), tuple.getFieldStart(i), tupleToBeInserted.getFieldLength(i));
+                System.arraycopy(tuple.getFieldData(i), tuple.getFieldStart(i),
+                        frameTuple.getFieldData(i), frameTuple.getFieldStart(i), tuple.getFieldLength(i));
             }
-            c = cmp.getComparators()[j].compare(tuple.getFieldData(j), tuple.getFieldStart(j), tuple.getFieldLength(j),
-                    tupleToBeInserted.getFieldData(j), tupleToBeInserted.getFieldStart(j),
-                    tupleToBeInserted.getFieldLength(j));
+            c = cmp.getComparators()[j].compare(frameTuple.getFieldData(j), frameTuple.getFieldStart(j), frameTuple.getFieldLength(j),
+                    tuple.getFieldData(j), tuple.getFieldStart(j),
+                    tuple.getFieldLength(j));
             if (c < 0) {
-                System.arraycopy(tupleToBeInserted.getFieldData(j), tupleToBeInserted.getFieldStart(j),
-                        tuple.getFieldData(j), tuple.getFieldStart(j), tupleToBeInserted.getFieldLength(j));
+                System.arraycopy(tuple.getFieldData(j), tuple.getFieldStart(j),
+                        frameTuple.getFieldData(j), frameTuple.getFieldStart(j), tuple.getFieldLength(j));
             }
         }
     }
@@ -725,11 +853,17 @@
     @Override
     public void adjustKey(ITupleReference tuple, MultiComparator cmp) {
         frameTuple.setFieldCount(cmp.getFieldCount());
-        int tupleIndex = slotManager.findTupleIndex(tuple, frameTuple, cmp, null, null);
+        int tupleIndex = findTuple(tuple, cmp);
         if (tupleIndex != -1) {
             tupleWriter.writeTuple(tuple, buf, getTupleOffset(tupleIndex));
         }
     }
+    
+    @Override
+    public int findTuple(ITupleReference tuple, MultiComparator cmp) {
+        frameTuple.setFieldCount(cmp.getFieldCount());
+        return slotManager.findTupleIndex(tuple, frameTuple, cmp, null, null);
+    }
 
     @Override
     public void adjustNodeMBR(ITreeIndexTupleReference[] tuples, MultiComparator cmp) {
@@ -782,6 +916,27 @@
         return buf.getInt(frameTuple.getFieldStart(cmp.getKeyFieldCount()));
     }
 
+    public boolean contains(ITupleReference tuple, MultiComparator cmp) {
+        frameTuple.setFieldCount(cmp.getFieldCount());
+        int maxFieldPos = cmp.getKeyFieldCount() / 2;
+        for (int i = 0; i < maxFieldPos; i++) {
+            int j = maxFieldPos + i;
+            int c = cmp.getComparators()[i].compare(tuple.getFieldData(i), tuple.getFieldStart(i),
+                    tuple.getFieldLength(i), frameTuple.getFieldData(j), frameTuple.getFieldStart(j),
+                    frameTuple.getFieldLength(j));
+            if (c > 0) {
+                return false;
+            }
+            c = cmp.getComparators()[i].compare(tuple.getFieldData(j), tuple.getFieldStart(j), tuple.getFieldLength(j),
+                    frameTuple.getFieldData(i), frameTuple.getFieldStart(i), frameTuple.getFieldLength(i));
+
+            if (c < 0) {
+                return false;
+            }
+        }
+        return true;
+    }
+    
     @Override
     public Rectangle intersect(ITupleReference tuple, int tupleIndex, MultiComparator cmp) {
         frameTuple.setFieldCount(cmp.getFieldCount());
diff --git a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTree.java b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTree.java
index c0d1501..6554bb4 100644
--- a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTree.java
+++ b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTree.java
@@ -2,6 +2,7 @@
 
 import java.util.ArrayList;
 import java.util.Stack;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -22,7 +23,7 @@
     private boolean created = false;
     private final int rootPage = 1; // the root page never changes
 
-    private int nsn = 0; // Global node sequence number
+    private final AtomicInteger globalNsn; // Global node sequence number
     private int numOfPages = 1;
 
     private final IFreePageManager freePageManager;
@@ -33,7 +34,7 @@
     private final IRTreeFrameFactory leafFrameFactory;
     private final MultiComparator interiorCmp;
     private final MultiComparator leafCmp;
-    public int dim;
+    public final int dim;
 
     public int rootSplits = 0;
     public int[] splitsByLevel = new int[500];
@@ -55,6 +56,15 @@
         this.interiorCmp = interiorCmp;
         this.leafCmp = leafCmp;
         this.dim = dim;
+        globalNsn = new AtomicInteger();
+    }
+
+    public synchronized void incrementGlobalNsn() {
+        globalNsn.incrementAndGet();
+    }
+
+    public synchronized int getGlobalNsn() {
+        return globalNsn.get();
     }
 
     public String printStats() {
@@ -249,24 +259,6 @@
         }
     }
 
-    public void insert(ITupleReference tuple, RTreeOpContext ctx) throws Exception {
-        ctx.reset();
-        ctx.setTuple(tuple);
-        ctx.splitKey.reset();
-        ctx.splitKey.getLeftTuple().setFieldCount(interiorCmp.getFieldCount());
-        ctx.splitKey.getRightTuple().setFieldCount(interiorCmp.getFieldCount());
-        ctx.interiorFrame.setPageTupleFieldCount(interiorCmp.getFieldCount());
-        for (int i = 0; i < currentLevel; i++) {
-            ctx.overflowArray.add((byte) 0);
-        }
-        insertImpl(rootPage, null, (byte) 0, ctx);
-
-        // we split the root, here is the key for a new root
-        if (ctx.splitKey.getLeftPageBuffer() != null) {
-            createNewRoot(ctx);
-        }
-    }
-
     // public void insert(ITupleReference tuple, RTreeOpContext ctx) throws
     // Exception {
     // ctx.reset();
@@ -278,10 +270,6 @@
     // for (int i = 0; i < currentLevel; i++) {
     // ctx.overflowArray.add((byte) 0);
     // }
-    //
-    // findLeaf(ctx);
-    //
-    //
     // insertImpl(rootPage, null, (byte) 0, ctx);
     //
     // // we split the root, here is the key for a new root
@@ -289,82 +277,311 @@
     // createNewRoot(ctx);
     // }
     // }
-    //
-    // public void findLeaf(RTreeOpContext ctx) throws Exception {
+
+    // public void insertImpl(int pageId, ICachedPage parent, byte desiredLevel,
+    // RTreeOpContext ctx) throws Exception {
     // ICachedPage node =
-    // bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, rootPage),
-    // false);
+    // bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, pageId), false);
     // pins++;
-    // while (true) {
+    // ctx.interiorFrame.setPage(node);
     // boolean isLeaf = ctx.interiorFrame.isLeaf();
     // acquireLatch(node, ctx.op, isLeaf);
+    //
+    // // latch coupling TODO: check the correctness of this
+    // if (parent != null) {
+    // parent.releaseReadLatch();
+    // readLatchesReleased++;
+    // bufferCache.unpin(parent);
+    // unpins++;
     // }
     //
+    // if (ctx.interiorFrame.getLevel() > desiredLevel) {
+    // int childPageId = ctx.interiorFrame.checkEnlargement(ctx.getTuple(),
+    // ctx.tupleEntries1, ctx.nodesMBRs,
+    // interiorCmp);
     //
-    //
+    // insertImpl(childPageId, node, desiredLevel, ctx);
+    // if (ctx.splitKey.getLeftPageBuffer() != null) {
+    // node = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, pageId),
+    // false);
+    // pins++;
+    // node.acquireWriteLatch();
+    // writeLatchesAcquired++;
+    // try {
+    // ctx.interiorFrame.setPage(node);
+    // ctx.interiorFrame.adjustKey(ctx.splitKey.getLeftTuple(), interiorCmp);
+    // insertTuple(pageId, ctx.splitKey.getRightTuple(), ctx, isLeaf);
+    // } finally {
+    // node.releaseWriteLatch();
+    // writeLatchesReleased++;
+    // bufferCache.unpin(node);
+    // unpins++;
+    // }
+    // }
+    // } else {
+    // try {
+    // if (isLeaf) {
+    // ctx.leafFrame.setPage(node);
+    // ctx.leafFrame.setPageTupleFieldCount(leafCmp.getFieldCount());
+    // }
+    // insertTuple(pageId, ctx.getTuple(), ctx, isLeaf);
+    // } finally {
+    // node.releaseWriteLatch();
+    // writeLatchesReleased++;
+    // bufferCache.unpin(node);
+    // unpins++;
+    // }
+    // }
     // }
 
-    public void insertImpl(int pageId, ICachedPage parent, byte desiredLevel, RTreeOpContext ctx) throws Exception {
-        ICachedPage node = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, pageId), false);
+    public void insert(ITupleReference tuple, RTreeOpContext ctx) throws Exception {
+        ctx.reset();
+        ctx.setTuple(tuple);
+        ctx.splitKey.reset();
+        ctx.splitKey.getLeftTuple().setFieldCount(interiorCmp.getFieldCount());
+        ctx.splitKey.getRightTuple().setFieldCount(interiorCmp.getFieldCount());
+        ctx.interiorFrame.setPageTupleFieldCount(interiorCmp.getFieldCount());
+        for (int i = 0; i < currentLevel; i++) {
+            ctx.overflowArray.add((byte) 0);
+        }
+
+        ICachedPage leafNode = findLeaf(ctx);
+
+        int pageId = ctx.path.getLast();
+        ctx.path.removeLast();
+        ctx.pageLsns.removeLast();
+        insertTuple(leafNode, pageId, ctx.getTuple(), ctx, ctx.leafFrame.isLeaf());
+
+        while (true) {
+            if (ctx.splitKey.getLeftPageBuffer() != null) {
+                updateParent(ctx);
+            } else {
+                break;
+            }
+        }
+
+        leafNode.releaseWriteLatch();
+        writeLatchesReleased++;
+        bufferCache.unpin(leafNode);
+        unpins++;
+    }
+
+    public void updateParent(RTreeOpContext ctx) throws Exception {
+        int parentId = ctx.path.getLast();
+        ICachedPage parentNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, parentId), false);
         pins++;
-        ctx.interiorFrame.setPage(node);
-        boolean isLeaf = ctx.interiorFrame.isLeaf();
-        acquireLatch(node, ctx.op, isLeaf);
+        parentNode.acquireWriteLatch();
+        writeLatchesAcquired++;
+        ctx.interiorFrame.setPage(parentNode);
+        boolean foundParent = true;
 
-        // latch coupling TODO: check the correctness of this
-        if (parent != null) {
-            parent.releaseReadLatch();
-            readLatchesReleased++;
-            bufferCache.unpin(parent);
+        if (ctx.interiorFrame.getPageLsn() != ctx.pageLsns.getLast()) {
+            foundParent = false;
+            while (true) {
+                if (ctx.interiorFrame.findTuple(ctx.splitKey.getLeftTuple(), interiorCmp) != -1) {
+                    // found the parent
+                    foundParent = true;
+                    break;
+                }
+                int rightPage = ctx.interiorFrame.getRightPage();
+                parentNode.releaseWriteLatch();
+                writeLatchesReleased++;
+                bufferCache.unpin(parentNode);
+                unpins++;
+
+                if (rightPage == -1) {
+                    break;
+                }
+
+                parentId = rightPage;
+                parentNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, parentId), false);
+                pins++;
+                parentNode.acquireWriteLatch();
+                writeLatchesAcquired++;
+                ctx.interiorFrame.setPage(parentNode);
+            }
+        }
+        if (foundParent) {
+            ctx.interiorFrame.adjustKey(ctx.splitKey.getLeftTuple(), interiorCmp);
+            insertTuple(parentNode, parentId, ctx.splitKey.getRightTuple(), ctx, ctx.interiorFrame.isLeaf());
+            ctx.path.removeLast();
+            ctx.pageLsns.removeLast();
+
+            parentNode.releaseWriteLatch();
+            writeLatchesReleased++;
+            bufferCache.unpin(parentNode);
             unpins++;
         }
 
-        // the children pointers in the node point to leaves
-        if (ctx.interiorFrame.getLevel() > desiredLevel) {
-            int childPageId = ctx.interiorFrame.getChildPageId(ctx.getTuple(), ctx.tupleEntries1, ctx.nodesMBRs,
-                    interiorCmp);
-            if (childPageId < 0) {
-                System.out.println();
-            }
+        // // very rare situation when the there is a root split, search for the
+        // parent tuple
+        // ctx.path.clear();
+        // ctx.pageLsns.clear();
+        // findPath(ctx, isLeaf);
 
-            insertImpl(childPageId, node, desiredLevel, ctx);
-            if (ctx.splitKey.getLeftPageBuffer() != null) {
+        // newstack = findPath( item->parent )
+        // replace part of stack to new one
+        // latch( parent->page, X-mode )
+        // findParent( item );
+
+    }
+
+    // public void findPath(RTreeOpContext ctx, boolean isLeaf) throws Exception
+    // {
+    // int pageId = rootPage;
+    // int parentLsn = 0;
+    // ctx.path.add(pageId);
+    // ctx.pageLsns.add(parentLsn);
+    //
+    // while (!ctx.path.isEmpty()) {
+    // pageId = ctx.path.getLast();
+    // parentLsn = ctx.pageLsns.getLast();
+    // ICachedPage node =
+    // bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, pageId), false);
+    // pins++;
+    // node.acquireReadLatch();
+    // readLatchesAcquired++;
+    // ctx.interiorFrame.setPage(node);
+    //
+    // if (pageId != rootPage && parentLsn < ctx.interiorFrame.getPageNsn()) {
+    // ctx.path.add(ctx.interiorFrame.getRightPage());
+    // ctx.pageLsns.add(0);
+    // }
+    //
+    //
+    // }
+    //
+    // push stack, [root, 0, 0] // page, LSN, parent
+    // while( stack )
+    // ptr = top of stack
+    // latch( ptr->page, S-mode )
+    // if ( ptr->parent->page->lsn < ptr->page->nsn )
+    // push stack, [ ptr->page->rightlink, 0, ptr->parent ]
+    // end
+    // for( each tuple on page )
+    // if ( tuple->pagepointer == item->page )
+    // return stack
+    // else
+    // add to stack at the end [tuple->pagepointer,0, ptr]
+    // end
+    // end
+    // unlatch( ptr->page )
+    // pop stack
+    // end
+    //
+    //
+    //
+    //
+    //
+    //
+    // }
+
+    public ICachedPage findLeaf(RTreeOpContext ctx) throws Exception {
+        int pageId = rootPage;
+        boolean writeLatched = false;
+        ICachedPage node = null;
+        boolean isLeaf = false;
+        int pageLsn = 0, parentLsn = 0;
+
+        while (true) {
+            if (!writeLatched) {
                 node = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, pageId), false);
                 pins++;
-                node.acquireWriteLatch();
-                writeLatchesAcquired++;
-                try {
-                    ctx.interiorFrame.setPage(node);
-                    ctx.interiorFrame.adjustKey(ctx.splitKey.getLeftTuple(), interiorCmp);
-                    insertTuple(pageId, ctx.splitKey.getRightTuple(), ctx, isLeaf);
-                } finally {
+                ctx.interiorFrame.setPage(node);
+                isLeaf = ctx.interiorFrame.isLeaf();
+                if (isLeaf) {
+                    node.acquireWriteLatch();
+                    writeLatchesAcquired++;
+                    writeLatched = true;
+                } else {
+                    // Be optimistic and grab read latch first. We will swap it
+                    // to write latch if we need to enlarge the best child
+                    // tuple.
+                    node.acquireReadLatch();
+                    readLatchesAcquired++;
+                }
+            }
+            ctx.path.add(pageId);
+            pageLsn = ctx.interiorFrame.getPageLsn();
+            ctx.pageLsns.add(pageLsn);
+
+            if (pageId != rootPage && parentLsn < ctx.interiorFrame.getPageNsn()) {
+                // Concurrent split detected, go back to parent and re-choose
+                // the best child
+                if (isLeaf) {
                     node.releaseWriteLatch();
                     writeLatchesReleased++;
                     bufferCache.unpin(node);
                     unpins++;
+                } else {
+                    node.releaseReadLatch();
+                    readLatchesReleased++;
+                    bufferCache.unpin(node);
+                    unpins++;
                 }
+
+                ctx.path.removeLast();
+                ctx.pageLsns.removeLast();
+                writeLatched = false;
+                continue;
             }
-        } else {
-            try {
-                if (isLeaf) {
-                    ctx.leafFrame.setPage(node);
-                    ctx.leafFrame.setPageTupleFieldCount(leafCmp.getFieldCount());
+            parentLsn = pageLsn;
+
+            if (!isLeaf) {
+                // checkEnlargement must be called *before* getBestChildPageId
+                boolean needsEnlargement = ctx.interiorFrame.checkEnlargement(ctx.getTuple(), ctx.tupleEntries1,
+                        ctx.nodesMBRs, interiorCmp);
+                int childPageId = ctx.interiorFrame.getBestChildPageId(interiorCmp);
+
+                if (needsEnlargement) {
+                    if (!writeLatched) {
+                        node.releaseReadLatch();
+                        readLatchesReleased++;
+                        // TODO: do we need to un-pin and pin again?
+                        bufferCache.unpin(node);
+                        unpins++;
+
+                        node = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, pageId), false);
+                        pins++;
+                        node.acquireWriteLatch();
+                        writeLatchesAcquired++;
+                        ctx.interiorFrame.setPage(node);
+                        writeLatched = true;
+
+                        if (ctx.interiorFrame.getPageLsn() != pageLsn) {
+                            // The page was changed while we unlocked it; thus,
+                            // retry
+                            continue;
+                        }
+                    }
+
+                    // We don't need to reset the frameTuple because it is
+                    // already pointing to the best child
+                    ctx.interiorFrame.enlarge(ctx.getTuple(), interiorCmp);
+
+                    node.releaseWriteLatch();
+                    writeLatchesReleased++;
+                    bufferCache.unpin(node);
+                    unpins++;
+                    writeLatched = false;
+                } else {
+                    node.releaseReadLatch();
+                    readLatchesReleased++;
+                    bufferCache.unpin(node);
+                    unpins++;
                 }
-                insertTuple(pageId, ctx.getTuple(), ctx, isLeaf);
-            } finally {
-                node.releaseWriteLatch();
-                writeLatchesReleased++;
-                bufferCache.unpin(node);
-                unpins++;
+                pageId = childPageId;
+            } else {
+                ctx.leafFrame.setPage(node);
+                ctx.leafFrame.setPageTupleFieldCount(leafCmp.getFieldCount());
+                return node;
             }
         }
-    }
-    
-    private void findLeaf() {
-        
+
     }
 
-    private void insertTuple(int pageId, ITupleReference tuple, RTreeOpContext ctx, boolean isLeaf) throws Exception {
+    private void insertTuple(ICachedPage leafNode, int pageId, ITupleReference tuple, RTreeOpContext ctx, boolean isLeaf)
+            throws Exception {
         FrameOpSpaceStatus spaceStatus;
         if (!isLeaf) {
             spaceStatus = ctx.interiorFrame.hasSpaceInsert(ctx.getTuple(), interiorCmp);
@@ -431,8 +648,9 @@
                         ret = ctx.interiorFrame.split(rightFrame, tuple, interiorCmp, ctx.splitKey, ctx.tupleEntries1,
                                 ctx.tupleEntries2, ctx.rec);
                         ctx.interiorFrame.setRightPage(rightPageId);
-                        rightFrame.setPageLsn(rightFrame.getPageLsn() + 1);
-                        ctx.interiorFrame.setPageLsn(ctx.interiorFrame.getPageLsn() + 1);
+                        rightFrame.setPageLsn(ctx.interiorFrame.getPageNsn());
+                        incrementGlobalNsn();
+                        ctx.interiorFrame.setPageLsn(getGlobalNsn());
                     } else {
                         splitsByLevel[0]++; // debug
                         rightFrame = leafFrameFactory.getFrame();
@@ -441,9 +659,9 @@
                         rightFrame.setPageTupleFieldCount(leafCmp.getFieldCount());
                         ret = ctx.leafFrame.split(rightFrame, tuple, leafCmp, ctx.splitKey, ctx.tupleEntries1,
                                 ctx.tupleEntries2, ctx.rec);
-                        ctx.leafFrame.setRightPage(rightPageId);
-                        rightFrame.setPageLsn(rightFrame.getPageLsn() + 1);
-                        ctx.leafFrame.setPageLsn(ctx.leafFrame.getPageLsn() + 1);
+                        rightFrame.setPageLsn(ctx.leafFrame.getPageNsn());
+                        incrementGlobalNsn();
+                        ctx.leafFrame.setPageLsn(getGlobalNsn());
                     }
 
                     if (ret != 0) {
@@ -453,6 +671,42 @@
                     }
                     // }
                 } finally {
+                    if (pageId == rootPage) {
+                        rootSplits++; // debug
+                        splitsByLevel[currentLevel]++;
+                        currentLevel++;
+
+                        int newLeftId = freePageManager.getFreePage(ctx.metaFrame);
+                        ICachedPage newLeftNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, newLeftId),
+                                true);
+                        pins++;
+                        newLeftNode.acquireWriteLatch();
+                        writeLatchesAcquired++;
+                        try {
+                            // copy left child to new left child
+                            System.arraycopy(leafNode.getBuffer().array(), 0, newLeftNode.getBuffer().array(), 0,
+                                    newLeftNode.getBuffer().capacity());
+
+                            // initialize new root (leftNode becomes new root)
+                            ctx.interiorFrame.setPage(leafNode);
+                            ctx.interiorFrame.initBuffer((byte) (ctx.interiorFrame.getLevel() + 1));
+
+                            ctx.splitKey.setLeftPage(newLeftId);
+
+                            ctx.interiorFrame.insert(ctx.splitKey.getLeftTuple(), interiorCmp);
+                            ctx.interiorFrame.insert(ctx.splitKey.getRightTuple(), interiorCmp);
+
+                            incrementGlobalNsn();
+                            ctx.interiorFrame.setPageLsn(getGlobalNsn());
+                        } finally {
+                            newLeftNode.releaseWriteLatch();
+                            writeLatchesReleased++;
+                            bufferCache.unpin(newLeftNode);
+                            unpins++;
+                        }
+
+                        ctx.splitKey.reset();
+                    }
                     rightNode.releaseWriteLatch();
                     writeLatchesReleased++;
                     bufferCache.unpin(rightNode);
@@ -463,7 +717,8 @@
         }
     }
 
-    public void search(Stack<Integer> s, ITupleReference tuple, RTreeOpContext ctx, ArrayList<Rectangle> results) throws Exception {
+    public void search(Stack<Integer> s, ITupleReference tuple, RTreeOpContext ctx, ArrayList<Rectangle> results)
+            throws Exception {
         ctx.reset();
         ctx.setTuple(tuple);
         ctx.interiorFrame.setPageTupleFieldCount(interiorCmp.getFieldCount());
@@ -495,7 +750,8 @@
                     for (int i = 0; i < tupleCount; i++) {
                         ctx.leafFrame.setPage(node);
 
-                        // check intersection, if intersect, add the tuple to the
+                        // check intersection, if intersect, add the tuple to
+                        // the
                         // result set
                         Rectangle rec = ctx.leafFrame.intersect(ctx.tuple, i, leafCmp);
                         if (rec != null) {
diff --git a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTreeOpContext.java b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTreeOpContext.java
index d72dfda..f321efc 100644
--- a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTreeOpContext.java
+++ b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTreeOpContext.java
@@ -3,7 +3,6 @@
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrame;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleReference;
-import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.TreeIndexOp;
 import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeFrame;
 
@@ -19,7 +18,8 @@
     public TupleEntryArrayList tupleEntries1;
     public TupleEntryArrayList tupleEntries2;
     public ITreeIndexTupleReference[] nodesMBRs;
-    public IntArrayList path; // used like a stack
+    public final IntArrayList path;
+    public final IntArrayList pageLsns;
     public Rectangle[] rec;
 
     public RTreeOpContext(TreeIndexOp op, IRTreeFrame interiorFrame, IRTreeFrame leafFrame,
@@ -37,6 +37,7 @@
         tupleEntries2 = new TupleEntryArrayList(100, 100, spatialUtils);
         nodesMBRs = new ITreeIndexTupleReference[treeHeightHint];
         path = new IntArrayList(treeHeightHint, treeHeightHint);
+        pageLsns = new IntArrayList(treeHeightHint, treeHeightHint);
         for (int i = 0; i < treeHeightHint; i++) {
             nodesMBRs[i] = interiorFrame.getTupleWriter().createTupleReference();
             nodesMBRs[i].setFieldCount(nodesMBRs[i].getFieldCount());
@@ -65,5 +66,11 @@
         if (tupleEntries2 != null) {
             tupleEntries2.clear();
         }
+        if (path != null) {
+            path.clear();
+        }
+        if (pageLsns != null) {
+            pageLsns.clear();
+        }
     }
 }
diff --git a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/UnorderedSlotManager.java b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/UnorderedSlotManager.java
index 428ecd6..d65aff1 100644
--- a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/UnorderedSlotManager.java
+++ b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/UnorderedSlotManager.java
@@ -45,9 +45,9 @@
             if (frame.getBuffer().getInt(slotOff) == -1) {
                 int slotStartOff = getSlotEndOff();
                 int length = slotOff - slotStartOff;
-                System.arraycopy(frame.getBuffer().array(), slotStartOff, frame.getBuffer().array(),
-                        slotStartOff + slotSize, length);
-                ((NSMRTreeFrame)frame).setTupleCount(frame.getTupleCount() - 1);
+                System.arraycopy(frame.getBuffer().array(), slotStartOff, frame.getBuffer().array(), slotStartOff
+                        + slotSize, length);
+                ((NSMRTreeFrame) frame).setTupleCount(frame.getTupleCount() - 1);
             } else {
                 slotOff -= slotSize;
             }