Fixed various bugs in the insert method when tested using multiple threads

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_indexes@445 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTree.java b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTree.java
index 41f74f0..a524388 100644
--- a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTree.java
+++ b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTree.java
@@ -67,6 +67,22 @@
         globalNsn.incrementAndGet();
     }
 
+    public synchronized void readLatchesAcquired() {
+        readLatchesAcquired++;
+    }
+    
+    public synchronized void readLatchesReleased() {
+        readLatchesReleased++;
+    }
+   
+    public synchronized void writeLatchesAcquired() {
+        writeLatchesAcquired++;
+    }
+    
+    public synchronized void writeLatchesReleased() {
+        writeLatchesReleased++;
+    }
+    
     public synchronized int getGlobalNsn() {
         return globalNsn.get();
     }
@@ -101,12 +117,12 @@
         ICachedPage node = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, pageId), false);
         pins++;
         node.acquireReadLatch();
-        readLatchesAcquired++;
+        readLatchesAcquired();
 
         try {
             if (parent != null && unpin == true) {
                 parent.releaseReadLatch();
-                readLatchesReleased++;
+                readLatchesReleased();
                 bufferCache.unpin(parent);
                 unpins++;
             }
@@ -136,13 +152,13 @@
             } else {
                 totalTuplesInserted += interiorFrame.getTupleCount();
                 node.releaseReadLatch();
-                readLatchesReleased++;
+                readLatchesReleased();
                 bufferCache.unpin(node);
                 unpins++;
             }
         } catch (Exception e) {
             node.releaseReadLatch();
-            readLatchesReleased++;
+            readLatchesReleased();
             bufferCache.unpin(node);
             unpins++;
             e.printStackTrace();
@@ -167,13 +183,13 @@
             pins++;
 
             rootNode.acquireWriteLatch();
-            writeLatchesAcquired++;
+            writeLatchesAcquired();
             try {
                 leafFrame.setPage(rootNode);
                 leafFrame.initBuffer((byte) 0);
             } finally {
                 rootNode.releaseWriteLatch();
-                writeLatchesReleased++;
+                writeLatchesReleased();
                 bufferCache.unpin(rootNode);
                 unpins++;
             }
@@ -194,9 +210,9 @@
     }
 
     public RTreeOpContext createOpContext(TreeIndexOp op, IRTreeFrame interiorFrame, IRTreeFrame leafFrame,
-            ITreeIndexMetaDataFrame metaFrame) {
+            ITreeIndexMetaDataFrame metaFrame, String name) {
         // TODO: figure out better tree-height hint
-        return new RTreeOpContext(op, interiorFrame, leafFrame, metaFrame, 8, dim);
+        return new RTreeOpContext(op, interiorFrame, leafFrame, metaFrame, 8, dim, name);
     }
 
     public void insert(ITupleReference tuple, RTreeOpContext ctx) throws Exception {
@@ -212,6 +228,7 @@
 
         int pageId = ctx.pathList.getLastPageId();
         ctx.pathList.removeLast();
+        System.out.println(ctx.name + " Trying to insert in pageId: " + pageId);
         insertTuple(leafNode, pageId, ctx.getTuple(), ctx, true);
 
         while (true) {
@@ -221,11 +238,12 @@
                 break;
             }
         }
-
+        
         leafNode.releaseWriteLatch();
-        writeLatchesReleased++;
+        writeLatchesReleased();
         bufferCache.unpin(leafNode);
         unpins++;
+
     }
 
     public ICachedPage findLeaf(RTreeOpContext ctx) throws Exception {
@@ -242,46 +260,66 @@
                 ctx.interiorFrame.setPage(node);
                 isLeaf = ctx.interiorFrame.isLeaf();
                 if (isLeaf) {
+                    System.out.println(ctx.name + " trying to get 555555555555 write lock: " + pageId);
                     node.acquireWriteLatch();
-                    writeLatchesAcquired++;
+                    writeLatchesAcquired();
                     writeLatched = true;
+                    System.out.println(ctx.name + " Got write lock: " + pageId);
+
+                    if (!ctx.interiorFrame.isLeaf()) {
+                        System.out.println(ctx.name + " Released write lock77777777: " + pageId);
+                        node.releaseWriteLatch();
+                        writeLatchesReleased();
+                        bufferCache.unpin(node);
+                        unpins++;
+                        writeLatched = false;
+                        continue;
+                    }
                 } else {
                     // Be optimistic and grab read latch first. We will swap it
                     // to write latch if we need to enlarge the best child
                     // tuple.
+                    System.out.println(ctx.name + " trying to get read lock: " + pageId);
                     node.acquireReadLatch();
-                    readLatchesAcquired++;
+                    readLatchesAcquired();
+                    System.out.println(ctx.name + " Got read lock: " + pageId);
                 }
             }
