Changed the RTree and RStarTree split algorithms to do double split if the new tuple cannot fit. All test cases pass. 

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@2352 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RStarTreePolicy.java b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RStarTreePolicy.java
index 9b80614..039acc8 100644
--- a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RStarTreePolicy.java
+++ b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RStarTreePolicy.java
@@ -24,6 +24,7 @@
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleReference;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
+import edu.uci.ics.hyracks.storage.am.common.frames.FrameOpSpaceStatus;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
 import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeFrame;
 import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreePolicy;
@@ -173,7 +174,7 @@
             startIndex = splitPoint;
             endIndex = (leftRTreeFrame.getTupleCount() + 1);
         }
-        boolean tupleInserted = false;
+        boolean insertedNewTupleInRightFrame = false;
         int totalBytes = 0, numOfDeletedTuples = 0;
         for (int i = startIndex; i < endIndex; i++) {
             if (tupleEntries1.get(i).getTupleIndex() != -1) {
@@ -184,8 +185,7 @@
                 totalBytes += leftRTreeFrame.getTupleSize(frameTuple);
                 numOfDeletedTuples++;
             } else {
-                rightFrame.insert(tuple, -1);
-                tupleInserted = true;
+                insertedNewTupleInRightFrame = true;
             }
         }
 
@@ -198,9 +198,18 @@
         // compact both pages
         rightFrame.compact();
         leftRTreeFrame.compact();
-
-        if (!tupleInserted) {
-            leftRTreeFrame.insert(tuple, -1);
+        
+        boolean insertedNewTuple = false;
+        if (insertedNewTupleInRightFrame) {
+            if (rightFrame.hasSpaceInsert(tuple) == FrameOpSpaceStatus.SUFFICIENT_CONTIGUOUS_SPACE) {
+                rightFrame.insert(tuple, -1);
+                insertedNewTuple = true;
+            }
+        } else {
+            if (leftRTreeFrame.hasSpaceInsert(tuple) == FrameOpSpaceStatus.SUFFICIENT_CONTIGUOUS_SPACE) {
+                leftRTreeFrame.insert(tuple, -1);
+                insertedNewTuple = true;
+            }
         }
 
         int tupleOff = slotManager.getTupleOff(slotManager.getSlotEndOff());
@@ -220,7 +229,7 @@
 
         tupleEntries1.clear();
         tupleEntries2.clear();
-        return true;
+        return insertedNewTuple;
     }
 
     public void generateDist(ITreeIndexFrame leftRTreeFrame, ITreeIndexTupleReference frameTuple,
diff --git a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreePolicy.java b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreePolicy.java
index 27dacf7..da9588d 100644
--- a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreePolicy.java
+++ b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/frames/RTreePolicy.java
@@ -24,6 +24,7 @@
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleReference;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
+import edu.uci.ics.hyracks.storage.am.common.frames.FrameOpSpaceStatus;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
 import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreeFrame;
 import edu.uci.ics.hyracks.storage.am.rtree.api.IRTreePolicy;
@@ -54,8 +55,8 @@
         }
     }
 
