Fixed bugs in the delete algorithm because we didn't update the page lsns

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_indexes@449 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTree.java b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTree.java
index 2e574ad..9cc6e00 100644
--- a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTree.java
+++ b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTree.java
@@ -3,6 +3,7 @@
 import java.util.ArrayList;
 import java.util.Stack;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -41,14 +42,13 @@
 
     public int rootSplits = 0;
     public int[] splitsByLevel = new int[500];
-    public long readLatchesAcquired = 0;
-    public long readLatchesReleased = 0;
-    public long writeLatchesAcquired = 0;
-    public long writeLatchesReleased = 0;
-    public long pins = 0;
-    public long unpins = 0;
+    public AtomicLong readLatchesAcquired = new AtomicLong();
+    public AtomicLong readLatchesReleased = new AtomicLong();
+    public AtomicLong writeLatchesAcquired = new AtomicLong();
+    public AtomicLong writeLatchesReleased = new AtomicLong();
+    public AtomicLong pins = new AtomicLong();
+    public AtomicLong unpins = new AtomicLong();
     public byte currentLevel = 0;
-    public long totalTuplesInserted = 0;
 
     public RTree(IBufferCache bufferCache, IFreePageManager freePageManager, IRTreeFrameFactory interiorFrameFactory,
             IRTreeFrameFactory leafFrameFactory, MultiComparator interiorCmp, MultiComparator leafCmp, int dim) {
@@ -63,38 +63,39 @@
         this.treeLatch = new ReentrantReadWriteLock(true);
     }
 
