Fixed a latch deadlock in the BTree. Since our leaves are double-linked, Mohan's protocol needed minor modifications. However, reverse scanning the tree could currently still lead to latch deadlock. Avoiding it is rather complex, and I'm deferring it for now.

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@848 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTree.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTree.java
index 57f1b79..4113e9f 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTree.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/impls/BTree.java
@@ -58,8 +58,6 @@
     private final static int MAX_RESTARTS = 10;
     private final static int rootPage = 1;
         
-    private boolean created = false;
-
     private final IFreePageManager freePageManager;
     private final IBufferCache bufferCache;    
     private final ITreeIndexFrameFactory interiorFrameFactory;
@@ -86,15 +84,11 @@
     public void create(int fileId) throws HyracksDataException {
         treeLatch.writeLock().lock();
         try {
-            if (created) {
-                return;
-            }
             ITreeIndexFrame leafFrame = leafFrameFactory.createFrame();
             ITreeIndexMetaDataFrame metaFrame = freePageManager.getMetaDataFrameFactory().createFrame();
             this.fileId = fileId;
             freePageManager.init(metaFrame, rootPage);
             initRoot(leafFrame, true);
-            created = true;
         } finally {
             treeLatch.writeLock().unlock();
         }
@@ -305,8 +299,9 @@
         insertUpdateOrDelete(tuple, ctx);
     }
     
-    private void insertLeaf(ICachedPage node, int pageId, ITupleReference tuple, BTreeOpContext ctx) throws Exception {
+    private boolean insertLeaf(ICachedPage node, int pageId, ITupleReference tuple, BTreeOpContext ctx) throws Exception {
         ctx.leafFrame.setPage(node);
+        boolean restartOp = false;
         int targetTupleIndex = ctx.leafFrame.findInsertTupleIndex(tuple);
         FrameOpSpaceStatus spaceStatus = ctx.leafFrame.hasSpaceInsert(tuple);
         switch (spaceStatus) {
@@ -324,7 +319,7 @@
                 ctx.splitKey.reset();
                 break;
             }
-            case INSUFFICIENT_SPACE: {
+            case INSUFFICIENT_SPACE: {            	
                 // Try compressing the page first and see if there is space available.
                 boolean reCompressed = ctx.leafFrame.compress();
                 if (reCompressed) {
@@ -336,31 +331,34 @@
                     ctx.leafFrame.insert(tuple, targetTupleIndex);
                     ctx.splitKey.reset();
                 } else {
-                    performLeafSplit(pageId, tuple, ctx);
+                	restartOp = performLeafSplit(pageId, tuple, ctx);
                 }
                 break;
             }
         }
         node.releaseWriteLatch();
         bufferCache.unpin(node);
+        return restartOp;
     }
     
-    private void performLeafSplit(int pageId, ITupleReference tuple, BTreeOpContext ctx) throws Exception {
-        int rightSiblingPageId = ctx.leafFrame.getNextLeaf();
+    private boolean performLeafSplit(int pageId, ITupleReference tuple, BTreeOpContext ctx) throws Exception {    	
+    	// Lock is released in unsetSmPages(), after sm has fully completed.
+        if (!treeLatch.writeLock().tryLock()) {
+        	return true;
+        }
+    	int rightSiblingPageId = ctx.leafFrame.getNextLeaf();
         ICachedPage rightSibling = null;
         if (rightSiblingPageId > 0) {
             rightSibling = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, rightSiblingPageId),
                     false);
         }
-        // Lock is released in unsetSmPages(), after sm has fully completed.
-        treeLatch.writeLock().lock(); 
         try {
             int rightPageId = freePageManager.getFreePage(ctx.metaFrame);
             ICachedPage rightNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, rightPageId),
                     true);
             rightNode.acquireWriteLatch();
             try {
-                IBTreeLeafFrame rightFrame = ctx.createLeafFrame();                
+                IBTreeLeafFrame rightFrame = ctx.createLeafFrame();
                 rightFrame.setPage(rightNode);
                 rightFrame.initBuffer((byte) 0);
                 rightFrame.setMultiComparator(cmp);
@@ -385,7 +383,7 @@
                 ctx.splitKey.setPages(pageId, rightPageId);
                 
                 if (rightSibling != null) {
-                    rightSibling.acquireWriteLatch();
+                	rightSibling.acquireWriteLatch();
                     try {
                         // Reuse rightFrame for modification.
                         rightFrame.setPage(rightSibling);
@@ -406,12 +404,14 @@
                 bufferCache.unpin(rightSibling);
             }
         }
+        return false;
     }
     
