Implemented Asterix LSMIO operation callbacks to inject LSNs into LSM components.

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_lsm_stabilization@820 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-transactions/pom.xml b/asterix-transactions/pom.xml
index 203c520..2637220 100644
--- a/asterix-transactions/pom.xml
+++ b/asterix-transactions/pom.xml
@@ -37,5 +37,16 @@
 	    	<artifactId>hyracks-storage-am-lsm-btree</artifactId>
 	    	<version>0.2.2-SNAPSHOT</version>
 	    </dependency>
+	    <dependency>
+	    	<groupId>edu.uci.ics.hyracks</groupId>
+	    	<artifactId>hyracks-storage-am-lsm-rtree</artifactId>
+	    	<version>0.2.2-SNAPSHOT</version>
+	    </dependency>
+	    <dependency>
+	    	<groupId>edu.uci.ics.hyracks</groupId>
+	    	<artifactId>hyracks-storage-am-lsm-invertedindex</artifactId>
+	    	<version>0.2.2-SNAPSHOT</version>
+	    </dependency>
 	</dependencies>
+	
 </project>
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/AbstractLSMIOOperationCallback.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/AbstractLSMIOOperationCallback.java
new file mode 100644
index 0000000..061546f
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/AbstractLSMIOOperationCallback.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.transaction.management.ioopcallbacks;
+
+import edu.uci.ics.asterix.transaction.management.opcallbacks.IndexOperationTracker;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrame;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
+import edu.uci.ics.hyracks.storage.common.file.BufferedFileHandle;
+
+public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationCallback {
+
+    // Object on which blocked LSMIndex operations are waiting.
+    private final IndexOperationTracker opTracker;
+
+    public AbstractLSMIOOperationCallback(IndexOperationTracker opTracker) {
+        this.opTracker = opTracker;
+    }
+
+    @Override
+    public void beforeOperation(ILSMIOOperation operation) {
+        // Do nothing.
+    }
+
+    @Override
+    public synchronized void afterFinalize(ILSMIOOperation operation, Object newComponent) {
+        // Wake up all blocked index operations that were waiting for this io operation to complete.
+        opTracker.notifyAll();
+    }
+
+    protected void putLSNIntoMetadata(ITreeIndex treeIndex) throws HyracksDataException {
+        int fileId = treeIndex.getFileId();
+        IBufferCache bufferCache = treeIndex.getBufferCache();
+        ITreeIndexMetaDataFrame metadataFrame = treeIndex.getFreePageManager().getMetaDataFrameFactory().createFrame();
+        // Mark the component as a valid component by flushing the metadata page to disk
+        int metadataPageId = treeIndex.getFreePageManager().getFirstMetadataPage();
+        ICachedPage metadataPage = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, metadataPageId), false);
+        metadataPage.acquireWriteLatch();
+        try {
+            metadataFrame.setPage(metadataPage);
+            metadataFrame.setLSN(opTracker.getLastLSN());
+        } finally {
+            metadataPage.releaseWriteLatch();
+            bufferCache.unpin(metadataPage);
+        }
+    }
+
+}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMBTreeIOOperationCallback.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMBTreeIOOperationCallback.java
new file mode 100644
index 0000000..74ef056
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMBTreeIOOperationCallback.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.transaction.management.ioopcallbacks;
+
+import edu.uci.ics.asterix.transaction.management.opcallbacks.IndexOperationTracker;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+
+public class LSMBTreeIOOperationCallback extends AbstractLSMIOOperationCallback {
+
+    public LSMBTreeIOOperationCallback(IndexOperationTracker opTracker) {
+        super(opTracker);
+    }
+
+    @Override
+    public void afterOperation(ILSMIOOperation operation, Object newComponent) throws HyracksDataException {
+        BTree btree = (BTree) newComponent;
+        putLSNIntoMetadata(btree);
+    }
+
+}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
new file mode 100644
index 0000000..8a38526
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.transaction.management.ioopcallbacks;
+
+import edu.uci.ics.asterix.transaction.management.opcallbacks.IndexOperationTracker;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
+
+public class LSMBTreeIOOperationCallbackFactory implements ILSMIOOperationCallbackFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public ILSMIOOperationCallback createIOOperationCallback(Object syncObj) {
+        return new LSMBTreeIOOperationCallback((IndexOperationTracker) syncObj);
+    }
+}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
new file mode 100644
index 0000000..5975fa1
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.transaction.management.ioopcallbacks;
+
+import edu.uci.ics.asterix.transaction.management.opcallbacks.IndexOperationTracker;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndex.LSMInvertedIndexComponent;
+
+public class LSMInvertedIndexIOOperationCallback extends AbstractLSMIOOperationCallback {
+
+    public LSMInvertedIndexIOOperationCallback(IndexOperationTracker opTracker) {
+        super(opTracker);
+    }
+
+    @Override
+    public void afterOperation(ILSMIOOperation operation, Object newComponent) throws HyracksDataException {
+        LSMInvertedIndexComponent invIndexComponent = (LSMInvertedIndexComponent) newComponent;
+        putLSNIntoMetadata(invIndexComponent.getDeletedKeysBTree());
+    }
+
+}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
new file mode 100644
index 0000000..d8b796c
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.transaction.management.ioopcallbacks;
+
+import edu.uci.ics.asterix.transaction.management.opcallbacks.IndexOperationTracker;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
+
+public class LSMInvertedIndexIOOperationCallbackFactory implements ILSMIOOperationCallbackFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public ILSMIOOperationCallback createIOOperationCallback(Object syncObj) {
+        return new LSMInvertedIndexIOOperationCallback((IndexOperationTracker) syncObj);
+    }
+}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMRTreeIOOperationCallback.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMRTreeIOOperationCallback.java
new file mode 100644
index 0000000..e29bbae
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMRTreeIOOperationCallback.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.transaction.management.ioopcallbacks;
+
+import edu.uci.ics.asterix.transaction.management.opcallbacks.IndexOperationTracker;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.AbstractLSMRTree.LSMRTreeComponent;
+
+public class LSMRTreeIOOperationCallback extends AbstractLSMIOOperationCallback {
+
+    public LSMRTreeIOOperationCallback(IndexOperationTracker opTracker) {
+        super(opTracker);
+    }
+
+    @Override
+    public void afterOperation(ILSMIOOperation operation, Object newComponent) throws HyracksDataException {
+        LSMRTreeComponent rtreeComponent = (LSMRTreeComponent) newComponent;
+        putLSNIntoMetadata(rtreeComponent.getRTree());
+        putLSNIntoMetadata(rtreeComponent.getBTree());
+    }
+
+}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
new file mode 100644
index 0000000..2a8af33
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.transaction.management.ioopcallbacks;
+
+import edu.uci.ics.asterix.transaction.management.opcallbacks.IndexOperationTracker;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
+
+public class LSMRTreeIOOperationCallbackFactory implements ILSMIOOperationCallbackFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public ILSMIOOperationCallback createIOOperationCallback(Object syncObj) {
+        return new LSMRTreeIOOperationCallback((IndexOperationTracker) syncObj);
+    }
+}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/IndexOperationTracker.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/IndexOperationTracker.java
index 182079f..b2391ae 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/IndexOperationTracker.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/IndexOperationTracker.java
@@ -19,6 +19,7 @@
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
@@ -30,13 +31,14 @@
     private int numActiveOperations = 0;
     private long lastLsn;
     private final ILSMIndex index;