-    public boolean split(ITreeIndexFrame leftFrame, ByteBuffer buf, ITreeIndexFrame rightFrame, ISlotManager slotManager,
-            ITreeIndexTupleReference frameTuple, ITupleReference tuple, ISplitKey splitKey) {
+    public boolean split(ITreeIndexFrame leftFrame, ByteBuffer buf, ITreeIndexFrame rightFrame,
+            ISlotManager slotManager, ITreeIndexTupleReference frameTuple, ITupleReference tuple, ISplitKey splitKey) {
         RTreeSplitKey rTreeSplitKey = ((RTreeSplitKey) splitKey);
         RTreeTypeAwareTupleWriter rTreeTupleWriterLeftFrame = ((RTreeTypeAwareTupleWriter) tupleWriter);
         RTreeTypeAwareTupleWriter rTreeTupleWriterRightFrame = ((RTreeTypeAwareTupleWriter) rightFrame.getTupleWriter());
@@ -68,15 +69,18 @@
         for (int i = 0; i < maxFieldPos; i++) {
             int j = maxFieldPos + i;
             frameTuple.resetByTupleIndex(leftRTreeFrame, 0);
-            double leastLowerValue = keyValueProviders[i].getValue(tuple.getFieldData(i), tuple.getFieldStart(i));
-            double greatestUpperValue = keyValueProviders[j].getValue(tuple.getFieldData(j), tuple.getFieldStart(j));
+            double leastLowerValue = keyValueProviders[i].getValue(frameTuple.getFieldData(i),
+                    frameTuple.getFieldStart(i));
+            double greatestUpperValue = keyValueProviders[j].getValue(frameTuple.getFieldData(j),
+                    frameTuple.getFieldStart(j));
             double leastUpperValue = leastLowerValue;
             double greatestLowerValue = greatestUpperValue;
-            int leastUpperIndex = -1;
-            int greatestLowerIndex = -1;
+            int leastUpperIndex = 0;
+            int greatestLowerIndex = 0;
             double width;
 
-            for (int k = 0; k < leftRTreeFrame.getTupleCount(); ++k) {
+            int tupleCount = leftRTreeFrame.getTupleCount();
+            for (int k = 1; k < tupleCount; ++k) {
                 frameTuple.resetByTupleIndex(leftRTreeFrame, k);
                 double lowerValue = keyValueProviders[i].getValue(frameTuple.getFieldData(i),
                         frameTuple.getFieldStart(i));
@@ -113,35 +117,26 @@
             }
         }
 
-        if (seed1 == -1 && seed1 == seed2) {
-            seed2 = 0;
-        } else if (seed1 == seed2) {
-            if (seed2 == 0) {
-                ++seed2;
+        if (seed1 == seed2) {
+            if (seed1 == 0) {
+                seed2 = 1;
             } else {
                 --seed2;
             }
         }
 
         int totalBytes = 0, numOfDeletedTuples = 0;
-        if (seed1 == -1) {
-            rightFrame.insert(tuple, -1);
-            rec[0].set(tuple, keyValueProviders);
-        } else {
-            frameTuple.resetByTupleIndex(leftRTreeFrame, seed1);
-            rec[0].set(frameTuple, keyValueProviders);
-            rightFrame.insert(frameTuple, -1);
-            ((UnorderedSlotManager) slotManager).modifySlot(slotManager.getSlotOff(seed1), -1);
-            totalBytes += leftRTreeFrame.getTupleSize(frameTuple);
-            numOfDeletedTuples++;
-        }
 
-        if (seed2 == -1) {
-            rec[1].set(tuple, keyValueProviders);
-        } else {
-            frameTuple.resetByTupleIndex(leftRTreeFrame, seed2);
-            rec[1].set(frameTuple, keyValueProviders);
-        }
+        frameTuple.resetByTupleIndex(leftRTreeFrame, seed1);
+        rec[0].set(frameTuple, keyValueProviders);
+        rightFrame.insert(frameTuple, -1);
+        ((UnorderedSlotManager) slotManager).modifySlot(slotManager.getSlotOff(seed1), -1);
+        totalBytes += leftRTreeFrame.getTupleSize(frameTuple);
+        numOfDeletedTuples++;
+
+        frameTuple.resetByTupleIndex(leftRTreeFrame, seed2);
+        rec[1].set(frameTuple, keyValueProviders);
+
         int remainingTuplestoBeInsertedInRightFrame;
         for (int k = 0; k < leftRTreeFrame.getTupleCount(); ++k) {
             remainingTuplestoBeInsertedInRightFrame = leftRTreeFrame.getTupleCount() / 2 - rightFrame.getTupleCount();
@@ -175,13 +170,14 @@
         rightFrame.compact();
         leftRTreeFrame.compact();
 
-        if (seed2 == -1) {
-            leftRTreeFrame.insert(tuple, -1);
-        } else if (seed1 != -1
-                && rec[0].enlargedArea(tuple, keyValueProviders) < rec[1].enlargedArea(tuple, keyValueProviders)) {
+        boolean insertedNewTuple = false;
+        if (rec[0].enlargedArea(tuple, keyValueProviders) < rec[1].enlargedArea(tuple, keyValueProviders)
+                && rightFrame.hasSpaceInsert(tuple) == FrameOpSpaceStatus.SUFFICIENT_CONTIGUOUS_SPACE) {
             rightFrame.insert(tuple, -1);
-        } else if (seed1 != -1) {
+            insertedNewTuple = true;
+        } else if (leftRTreeFrame.hasSpaceInsert(tuple) == FrameOpSpaceStatus.SUFFICIENT_CONTIGUOUS_SPACE) {
             leftRTreeFrame.insert(tuple, -1);
+            insertedNewTuple = true;
         }
 
         int tupleOff = slotManager.getTupleOff(slotManager.getSlotEndOff());
@@ -197,7 +193,7 @@
         rTreeTupleWriterRightFrame.writeTupleFields(((RTreeNSMFrame) rightFrame).getTuples(), 0,
                 rTreeSplitKey.getRightPageBuffer(), 0);
         rTreeSplitKey.getRightTuple().resetByTupleOffset(rTreeSplitKey.getRightPageBuffer(), 0);
-        return true;
+        return insertedNewTuple;
     }
 
     public boolean findBestChild(ITreeIndexFrame frame, ITupleReference tuple, ITreeIndexTupleReference frameTuple,
diff --git a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTree.java b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTree.java
index 9eb8ed4..11742bb 100644
--- a/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTree.java
+++ b/hyracks-storage-am-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/impls/RTree.java
@@ -26,15 +26,15 @@
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 import edu.uci.ics.hyracks.storage.am.common.api.IFreePageManager;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexBulkLoader;
-import edu.uci.ics.hyracks.storage.am.common.api.IIndexOperationContext;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexOperationContext;
 import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
 import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
 import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrame;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleReference;
 import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
 import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
@@ -163,33 +163,36 @@
                 throw new IllegalArgumentException("The low key point has larger coordinates than the high key point.");
             }
         }
-        try {
-            ICachedPage leafNode = findLeaf(ctx);
+        boolean tupleInserted = false;
+        while (!tupleInserted) {
+            try {
+                ICachedPage leafNode = findLeaf(ctx);
 
-            int pageId = ctx.pathList.getLastPageId();
-            ctx.pathList.moveLast();
-            insertTuple(leafNode, pageId, ctx.getTuple(), ctx, true);
+                int pageId = ctx.pathList.getLastPageId();
+                ctx.pathList.moveLast();
+                tupleInserted = insertTuple(leafNode, pageId, ctx.getTuple(), ctx, true);
 
-            while (true) {
-                if (ctx.splitKey.getLeftPageBuffer() != null) {
-                    updateParentForInsert(ctx);
-                } else {
-                    break;
+                while (true) {
+                    if (ctx.splitKey.getLeftPageBuffer() != null) {
+                        updateParentForInsert(ctx);
+                    } else {
+                        break;
+                    }
                 }
-            }
-        } finally {
-            for (int i = ctx.NSNUpdates.size() - 1; i >= 0; i--) {
-                ICachedPage node = ctx.NSNUpdates.get(i);
-                ctx.interiorFrame.setPage(node);
-                ctx.interiorFrame.setPageNsn(incrementGlobalNsn());
-            }
+            } finally {
+                for (int i = ctx.NSNUpdates.size() - 1; i >= 0; i--) {
+                    ICachedPage node = ctx.NSNUpdates.get(i);
+                    ctx.interiorFrame.setPage(node);
+                    ctx.interiorFrame.setPageNsn(incrementGlobalNsn());
+                }
 
-            for (int i = ctx.LSNUpdates.size() - 1; i >= 0; i--) {
-                ICachedPage node = ctx.LSNUpdates.get(i);
-                ctx.interiorFrame.setPage(node);
-                ctx.interiorFrame.setPageLsn(incrementGlobalNsn());
-                node.releaseWriteLatch();
-                bufferCache.unpin(node);
+                for (int i = ctx.LSNUpdates.size() - 1; i >= 0; i--) {
+                    ICachedPage node = ctx.LSNUpdates.get(i);
+                    ctx.interiorFrame.setPage(node);
+                    ctx.interiorFrame.setPageLsn(incrementGlobalNsn());
+                    node.releaseWriteLatch();
+                    bufferCache.unpin(node);
+                }
             }
         }
     }
@@ -319,7 +322,7 @@
         }
     }
 
-    private void insertTuple(ICachedPage node, int pageId, ITupleReference tuple, RTreeOpContext ctx, boolean isLeaf)
+    private boolean insertTuple(ICachedPage node, int pageId, ITupleReference tuple, RTreeOpContext ctx, boolean isLeaf)
             throws HyracksDataException, TreeIndexException {
         boolean succeeded = false;
         FrameOpSpaceStatus spaceStatus;
@@ -329,6 +332,7 @@
             spaceStatus = ctx.leafFrame.hasSpaceInsert(tuple);
         }
 
+        boolean tupleInserted = true;
         switch (spaceStatus) {
             case SUFFICIENT_CONTIGUOUS_SPACE: {
                 try {
@@ -398,7 +402,7 @@
                         rightFrame.initBuffer((byte) 0);
                         rightFrame.setRightPage(ctx.interiorFrame.getRightPage());
                         ctx.modificationCallback.found(null);
-                        ctx.leafFrame.split(rightFrame, tuple, ctx.splitKey);
+                        tupleInserted = ctx.leafFrame.split(rightFrame, tuple, ctx.splitKey);
                         ctx.leafFrame.setRightPage(rightPageId);
                     }
                     succeeded = true;
@@ -473,6 +477,7 @@
                 break;
             }
         }
+        return tupleInserted;
     }
 
     private void updateParentForInsert(RTreeOpContext ctx) throws HyracksDataException, TreeIndexException {