Fixed a bug in the LSMRTree delete operation reported by Markus in issue 58. For each delete operation we need to make sure that we run a true in-memory RTree delete operation to avoid return duplicate keys in case of consequent inserts and deletes for the same tuple before flush occur.

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1193 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
index e746c02..7bcfd19 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
@@ -25,7 +25,6 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
-import edu.uci.ics.hyracks.dataflow.common.util.TupleUtils;
 import edu.uci.ics.hyracks.storage.am.btree.exceptions.BTreeDuplicateKeyException;
 import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
 import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
@@ -293,23 +292,32 @@
                     ctx.getBTreeMultiComparator(), ctx.getBTreeMultiComparator());
             ITreeIndexCursor cursor = ctx.memBTreeAccessor.createSearchCursor();
             ctx.memBTreeAccessor.search(cursor, btreeRangePredicate);
-            ITupleReference tupleCopy = null;
+            boolean foundTupleInMemoryBTree = false;
             try {
                 if (cursor.hasNext()) {
-                    cursor.next();
-                    tupleCopy = TupleUtils.copyTuple(cursor.getTuple());
+                    foundTupleInMemoryBTree = true;
                 }
             } finally {
                 cursor.close();
             }
-            if (tupleCopy != null) {
-                ctx.memRTreeAccessor.insert(tupleCopy);
-            } else {
-                ctx.memRTreeAccessor.insert(tuple);
+            if (foundTupleInMemoryBTree) {
+                ctx.memBTreeAccessor.delete(tuple);
             }
+            ctx.memRTreeAccessor.insert(tuple);
 
         } else {
-
+            // For each delete operation, we make sure that we run a true
+            // in-memory RTree delete operation besides from inserting a delete
+            // tuple in the in-memory BTree. The reason for running the RTree
+            // delete operation is that to avoid the following scenario:
+            // 1) Inserter inserts tupleA to the in-memory RTree.
+            // 2) Deleter inserts tupleA to the in-memory BTree.
+            // 3) Inserter inserts tupleA to the in-memory RTree.
+            // Note that all the above operations happened before flushing the
+            // in-memory trees Now, when we search using the LSMRTree search
+            // cursor, it will return tupleA twice! which is not correct! Thus
+            // we run a true RTree delete operation.
+            ctx.memRTreeAccessor.delete(tuple);
             try {
                 ctx.memBTreeAccessor.insert(tuple);
             } catch (BTreeDuplicateKeyException e) {
diff --git a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/OrderedIndexTestUtils.java b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/OrderedIndexTestUtils.java
index 36e5395..8519c02 100644
--- a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/OrderedIndexTestUtils.java
+++ b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/OrderedIndexTestUtils.java
@@ -357,6 +357,13 @@
     }
 
     @Override
+    protected void setIntPayloadFields(int[] fieldValues, int numKeyFields, int numFields) {
+        for (int j = numKeyFields; j < numFields; j++) {
+            fieldValues[j] = j;
+        }
+    }
+
+    @Override
     protected Collection createCheckTuplesCollection() {
         return new TreeSet<CheckTuple>();
     }
@@ -365,16 +372,17 @@
     protected ArrayTupleBuilder createDeleteTupleBuilder(ITreeIndexTestContext ctx) {
         return new ArrayTupleBuilder(ctx.getKeyFieldCount());
     }
-    
+
     @Override
-    protected boolean checkDiskOrderScanResult(ITupleReference tuple, CheckTuple checkTuple, ITreeIndexTestContext ctx) throws HyracksDataException {    	
-    	@SuppressWarnings("unchecked")
-		TreeSet<CheckTuple> checkTuples = (TreeSet<CheckTuple>) ctx.getCheckTuples();
-    	CheckTuple matchingCheckTuple = checkTuples.floor(checkTuple);
-    	if (matchingCheckTuple == null) {
-    		return false;
-    	}
-    	compareActualAndExpected(tuple, matchingCheckTuple, ctx.getFieldSerdes());
-    	return true;
+    protected boolean checkDiskOrderScanResult(ITupleReference tuple, CheckTuple checkTuple, ITreeIndexTestContext ctx)
+            throws HyracksDataException {
+        @SuppressWarnings("unchecked")
+        TreeSet<CheckTuple> checkTuples = (TreeSet<CheckTuple>) ctx.getCheckTuples();
+        CheckTuple matchingCheckTuple = checkTuples.floor(checkTuple);
+        if (matchingCheckTuple == null) {
+            return false;
+        }
+        compareActualAndExpected(tuple, matchingCheckTuple, ctx.getFieldSerdes());
+        return true;
     }
 }
diff --git a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/common/TreeIndexTestUtils.java b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/common/TreeIndexTestUtils.java
index 4c1d6f8..6e2a7ee 100644
--- a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/common/TreeIndexTestUtils.java
+++ b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/common/TreeIndexTestUtils.java
@@ -38,12 +38,15 @@
 
     protected abstract void setIntKeyFields(int[] fieldValues, int numKeyFields, int maxValue, Random rnd);
 
+    protected abstract void setIntPayloadFields(int[] fieldValues, int numKeyFields, int numFields);
+
     protected abstract Collection createCheckTuplesCollection();
 
     protected abstract ArrayTupleBuilder createDeleteTupleBuilder(ITreeIndexTestContext ctx);
-    
+
     // See if tuple with corresponding checkTuple exists in ctx.checkTuples.
-    protected abstract boolean checkDiskOrderScanResult(ITupleReference tuple, CheckTuple checkTuple, ITreeIndexTestContext ctx) throws HyracksDataException;
+    protected abstract boolean checkDiskOrderScanResult(ITupleReference tuple, CheckTuple checkTuple,
+            ITreeIndexTestContext ctx) throws HyracksDataException;
 
     @SuppressWarnings("unchecked")
     public static void createTupleFromCheckTuple(CheckTuple checkTuple, ArrayTupleBuilder tupleBuilder,
@@ -137,9 +140,7 @@
             // Set keys.
             setIntKeyFields(fieldValues, numKeyFields, maxValue, rnd);
             // Set values.
-            for (int j = numKeyFields; j < fieldCount; j++) {
-                fieldValues[j] = j;
-            }
+            setIntPayloadFields(fieldValues, numKeyFields, fieldCount);
             TupleUtils.createIntegerTuple(ctx.getTupleBuilder(), ctx.getTuple(), fieldValues);
             if (LOGGER.isLoggable(Level.INFO)) {
                 if ((i + 1) % (numTuples / Math.min(10, numTuples)) == 0) {
@@ -167,9 +168,7 @@
             // Set keys.
             setIntKeyFields(fieldValues, numKeyFields, maxValue, rnd);
             // Set values.
-            for (int j = numKeyFields; j < fieldCount; j++) {
-                fieldValues[j] = j;
-            }
+            setIntPayloadFields(fieldValues, numKeyFields, fieldCount);
 
             // Set expected values. (We also use these as the pre-sorted stream
             // for ordered indexes bulk loading).
@@ -240,6 +239,7 @@
             checkTuples[checkTupleIdx] = tmp;
             numCheckTuples--;
         }
+        System.out.println();
     }
 
 }
diff --git a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/RTreeTestUtils.java b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/RTreeTestUtils.java
index c966088..889318c 100644
--- a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/RTreeTestUtils.java
+++ b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/rtree/RTreeTestUtils.java
@@ -27,6 +27,8 @@
 @SuppressWarnings("rawtypes")
 public class RTreeTestUtils extends TreeIndexTestUtils {
     private static final Logger LOGGER = Logger.getLogger(RTreeTestUtils.class.getName());
+    private int intPayloadValue = 0;
+    private double doublePayloadValue = 0.0;
 
     @SuppressWarnings("unchecked")
     // Create a new ArrayList containing the elements satisfying the search key
@@ -78,9 +80,7 @@
             // Set keys.
             setDoubleKeyFields(fieldValues, numKeyFields, maxValue, rnd);
             // Set values.
-            for (int j = numKeyFields; j < fieldCount; j++) {
-                fieldValues[j] = j;
-            }
+            setDoublePayloadFields(fieldValues, numKeyFields, fieldCount);
             TupleUtils.createDoubleTuple(ctx.getTupleBuilder(), ctx.getTuple(), fieldValues);
             if (LOGGER.isLoggable(Level.INFO)) {
                 if ((i + 1) % (numTuples / Math.min(10, numTuples)) == 0) {
@@ -98,7 +98,7 @@
         }
     }
 
-    protected void setDoubleKeyFields(double[] fieldValues, int numKeyFields, double maxValue, Random rnd) {
+    private void setDoubleKeyFields(double[] fieldValues, int numKeyFields, double maxValue, Random rnd) {
         int maxFieldPos = numKeyFields / 2;
         for (int j = 0; j < maxFieldPos; j++) {
             int k = maxFieldPos + j;
@@ -111,6 +111,12 @@
             fieldValues[k] = secondValue;
         }
     }
+    
+    private void setDoublePayloadFields(double[] fieldValues, int numKeyFields, int numFields) {
+        for (int j = numKeyFields; j < numFields; j++) {
+            fieldValues[j] = doublePayloadValue++;
+        }
+    }
 
     @SuppressWarnings("unchecked")
     protected CheckTuple createDoubleCheckTuple(double[] fieldValues, int numKeyFields) {
@@ -132,9 +138,7 @@
             // Set keys.
             setDoubleKeyFields(fieldValues, numKeyFields, maxValue, rnd);
             // Set values.
-            for (int j = numKeyFields; j < fieldCount; j++) {
-                fieldValues[j] = j;
-            }
+            setDoublePayloadFields(fieldValues, numKeyFields, fieldCount);
 
             // Set expected values.
             ctx.insertCheckTuple(createDoubleCheckTuple(fieldValues, ctx.getKeyFieldCount()), tmpCheckTuples);
@@ -209,6 +213,13 @@
             fieldValues[k] = secondValue;
         }
     }
+    
+    @Override
+    protected void setIntPayloadFields(int[] fieldValues, int numKeyFields, int numFields) {
+        for (int j = numKeyFields; j < numFields; j++) {
+            fieldValues[j] = intPayloadValue++;
+        }
+    }
 
     @Override
     protected Collection createCheckTuplesCollection() {
@@ -219,9 +230,10 @@
     protected ArrayTupleBuilder createDeleteTupleBuilder(ITreeIndexTestContext ctx) {
         return new ArrayTupleBuilder(ctx.getFieldCount());
     }
-    
+
     @Override
-    protected boolean checkDiskOrderScanResult(ITupleReference tuple, CheckTuple checkTuple, ITreeIndexTestContext ctx) throws HyracksDataException {
-    	return ctx.getCheckTuples().contains(checkTuple);
+    protected boolean checkDiskOrderScanResult(ITupleReference tuple, CheckTuple checkTuple, ITreeIndexTestContext ctx)
+            throws HyracksDataException {
+        return ctx.getCheckTuples().contains(checkTuple);
     }
 }
diff --git a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/util/LSMRTreeTestContext.java b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/util/LSMRTreeTestContext.java
index 2d75c6b..3024621 100644
--- a/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/util/LSMRTreeTestContext.java
+++ b/hyracks-tests/hyracks-storage-am-lsm-rtree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/util/LSMRTreeTestContext.java
@@ -62,16 +62,17 @@
     }
 
     public static LSMRTreeTestContext create(InMemoryBufferCache memBufferCache,
-            InMemoryFreePageManager memFreePageManager, IOManager ioManager, String onDiskDir, IBufferCache diskBufferCache,
-            IFileMapProvider diskFileMapProvider, ISerializerDeserializer[] fieldSerdes,
+            InMemoryFreePageManager memFreePageManager, IOManager ioManager, String onDiskDir,
+            IBufferCache diskBufferCache, IFileMapProvider diskFileMapProvider, ISerializerDeserializer[] fieldSerdes,
             IPrimitiveValueProviderFactory[] valueProviderFactories, int numKeyFields, int fileId) throws Exception {
         ITypeTraits[] typeTraits = SerdeUtils.serdesToTypeTraits(fieldSerdes);
         IBinaryComparatorFactory[] rtreeCmpFactories = SerdeUtils
                 .serdesToComparatorFactories(fieldSerdes, numKeyFields);
         IBinaryComparatorFactory[] btreeCmpFactories = SerdeUtils.serdesToComparatorFactories(fieldSerdes,
                 fieldSerdes.length);
-        LSMRTree lsmTree = LSMRTreeUtils.createLSMTree(memBufferCache, memFreePageManager, ioManager, onDiskDir, diskBufferCache,
-                diskFileMapProvider, typeTraits, rtreeCmpFactories, btreeCmpFactories, valueProviderFactories);
+        LSMRTree lsmTree = LSMRTreeUtils.createLSMTree(memBufferCache, memFreePageManager, ioManager, onDiskDir,
+                diskBufferCache, diskFileMapProvider, typeTraits, rtreeCmpFactories, btreeCmpFactories,
+                valueProviderFactories);
         lsmTree.create(fileId);
         lsmTree.open(fileId);
         LSMRTreeTestContext testCtx = new LSMRTreeTestContext(fieldSerdes, lsmTree);