fixed minor bugs in lsm btree search cursor (related to opcallback) and added opcallback tests for BTree and LSM BTree

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1782 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java
index 6ffbf2c..ef7c2b4 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java
@@ -22,6 +22,7 @@
 import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
 import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
 
@@ -89,7 +90,8 @@
 
     private void setMemBTreeAccessor() {
         if (memBTreeAccessor == null) {
-            memBTreeAccessor = (BTree.BTreeAccessor) memBTree.createAccessor(modificationCallback, searchCallback);
+            memBTreeAccessor = (BTree.BTreeAccessor) memBTree.createAccessor(modificationCallback,
+                    NoOpOperationCallback.INSTANCE);
             memBTreeOpCtx = memBTreeAccessor.getOpContext();
         }
     }
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
index 720e89c..832f88e 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
@@ -68,7 +68,7 @@
             // PQ is empty
             return false;
         }
-        
+
         assert outputElement == null;
 
         if (searchCallback.proceed(pqHead.getTuple())) {
@@ -125,6 +125,7 @@
             if (!pushIntoPriorityQueue(inMemElement)) {
                 return !outputPriorityQueue.isEmpty();
             }
+            checkPriorityQueue();
         } else {
             searchCallback.reconcile(pqHead.getTuple());
         }
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallback.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallback.java
index 96e0a51..47f556d 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallback.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallback.java
@@ -16,10 +16,6 @@
         if (!notified) {
             this.wait();
         }
-    }
-
-    public synchronized void reset() {
         notified = false;
     }
-
 }