-    private void updateLeaf(ICachedPage node, int pageId, ITupleReference tuple, BTreeOpContext ctx) throws Exception {
+    private boolean updateLeaf(ICachedPage node, int pageId, ITupleReference tuple, BTreeOpContext ctx) throws Exception {
         ctx.leafFrame.setPage(node);
         int oldTupleIndex = ctx.leafFrame.findUpdateTupleIndex(tuple);
         FrameOpSpaceStatus spaceStatus = ctx.leafFrame.hasSpaceUpdate(tuple, oldTupleIndex);
+        boolean restartOp = false;
         switch (spaceStatus) {
             case SUFFICIENT_INPLACE_SPACE: {
                 ctx.leafFrame.update(tuple, oldTupleIndex, true);
@@ -443,13 +443,14 @@
                     ctx.leafFrame.insert(tuple, targetTupleIndex);
                     ctx.splitKey.reset();
                 } else {
-                    performLeafSplit(pageId, tuple, ctx);
+                    restartOp = performLeafSplit(pageId, tuple, ctx);
                 }
                 break;
             }
         }
         node.releaseWriteLatch();
         bufferCache.unpin(node);
+        return restartOp;
     }
 
     private void insertInterior(ICachedPage node, int pageId, ITupleReference tuple, BTreeOpContext ctx)
@@ -506,95 +507,102 @@
         }
     }
 
