fixed minor bugs in lsm btree search cursor (related to opcallback) and added opcallback tests for BTree and LSM BTree
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1782 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java
index 6ffbf2c..ef7c2b4 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java
@@ -22,6 +22,7 @@
import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
@@ -89,7 +90,8 @@
private void setMemBTreeAccessor() {
if (memBTreeAccessor == null) {
- memBTreeAccessor = (BTree.BTreeAccessor) memBTree.createAccessor(modificationCallback, searchCallback);
+ memBTreeAccessor = (BTree.BTreeAccessor) memBTree.createAccessor(modificationCallback,
+ NoOpOperationCallback.INSTANCE);
memBTreeOpCtx = memBTreeAccessor.getOpContext();
}
}
diff --git a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
index 720e89c..832f88e 100644
--- a/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
+++ b/hyracks-storage-am-lsm-btree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
@@ -68,7 +68,7 @@
// PQ is empty
return false;
}
-
+
assert outputElement == null;
if (searchCallback.proceed(pqHead.getTuple())) {
@@ -125,6 +125,7 @@
if (!pushIntoPriorityQueue(inMemElement)) {
return !outputPriorityQueue.isEmpty();
}
+ checkPriorityQueue();
} else {
searchCallback.reconcile(pqHead.getTuple());
}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallback.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallback.java
index 96e0a51..47f556d 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallback.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallback.java
@@ -16,10 +16,6 @@
if (!notified) {
this.wait();
}
- }
-
- public synchronized void reset() {
notified = false;
}
-
}
diff --git a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/AbstractModificationOperationCallbackTest.java b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/AbstractModificationOperationCallbackTest.java
new file mode 100644
index 0000000..302efec
--- /dev/null
+++ b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/AbstractModificationOperationCallbackTest.java
@@ -0,0 +1,83 @@
+package edu.uci.ics.hyracks.storage.am.btree;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.common.util.TupleUtils;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import edu.uci.ics.hyracks.storage.am.config.AccessMethodTestsConfig;
+
+public abstract class AbstractModificationOperationCallbackTest extends AbstractOperationCallbackTest {
+
+ protected final ArrayTupleBuilder builder;
+ protected final ArrayTupleReference tuple;
+ protected final IModificationOperationCallback cb;
+
+ protected boolean isFoundNull;
+
+ public AbstractModificationOperationCallbackTest() {
+ this.builder = new ArrayTupleBuilder(NUM_KEY_FIELDS);
+ this.tuple = new ArrayTupleReference();
+ this.cb = new VeriyfingModificationCallback();
+ this.isFoundNull = true;
+ }
+
+ @Before
+ public void setup() throws Exception {
+ super.setup();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ super.tearDown();
+ }
+
+ @Test
+ public void modificationCallbackTest() throws Exception {
+ IIndexAccessor accessor = index.createAccessor(cb, NoOpOperationCallback.INSTANCE);
+
+ isFoundNull = true;
+ for (int i = 0; i < AccessMethodTestsConfig.BTREE_NUM_TUPLES_TO_INSERT; i++) {
+ TupleUtils.createIntegerTuple(builder, tuple, i);
+ accessor.insert(tuple);
+ }
+
+ isFoundNull = false;
+ for (int i = 0; i < AccessMethodTestsConfig.BTREE_NUM_TUPLES_TO_INSERT; i++) {
+ TupleUtils.createIntegerTuple(builder, tuple, i);
+ accessor.upsert(tuple);
+ }
+
+ isFoundNull = false;
+ for (int i = 0; i < AccessMethodTestsConfig.BTREE_NUM_TUPLES_TO_INSERT; i++) {
+ TupleUtils.createIntegerTuple(builder, tuple, i);
+ accessor.delete(tuple);
+ }
+ }
+
+ private class VeriyfingModificationCallback implements IModificationOperationCallback {
+
+ @Override
+ public void before(ITupleReference tuple) {
+ Assert.assertEquals(0, cmp.compare(AbstractModificationOperationCallbackTest.this.tuple, tuple));
+ }
+
+ @Override
+ public void found(ITupleReference tuple) {
+ if (isFoundNull) {
+ Assert.assertEquals(null, tuple);
+ } else {
+ Assert.assertEquals(0, cmp.compare(AbstractModificationOperationCallbackTest.this.tuple, tuple));
+ }
+ }
+
+ }
+
+}
diff --git a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/AbstractOperationCallbackTest.java b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/AbstractOperationCallbackTest.java
new file mode 100644
index 0000000..1086ce3
--- /dev/null
+++ b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/AbstractOperationCallbackTest.java
@@ -0,0 +1,35 @@
+package edu.uci.ics.hyracks.storage.am.btree;
+
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.util.SerdeUtils;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
+
+public abstract class AbstractOperationCallbackTest {
+ protected static final int NUM_KEY_FIELDS = 1;
+
+ @SuppressWarnings("rawtypes")
+ protected final ISerializerDeserializer[] keySerdes;
+ protected final MultiComparator cmp;
+
+ protected IIndex index;
+
+ protected abstract void createIndexInstance() throws Exception;
+
+ public AbstractOperationCallbackTest() {
+ this.keySerdes = new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE };
+ this.cmp = MultiComparator.create(SerdeUtils.serdesToComparatorFactories(keySerdes, keySerdes.length));
+ }
+
+ public void setup() throws Exception {
+ createIndexInstance();
+ index.create();
+ index.activate();
+ }
+
+ public void tearDown() throws Exception {
+ index.deactivate();
+ index.destroy();
+ }
+}
diff --git a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/AbstractSearchOperationCallbackTest.java b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/AbstractSearchOperationCallbackTest.java
new file mode 100644
index 0000000..8094a1d
--- /dev/null
+++ b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/btree/AbstractSearchOperationCallbackTest.java
@@ -0,0 +1,216 @@
+package edu.uci.ics.hyracks.storage.am.btree;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.common.util.TupleUtils;
+import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
+
+public abstract class AbstractSearchOperationCallbackTest extends AbstractOperationCallbackTest {
+ private static final int NUM_TASKS = 2;
+
+ protected final Lock lock;
+ protected final Condition condition;
+
+ protected ExecutorService executor;
+ protected boolean insertTaskStarted;
+
+ public AbstractSearchOperationCallbackTest() {
+ this.lock = new ReentrantLock(true);
+ this.condition = lock.newCondition();
+ this.insertTaskStarted = false;
+ }
+
+ @Before
+ public void setup() throws Exception {
+ executor = Executors.newFixedThreadPool(NUM_TASKS);
+ super.setup();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ executor.shutdown();
+ super.tearDown();
+ }
+
+ @Test
+ public void searchCallbackTest() throws Exception {
+ Future<Boolean> insertFuture = executor.submit(new InsertionTask());
+ Future<Boolean> searchFuture = executor.submit(new SearchTask());
+ Assert.assertTrue(searchFuture.get());
+ Assert.assertTrue(insertFuture.get());
+ }
+
+ private class SearchTask implements Callable<Boolean> {
+ private final ISearchOperationCallback cb;
+ private final IIndexAccessor accessor;
+ private final IIndexCursor cursor;
+ private final RangePredicate predicate;
+ private final ArrayTupleBuilder builder;
+ private final ArrayTupleReference tuple;
+
+ private boolean blockOnHigh;
+ private int expectedAfterBlock;
+
+ public SearchTask() {
+ this.cb = new SynchronizingSearchOperationCallback();
+ this.accessor = index.createAccessor(NoOpOperationCallback.INSTANCE, cb);
+ this.cursor = accessor.createSearchCursor();
+ this.predicate = new RangePredicate();
+ this.builder = new ArrayTupleBuilder(NUM_KEY_FIELDS);
+ this.tuple = new ArrayTupleReference();
+
+ this.blockOnHigh = false;
+ this.expectedAfterBlock = -1;
+ }
+
+ @Override
+ public Boolean call() throws Exception {
+ lock.lock();
+ try {
+ if (!insertTaskStarted) {
+ condition.await();
+ }
+
+ // begin a search on [101, +inf), blocking on 101
+ TupleUtils.createIntegerTuple(builder, tuple, 101);
+ predicate.setLowKey(tuple, true);
+ predicate.setHighKey(null, true);
+ accessor.search(cursor, predicate);
+ consumeIntTupleRange(101, 101, true, 101);
+
+ // consume tuples [102, 152], blocking on 151
+ consumeIntTupleRange(102, 151, true, 152);
+
+ // consume tuples [153, 300]
+ consumeIntTupleRange(153, 300, false, -1);
+
+ cursor.close();
+ } finally {
+ lock.unlock();
+ }
+
+ return true;
+ }
+
+ private void consumeIntTupleRange(int begin, int end, boolean blockOnHigh, int expectedAfterBlock)
+ throws Exception {
+ if (end < begin) {
+ throw new IllegalArgumentException("Invalid range: [" + begin + ", " + end + "]");
+ }
+
+ for (int i = begin; i <= end; i++) {
+ if (blockOnHigh == true && i == end) {
+ this.blockOnHigh = true;
+ this.expectedAfterBlock = expectedAfterBlock;
+ }
+ TupleUtils.createIntegerTuple(builder, tuple, i);
+ if (!cursor.hasNext()) {
+ Assert.fail("Failed to consume entire tuple range since cursor is exhausted.");
+ }
+ cursor.next();
+
+ if (this.blockOnHigh) {
+ TupleUtils.createIntegerTuple(builder, tuple, expectedAfterBlock);
+ }
+ Assert.assertEquals(0, cmp.compare(tuple, cursor.getTuple()));
+ }
+ }
+
+ private class SynchronizingSearchOperationCallback implements ISearchOperationCallback {
+
+ @Override
+ public boolean proceed(ITupleReference tuple) {
+ Assert.assertEquals(0, cmp.compare(SearchTask.this.tuple, tuple));
+ return false;
+ }
+
+ @Override
+ public void reconcile(ITupleReference tuple) {
+ Assert.assertEquals(0, cmp.compare(SearchTask.this.tuple, tuple));
+ if (blockOnHigh) {
+ try {
+ TupleUtils.createIntegerTuple(builder, SearchTask.this.tuple, expectedAfterBlock);
+ } catch (HyracksDataException e) {
+ e.printStackTrace();
+ }
+ condition.signal();
+ condition.awaitUninterruptibly();
+ blockOnHigh = false;
+ }
+ }
+
+ }
+ }
+
+ private class InsertionTask implements Callable<Boolean> {
+ private final IIndexAccessor accessor;
+ private final ArrayTupleBuilder builder;
+ private final ArrayTupleReference tuple;
+
+ public InsertionTask() {
+ this.accessor = index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ this.builder = new ArrayTupleBuilder(NUM_KEY_FIELDS);
+ this.tuple = new ArrayTupleReference();
+ }
+
+ @Override
+ public Boolean call() throws Exception {
+ lock.lock();
+ try {
+ insertTaskStarted = true;
+
+ // insert tuples [101, 200]
+ insertIntTupleRange(101, 200);
+ condition.signal();
+ condition.await();
+
+ // insert tuples [1, 100]
+ insertIntTupleRange(1, 100);
+ condition.signal();
+ condition.await();
+
+ // insert tuples [201, 300] and delete tuple 151
+ insertIntTupleRange(201, 300);
+ TupleUtils.createIntegerTuple(builder, tuple, 151);
+ accessor.delete(tuple);
+ condition.signal();
+ } finally {
+ lock.unlock();
+ }
+
+ return true;
+ }
+
+ private void insertIntTupleRange(int begin, int end) throws Exception {
+ if (end < begin) {
+ throw new IllegalArgumentException("Invalid range: [" + begin + ", " + end + "]");
+ }
+
+ for (int i = begin; i <= end; i++) {
+ TupleUtils.createIntegerTuple(builder, tuple, i);
+ accessor.insert(tuple);
+ }
+ }
+
+ }
+
+}
diff --git a/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/btree/BTreeModificationOperationCallbackTest.java b/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/btree/BTreeModificationOperationCallbackTest.java
new file mode 100644
index 0000000..b5cbca3
--- /dev/null
+++ b/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/btree/BTreeModificationOperationCallbackTest.java
@@ -0,0 +1,35 @@
+package edu.uci.ics.hyracks.storage.am.btree;
+
+import edu.uci.ics.hyracks.dataflow.common.util.SerdeUtils;
+import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeLeafFrameType;
+import edu.uci.ics.hyracks.storage.am.btree.util.BTreeTestHarness;
+import edu.uci.ics.hyracks.storage.am.btree.util.BTreeUtils;
+
+public class BTreeModificationOperationCallbackTest extends AbstractModificationOperationCallbackTest {
+ private final BTreeTestHarness harness;
+
+ public BTreeModificationOperationCallbackTest() {
+ harness = new BTreeTestHarness();
+ }
+
+ @Override
+ protected void createIndexInstance() throws Exception {
+ index = BTreeUtils.createBTree(harness.getBufferCache(), harness.getFileMapProvider(),
+ SerdeUtils.serdesToTypeTraits(keySerdes),
+ SerdeUtils.serdesToComparatorFactories(keySerdes, keySerdes.length), BTreeLeafFrameType.REGULAR_NSM,
+ harness.getFileReference());
+ }
+
+ @Override
+ public void setup() throws Exception {
+ harness.setUp();
+ super.setup();
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ super.tearDown();
+ harness.tearDown();
+ }
+
+}
diff --git a/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/btree/BTreeSearchOperationCallbackTest.java b/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/btree/BTreeSearchOperationCallbackTest.java
new file mode 100644
index 0000000..037d992
--- /dev/null
+++ b/hyracks-tests/hyracks-storage-am-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/btree/BTreeSearchOperationCallbackTest.java
@@ -0,0 +1,35 @@
+package edu.uci.ics.hyracks.storage.am.btree;
+
+import edu.uci.ics.hyracks.dataflow.common.util.SerdeUtils;
+import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeLeafFrameType;
+import edu.uci.ics.hyracks.storage.am.btree.util.BTreeTestHarness;
+import edu.uci.ics.hyracks.storage.am.btree.util.BTreeUtils;
+
+public class BTreeSearchOperationCallbackTest extends AbstractSearchOperationCallbackTest {
+ private final BTreeTestHarness harness;
+
+ public BTreeSearchOperationCallbackTest() {
+ harness = new BTreeTestHarness();
+ }
+
+ @Override
+ protected void createIndexInstance() throws Exception {
+ index = BTreeUtils.createBTree(harness.getBufferCache(), harness.getFileMapProvider(),
+ SerdeUtils.serdesToTypeTraits(keySerdes),
+ SerdeUtils.serdesToComparatorFactories(keySerdes, keySerdes.length), BTreeLeafFrameType.REGULAR_NSM,
+ harness.getFileReference());
+ }
+
+ @Override
+ public void setup() throws Exception {
+ harness.setUp();
+ super.setup();
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ super.tearDown();
+ harness.tearDown();
+ }
+
+}
diff --git a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeModificationOperationCallbackTest.java b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeModificationOperationCallbackTest.java
new file mode 100644
index 0000000..660197c
--- /dev/null
+++ b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeModificationOperationCallbackTest.java
@@ -0,0 +1,104 @@
+package edu.uci.ics.hyracks.storage.am.lsm.btree;
+
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.dataflow.common.util.SerdeUtils;
+import edu.uci.ics.hyracks.dataflow.common.util.TupleUtils;
+import edu.uci.ics.hyracks.storage.am.btree.AbstractModificationOperationCallbackTest;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.btree.util.LSMBTreeTestHarness;
+import edu.uci.ics.hyracks.storage.am.lsm.btree.util.LSMBTreeUtils;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.BlockingIOOperationCallback;
+
+public class LSMBTreeModificationOperationCallbackTest extends AbstractModificationOperationCallbackTest {
+ private static final int NUM_TUPLES = 11;
+
+ private final LSMBTreeTestHarness harness;
+ private final BlockingIOOperationCallback ioOpCallback;
+
+ public LSMBTreeModificationOperationCallbackTest() {
+ super();
+ this.ioOpCallback = new BlockingIOOperationCallback();
+ harness = new LSMBTreeTestHarness();
+ }
+
+ @Override
+ protected void createIndexInstance() throws Exception {
+ ILSMOperationTracker tracker = new ILSMOperationTracker() {
+ @Override
+ public void threadExit(ILSMIndex index) {
+ // Do nothing
+ }
+
+ @Override
+ public void threadEnter(ILSMIndex index) {
+ // Do nothing
+ }
+ };
+ index = LSMBTreeUtils.createLSMTree(harness.getMemBufferCache(), harness.getMemFreePageManager(),
+ harness.getIOManager(), harness.getFileReference(), harness.getDiskBufferCache(),
+ harness.getDiskFileMapProvider(), SerdeUtils.serdesToTypeTraits(keySerdes),
+ SerdeUtils.serdesToComparatorFactories(keySerdes, keySerdes.length), harness.getFlushController(),
+ harness.getMergePolicy(), tracker, harness.getIOScheduler());
+ }
+
+ @Override
+ public void setup() throws Exception {
+ harness.setUp();
+ super.setup();
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ super.tearDown();
+ harness.tearDown();
+ }
+
+ @Test
+ public void modificationCallbackTest() throws Exception {
+ IIndexAccessor accessor = index.createAccessor(cb, NoOpOperationCallback.INSTANCE);
+ ILSMIOOperation flushOp = ((ILSMIndexAccessor) accessor).createFlushOperation(ioOpCallback);
+
+ for (int j = 0; j < 2; j++) {
+ isFoundNull = true;
+ for (int i = 0; i < NUM_TUPLES; i++) {
+ TupleUtils.createIntegerTuple(builder, tuple, i);
+ accessor.insert(tuple);
+ }
+
+ if (j == 1) {
+ harness.getIOScheduler().scheduleOperation(flushOp);
+ ioOpCallback.waitForIO();
+ isFoundNull = true;
+ } else {
+ isFoundNull = false;
+ }
+
+ for (int i = 0; i < NUM_TUPLES; i++) {
+ TupleUtils.createIntegerTuple(builder, tuple, i);
+ accessor.upsert(tuple);
+ }
+
+ if (j == 1) {
+ harness.getIOScheduler().scheduleOperation(flushOp);
+ ioOpCallback.waitForIO();
+ isFoundNull = true;
+ } else {
+ isFoundNull = false;
+ }
+
+ for (int i = 0; i < NUM_TUPLES; i++) {
+ TupleUtils.createIntegerTuple(builder, tuple, i);
+ accessor.delete(tuple);
+ }
+
+ harness.getIOScheduler().scheduleOperation(flushOp);
+ ioOpCallback.waitForIO();
+ }
+ }
+}
diff --git a/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java
new file mode 100644
index 0000000..7534bf1
--- /dev/null
+++ b/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/edu/uci/ics/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java
@@ -0,0 +1,246 @@
+package edu.uci.ics.hyracks.storage.am.lsm.btree;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleReference;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.dataflow.common.util.SerdeUtils;
+import edu.uci.ics.hyracks.dataflow.common.util.TupleUtils;
+import edu.uci.ics.hyracks.storage.am.btree.AbstractSearchOperationCallbackTest;
+import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexBulkLoader;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.btree.util.LSMBTreeTestHarness;
+import edu.uci.ics.hyracks.storage.am.lsm.btree.util.LSMBTreeUtils;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+
+public class LSMBTreeSearchOperationCallbackTest extends AbstractSearchOperationCallbackTest {
+ private final LSMBTreeTestHarness harness;
+
+ public LSMBTreeSearchOperationCallbackTest() {
+ harness = new LSMBTreeTestHarness();
+ }
+
+ @Override
+ protected void createIndexInstance() throws Exception {
+ ILSMOperationTracker tracker = new ILSMOperationTracker() {
+ @Override
+ public void threadExit(ILSMIndex index) {
+ // Do nothing
+ }
+
+ @Override
+ public void threadEnter(ILSMIndex index) {
+ // Do nothing
+ }
+ };
+ index = LSMBTreeUtils.createLSMTree(harness.getMemBufferCache(), harness.getMemFreePageManager(),
+ harness.getIOManager(), harness.getFileReference(), harness.getDiskBufferCache(),
+ harness.getDiskFileMapProvider(), SerdeUtils.serdesToTypeTraits(keySerdes),
+ SerdeUtils.serdesToComparatorFactories(keySerdes, keySerdes.length), harness.getFlushController(),
+ harness.getMergePolicy(), tracker, harness.getIOScheduler());
+ }
+
+ @Override
+ public void setup() throws Exception {
+ harness.setUp();
+ super.setup();
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ super.tearDown();
+ harness.tearDown();
+ }
+
+ @Test
+ public void searchCallbackTest() throws Exception {
+ Future<Boolean> insertFuture = executor.submit(new InsertionTask());
+ Future<Boolean> searchFuture = executor.submit(new SearchTask());
+ Assert.assertTrue(searchFuture.get());
+ Assert.assertTrue(insertFuture.get());
+ }
+
+ private class SearchTask implements Callable<Boolean> {
+ private final ISearchOperationCallback cb;
+ private final IIndexAccessor accessor;
+ private final IIndexCursor cursor;
+ private final RangePredicate predicate;
+ private final ArrayTupleBuilder builder;
+ private final ArrayTupleReference tuple;
+
+ private boolean blockOnHigh;
+ private int expectedAfterBlock;
+
+ public SearchTask() {
+ this.cb = new SynchronizingSearchOperationCallback();
+ this.accessor = index.createAccessor(NoOpOperationCallback.INSTANCE, cb);
+ this.cursor = accessor.createSearchCursor();
+ this.predicate = new RangePredicate();
+ this.builder = new ArrayTupleBuilder(NUM_KEY_FIELDS);
+ this.tuple = new ArrayTupleReference();
+
+ this.blockOnHigh = false;
+ this.expectedAfterBlock = -1;
+ }
+
+ @Override
+ public Boolean call() throws Exception {
+ lock.lock();
+ try {
+ if (!insertTaskStarted) {
+ condition.await();
+ }
+
+ // begin a search on [50, +inf), blocking on 75
+ TupleUtils.createIntegerTuple(builder, tuple, 50);
+ predicate.setLowKey(tuple, true);
+ predicate.setHighKey(null, true);
+ accessor.search(cursor, predicate);
+ consumeIntTupleRange(50, 75, true, 76);
+
+ // consume tuples [77, 150], blocking on 151
+ consumeIntTupleRange(77, 150, true, 150);
+
+ // consume tuples [152, 300]
+ consumeIntTupleRange(152, 300, false, -1);
+
+ cursor.close();
+ } finally {
+ lock.unlock();
+ }
+
+ return true;
+ }
+
+ private void consumeIntTupleRange(int begin, int end, boolean blockOnHigh, int expectedAfterBlock)
+ throws Exception {
+ if (end < begin) {
+ throw new IllegalArgumentException("Invalid range: [" + begin + ", " + end + "]");
+ }
+
+ for (int i = begin; i <= end; i++) {
+ if (blockOnHigh == true && i == end) {
+ this.blockOnHigh = true;
+ this.expectedAfterBlock = expectedAfterBlock;
+ }
+ TupleUtils.createIntegerTuple(builder, tuple, i);
+ if (!cursor.hasNext()) {
+ Assert.fail("Failed to consume entire tuple range since cursor is exhausted.");
+ }
+ cursor.next();
+
+ if (this.blockOnHigh) {
+ TupleUtils.createIntegerTuple(builder, tuple, expectedAfterBlock);
+ }
+ Assert.assertEquals(0, cmp.compare(tuple, cursor.getTuple()));
+ }
+ }
+
+ private class SynchronizingSearchOperationCallback implements ISearchOperationCallback {
+
+ @Override
+ public boolean proceed(ITupleReference tuple) {
+ Assert.assertEquals(0, cmp.compare(SearchTask.this.tuple, tuple));
+ return false;
+ }
+
+ @Override
+ public void reconcile(ITupleReference tuple) {
+ Assert.assertEquals(0, cmp.compare(SearchTask.this.tuple, tuple));
+ if (blockOnHigh) {
+ try {
+ TupleUtils.createIntegerTuple(builder, SearchTask.this.tuple, expectedAfterBlock);
+ } catch (HyracksDataException e) {
+ e.printStackTrace();
+ }
+ condition.signal();
+ condition.awaitUninterruptibly();
+ blockOnHigh = false;
+ }
+ }
+
+ }
+ }
+
+ private class InsertionTask implements Callable<Boolean> {
+ private final IIndexAccessor accessor;
+ private final ArrayTupleBuilder builder;
+ private final ArrayTupleReference tuple;
+
+ public InsertionTask() {
+ this.accessor = index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ this.builder = new ArrayTupleBuilder(NUM_KEY_FIELDS);
+ this.tuple = new ArrayTupleReference();
+ }
+
+ @Override
+ public Boolean call() throws Exception {
+ lock.lock();
+ try {
+ insertTaskStarted = true;
+
+ // bulkload [101, 150] & [151, 200] as two separate disk components
+ // insert [50, 100] & [301, 350] to the in-memory component
+ // delete tuple 151
+ bulkloadIntTupleRange(101, 150);
+ bulkloadIntTupleRange(151, 200);
+ insertIntTupleRange(50, 100);
+ insertIntTupleRange(301, 350);
+ TupleUtils.createIntegerTuple(builder, tuple, 151);
+ accessor.delete(tuple);
+ condition.signal();
+ condition.await();
+
+ // delete tuple 75
+ TupleUtils.createIntegerTuple(builder, tuple, 75);
+ accessor.delete(tuple);
+ condition.signal();
+ condition.await();
+
+ // insert tuples [201, 300] and delete tuple 151
+ insertIntTupleRange(201, 300);
+ condition.signal();
+ } finally {
+ lock.unlock();
+ }
+
+ return true;
+ }
+
+ private void insertIntTupleRange(int begin, int end) throws Exception {
+ if (end < begin) {
+ throw new IllegalArgumentException("Invalid range: [" + begin + ", " + end + "]");
+ }
+
+ for (int i = begin; i <= end; i++) {
+ TupleUtils.createIntegerTuple(builder, tuple, i);
+ accessor.insert(tuple);
+ }
+ }
+
+ private void bulkloadIntTupleRange(int begin, int end) throws Exception {
+ if (end < begin) {
+ throw new IllegalArgumentException("Invalid range: [" + begin + ", " + end + "]");
+ }
+
+ IIndexBulkLoader bulkloader = index.createBulkLoader(1.0f);
+ for (int i = begin; i <= end; i++) {
+ TupleUtils.createIntegerTuple(builder, tuple, i);
+ bulkloader.add(tuple);
+ }
+ bulkloader.end();
+ }
+
+ }
+}