diff --git a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/AbstractModificationOperationCallbackTest.java b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/AbstractModificationOperationCallbackTest.java
new file mode 100644
index 0000000..302efec
--- /dev/null
+++ b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/AbstractModificationOperationCallbackTest.java
@@ -0,0 +1,83 @@
+package edu.uci.ics.hyracks.storage.am.btree;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.common.util.TupleUtils;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import edu.uci.ics.hyracks.storage.am.config.AccessMethodTestsConfig;
+
+public abstract class AbstractModificationOperationCallbackTest extends AbstractOperationCallbackTest {
+
+    protected final ArrayTupleBuilder builder;
+    protected final ArrayTupleReference tuple;
+    protected final IModificationOperationCallback cb;
+
+    protected boolean isFoundNull;
+
+    public AbstractModificationOperationCallbackTest() {
+        this.builder = new ArrayTupleBuilder(NUM_KEY_FIELDS);
+        this.tuple = new ArrayTupleReference();
+        this.cb = new VeriyfingModificationCallback();
+        this.isFoundNull = true;
+    }
+
+    @Before
+    public void setup() throws Exception {
+        super.setup();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        super.tearDown();
+    }
+
+    @Test
+    public void modificationCallbackTest() throws Exception {
+        IIndexAccessor accessor = index.createAccessor(cb, NoOpOperationCallback.INSTANCE);
+
+        isFoundNull = true;
+        for (int i = 0; i < AccessMethodTestsConfig.BTREE_NUM_TUPLES_TO_INSERT; i++) {
+            TupleUtils.createIntegerTuple(builder, tuple, i);
+            accessor.insert(tuple);
+        }
+
+        isFoundNull = false;
+        for (int i = 0; i < AccessMethodTestsConfig.BTREE_NUM_TUPLES_TO_INSERT; i++) {
+            TupleUtils.createIntegerTuple(builder, tuple, i);
+            accessor.upsert(tuple);
+        }
+
+        isFoundNull = false;
+        for (int i = 0; i < AccessMethodTestsConfig.BTREE_NUM_TUPLES_TO_INSERT; i++) {
+            TupleUtils.createIntegerTuple(builder, tuple, i);
+            accessor.delete(tuple);
+        }
+    }
+
+    private class VeriyfingModificationCallback implements IModificationOperationCallback {
+
+        @Override
+        public void before(ITupleReference tuple) {
+            Assert.assertEquals(0, cmp.compare(AbstractModificationOperationCallbackTest.this.tuple, tuple));
+        }
+
+        @Override
+        public void found(ITupleReference tuple) {
+            if (isFoundNull) {
+                Assert.assertEquals(null, tuple);
+            } else {
+                Assert.assertEquals(0, cmp.compare(AbstractModificationOperationCallbackTest.this.tuple, tuple));
+            }
+        }
+
+    }
+
+}
diff --git a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/AbstractOperationCallbackTest.java b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/AbstractOperationCallbackTest.java
new file mode 100644
index 0000000..1086ce3
--- /dev/null
+++ b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/AbstractOperationCallbackTest.java
@@ -0,0 +1,35 @@
+package edu.uci.ics.hyracks.storage.am.btree;
+
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.util.SerdeUtils;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+
+public abstract class AbstractOperationCallbackTest {
+    protected static final int NUM_KEY_FIELDS = 1;
+
+    @SuppressWarnings("rawtypes")
+    protected final ISerializerDeserializer[] keySerdes;
+    protected final MultiComparator cmp;
+
+    protected IIndex index;
+
+    protected abstract void createIndexInstance() throws Exception;
+
+    public AbstractOperationCallbackTest() {
+        this.keySerdes = new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE };
+        this.cmp = MultiComparator.create(SerdeUtils.serdesToComparatorFactories(keySerdes, keySerdes.length));
+    }
+
+    public void setup() throws Exception {
+        createIndexInstance();
+        index.create();
+        index.activate();
+    }
+
+    public void tearDown() throws Exception {
+        index.deactivate();
+        index.destroy();
+    }
+}
diff --git a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/AbstractSearchOperationCallbackTest.java b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/AbstractSearchOperationCallbackTest.java
new file mode 100644
index 0000000..8094a1d
--- /dev/null
+++ b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/AbstractSearchOperationCallbackTest.java
@@ -0,0 +1,216 @@
+package edu.uci.ics.hyracks.storage.am.btree;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.common.util.TupleUtils;
+import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
+
+public abstract class AbstractSearchOperationCallbackTest extends AbstractOperationCallbackTest {
+    private static final int NUM_TASKS = 2;
+
+    protected final Lock lock;
+    protected final Condition condition;
+
+    protected ExecutorService executor;
+    protected boolean insertTaskStarted;
+
+    public AbstractSearchOperationCallbackTest() {
+        this.lock = new ReentrantLock(true);
+        this.condition = lock.newCondition();
+        this.insertTaskStarted = false;
+    }
+
+    @Before
+    public void setup() throws Exception {
+        executor = Executors.newFixedThreadPool(NUM_TASKS);
+        super.setup();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        executor.shutdown();
+        super.tearDown();
+    }
+
+    @Test
+    public void searchCallbackTest() throws Exception {
+        Future<Boolean> insertFuture = executor.submit(new InsertionTask());
+        Future<Boolean> searchFuture = executor.submit(new SearchTask());
+        Assert.assertTrue(searchFuture.get());
+        Assert.assertTrue(insertFuture.get());
+    }
+
+    private class SearchTask implements Callable<Boolean> {
+        private final ISearchOperationCallback cb;
+        private final IIndexAccessor accessor;
+        private final IIndexCursor cursor;
+        private final RangePredicate predicate;
+        private final ArrayTupleBuilder builder;
+        private final ArrayTupleReference tuple;
+
+        private boolean blockOnHigh;
+        private int expectedAfterBlock;
+
+        public SearchTask() {
+            this.cb = new SynchronizingSearchOperationCallback();
+            this.accessor = index.createAccessor(NoOpOperationCallback.INSTANCE, cb);
+            this.cursor = accessor.createSearchCursor();
+            this.predicate = new RangePredicate();
+            this.builder = new ArrayTupleBuilder(NUM_KEY_FIELDS);
+            this.tuple = new ArrayTupleReference();
+
+            this.blockOnHigh = false;
+            this.expectedAfterBlock = -1;
+        }
+
+        @Override
+        public Boolean call() throws Exception {
+            lock.lock();
+            try {
+                if (!insertTaskStarted) {
+                    condition.await();
+                }
+
+                // begin a search on [101, +inf), blocking on 101
+                TupleUtils.createIntegerTuple(builder, tuple, 101);
+                predicate.setLowKey(tuple, true);
+                predicate.setHighKey(null, true);
+                accessor.search(cursor, predicate);
+                consumeIntTupleRange(101, 101, true, 101);
+
+                // consume tuples [102, 152], blocking on 151
+                consumeIntTupleRange(102, 151, true, 152);
+
+                // consume tuples [153, 300]
+                consumeIntTupleRange(153, 300, false, -1);
+
+                cursor.close();
+            } finally {
+                lock.unlock();
+            }
+            
+            return true;
+        }
+
+        private void consumeIntTupleRange(int begin, int end, boolean blockOnHigh, int expectedAfterBlock)
+                throws Exception {
+            if (end < begin) {
+                throw new IllegalArgumentException("Invalid range: [" + begin + ", " + end + "]");
+            }
+
+            for (int i = begin; i <= end; i++) {
+                if (blockOnHigh == true && i == end) {
+                    this.blockOnHigh = true;
+                    this.expectedAfterBlock = expectedAfterBlock;
+                }
+                TupleUtils.createIntegerTuple(builder, tuple, i);
+                if (!cursor.hasNext()) {
+                    Assert.fail("Failed to consume entire tuple range since cursor is exhausted.");
+                }
+                cursor.next();
+
+                if (this.blockOnHigh) {
+                    TupleUtils.createIntegerTuple(builder, tuple, expectedAfterBlock);
+                }
+                Assert.assertEquals(0, cmp.compare(tuple, cursor.getTuple()));
+            }
+        }
+
+        private class SynchronizingSearchOperationCallback implements ISearchOperationCallback {
+
+            @Override
+            public boolean proceed(ITupleReference tuple) {
+                Assert.assertEquals(0, cmp.compare(SearchTask.this.tuple, tuple));
+                return false;
+            }
+
+            @Override
+            public void reconcile(ITupleReference tuple) {
+                Assert.assertEquals(0, cmp.compare(SearchTask.this.tuple, tuple));
+                if (blockOnHigh) {
+                    try {
+                        TupleUtils.createIntegerTuple(builder, SearchTask.this.tuple, expectedAfterBlock);
+                    } catch (HyracksDataException e) {
+                        e.printStackTrace();
+                    }
+                    condition.signal();
+                    condition.awaitUninterruptibly();
+                    blockOnHigh = false;
+                }
+            }
+
+        }
+    }
+
+    private class InsertionTask implements Callable<Boolean> {
+        private final IIndexAccessor accessor;
+        private final ArrayTupleBuilder builder;
+        private final ArrayTupleReference tuple;
+
+        public InsertionTask() {
+            this.accessor = index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+            this.builder = new ArrayTupleBuilder(NUM_KEY_FIELDS);
+            this.tuple = new ArrayTupleReference();
+        }
+
+        @Override
+        public Boolean call() throws Exception {
+            lock.lock();
+            try {
+                insertTaskStarted = true;
+
+                // insert tuples [101, 200]
+                insertIntTupleRange(101, 200);
+                condition.signal();
+                condition.await();
+
+                // insert tuples [1, 100]
+                insertIntTupleRange(1, 100);
+                condition.signal();
+                condition.await();
+
+                // insert tuples [201, 300] and delete tuple 151
+                insertIntTupleRange(201, 300);
+                TupleUtils.createIntegerTuple(builder, tuple, 151);
+                accessor.delete(tuple);
+                condition.signal();
+            } finally {
+                lock.unlock();
+            }
+
+            return true;
+        }
+
+        private void insertIntTupleRange(int begin, int end) throws Exception {
+            if (end < begin) {
+                throw new IllegalArgumentException("Invalid range: [" + begin + ", " + end + "]");
+            }
+
+            for (int i = begin; i <= end; i++) {
+                TupleUtils.createIntegerTuple(builder, tuple, i);
+                accessor.insert(tuple);
+            }
+        }
+
+    }
+
+}
diff --git a/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/btree/BTreeModificationOperationCallbackTest.java b/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/btree/BTreeModificationOperationCallbackTest.java
new file mode 100644
index 0000000..b5cbca3
--- /dev/null
+++ b/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/btree/BTreeModificationOperationCallbackTest.java
@@ -0,0 +1,35 @@
+package edu.uci.ics.hyracks.storage.am.btree;
+
+import edu.uci.ics.hyracks.dataflow.common.util.SerdeUtils;
+import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeLeafFrameType;
+import edu.uci.ics.hyracks.storage.am.btree.util.BTreeTestHarness;
+import edu.uci.ics.hyracks.storage.am.btree.util.BTreeUtils;
+
+public class BTreeModificationOperationCallbackTest extends AbstractModificationOperationCallbackTest {
+    private final BTreeTestHarness harness;
+
+    public BTreeModificationOperationCallbackTest() {
+        harness = new BTreeTestHarness();
+    }
+
+    @Override
+    protected void createIndexInstance() throws Exception {
+        index = BTreeUtils.createBTree(harness.getBufferCache(), harness.getFileMapProvider(),
+                SerdeUtils.serdesToTypeTraits(keySerdes),
+                SerdeUtils.serdesToComparatorFactories(keySerdes, keySerdes.length), BTreeLeafFrameType.REGULAR_NSM,
+                harness.getFileReference());
+    }
+
+    @Override
+    public void setup() throws Exception {
+        harness.setUp();
+        super.setup();
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        super.tearDown();
+        harness.tearDown();
+    }
+
+}
diff --git a/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/btree/BTreeSearchOperationCallbackTest.java b/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/btree/BTreeSearchOperationCallbackTest.java
new file mode 100644
index 0000000..037d992
--- /dev/null
+++ b/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/btree/BTreeSearchOperationCallbackTest.java
@@ -0,0 +1,35 @@
+package edu.uci.ics.hyracks.storage.am.btree;
+
+import edu.uci.ics.hyracks.dataflow.common.util.SerdeUtils;
+import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeLeafFrameType;
+import edu.uci.ics.hyracks.storage.am.btree.util.BTreeTestHarness;
+import edu.uci.ics.hyracks.storage.am.btree.util.BTreeUtils;
+
+public class BTreeSearchOperationCallbackTest extends AbstractSearchOperationCallbackTest {
+    private final BTreeTestHarness harness;
+
+    public BTreeSearchOperationCallbackTest() {
+        harness = new BTreeTestHarness();
+    }
+
+    @Override
+    protected void createIndexInstance() throws Exception {
+        index = BTreeUtils.createBTree(harness.getBufferCache(), harness.getFileMapProvider(),
+                SerdeUtils.serdesToTypeTraits(keySerdes),
+                SerdeUtils.serdesToComparatorFactories(keySerdes, keySerdes.length), BTreeLeafFrameType.REGULAR_NSM,
+                harness.getFileReference());
+    }
+
+    @Override
+    public void setup() throws Exception {
+        harness.setUp();
+        super.setup();
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        super.tearDown();
+        harness.tearDown();
+    }
+
+}
diff --git a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeModificationOperationCallbackTest.java b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeModificationOperationCallbackTest.java
new file mode 100644
index 0000000..660197c
--- /dev/null
+++ b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeModificationOperationCallbackTest.java
@@ -0,0 +1,104 @@
+package edu.uci.ics.hyracks.storage.am.lsm.btree;
+
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.dataflow.common.util.SerdeUtils;
+import edu.uci.ics.hyracks.dataflow.common.util.TupleUtils;
+import edu.uci.ics.hyracks.storage.am.btree.AbstractModificationOperationCallbackTest;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.btree.util.LSMBTreeTestHarness;
+import edu.uci.ics.hyracks.storage.am.lsm.btree.util.LSMBTreeUtils;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.BlockingIOOperationCallback;
+
+public class LSMBTreeModificationOperationCallbackTest extends AbstractModificationOperationCallbackTest {
+    private static final int NUM_TUPLES = 11;
+
+    private final LSMBTreeTestHarness harness;
+    private final BlockingIOOperationCallback ioOpCallback;
+
+    public LSMBTreeModificationOperationCallbackTest() {
+        super();
+        this.ioOpCallback = new BlockingIOOperationCallback();
+        harness = new LSMBTreeTestHarness();
+    }
+
+    @Override
+    protected void createIndexInstance() throws Exception {
+        ILSMOperationTracker tracker = new ILSMOperationTracker() {
+            @Override
+            public void threadExit(ILSMIndex index) {
+                // Do nothing
+            }
+
+            @Override
+            public void threadEnter(ILSMIndex index) {
+                // Do nothing
+            }
+        };
+        index = LSMBTreeUtils.createLSMTree(harness.getMemBufferCache(), harness.getMemFreePageManager(),
+                harness.getIOManager(), harness.getFileReference(), harness.getDiskBufferCache(),
+                harness.getDiskFileMapProvider(), SerdeUtils.serdesToTypeTraits(keySerdes),
+                SerdeUtils.serdesToComparatorFactories(keySerdes, keySerdes.length), harness.getFlushController(),
+                harness.getMergePolicy(), tracker, harness.getIOScheduler());
+    }
+
+    @Override
+    public void setup() throws Exception {
+        harness.setUp();
+        super.setup();
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        super.tearDown();
+        harness.tearDown();
+    }
+
+    @Test
+    public void modificationCallbackTest() throws Exception {
+        IIndexAccessor accessor = index.createAccessor(cb, NoOpOperationCallback.INSTANCE);
+        ILSMIOOperation flushOp = ((ILSMIndexAccessor) accessor).createFlushOperation(ioOpCallback);
+
+        for (int j = 0; j < 2; j++) {
+            isFoundNull = true;
+            for (int i = 0; i < NUM_TUPLES; i++) {
+                TupleUtils.createIntegerTuple(builder, tuple, i);
+                accessor.insert(tuple);
+            }
+
+            if (j == 1) {
+                harness.getIOScheduler().scheduleOperation(flushOp);
+                ioOpCallback.waitForIO();
+                isFoundNull = true;
+            } else {
+                isFoundNull = false;
+            }
+
+            for (int i = 0; i < NUM_TUPLES; i++) {
+                TupleUtils.createIntegerTuple(builder, tuple, i);
+                accessor.upsert(tuple);
+            }
+
+            if (j == 1) {
+                harness.getIOScheduler().scheduleOperation(flushOp);
+                ioOpCallback.waitForIO();
+                isFoundNull = true;
+            } else {
+                isFoundNull = false;
+            }
+
+            for (int i = 0; i < NUM_TUPLES; i++) {
+                TupleUtils.createIntegerTuple(builder, tuple, i);
+                accessor.delete(tuple);
+            }
+
+            harness.getIOScheduler().scheduleOperation(flushOp);
+            ioOpCallback.waitForIO();
+        }
+    }
+}
diff --git a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java
new file mode 100644
index 0000000..7534bf1
--- /dev/null
+++ b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java
@@ -0,0 +1,246 @@
+package edu.uci.ics.hyracks.storage.am.lsm.btree;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.common.util.SerdeUtils;
+import edu.uci.ics.hyracks.dataflow.common.util.TupleUtils;
+import edu.uci.ics.hyracks.storage.am.btree.AbstractSearchOperationCallbackTest;
+import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexBulkLoader;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.btree.util.LSMBTreeTestHarness;
+import edu.uci.ics.hyracks.storage.am.lsm.btree.util.LSMBTreeUtils;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+
+public class LSMBTreeSearchOperationCallbackTest extends AbstractSearchOperationCallbackTest {
+    private final LSMBTreeTestHarness harness;
+
+    public LSMBTreeSearchOperationCallbackTest() {
+        harness = new LSMBTreeTestHarness();
+    }
+
+    @Override
+    protected void createIndexInstance() throws Exception {
+        ILSMOperationTracker tracker = new ILSMOperationTracker() {
+            @Override
+            public void threadExit(ILSMIndex index) {
+                // Do nothing
+            }
+
+            @Override
+            public void threadEnter(ILSMIndex index) {
+                // Do nothing
+            }
+        };
+        index = LSMBTreeUtils.createLSMTree(harness.getMemBufferCache(), harness.getMemFreePageManager(),
+                harness.getIOManager(), harness.getFileReference(), harness.getDiskBufferCache(),
+                harness.getDiskFileMapProvider(), SerdeUtils.serdesToTypeTraits(keySerdes),
+                SerdeUtils.serdesToComparatorFactories(keySerdes, keySerdes.length), harness.getFlushController(),
+                harness.getMergePolicy(), tracker, harness.getIOScheduler());
+    }
+
+    @Override
+    public void setup() throws Exception {
+        harness.setUp();
+        super.setup();
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        super.tearDown();
+        harness.tearDown();
+    }
+
+    @Test
+    public void searchCallbackTest() throws Exception {
+        Future<Boolean> insertFuture = executor.submit(new InsertionTask());
+        Future<Boolean> searchFuture = executor.submit(new SearchTask());
+        Assert.assertTrue(searchFuture.get());
+        Assert.assertTrue(insertFuture.get());
+    }
+
+    private class SearchTask implements Callable<Boolean> {
+        private final ISearchOperationCallback cb;
+        private final IIndexAccessor accessor;
+        private final IIndexCursor cursor;
+        private final RangePredicate predicate;
+        private final ArrayTupleBuilder builder;
+        private final ArrayTupleReference tuple;
+
+        private boolean blockOnHigh;
+        private int expectedAfterBlock;
+
+        public SearchTask() {
+            this.cb = new SynchronizingSearchOperationCallback();
+            this.accessor = index.createAccessor(NoOpOperationCallback.INSTANCE, cb);
+            this.cursor = accessor.createSearchCursor();
+            this.predicate = new RangePredicate();
+            this.builder = new ArrayTupleBuilder(NUM_KEY_FIELDS);
+            this.tuple = new ArrayTupleReference();
+
+            this.blockOnHigh = false;
+            this.expectedAfterBlock = -1;
+        }
+
+        @Override
+        public Boolean call() throws Exception {
+            lock.lock();
+            try {
+                if (!insertTaskStarted) {
+                    condition.await();
+                }
+
+                // begin a search on [50, +inf), blocking on 75
+                TupleUtils.createIntegerTuple(builder, tuple, 50);
+                predicate.setLowKey(tuple, true);
+                predicate.setHighKey(null, true);
+                accessor.search(cursor, predicate);
+                consumeIntTupleRange(50, 75, true, 76);
+
+                // consume tuples [77, 150], blocking on 151
+                consumeIntTupleRange(77, 150, true, 150);
+
+                // consume tuples [152, 300]
+                consumeIntTupleRange(152, 300, false, -1);
+
+                cursor.close();
+            } finally {
+                lock.unlock();
+            }
+
+            return true;
+        }
+
+        private void consumeIntTupleRange(int begin, int end, boolean blockOnHigh, int expectedAfterBlock)
+                throws Exception {
+            if (end < begin) {
+                throw new IllegalArgumentException("Invalid range: [" + begin + ", " + end + "]");
+            }
+
+            for (int i = begin; i <= end; i++) {
+                if (blockOnHigh == true && i == end) {
+                    this.blockOnHigh = true;
+                    this.expectedAfterBlock = expectedAfterBlock;
+                }
+                TupleUtils.createIntegerTuple(builder, tuple, i);
+                if (!cursor.hasNext()) {
+                    Assert.fail("Failed to consume entire tuple range since cursor is exhausted.");
+                }
+                cursor.next();
+
+                if (this.blockOnHigh) {
+                    TupleUtils.createIntegerTuple(builder, tuple, expectedAfterBlock);
+                }
+                Assert.assertEquals(0, cmp.compare(tuple, cursor.getTuple()));
+            }
+        }
+
+        private class SynchronizingSearchOperationCallback implements ISearchOperationCallback {
+
+            @Override
+            public boolean proceed(ITupleReference tuple) {
+                Assert.assertEquals(0, cmp.compare(SearchTask.this.tuple, tuple));
+                return false;
+            }
+
+            @Override
+            public void reconcile(ITupleReference tuple) {
+                Assert.assertEquals(0, cmp.compare(SearchTask.this.tuple, tuple));
+                if (blockOnHigh) {
+                    try {
+                        TupleUtils.createIntegerTuple(builder, SearchTask.this.tuple, expectedAfterBlock);
+                    } catch (HyracksDataException e) {
+                        e.printStackTrace();
+                    }
+                    condition.signal();
+                    condition.awaitUninterruptibly();
+                    blockOnHigh = false;
+                }
+            }
+
+        }
+    }
+
+    private class InsertionTask implements Callable<Boolean> {
+        private final IIndexAccessor accessor;
+        private final ArrayTupleBuilder builder;
+        private final ArrayTupleReference tuple;
+
+        public InsertionTask() {
+            this.accessor = index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+            this.builder = new ArrayTupleBuilder(NUM_KEY_FIELDS);
+            this.tuple = new ArrayTupleReference();
+        }
+
+        @Override
+        public Boolean call() throws Exception {
+            lock.lock();
+            try {
+                insertTaskStarted = true;
+
+                // bulkload [101, 150] & [151, 200] as two separate disk components 
+                // insert [50, 100] & [301, 350] to the in-memory component
+                // delete tuple 151
+                bulkloadIntTupleRange(101, 150);
+                bulkloadIntTupleRange(151, 200);
+                insertIntTupleRange(50, 100);
+                insertIntTupleRange(301, 350);
+                TupleUtils.createIntegerTuple(builder, tuple, 151);
+                accessor.delete(tuple);
+                condition.signal();
+                condition.await();
+
+                // delete tuple 75
+                TupleUtils.createIntegerTuple(builder, tuple, 75);
+                accessor.delete(tuple);
+                condition.signal();
+                condition.await();
+
+                // insert tuples [201, 300] and delete tuple 151
+                insertIntTupleRange(201, 300);
+                condition.signal();
+            } finally {
+                lock.unlock();
+            }
+
+            return true;
+        }
+
+        private void insertIntTupleRange(int begin, int end) throws Exception {
+            if (end < begin) {
+                throw new IllegalArgumentException("Invalid range: [" + begin + ", " + end + "]");
+            }
+
+            for (int i = begin; i <= end; i++) {
+                TupleUtils.createIntegerTuple(builder, tuple, i);
+                accessor.insert(tuple);
+            }
+        }
+
+        private void bulkloadIntTupleRange(int begin, int end) throws Exception {
+            if (end < begin) {
+                throw new IllegalArgumentException("Invalid range: [" + begin + ", " + end + "]");
+            }
+
+            IIndexBulkLoader bulkloader = index.createBulkLoader(1.0f);
+            for (int i = begin; i <= end; i++) {
+                TupleUtils.createIntegerTuple(builder, tuple, i);
+                bulkloader.add(tuple);
+            }
+            bulkloader.end();
+        }
+
+    }
+}