-    private void deleteLeaf(ICachedPage node, int pageId, ITupleReference tuple, BTreeOpContext ctx) throws Exception {
+    private boolean deleteLeaf(ICachedPage node, int pageId, ITupleReference tuple, BTreeOpContext ctx) throws Exception {
         ctx.leafFrame.setPage(node);
         int tupleIndex = ctx.leafFrame.findDeleteTupleIndex(tuple);
-
+        
         // Will this leaf become empty?
         if (ctx.leafFrame.getTupleCount() > 1) {
             // Leaf will not become empty.
             ctx.leafFrame.delete(tuple, tupleIndex);
             node.releaseWriteLatch();
             bufferCache.unpin(node);
-            return;
+            return false;
         }
         
         // Leaf will become empty. 
-        IBTreeLeafFrame siblingFrame = ctx.createLeafFrame();
+        IBTreeLeafFrame siblingFrame = (IBTreeLeafFrame) leafFrameFactory.createFrame();
         siblingFrame.setMultiComparator(cmp);
         ICachedPage leftNode = null;
         ICachedPage rightNode = null;
         int nextLeaf = ctx.leafFrame.getNextLeaf();
         int prevLeaf = ctx.leafFrame.getPrevLeaf();
-        if (prevLeaf > 0) {
-            leftNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, prevLeaf), false);
+        // Try to get the tree latch, if it's already taken, then restart this operation
+        // to avoid latch deadlock.
+        if (!treeLatch.writeLock().tryLock()) {
+        	return true;
         }
         try {
-            if (nextLeaf > 0) {
-                rightNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, nextLeaf), false);
-            }
-            try {
-                treeLatch.writeLock().lock();
-                try {
-                    ctx.leafFrame.delete(tuple, tupleIndex);
-                    // To propagate the deletion we only need to make the
-                    // splitKey != null.
-                    // Reuse data to identify which key to delete in the parent.
-                    ctx.splitKey.initData(1);
-                } catch (Exception e) {
-                    // Don't propagate deletion.
-                    ctx.splitKey.reset();
-                    throw e;
-                }
+        	if (prevLeaf > 0) {
+        		leftNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, prevLeaf), false);
+        	}
+        	try {
+        		if (nextLeaf > 0) {
+        			rightNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, nextLeaf), false);
+        		}
+        		try {
+        			try {
+        				ctx.leafFrame.delete(tuple, tupleIndex);
+        				// To propagate the deletion we only need to make the
+        				// splitKey != null.
+        				// Reuse data to identify which key to delete in the parent.
+        				ctx.splitKey.initData(1);
+        			} catch (Exception e) {
+        				// Don't propagate deletion.
+        				ctx.splitKey.reset();
+        				throw e;
+        			}
 
-                // TODO: Tie together with logging.
-                ctx.leafFrame.setPageLsn(ctx.leafFrame.getPageLsn() + 1);
-                ctx.leafFrame.setLevel(freePageManager.getFreePageLevelIndicator());
+        			// TODO: Tie together with logging.
+        			ctx.leafFrame.setPageLsn(ctx.leafFrame.getPageLsn() + 1);
+        			ctx.leafFrame.setLevel(freePageManager.getFreePageLevelIndicator());
 
-                ctx.smPages.add(pageId);
-                ctx.leafFrame.setSmFlag(true);
+        			ctx.smPages.add(pageId);
+        			ctx.leafFrame.setSmFlag(true);
 
-                node.releaseWriteLatch();
-                bufferCache.unpin(node);
+        			node.releaseWriteLatch();
+        			bufferCache.unpin(node);
 
-                if (leftNode != null) {
-                    leftNode.acquireWriteLatch();
-                    try {
-                        siblingFrame.setPage(leftNode);
-                        siblingFrame.setNextLeaf(nextLeaf);
-                        // TODO: Tie together with logging.
-                        siblingFrame.setPageLsn(siblingFrame.getPageLsn() + 1);
-                    } finally {
-                        leftNode.releaseWriteLatch();
-                    }
-                }
+        			if (leftNode != null) {
+        				leftNode.acquireWriteLatch();
+        				try {
+        					siblingFrame.setPage(leftNode);
+        					siblingFrame.setNextLeaf(nextLeaf);
+        					// TODO: Tie together with logging.
+        					siblingFrame.setPageLsn(siblingFrame.getPageLsn() + 1);
+        				} finally {
+        					leftNode.releaseWriteLatch();
+        				}
+        			}
 
-                if (rightNode != null) {
-                    rightNode.acquireWriteLatch();
-                    try {
-                        siblingFrame.setPage(rightNode);
-                        siblingFrame.setPrevLeaf(prevLeaf);
-                        // TODO: Tie together with logging.
-                        siblingFrame.setPageLsn(siblingFrame.getPageLsn() + 1);
-                    } finally {
-                        rightNode.releaseWriteLatch();
-                    }
-                }
-                // Register pageId as a free.
-                ctx.freePages.add(pageId);
-            } catch (Exception e) {
-                treeLatch.writeLock().unlock();
-                throw e;
-            } finally {
-                if (rightNode != null) {
-                    bufferCache.unpin(rightNode);
-                }
-            }
-        } finally {
-            if (leftNode != null) {
-                bufferCache.unpin(leftNode);
-            }
+        			if (rightNode != null) {
+        				rightNode.acquireWriteLatch();
+        				try {
+        					siblingFrame.setPage(rightNode);
+        					siblingFrame.setPrevLeaf(prevLeaf);
+        					// TODO: Tie together with logging.
+        					siblingFrame.setPageLsn(siblingFrame.getPageLsn() + 1);
+        				} finally {
+        					rightNode.releaseWriteLatch();
+        				}
+        			}
+        			// Register pageId as a free.
+        			ctx.freePages.add(pageId);
+        		} finally {
+        			if (rightNode != null) {
+                		bufferCache.unpin(rightNode);
+                	}
+        		}
+        	} finally {
+        		if (leftNode != null) {
+        			bufferCache.unpin(leftNode);
+        		}
+        	}
+        } catch (Exception e) {
+        	treeLatch.writeLock().unlock();
+        	throw e;
         }
