Changed lsm io operation callback such that actions can be taken before and after executing the operation. This mechanism is going to be used by Asterix to inject the last lsn into components.

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@2412 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/ITreeIndexMetaDataFrame.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/ITreeIndexMetaDataFrame.java
index 9e95970..b48ded4 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/ITreeIndexMetaDataFrame.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/api/ITreeIndexMetaDataFrame.java
@@ -47,4 +47,9 @@
 	
 	// Set special validity flag.
 	public void setValid(boolean isValid);
+	
+	// Special placeholder for LSN information. Used for transactional LSM indexes.
+	public long getLSN();
+	
+	public void setLSN(long lsn);
 }
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/frames/LIFOMetaDataFrame.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/frames/LIFOMetaDataFrame.java
index 31c674d..1b8bc15 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/frames/LIFOMetaDataFrame.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/frames/LIFOMetaDataFrame.java
@@ -36,6 +36,7 @@
 	protected static final int levelOff = maxPageOff + 12; //20
 	protected static final int nextPageOff = levelOff + 1; // 21
 	protected static final int validOff = nextPageOff + 4; // 25
+	protected static final int lsnOff = validOff + 4; // 29
 
 	protected ICachedPage page = null;
 	protected ByteBuffer buf = null;
@@ -101,7 +102,7 @@
 	@Override
 	public void initBuffer(byte level) {
 		buf.putInt(tupleCountOff, 0);
-		buf.putInt(freeSpaceOff, validOff + 4);
+		buf.putInt(freeSpaceOff, lsnOff + 4);
 		//buf.putInt(maxPageOff, -1);
 		buf.put(levelOff, level);
 		buf.putInt(nextPageOff, -1);
@@ -131,4 +132,14 @@
             buf.putInt(validOff, 0);
         }
     }
+
+    @Override
+    public long getLSN() {
+        return buf.getLong(lsnOff);
+    }
+
+    @Override
+    public void setLSN(long lsn) {
+        buf.putLong(lsnOff, lsn);
+    }
 }
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallback.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallback.java
index edc9b4d..b7ed45b 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallback.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallback.java
@@ -1,5 +1,11 @@
 package edu.uci.ics.hyracks.storage.am.lsm.common.api;
 
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
 public interface ILSMIOOperationCallback {
-    public void callback();
+    public void beforeOperation(ILSMIOOperation operation) throws HyracksDataException;
+    
+    public void afterOperation(ILSMIOOperation operation, Object newComponent) throws HyracksDataException;
+    
+    public void afterFinalize(ILSMIOOperation operation, Object newComponent) throws HyracksDataException;
 }
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallbackFactory.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallbackFactory.java
new file mode 100644
index 0000000..52361ee
--- /dev/null
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallbackFactory.java
@@ -0,0 +1,7 @@
+package edu.uci.ics.hyracks.storage.am.lsm.common.api;
+
+import java.io.Serializable;
+
+public interface ILSMIOOperationCallbackFactory extends Serializable {
+    public ILSMIOOperationCallback createIOOperationCallback(Object syncObj);
+}
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallback.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallback.java
index 47f556d..53e7224 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallback.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/BlockingIOOperationCallback.java
@@ -1,21 +1,33 @@
 package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
 
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 
 public class BlockingIOOperationCallback implements ILSMIOOperationCallback {
 
     private boolean notified = false;
 
-    @Override
-    public synchronized void callback() {
-        this.notifyAll();
-        notified = true;
-    }
-
     public synchronized void waitForIO() throws InterruptedException {
         if (!notified) {
             this.wait();
         }
         notified = false;
     }
+
+    @Override
+    public void beforeOperation(ILSMIOOperation operation) throws HyracksDataException {
+        // Do nothing.
+    }
+
+    @Override
+    public void afterOperation(ILSMIOOperation operation, Object newComponent) throws HyracksDataException {
+        // Do nothing.
+    }
+
+    @Override
+    public synchronized void afterFinalize(ILSMIOOperation operation, Object newComponent) throws HyracksDataException {
+        this.notifyAll();
+        notified = true;
+    }
 }
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ImmediateScheduler.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ImmediateScheduler.java
index bd690ac..c84a852 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ImmediateScheduler.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ImmediateScheduler.java
@@ -16,8 +16,6 @@
             e.printStackTrace();
         } catch (IndexException e) {
             e.printStackTrace();
-        } finally {
-            operation.getCallback().callback();
         }
     }
 
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 837616a..634800a 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -106,14 +106,20 @@
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Flushing LSM-Index: " + lsmIndex);
         }