-    public synchronized void incrementGlobalNsn() {
+    public void incrementGlobalNsn() {
         globalNsn.incrementAndGet();
     }
 
-    public synchronized void incrementReadLatchesAcquired() {
-        readLatchesAcquired++;
-    }
-    
-    public synchronized void incrementReadLatchesReleased() {
-        readLatchesReleased++;
-    }
-   
-    public synchronized void incrementWriteLatchesAcquired() {
-        writeLatchesAcquired++;
-    }
-    
-    public synchronized void incrementWriteLatchesReleased() {
-        writeLatchesReleased++;
-    }
-    
-    public synchronized void incrementPins() {
-        pins++;
-    }
-    
-    public synchronized void incrementUnpins() {
-        unpins++;
-    }
-    
-    public synchronized int getGlobalNsn() {
+    public int getGlobalNsn() {
         return globalNsn.get();
     }
 
+    public void incrementReadLatchesAcquired() {
+        readLatchesAcquired.incrementAndGet();
+    }
+
+    public void incrementReadLatchesReleased() {
+        readLatchesReleased.incrementAndGet();
+        ;
+    }
+
+    public void incrementWriteLatchesAcquired() {
+        writeLatchesAcquired.incrementAndGet();
+    }
+
+    public void incrementWriteLatchesReleased() {
+        writeLatchesReleased.incrementAndGet();
+    }
+
+    public void incrementPins() {
+        pins.incrementAndGet();
+    }
+
+    public void incrementUnpins() {
+        unpins.incrementAndGet();
+    }
+
     public String printStats() {
         StringBuilder strBuilder = new StringBuilder();
         strBuilder.append("\n");
@@ -103,9 +104,11 @@
         for (int i = 0; i < currentLevel; i++) {
             strBuilder.append(String.format("%3d ", i) + String.format("%8d ", splitsByLevel[i]) + "\n");
         }
-        strBuilder.append(String.format("READ LATCHES:  %10d %10d\n", readLatchesAcquired, readLatchesReleased));
-        strBuilder.append(String.format("WRITE LATCHES: %10d %10d\n", writeLatchesAcquired, writeLatchesReleased));
-        strBuilder.append(String.format("PINS:          %10d %10d\n", pins, unpins));
+        strBuilder.append(String.format("READ LATCHES:  %10d %10d\n", readLatchesAcquired.get(),
+                readLatchesReleased.get()));
+        strBuilder.append(String.format("WRITE LATCHES: %10d %10d\n", writeLatchesAcquired.get(),
+                writeLatchesReleased.get()));
+        strBuilder.append(String.format("PINS:          %10d %10d\n", pins.get(), unpins.get()));
 
         strBuilder.append(String.format("Num of Pages:          %10d\n", numOfPages));
 
@@ -114,9 +117,7 @@
 
     public void printTree(IRTreeFrame leafFrame, IRTreeFrame interiorFrame, ISerializerDeserializer[] fields)
             throws Exception {
-        totalTuplesInserted = 0;
         printTree(rootPage, null, false, leafFrame, interiorFrame, fields);
-        System.out.println(totalTuplesInserted);
     }
 
     public void printTree(int pageId, ICachedPage parent, boolean unpin, IRTreeFrame leafFrame,
@@ -158,7 +159,6 @@
                     printTree(children.get(i), node, i == children.size() - 1, leafFrame, interiorFrame, fields);
                 }
             } else {
-                totalTuplesInserted += interiorFrame.getTupleCount();
                 node.releaseReadLatch();
                 incrementReadLatchesReleased();
                 bufferCache.unpin(node);
@@ -218,9 +218,9 @@
     }
 
     public RTreeOpContext createOpContext(TreeIndexOp op, IRTreeFrame interiorFrame, IRTreeFrame leafFrame,
-            ITreeIndexMetaDataFrame metaFrame, String name) {
+            ITreeIndexMetaDataFrame metaFrame, String threadName) {
         // TODO: figure out better tree-height hint
-        return new RTreeOpContext(op, interiorFrame, leafFrame, metaFrame, 8, dim, name);
+        return new RTreeOpContext(op, interiorFrame, leafFrame, metaFrame, 8, dim, threadName);
     }
 
     public void insert(ITupleReference tuple, RTreeOpContext ctx) throws Exception {
@@ -236,7 +236,6 @@
 
         int pageId = ctx.pathList.getLastPageId();
         ctx.pathList.removeLast();
-        System.out.println(ctx.name + " Trying to insert in pageId: " + pageId);
         insertTuple(leafNode, pageId, ctx.getTuple(), ctx, true);
 
         while (true) {
@@ -246,7 +245,7 @@
                 break;
             }
         }
-        
+
         leafNode.releaseWriteLatch();
         incrementWriteLatchesReleased();
         bufferCache.unpin(leafNode);
@@ -268,14 +267,11 @@
                 ctx.interiorFrame.setPage(node);
                 isLeaf = ctx.interiorFrame.isLeaf();
                 if (isLeaf) {
-                    System.out.println(ctx.name + " trying to get 555555555555 write lock: " + pageId);
                     node.acquireWriteLatch();
                     incrementWriteLatchesAcquired();
                     writeLatched = true;
-                    System.out.println(ctx.name + " Got write lock: " + pageId);
 
                     if (!ctx.interiorFrame.isLeaf()) {
-                        System.out.println(ctx.name + " Released write lock77777777: " + pageId);
                         node.releaseWriteLatch();
                         incrementWriteLatchesReleased();
                         bufferCache.unpin(node);
@@ -287,10 +283,8 @@
                     // Be optimistic and grab read latch first. We will swap it
                     // to write latch if we need to enlarge the best child
                     // tuple.
-                    System.out.println(ctx.name + " trying to get read lock: " + pageId);
                     node.acquireReadLatch();
                     incrementReadLatchesAcquired();
-                    System.out.println(ctx.name + " Got read lock: " + pageId);
                 }
             }
 
@@ -298,14 +292,12 @@
                 // Concurrent split detected, go back to parent and re-choose
                 // the best child
                 if (writeLatched) {
-                    System.out.println(ctx.name + " Released write lock888888888: " + pageId);
                     node.releaseWriteLatch();
                     incrementWriteLatchesReleased();
                     bufferCache.unpin(node);
                     incrementUnpins();
                     writeLatched = false;
                 } else {
-                    System.out.println(ctx.name + " Released read lock11111111111: " + pageId);
                     node.releaseReadLatch();
                     incrementReadLatchesReleased();
                     bufferCache.unpin(node);
@@ -323,11 +315,6 @@
             pageLsn = ctx.interiorFrame.getPageLsn();
             ctx.pathList.add(pageId, pageLsn, -1);
 
-            for (int i = 0; i < ctx.pathList.size(); i++) {
-                System.out.println(ctx.name + " pageId: " + ctx.pathList.getPageId(i) + " pageLsn: "
-                        + ctx.pathList.getPageLsn(i));
-            }
-
             if (!isLeaf) {
                 // checkEnlargement must be called *before* getBestChildPageId
                 boolean needsEnlargement = ctx.interiorFrame.findBestChild(ctx.getTuple(), ctx.tupleEntries1,
@@ -336,28 +323,23 @@
 
                 if (needsEnlargement) {
                     if (!writeLatched) {
-                        System.out.println(ctx.name + " Released read lock2222222222222222: " + pageId);
                         node.releaseReadLatch();
                         incrementReadLatchesReleased();
                         // TODO: do we need to un-pin and pin again?
                         bufferCache.unpin(node);
                         incrementUnpins();
-                        
-                        System.out.println(ctx.name + " trying to get 6666666666666 write lock: " + pageId);
+
                         node = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, pageId), false);
                         incrementPins();
                         node.acquireWriteLatch();
                         incrementWriteLatchesAcquired();
                         ctx.interiorFrame.setPage(node);
                         writeLatched = true;
-                        System.out.println(ctx.name + " Got write lock: " + pageId);
 
                         if (ctx.interiorFrame.getPageLsn() != pageLsn) {
                             // The page was changed while we unlocked it; thus,
                             // retry (re-choose best child)
 
-                            System.out.println(ctx.name
-                                    + "  changed while we unlocked it; thus, retry (re-choose best child): " + pageId);
                             ctx.pathList.removeLast();
                             continue;
                         }
@@ -367,7 +349,6 @@
                     // already pointing to the best child
                     ctx.interiorFrame.enlarge(ctx.getTuple(), interiorCmp);
 
-                    System.out.println(ctx.name + " Released write lock999999999: " + pageId);
                     node.releaseWriteLatch();
                     incrementWriteLatchesReleased();
                     bufferCache.unpin(node);
@@ -375,14 +356,12 @@
                     writeLatched = false;
                 } else {
                     if (writeLatched) {
-                        System.out.println(ctx.name + " Released write lock1010101010: " + pageId);
                         node.releaseWriteLatch();
                         incrementWriteLatchesReleased();
                         bufferCache.unpin(node);
                         incrementUnpins();
                         writeLatched = false;
                     } else {
-                        System.out.println(ctx.name + " Released read lock33333333333333333: " + pageId);
                         node.releaseReadLatch();
                         incrementReadLatchesReleased();
                         bufferCache.unpin(node);
@@ -403,24 +382,20 @@
         FrameOpSpaceStatus spaceStatus;
         if (!isLeaf) {
             spaceStatus = ctx.interiorFrame.hasSpaceInsert(ctx.getTuple(), interiorCmp);
-            System.out.println(ctx.name + " Number of tuples: " + ctx.interiorFrame.getTupleCount());
         } else {
             spaceStatus = ctx.leafFrame.hasSpaceInsert(ctx.getTuple(), leafCmp);
-            System.out.println(ctx.name + " Number of tuples: " + ctx.leafFrame.getTupleCount());
         }
 
         switch (spaceStatus) {
             case SUFFICIENT_CONTIGUOUS_SPACE: {
                 if (!isLeaf) {
-                    ctx.interiorFrame.insert(tuple, interiorCmp);
+                    ctx.interiorFrame.insert(tuple, interiorCmp, -1);
                     incrementGlobalNsn();
                     ctx.interiorFrame.setPageLsn(getGlobalNsn());
-                    System.out.println(ctx.name + " Number of tuples: " + ctx.interiorFrame.getTupleCount());
                 } else {
-                    ctx.leafFrame.insert(tuple, leafCmp);
+                    ctx.leafFrame.insert(tuple, leafCmp, -1);
                     incrementGlobalNsn();
                     ctx.leafFrame.setPageLsn(getGlobalNsn());
-                    System.out.println(ctx.name + " Number of tuples: " + ctx.leafFrame.getTupleCount());
                 }
                 ctx.splitKey.reset();
                 break;
@@ -429,29 +404,26 @@
             case SUFFICIENT_SPACE: {
                 if (!isLeaf) {
                     ctx.interiorFrame.compact(interiorCmp);
-                    ctx.interiorFrame.insert(tuple, interiorCmp);
+                    ctx.interiorFrame.insert(tuple, interiorCmp, -1);
                     incrementGlobalNsn();
                     ctx.interiorFrame.setPageLsn(getGlobalNsn());
-                    System.out.println(ctx.name + " Number of tuples: " + ctx.interiorFrame.getTupleCount());
                 } else {
                     ctx.leafFrame.compact(leafCmp);
-                    ctx.leafFrame.insert(tuple, leafCmp);
+                    ctx.leafFrame.insert(tuple, leafCmp, -1);
                     incrementGlobalNsn();
                     ctx.leafFrame.setPageLsn(getGlobalNsn());
-                    System.out.println(ctx.name + " Number of tuples: " + ctx.leafFrame.getTupleCount());
                 }
                 ctx.splitKey.reset();
                 break;
             }
 
             case INSUFFICIENT_SPACE: {
-                System.out.println(ctx.name + " Split");
                 int rightPageId = freePageManager.getFreePage(ctx.metaFrame);
                 ICachedPage rightNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, rightPageId), true);
                 incrementPins();
                 rightNode.acquireWriteLatch();
                 incrementWriteLatchesAcquired();
-                
+
                 try {
                     IRTreeFrame rightFrame;
                     int ret;
@@ -516,8 +488,8 @@
 
                             ctx.splitKey.setLeftPage(newLeftId);
 
-                            ctx.interiorFrame.insert(ctx.splitKey.getLeftTuple(), interiorCmp);
-                            ctx.interiorFrame.insert(ctx.splitKey.getRightTuple(), interiorCmp);
+                            ctx.interiorFrame.insert(ctx.splitKey.getLeftTuple(), interiorCmp, -1);
+                            ctx.interiorFrame.insert(ctx.splitKey.getRightTuple(), interiorCmp, -1);
 
                             incrementGlobalNsn();
                             int newNsn = getGlobalNsn();
@@ -541,7 +513,7 @@
             }
         }
     }
-    
+
     public void updateParentForInsert(RTreeOpContext ctx) throws Exception {
         int parentId = ctx.pathList.getLastPageId();
         ICachedPage parentNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, parentId), false);
@@ -579,7 +551,6 @@
         }
         if (foundParent) {
             ctx.interiorFrame.adjustKey(ctx.splitKey.getLeftTuple(), -1, interiorCmp);
-            System.out.println(ctx.name + " Trying to insert from updateParentForInsert in pageId: " + parentId);
             insertTuple(parentNode, parentId, ctx.splitKey.getRightTuple(), ctx, ctx.interiorFrame.isLeaf());
             ctx.pathList.removeLast();
 
@@ -729,6 +700,9 @@
                 ctx.interiorFrame.adjustKey(ctx.splitKey.getLeftTuple(), tupleIndex, interiorCmp);
                 ctx.pathList.removeLast();
 
+                incrementGlobalNsn();
+                ctx.interiorFrame.setPageLsn(getGlobalNsn());
+
                 ctx.splitKey.reset();
                 if (!ctx.pathList.isEmpty()) {
                     ctx.interiorFrame.computeMBR(ctx.splitKey, interiorCmp);
@@ -773,23 +747,23 @@
             boolean isLeaf = ctx.interiorFrame.isLeaf();
             int pageLsn = ctx.interiorFrame.getPageLsn();
             int parentIndex = ctx.traverseList.getParentIndex(pageIndex);
+            ctx.traverseList.setPageLsn(pageIndex, pageLsn);
 
             if (pageId != rootPage && parentLsn < ctx.interiorFrame.getPageNsn()) {
                 // Concurrent split detected, we need to visit the right page
                 int rightPage = ctx.interiorFrame.getRightPage();
                 if (rightPage != -1) {
-                    ctx.traverseList.add(rightPage, parentLsn, parentIndex);
+                    ctx.traverseList.add(rightPage, -1, parentIndex);
                     ctx.pathList.add(rightPage, parentLsn, ctx.traverseList.size() - 1);
                 }
             }
 
             if (!isLeaf) {
-                parentLsn = pageLsn;
                 for (int i = 0; i < ctx.interiorFrame.getTupleCount(); i++) {
                     int childPageId = ctx.interiorFrame.getChildPageIdIfIntersect(ctx.tuple, i, interiorCmp);
                     if (childPageId != -1) {
-                        ctx.traverseList.add(childPageId, parentLsn, pageIndex);
-                        ctx.pathList.add(childPageId, parentLsn, ctx.traverseList.size() - 1);
+                        ctx.traverseList.add(childPageId, -1, pageIndex);
+                        ctx.pathList.add(childPageId, pageLsn, ctx.traverseList.size() - 1);
                     }
                 }
             } else {
@@ -813,8 +787,12 @@
 
                         tupleIndex = ctx.leafFrame.findTupleIndex(ctx.tuple, leafCmp);
                         if (tupleIndex == -1) {
-                            ctx.traverseList.add(pageId, parentLsn, parentIndex);
+                            ctx.traverseList.add(pageId, -1, parentIndex);
                             ctx.pathList.add(pageId, parentLsn, ctx.traverseList.size() - 1);
+                        } else {
+                            ctx.pathList.clear();
+                            fillPath(ctx, pageIndex);
+                            return tupleIndex;
                         }
                     } else {
                         ctx.pathList.clear();
@@ -833,9 +811,11 @@
 
     public void deleteTuple(int pageId, int tupleIndex, RTreeOpContext ctx) throws Exception {
         ctx.leafFrame.delete(tupleIndex, leafCmp);
+        incrementGlobalNsn();
+        ctx.leafFrame.setPageLsn(getGlobalNsn());
 
         // if the page is empty, just leave it there for future inserts
-        if (ctx.leafFrame.getTupleCount() > 0) {
+        if (pageId != rootPage && ctx.leafFrame.getTupleCount() > 0) {
             ctx.leafFrame.computeMBR(ctx.splitKey, leafCmp);
             ctx.splitKey.setLeftPage(pageId);
         }
diff --git a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTreeOpContext.java b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTreeOpContext.java
index 7e4a646..0faea4e 100644
--- a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTreeOpContext.java
+++ b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTreeOpContext.java
@@ -18,10 +18,10 @@
     public final PathList pathList; // used to record the pageIds and pageLsns of the visited pages 
     public final TraverseList traverseList; // used for traversing the tree
     public Rectangle[] rec;
-    public String name;
+    public String threadName; // for debugging
 
     public RTreeOpContext(TreeIndexOp op, IRTreeFrame interiorFrame, IRTreeFrame leafFrame,
-            ITreeIndexMetaDataFrame metaFrame, int treeHeightHint, int dim, String name) {
+            ITreeIndexMetaDataFrame metaFrame, int treeHeightHint, int dim, String threadName) {
         this.op = op;
         this.interiorFrame = interiorFrame;
         this.leafFrame = leafFrame;
@@ -38,7 +38,7 @@
         for (int i = 0; i < 4; i++) {
             rec[i] = new Rectangle(dim);
         }
-        this.name = name;
+        this.threadName = threadName;
     }
 
     public ITupleReference getTuple() {