-            pageLsn = ctx.interiorFrame.getPageLsn();
-            ctx.pathList.add(pageId, pageLsn, -1);
 
             if (pageId != rootPage && parentLsn < ctx.interiorFrame.getPageNsn()) {
                 // Concurrent split detected, go back to parent and re-choose
                 // the best child
-                if (isLeaf) {
+                if (writeLatched) {
+                    System.out.println(ctx.name + " Released write lock888888888: " + pageId);
                     node.releaseWriteLatch();
-                    writeLatchesReleased++;
+                    writeLatchesReleased();
                     bufferCache.unpin(node);
                     unpins++;
+                    writeLatched = false;
                 } else {
+                    System.out.println(ctx.name + " Released read lock11111111111: " + pageId);
                     node.releaseReadLatch();
-                    readLatchesReleased++;
+                    readLatchesReleased();
                     bufferCache.unpin(node);
                     unpins++;
                 }
 
-                ctx.pathList.removeLast();
-
                 pageId = ctx.pathList.getLastPageId();
                 if (pageId != rootPage) {
                     parentLsn = ctx.pathList.getPageLsn(ctx.pathList.size() - 2);
                 }
-
-                writeLatched = false;
+                ctx.pathList.removeLast();
                 continue;
             }
 
+            pageLsn = ctx.interiorFrame.getPageLsn();
+            ctx.pathList.add(pageId, pageLsn, -1);
+
+            for (int i = 0; i < ctx.pathList.size(); i++) {
+                System.out.println(ctx.name + " pageId: " + ctx.pathList.getPageId(i) + " pageLsn: "
+                        + ctx.pathList.getPageLsn(i));
+            }
+
             if (!isLeaf) {
                 // checkEnlargement must be called *before* getBestChildPageId
                 boolean needsEnlargement = ctx.interiorFrame.findBestChild(ctx.getTuple(), ctx.tupleEntries1,
@@ -290,22 +328,28 @@
 
                 if (needsEnlargement) {
                     if (!writeLatched) {
+                        System.out.println(ctx.name + " Released read lock2222222222222222: " + pageId);
                         node.releaseReadLatch();
-                        readLatchesReleased++;
+                        readLatchesReleased();
                         // TODO: do we need to un-pin and pin again?
                         bufferCache.unpin(node);
                         unpins++;
-
+                        
+                        System.out.println(ctx.name + " trying to get 6666666666666 write lock: " + pageId);
                         node = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, pageId), false);
                         pins++;
                         node.acquireWriteLatch();
-                        writeLatchesAcquired++;
+                        writeLatchesAcquired();
                         ctx.interiorFrame.setPage(node);
                         writeLatched = true;
+                        System.out.println(ctx.name + " Got write lock: " + pageId);
 
                         if (ctx.interiorFrame.getPageLsn() != pageLsn) {
                             // The page was changed while we unlocked it; thus,
                             // retry (re-choose best child)
+
+                            System.out.println(ctx.name
+                                    + "  changed while we unlocked it; thus, retry (re-choose best child): " + pageId);
                             ctx.pathList.removeLast();
                             continue;
                         }
@@ -315,16 +359,27 @@
                     // already pointing to the best child
                     ctx.interiorFrame.enlarge(ctx.getTuple(), interiorCmp);
 
+                    System.out.println(ctx.name + " Released write lock999999999: " + pageId);
                     node.releaseWriteLatch();
-                    writeLatchesReleased++;
+                    writeLatchesReleased();
                     bufferCache.unpin(node);
                     unpins++;
                     writeLatched = false;
                 } else {
-                    node.releaseReadLatch();
-                    readLatchesReleased++;
-                    bufferCache.unpin(node);
-                    unpins++;
+                    if (writeLatched) {
+                        System.out.println(ctx.name + " Released write lock1010101010: " + pageId);
+                        node.releaseWriteLatch();
+                        writeLatchesReleased();
+                        bufferCache.unpin(node);
+                        unpins++;
+                        writeLatched = false;
+                    } else {
+                        System.out.println(ctx.name + " Released read lock33333333333333333: " + pageId);
+                        node.releaseReadLatch();
+                        readLatchesReleased();
+                        bufferCache.unpin(node);
+                        unpins++;
+                    }
                 }
                 pageId = childPageId;
                 parentLsn = pageLsn;
@@ -340,16 +395,24 @@
         FrameOpSpaceStatus spaceStatus;
         if (!isLeaf) {
             spaceStatus = ctx.interiorFrame.hasSpaceInsert(ctx.getTuple(), interiorCmp);
+            System.out.println(ctx.name + " Number of tuples: " + ctx.interiorFrame.getTupleCount());
         } else {
             spaceStatus = ctx.leafFrame.hasSpaceInsert(ctx.getTuple(), leafCmp);
+            System.out.println(ctx.name + " Number of tuples: " + ctx.leafFrame.getTupleCount());
         }
 
         switch (spaceStatus) {
             case SUFFICIENT_CONTIGUOUS_SPACE: {
                 if (!isLeaf) {
                     ctx.interiorFrame.insert(tuple, interiorCmp);
+                    incrementGlobalNsn();
+                    ctx.interiorFrame.setPageLsn(getGlobalNsn());
+                    System.out.println(ctx.name + " Number of tuples: " + ctx.interiorFrame.getTupleCount());
                 } else {
                     ctx.leafFrame.insert(tuple, leafCmp);
+                    incrementGlobalNsn();
+                    ctx.leafFrame.setPageLsn(getGlobalNsn());
+                    System.out.println(ctx.name + " Number of tuples: " + ctx.leafFrame.getTupleCount());
                 }
                 ctx.splitKey.reset();
                 break;
@@ -359,22 +422,28 @@
                 if (!isLeaf) {
                     ctx.interiorFrame.compact(interiorCmp);
                     ctx.interiorFrame.insert(tuple, interiorCmp);
+                    incrementGlobalNsn();
+                    ctx.interiorFrame.setPageLsn(getGlobalNsn());
+                    System.out.println(ctx.name + " Number of tuples: " + ctx.interiorFrame.getTupleCount());
                 } else {
                     ctx.leafFrame.compact(leafCmp);
                     ctx.leafFrame.insert(tuple, leafCmp);
+                    incrementGlobalNsn();
+                    ctx.leafFrame.setPageLsn(getGlobalNsn());
+                    System.out.println(ctx.name + " Number of tuples: " + ctx.leafFrame.getTupleCount());
                 }
                 ctx.splitKey.reset();
                 break;
             }
 
             case INSUFFICIENT_SPACE: {
-                System.out.println("Split");
+                System.out.println(ctx.name + " Split");
                 int rightPageId = freePageManager.getFreePage(ctx.metaFrame);
                 ICachedPage rightNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, rightPageId), true);
                 pins++;
                 rightNode.acquireWriteLatch();
-                writeLatchesAcquired++;
-
+                writeLatchesAcquired();
+                
                 try {
                     IRTreeFrame rightFrame;
                     int ret;
@@ -388,9 +457,12 @@
                         ret = ctx.interiorFrame.split(rightFrame, tuple, interiorCmp, ctx.splitKey, ctx.tupleEntries1,
                                 ctx.tupleEntries2, ctx.rec);
                         ctx.interiorFrame.setRightPage(rightPageId);
-                        rightFrame.setPageLsn(ctx.interiorFrame.getPageNsn());
+                        rightFrame.setPageNsn(ctx.interiorFrame.getPageNsn());
                         incrementGlobalNsn();
-                        ctx.interiorFrame.setPageLsn(getGlobalNsn());
+                        int newNsn = getGlobalNsn();
+                        rightFrame.setPageLsn(newNsn);
+                        ctx.interiorFrame.setPageNsn(newNsn);
+                        ctx.interiorFrame.setPageLsn(newNsn);
                     } else {
                         splitsByLevel[0]++; // debug
                         rightFrame = leafFrameFactory.getFrame();
@@ -399,11 +471,14 @@
                         rightFrame.setPageTupleFieldCount(leafCmp.getFieldCount());
                         ret = ctx.leafFrame.split(rightFrame, tuple, leafCmp, ctx.splitKey, ctx.tupleEntries1,
                                 ctx.tupleEntries2, ctx.rec);
-                        rightFrame.setPageLsn(ctx.leafFrame.getPageNsn());
+                        ctx.leafFrame.setRightPage(rightPageId);
+                        rightFrame.setPageNsn(ctx.leafFrame.getPageNsn());
                         incrementGlobalNsn();
-                        ctx.leafFrame.setPageLsn(getGlobalNsn());
+                        int newNsn = getGlobalNsn();
+                        rightFrame.setPageLsn(newNsn);
+                        ctx.leafFrame.setPageNsn(newNsn);
+                        ctx.leafFrame.setPageLsn(newNsn);
                     }
-
                     if (ret != 0) {
                         ctx.splitKey.reset();
                     } else {
@@ -421,7 +496,7 @@
                                 true);
                         pins++;
                         newLeftNode.acquireWriteLatch();
-                        writeLatchesAcquired++;
+                        writeLatchesAcquired();
                         try {
                             // copy left child to new left child
                             System.arraycopy(node.getBuffer().array(), 0, newLeftNode.getBuffer().array(), 0,
@@ -437,10 +512,12 @@
                             ctx.interiorFrame.insert(ctx.splitKey.getRightTuple(), interiorCmp);
 
                             incrementGlobalNsn();
-                            ctx.interiorFrame.setPageLsn(getGlobalNsn());
+                            int newNsn = getGlobalNsn();
+                            ctx.interiorFrame.setPageLsn(newNsn);
+                            ctx.interiorFrame.setPageNsn(newNsn);
                         } finally {
                             newLeftNode.releaseWriteLatch();
-                            writeLatchesReleased++;
+                            writeLatchesReleased();
                             bufferCache.unpin(newLeftNode);
                             unpins++;
                         }
@@ -448,7 +525,7 @@
                         ctx.splitKey.reset();
                     }
                     rightNode.releaseWriteLatch();
-                    writeLatchesReleased++;
+                    writeLatchesReleased();
                     bufferCache.unpin(rightNode);
                     unpins++;
                 }
@@ -456,13 +533,13 @@
             }
         }
     }