-        Object newComponent = lsmIndex.flush(operation);
-
-        // The implementation of this call must take any necessary steps to make
-        // the new component permanent, and mark it as valid (usually this means
-        // forcing all pages of the tree to disk, possibly with some extra
-        // information to mark the tree as valid).
-        lsmIndex.getComponentFinalizer().finalize(newComponent);
-
+        Object newComponent = null;
+        try {
+            operation.getCallback().beforeOperation(operation);
+            newComponent = lsmIndex.flush(operation);
+            operation.getCallback().afterOperation(operation, newComponent);
+            
+            // The implementation of this call must take any necessary steps to make
+            // the new component permanent, and mark it as valid (usually this means
+            // forcing all pages of the tree to disk, possibly with some extra
+            // information to mark the tree as valid).
+            lsmIndex.getComponentFinalizer().finalize(newComponent);
+        } finally {
+            operation.getCallback().afterFinalize(operation, newComponent);
+        }
         lsmIndex.resetInMemoryComponent();
         synchronized (diskComponentsSync) {
             lsmIndex.addFlushedComponent(newComponent);
@@ -175,21 +181,28 @@
         AtomicInteger localSearcherRefCount = searcherRefCount;
 
         List<Object> mergedComponents = new ArrayList<Object>();
-        Object newComponent = lsmIndex.merge(mergedComponents, operation);
+        Object newComponent = null;
+        try {
+            operation.getCallback().beforeOperation(operation);
+            newComponent = lsmIndex.merge(mergedComponents, operation);
+            operation.getCallback().afterOperation(operation, newComponent);
+            
+            // No merge happened.
+            if (newComponent == null) {
+                isMerging.set(false);
+                return;
+            }
 
-        // No merge happened.
-        if (newComponent == null) {
-            isMerging.set(false);
-            return;
+            // TODO: Move this to just before the merge cleanup and remove latching on disk components
+            // The implementation of this call must take any necessary steps to make
+            // the new component permanent, and mark it as valid (usually this means
+            // forcing all pages of the tree to disk, possibly with some extra
+            // information to mark the tree as valid).
+            lsmIndex.getComponentFinalizer().finalize(newComponent);
+        } finally {
+            operation.getCallback().afterFinalize(operation, newComponent);
         }
-
-        // TODO: Move this to just before the merge cleanup and remove latching on disk components
-        // The implementation of this call must take any necessary steps to make
-        // the new component permanent, and mark it as valid (usually this means
-        // forcing all pages of the tree to disk, possibly with some extra
-        // information to mark the tree as valid).
-        lsmIndex.getComponentFinalizer().finalize(newComponent);
-
+        
         // Remove the old Trees from the list, and add the new merged Tree(s).
         // Also, swap the searchRefCount.
         synchronized (diskComponentsSync) {
@@ -202,6 +215,7 @@
             }
             searcherRefCount.set(0);
         }
+        
 
         // Wait for all searchers that are still accessing the old on-disk
         // Trees, then perform the final cleanup of the old Trees.
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallback.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallback.java
index 1e51b41..24bd16c 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallback.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/NoOpIOOperationCallback.java
@@ -1,12 +1,24 @@
 package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
 
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 
 public enum NoOpIOOperationCallback implements ILSMIOOperationCallback {
     INSTANCE;
 
     @Override
-    public void callback() {
+    public void beforeOperation(ILSMIOOperation operation) throws HyracksDataException {
+        // Do nothing.
+    }
+
+    @Override
+    public void afterOperation(ILSMIOOperation operation, Object newComponent) throws HyracksDataException {
+        // Do nothing.
+    }
+
+    @Override
+    public void afterFinalize(ILSMIOOperation operation, Object newComponent) throws HyracksDataException {
         // Do nothing.
     }
 }
diff --git a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ReferenceCountingOperationTracker.java b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ReferenceCountingOperationTracker.java
index 71a178c..5b10ba3 100644
--- a/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ReferenceCountingOperationTracker.java
+++ b/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/ReferenceCountingOperationTracker.java
@@ -2,6 +2,7 @@
 
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
@@ -53,7 +54,17 @@
 
     private class FlushOperationCallback implements ILSMIOOperationCallback {
         @Override
-        public void callback() {
+        public void beforeOperation(ILSMIOOperation operation) throws HyracksDataException {
+            // Do nothing.
+        }
+
+        @Override
+        public void afterOperation(ILSMIOOperation operation, Object newComponent) throws HyracksDataException {
+            // Do nothing.
+        }
+
+        @Override
+        public void afterFinalize(ILSMIOOperation operation, Object newComponent) throws HyracksDataException {
             ReferenceCountingOperationTracker.this.notifyAll();
         }
     }
diff --git a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
index 7711a89..57fd2db 100644
--- a/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
+++ b/hyracks-storage-am-lsm-rtree/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
@@ -60,7 +60,7 @@
 
 public abstract class AbstractLSMRTree implements ILSMIndexInternal, ITreeIndex {
 
-    public class LSMRTreeComponent {
+    public static class LSMRTreeComponent {
         private final RTree rtree;
         private final BTree btree;