-    private final ILSMIndexAccessor accessor;    
-    private final FlushOperationCallback FLUSHCALLBACK_INSTANCE = new FlushOperationCallback();
+    private final ILSMIndexAccessor accessor;
+    private final ILSMIOOperationCallback ioOpCallback;
     
-    public IndexOperationTracker(ILSMIndex index) {
+    public IndexOperationTracker(ILSMIndex index, ILSMIOOperationCallbackFactory ioOpCallbackFactory) {
         this.index = index;
         accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
                 NoOpOperationCallback.INSTANCE);
+        ioOpCallback = ioOpCallbackFactory.createIOOperationCallback(this);
     }     
 
     @Override
@@ -78,7 +80,7 @@
         // If we need a flush, and this is the last completing operation, then schedule the flush.
         // Once the flush has completed notify all waiting operations.
         if (index.getFlushController().getFlushStatus(index) && numActiveOperations == 0) {
-            index.getIOScheduler().scheduleOperation(accessor.createFlushOperation(FLUSHCALLBACK_INSTANCE));
+            index.getIOScheduler().scheduleOperation(accessor.createFlushOperation(ioOpCallback));
         }
     }
     
@@ -98,11 +100,4 @@
     public void setLastLSN(long lastLsn) {
         this.lastLsn = lastLsn;
     }
-    
-    private class FlushOperationCallback implements ILSMIOOperationCallback {
-        @Override
-        public void callback() {
-            IndexOperationTracker.this.notifyAll();
-        }
-    }
 }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/IndexOperationTrackerFactory.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/IndexOperationTrackerFactory.java
index 1cbc110..032a4f9 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/IndexOperationTrackerFactory.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/IndexOperationTrackerFactory.java
@@ -15,6 +15,7 @@
 
 package edu.uci.ics.asterix.transaction.management.opcallbacks;
 
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
@@ -23,9 +24,15 @@
 
     private static final long serialVersionUID = 1L;
 
+    private final ILSMIOOperationCallbackFactory ioOpCallbackFactory;
+    
+    public IndexOperationTrackerFactory(ILSMIOOperationCallbackFactory ioOpCallbackFactory) {
+        this.ioOpCallbackFactory = ioOpCallbackFactory;
+    }
+    
     @Override
     public ILSMOperationTracker createOperationTracker(ILSMIndex index) {
-        return new IndexOperationTracker(index);
+        return new IndexOperationTracker(index, ioOpCallbackFactory);
     }
 
 }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
index 103c448..a11d00e 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
@@ -56,8 +56,7 @@
             IModificationOperationCallback modCallback = new PrimaryIndexModificationOperationCallback(datasetId,
                     primaryKeyFields, primaryKeyHashFunctions, txnCtx, txnSubsystem.getLockManager(), txnSubsystem,
                     resourceId, indexOp);
-            txnCtx.registerIndex(index);
-            txnCtx.registerCallback((AbstractOperationCallback) modCallback);
+            txnCtx.registerIndexAndCallback(index, (AbstractOperationCallback) modCallback);
             return modCallback;
         } catch (ACIDException e) {
             throw new HyracksDataException(e);