-
+    
     public void updateParentForInsert(RTreeOpContext ctx) throws Exception {
         int parentId = ctx.pathList.getLastPageId();
         ICachedPage parentNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, parentId), false);
         pins++;
         parentNode.acquireWriteLatch();
-        writeLatchesAcquired++;
+        writeLatchesAcquired();
         ctx.interiorFrame.setPage(parentNode);
         boolean foundParent = true;
 
@@ -476,7 +553,7 @@
                 }
                 int rightPage = ctx.interiorFrame.getRightPage();
                 parentNode.releaseWriteLatch();
-                writeLatchesReleased++;
+                writeLatchesReleased();
                 bufferCache.unpin(parentNode);
                 unpins++;
 
@@ -488,17 +565,18 @@
                 parentNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, parentId), false);
                 pins++;
                 parentNode.acquireWriteLatch();
-                writeLatchesAcquired++;
+                writeLatchesAcquired();
                 ctx.interiorFrame.setPage(parentNode);
             }
         }
         if (foundParent) {
             ctx.interiorFrame.adjustKey(ctx.splitKey.getLeftTuple(), -1, interiorCmp);
+            System.out.println(ctx.name + " Trying to insert from updateParentForInsert in pageId: " + parentId);
             insertTuple(parentNode, parentId, ctx.splitKey.getRightTuple(), ctx, ctx.interiorFrame.isLeaf());
             ctx.pathList.removeLast();
 
             parentNode.releaseWriteLatch();
-            writeLatchesReleased++;
+            writeLatchesReleased();
             bufferCache.unpin(parentNode);
             unpins++;
             return;
@@ -526,7 +604,7 @@
             ICachedPage node = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, pageId), false);
             pins++;
             node.acquireReadLatch();