+        return false;
     }
 
     private void deleteInterior(ICachedPage node, int pageId, ITupleReference tuple, BTreeOpContext ctx)
@@ -665,7 +673,7 @@
         // this check performs an unprotected read in the page
         // the following could happen: TODO fill out
         boolean unsafeIsLeaf = ctx.interiorFrame.isLeaf();
-        acquireLatch(node, ctx, unsafeIsLeaf);        
+        acquireLatch(node, ctx, unsafeIsLeaf);
         boolean smFlag = ctx.interiorFrame.getSmFlag();
         // re-check leafness after latching
         boolean isLeaf = ctx.interiorFrame.isLeaf();
@@ -674,7 +682,6 @@
         // structure modification
         ctx.pageLsns.add(ctx.interiorFrame.getPageLsn());
         try {
-
             // latch coupling, note: parent should never be write latched,
             // otherwise something is wrong.
             if (parent != null) {
@@ -742,8 +749,6 @@
                     } // end while
                 } else { // smFlag
                     ctx.opRestarts++;
-                    System.out.println("ONGOING SM ON PAGE " + pageId + " AT LEVEL " + ctx.interiorFrame.getLevel()
-                            + ", RESTARTS: " + ctx.opRestarts);
                     releaseLatch(node, ctx, unsafeIsLeaf);
                     bufferCache.unpin(node);
 
@@ -752,9 +757,9 @@
                     // instead we just immediately release the lock. this is
                     // inefficient but still correct and will not cause
                     // latch-deadlock
-                    treeLatch.readLock().lock();
-                    treeLatch.readLock().unlock();
-
+                    treeLatch.writeLock().lock();
+                    treeLatch.writeLock().unlock();
+                    
                     // unwind recursion and restart operation, find lowest page
                     // with a pageLsn as seen by this operation during descent
                     ctx.pageLsns.removeLast(); // pop current page lsn
@@ -763,17 +768,19 @@
                     ctx.pageLsns.add(RESTART_OP);
                 }
             } else { // isLeaf and !smFlag
-                switch (ctx.op) {
+                // We may have to restart an op to avoid latch deadlock.
+            	boolean restartOp = false;
+            	switch (ctx.op) {
                     case INSERT: {
-                        insertLeaf(node, pageId, ctx.pred.getLowKey(), ctx);
+                        restartOp = insertLeaf(node, pageId, ctx.pred.getLowKey(), ctx);
                         break;
                     }
                     case UPDATE: {
-                        updateLeaf(node, pageId, ctx.pred.getLowKey(), ctx);
+                    	restartOp = updateLeaf(node, pageId, ctx.pred.getLowKey(), ctx);
                         break;
                     }
                     case DELETE: {
-                        deleteLeaf(node, pageId, ctx.pred.getLowKey(), ctx);
+                    	restartOp = deleteLeaf(node, pageId, ctx.pred.getLowKey(), ctx);
                         break;
                     }
                     case SEARCH: {
@@ -782,22 +789,27 @@
                         break;
                     }
                 }
+            	if (restartOp) {
+            		ctx.pageLsns.removeLast();
+                    ctx.pageLsns.add(RESTART_OP);
+            	}
             }
         } catch (TreeIndexException e) {
-            if (!ctx.exceptionHandled) {
+        	if (!ctx.exceptionHandled) {
                 releaseLatch(node, ctx, unsafeIsLeaf);
                 bufferCache.unpin(node);
                 ctx.exceptionHandled = true;
             }
             throw e;
         } catch (PageAllocationException e) {
-            if (!ctx.exceptionHandled) {
+        	if (!ctx.exceptionHandled) {
                 releaseLatch(node, ctx, unsafeIsLeaf);
                 bufferCache.unpin(node);
                 ctx.exceptionHandled = true;
             }
             throw e;
         } catch (Exception e) {
+        	e.printStackTrace();
             releaseLatch(node, ctx, unsafeIsLeaf);
             bufferCache.unpin(node);
             BTreeException wrappedException = new BTreeException(e);