Properly Using op callbacks in the lsm inverted index.

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_inverted_index_updates_new@1882 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeSearchCursor.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeSearchCursor.java
index e95bf86..4306527 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeSearchCursor.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMTreeSearchCursor.java
@@ -122,7 +122,7 @@
 
     @Override
     public ITupleReference getTuple() {
-        return (ITupleReference) outputElement.getTuple();
+        return outputElement.getTuple();
     }
 
     protected boolean pushIntoPriorityQueue(PriorityQueueElement e) throws HyracksDataException, IndexException {
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
index f6a7002..c10c01e 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
@@ -290,25 +290,27 @@
     @Override
     public IIndexAccessor createAccessor(IModificationOperationCallback modificationCallback,
             ISearchOperationCallback searchCallback) {
-        // TODO: Ignore opcallbacks for now.
-        return new LSMInvertedIndexAccessor(this, lsmHarness, fileManager, createOpContext());
+        return new LSMInvertedIndexAccessor(this, lsmHarness, fileManager, createOpContext(modificationCallback,
+                searchCallback));
     }
-    
-    private LSMInvertedIndexOpContext createOpContext() {
-        return new LSMInvertedIndexOpContext(memComponent.getInvIndex(), memComponent.getDeletedKeysBTree());
+
+    private LSMInvertedIndexOpContext createOpContext(IModificationOperationCallback modificationCallback,
+            ISearchOperationCallback searchCallback) {
+        return new LSMInvertedIndexOpContext(memComponent.getInvIndex(), memComponent.getDeletedKeysBTree(),
+                modificationCallback, searchCallback);
     }
-    
+
     @Override
     public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput) throws IndexException {
         return new LSMInvertedIndexBulkLoader(fillFactor, verifyInput);
     }
         
     /**
-     * The keys in the in-memory deleted-keys BTree only refers to on-disk components.
+     * The keys in the in-memory deleted-keys BTree only refer to on-disk components.
      * We delete documents from the in-memory inverted index by deleting its entries directly,
      * and do NOT add the deleted key to the deleted-keys BTree.
      * Otherwise, inserts would have to remove keys from the in-memory deleted-keys BTree which 
-     * may cause incorrect behavior in the following pathological case:
+     * may cause incorrect behavior (lost deletes) in the following pathological case:
      * Insert doc 1, flush, delete doc 1, insert doc 1
      * After the sequence above doc 1 will now appear twice because the delete of the on-disk doc 1 has been lost.
      * Insert:
@@ -321,13 +323,15 @@
     public boolean insertUpdateOrDelete(ITupleReference tuple, IIndexOpContext ictx) throws HyracksDataException,
             IndexException {
         LSMInvertedIndexOpContext ctx = (LSMInvertedIndexOpContext) ictx;
+        ctx.modificationCallback.before(tuple);
         switch (ctx.getIndexOp()) {
-            case INSERT: {
-                // Insert into the in-memory inverted index.
+            case INSERT: {                
+                // Insert into the in-memory inverted index.                
                 ctx.memInvIndexAccessor.insert(tuple);
                 break;
             }
             case DELETE: {
+                ctx.modificationCallback.before(tuple);
                 // First remove all entries in the in-memory inverted index (if any).
                 ctx.memInvIndexAccessor.delete(tuple);
                 // Insert key into the deleted-keys BTree.
@@ -339,11 +343,6 @@
                 }
                 break;
             }
-            case PHYSICALDELETE: {
-                // TODO: Think about whether we actually need this, since this is a secondary index.
-                ctx.memInvIndexAccessor.delete(tuple);
-                break;
-            }
             default: {
                 throw new UnsupportedOperationException("Operation " + ctx.getIndexOp() + " not supported.");
             }
@@ -446,7 +445,7 @@
     @Override
     public ILSMIOOperation createMergeOperation(ILSMIOOperationCallback callback) throws HyracksDataException,
             IndexException {
-        LSMInvertedIndexOpContext ctx = createOpContext();
+        LSMInvertedIndexOpContext ctx = createOpContext(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
         ctx.reset(IndexOp.SEARCH);
         IIndexCursor cursor = new LSMInvertedIndexRangeSearchCursor();
         RangePredicate mergePred = new RangePredicate(null, null, true, true, null, null);
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
index 6046943..7c04979 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
@@ -34,12 +34,13 @@
 
 public class LSMInvertedIndexAccessor implements ILSMIndexAccessor, IInvertedIndexAccessor {
 
-    protected final LSMHarness lsmHarness;    
+    protected final LSMHarness lsmHarness;
     protected final ILSMIndexFileManager fileManager;
     protected final IIndexOpContext ctx;
     protected final LSMInvertedIndex invIndex;
-    
-    public LSMInvertedIndexAccessor(LSMInvertedIndex invIndex, LSMHarness lsmHarness, ILSMIndexFileManager fileManager, IIndexOpContext ctx) {
+
+    public LSMInvertedIndexAccessor(LSMInvertedIndex invIndex, LSMHarness lsmHarness, ILSMIndexFileManager fileManager,
+            IIndexOpContext ctx) {
         this.lsmHarness = lsmHarness;
         this.fileManager = fileManager;
         this.ctx = ctx;
@@ -54,13 +55,7 @@
 
     @Override
     public void delete(ITupleReference tuple) throws HyracksDataException, IndexException {
-        ctx.reset(IndexOp.DELETE);
-        lsmHarness.insertUpdateOrDelete(tuple, ctx);
-    }
-    
-    @Override
-    public void physicalDelete(ITupleReference tuple) throws HyracksDataException, IndexException {
-        ctx.reset(IndexOp.PHYSICALDELETE);
+        ctx.reset(IndexOp.DELETE);        
         lsmHarness.insertUpdateOrDelete(tuple, ctx);
     }
     
@@ -110,19 +105,22 @@
             HyracksDataException {
         search(cursor, searchPred);
     }
-    
+
+    @Override
+    public void physicalDelete(ITupleReference tuple) throws HyracksDataException, IndexException {
+        throw new UnsupportedOperationException("Physical delete not supported by lsm inverted index.");
+    }
+
     @Override
     public void update(ITupleReference tuple) throws HyracksDataException, IndexException {
-        // TODO Auto-generated method stub
-        
+        throw new UnsupportedOperationException("Update not supported by lsm inverted index.");
     }
 
     @Override
     public void upsert(ITupleReference tuple) throws HyracksDataException, IndexException {
-        // TODO Auto-generated method stub
-        
+        throw new UnsupportedOperationException("Upsert not supported by lsm inverted index.");
     }
-    
+
     @Override
     public IInvertedListCursor createInvertedListCursor() {
         throw new UnsupportedOperationException("Cannot create inverted list cursor on lsm inverted index.");
@@ -131,5 +129,6 @@
     @Override
     public void openInvertedListCursor(IInvertedListCursor listCursor, ITupleReference searchKey)
             throws HyracksDataException, IndexException {
-        throw new UnsupportedOperationException("Cannot open inverted list cursor on lsm inverted index.");}
+        throw new UnsupportedOperationException("Cannot open inverted list cursor on lsm inverted index.");
+    }
 }
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java
index 2ff02cc..cdf8cf7 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java
@@ -17,6 +17,8 @@
 
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexOpContext;
+import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
@@ -30,6 +32,9 @@
     private final IInvertedIndex memInvIndex;
     private final IIndex memDeletedKeysBTree;
 
+    public final IModificationOperationCallback modificationCallback;
+    public final ISearchOperationCallback searchCallback;
+    
     // Tuple that only has the inverted-index elements (aka keys), projecting away the document fields.
     public PermutingTupleReference keysOnlyTuple;
     
@@ -38,9 +43,12 @@
     // Accessor to the deleted-keys BTree.
     public IIndexAccessor deletedKeysBTreeAccessor;
 
-    public LSMInvertedIndexOpContext(IInvertedIndex memInvIndex, IIndex memDeletedKeysBTree) {
+    public LSMInvertedIndexOpContext(IInvertedIndex memInvIndex, IIndex memDeletedKeysBTree,
+            IModificationOperationCallback modificationCallback, ISearchOperationCallback searchCallback) {
         this.memInvIndex = memInvIndex;
         this.memDeletedKeysBTree = memDeletedKeysBTree;
+        this.modificationCallback = modificationCallback;
+        this.searchCallback = searchCallback;
     }
 
     @Override
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursor.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursor.java
index d10cbe9..bd8bc93 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursor.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexRangeSearchCursor.java
@@ -38,6 +38,11 @@
     protected RangePredicate keySearchPred;
     
     @Override
+    public void next() throws HyracksDataException {
+        super.next();
+    }
+    
+    @Override
     public void open(ICursorInitialState initState, ISearchPredicate searchPred) throws IndexException,
             HyracksDataException {
         LSMInvertedIndexRangeSearchCursorInitialState lsmInitState = (LSMInvertedIndexRangeSearchCursorInitialState) initState;
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexSearchCursor.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexSearchCursor.java
index 37e7f51..9bb52fb 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexSearchCursor.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexSearchCursor.java
@@ -23,6 +23,7 @@
 import edu.uci.ics.hyracks.storage.am.common.api.ICursorInitialState;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
+import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
 import edu.uci.ics.hyracks.storage.am.common.api.ISearchPredicate;
 import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
@@ -44,11 +45,12 @@
     private AtomicInteger searcherRefCount;
     private List<IIndexAccessor> indexAccessors;
     private ISearchPredicate searchPred;
+    private ISearchOperationCallback searchCallback;
     
     // Assuming the cursor for all deleted-keys indexes are of the same type.
-    protected IIndexCursor deletedKeysBTreeCursor;
-    protected List<IIndexAccessor> deletedKeysBTreeAccessors;
-    protected RangePredicate keySearchPred;
+    private IIndexCursor deletedKeysBTreeCursor;
+    private List<IIndexAccessor> deletedKeysBTreeAccessors;
+    private RangePredicate keySearchPred;    
 
     @Override
     public void open(ICursorInitialState initialState, ISearchPredicate searchPred) throws HyracksDataException {
@@ -59,6 +61,7 @@
         indexAccessors = lsmInitState.getIndexAccessors();
         accessorIndex = 0;
         this.searchPred = searchPred;
+        this.searchCallback = lsmInitState.getSearchOperationCallback();
         
         // For searching the deleted-keys BTrees.
         deletedKeysBTreeAccessors = lsmInitState.getDeletedKeysBTreeAccessors();
@@ -134,7 +137,12 @@
     @Override
     public void next() throws HyracksDataException {
         // Mark the tuple as consumed, so hasNext() can move on.
-        tupleConsumed = true;                
+        tupleConsumed = true;
+        // We assume that the underlying cursors materialize their results such that
+        // there is no need to reposition the result cursor after reconciliation.
+        if (!searchCallback.proceed(currentCursor.getTuple())) {
+            searchCallback.reconcile(currentCursor.getTuple());
+        }
     }
 
     @Override
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexSearchCursorInitialState.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexSearchCursorInitialState.java
index 6102cee..712014d 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexSearchCursorInitialState.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexSearchCursorInitialState.java
@@ -34,13 +34,14 @@
     private final LSMHarness lsmHarness;
     private final List<IIndexAccessor> indexAccessors;
     private final List<IIndexAccessor> deletedKeysBTreeAccessors;
-    private final IIndexOpContext opContext;
+    private final LSMInvertedIndexOpContext ctx;
     private ISearchOperationCallback searchCallback;
     private MultiComparator originalCmp;
     private final MultiComparator keyCmp;
     private final PermutingTupleReference keysOnlyTuple;
 
-    public LSMInvertedIndexSearchCursorInitialState(final MultiComparator keyCmp, PermutingTupleReference keysOnlyTuple, List<IIndexAccessor> indexAccessors,
+    public LSMInvertedIndexSearchCursorInitialState(final MultiComparator keyCmp,
+            PermutingTupleReference keysOnlyTuple, List<IIndexAccessor> indexAccessors,
             List<IIndexAccessor> deletedKeysBTreeAccessors, IIndexOpContext ctx, boolean includeMemComponent,
             AtomicInteger searcherfRefCount, LSMHarness lsmHarness) {
         this.keyCmp = keyCmp;
@@ -50,7 +51,8 @@
         this.includeMemComponent = includeMemComponent;
         this.searcherfRefCount = searcherfRefCount;
         this.lsmHarness = lsmHarness;
-        this.opContext = ctx;
+        this.ctx = (LSMInvertedIndexOpContext) ctx;
+        this.searchCallback = this.ctx.searchCallback;
     }
 
     @Override
@@ -79,7 +81,7 @@
     }
 
     public IIndexOpContext getOpContext() {
-        return opContext;
+        return ctx;
     }
 
     @Override
diff --git a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/config/AccessMethodTestsConfig.java b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/config/AccessMethodTestsConfig.java
index a98e6f3..a09268b 100644
--- a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/config/AccessMethodTestsConfig.java
+++ b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/config/AccessMethodTestsConfig.java
@@ -78,8 +78,8 @@
     // Test parameters.
     public static final int LSM_INVINDEX_NUM_DOCS_TO_INSERT = 10000;
     // Used for full-fledged search test.
-    public static final int LSM_INVINDEX_NUM_DOC_QUERIES = 500;
-    public static final int LSM_INVINDEX_NUM_RANDOM_QUERIES = 500;
+    public static final int LSM_INVINDEX_NUM_DOC_QUERIES = 1000;
+    public static final int LSM_INVINDEX_NUM_RANDOM_QUERIES = 1000;
     // Used for non-search tests to sanity check index searches.
     public static final int LSM_INVINDEX_TINY_NUM_DOC_QUERIES = 200;
     public static final int LSM_INVINDEX_TINY_NUM_RANDOM_QUERIES = 200;
@@ -89,7 +89,7 @@
     public static final int LSM_INVINDEX_NUM_DELETE_ROUNDS = 3;
     // Allocate a generous size to make sure we have enough elements for all tests.
     public static final int LSM_INVINDEX_SCAN_COUNT_ARRAY_SIZE = 1000000;
-    public static final int LSM_INVINDEX_MULTITHREAD_NUM_OPERATIONS = 5000;
+    public static final int LSM_INVINDEX_MULTITHREAD_NUM_OPERATIONS = 10000;
     
 }