-            readLatchesAcquired++;
+            readLatchesAcquired();
             ctx.interiorFrame.setPage(node);
             pageLsn = ctx.interiorFrame.getPageLsn();
             pageIndex = ctx.traverseList.first();
@@ -547,13 +625,13 @@
                 fillPath(ctx, pageIndex);
 
                 node.releaseReadLatch();
-                readLatchesReleased++;
+                readLatchesReleased();
                 bufferCache.unpin(node);
                 unpins++;
                 return;
             }
             node.releaseReadLatch();
-            readLatchesReleased++;
+            readLatchesReleased();
             bufferCache.unpin(node);
             unpins++;
         }
@@ -590,7 +668,7 @@
             }
 
             ctx.leafFrame.getPage().releaseWriteLatch();
-            writeLatchesReleased++;
+            writeLatchesReleased();
             bufferCache.unpin(ctx.leafFrame.getPage());
             unpins++;
         }
@@ -601,7 +679,7 @@
         ICachedPage parentNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, parentId), false);
         pins++;
         parentNode.acquireWriteLatch();
-        writeLatchesAcquired++;
+        writeLatchesAcquired();
         ctx.interiorFrame.setPage(parentNode);
         boolean foundParent = true;
         int tupleIndex = -1;
@@ -617,7 +695,7 @@
                 }
                 int rightPage = ctx.interiorFrame.getRightPage();
                 parentNode.releaseWriteLatch();
-                writeLatchesReleased++;
+                writeLatchesReleased();
                 bufferCache.unpin(parentNode);
                 unpins++;
 
@@ -629,7 +707,7 @@
                 parentNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, parentId), false);
                 pins++;
                 parentNode.acquireWriteLatch();
-                writeLatchesAcquired++;
+                writeLatchesAcquired();
                 ctx.interiorFrame.setPage(parentNode);
             }
         }
@@ -654,7 +732,7 @@
             }
 
             parentNode.releaseWriteLatch();
-            writeLatchesReleased++;
+            writeLatchesReleased();
             bufferCache.unpin(parentNode);
             unpins++;
             return;
@@ -682,7 +760,7 @@
             ICachedPage node = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, pageId), false);
             pins++;
             node.acquireReadLatch();
-            readLatchesAcquired++;
+            readLatchesAcquired();
             ctx.interiorFrame.setPage(node);
             boolean isLeaf = ctx.interiorFrame.isLeaf();
             int pageLsn = ctx.interiorFrame.getPageLsn();
@@ -712,14 +790,14 @@
                 if (tupleIndex != -1) {
 
                     node.releaseReadLatch();
-                    readLatchesReleased++;
+                    readLatchesReleased();
                     bufferCache.unpin(node);
                     unpins++;
 
                     node = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, pageId), false);
                     pins++;
                     node.acquireWriteLatch();
-                    writeLatchesAcquired++;
+                    writeLatchesAcquired();
                     ctx.leafFrame.setPage(node);
 
                     if (ctx.leafFrame.getPageLsn() != pageLsn) {
@@ -738,7 +816,7 @@
                 }
             }
             node.releaseReadLatch();
-            readLatchesReleased++;
+            readLatchesReleased();
             bufferCache.unpin(node);
             unpins++;
         }
@@ -747,7 +825,7 @@
 
     public void deleteTuple(int pageId, int tupleIndex, RTreeOpContext ctx) throws Exception {
         ctx.leafFrame.delete(tupleIndex, leafCmp);
-        
+
         // if the page is empty, just leave it there for future inserts
         if (ctx.leafFrame.getTupleCount() > 0) {
             ctx.leafFrame.computeMBR(ctx.splitKey, leafCmp);
@@ -767,7 +845,7 @@
             ICachedPage node = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, pageId), false);
             pins++;
             node.acquireReadLatch();
-            readLatchesAcquired++;
+            readLatchesAcquired();
 
             try {
 
@@ -802,7 +880,7 @@
 
             } finally {
                 node.releaseReadLatch();
-                readLatchesReleased++;
+                readLatchesReleased();
                 bufferCache.unpin(node);
                 unpins++;
             }
diff --git a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTreeOpContext.java b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTreeOpContext.java
index 4c8f198..7e4a646 100644
--- a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTreeOpContext.java
+++ b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTreeOpContext.java
@@ -18,9 +18,10 @@
     public final PathList pathList; // used to record the pageIds and pageLsns of the visited pages 
     public final TraverseList traverseList; // used for traversing the tree
     public Rectangle[] rec;
+    public String name;
 
     public RTreeOpContext(TreeIndexOp op, IRTreeFrame interiorFrame, IRTreeFrame leafFrame,
-            ITreeIndexMetaDataFrame metaFrame, int treeHeightHint, int dim) {
+            ITreeIndexMetaDataFrame metaFrame, int treeHeightHint, int dim, String name) {
         this.op = op;
         this.interiorFrame = interiorFrame;
         this.leafFrame = leafFrame;
@@ -37,6 +38,7 @@
         for (int i = 0; i < 4; i++) {
             rec[i] = new Rectangle(dim);
         }
+        this.name = name;
     }
 
     public ITupleReference getTuple() {