Properly setting LSN in merged components through IO operation callback.

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_lsm_stabilization@823 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/AbstractLSMIOOperationCallback.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/AbstractLSMIOOperationCallback.java
index 061546f..e31fea3 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/AbstractLSMIOOperationCallback.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/AbstractLSMIOOperationCallback.java
@@ -15,6 +15,8 @@
 
 package edu.uci.ics.asterix.transaction.management.ioopcallbacks;
 
+import java.util.List;
+
 import edu.uci.ics.asterix.transaction.management.opcallbacks.IndexOperationTracker;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
@@ -28,7 +30,7 @@
 public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationCallback {
 
     // Object on which blocked LSMIndex operations are waiting.
-    private final IndexOperationTracker opTracker;
+    protected final IndexOperationTracker opTracker;
 
     public AbstractLSMIOOperationCallback(IndexOperationTracker opTracker) {
         this.opTracker = opTracker;
@@ -45,21 +47,38 @@
         opTracker.notifyAll();
     }
 
-    protected void putLSNIntoMetadata(ITreeIndex treeIndex) throws HyracksDataException {
+    protected abstract long getComponentLSN(List<Object> oldComponents) throws HyracksDataException;
+    
+    protected void putLSNIntoMetadata(ITreeIndex treeIndex, List<Object> oldComponents) throws HyracksDataException {
+        long componentLSN = getComponentLSN(oldComponents);
         int fileId = treeIndex.getFileId();
         IBufferCache bufferCache = treeIndex.getBufferCache();
         ITreeIndexMetaDataFrame metadataFrame = treeIndex.getFreePageManager().getMetaDataFrameFactory().createFrame();
-        // Mark the component as a valid component by flushing the metadata page to disk
         int metadataPageId = treeIndex.getFreePageManager().getFirstMetadataPage();
         ICachedPage metadataPage = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, metadataPageId), false);
         metadataPage.acquireWriteLatch();
         try {
             metadataFrame.setPage(metadataPage);
-            metadataFrame.setLSN(opTracker.getLastLSN());
+            metadataFrame.setLSN(componentLSN);
         } finally {
             metadataPage.releaseWriteLatch();
             bufferCache.unpin(metadataPage);
         }
     }
 
+    protected long getTreeIndexLSN(ITreeIndex treeIndex) throws HyracksDataException {
+        int fileId = treeIndex.getFileId();
+        IBufferCache bufferCache = treeIndex.getBufferCache();
+        ITreeIndexMetaDataFrame metadataFrame = treeIndex.getFreePageManager().getMetaDataFrameFactory().createFrame();
+        int metadataPageId = treeIndex.getFreePageManager().getFirstMetadataPage();
+        ICachedPage metadataPage = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, metadataPageId), false);
+        metadataPage.acquireReadLatch();
+        try {
+            metadataFrame.setPage(metadataPage);
+            return metadataFrame.getLSN();
+        } finally {
+            metadataPage.releaseReadLatch();
+            bufferCache.unpin(metadataPage);
+        }
+    }
 }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMBTreeIOOperationCallback.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMBTreeIOOperationCallback.java
index 74ef056..a29b20c 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMBTreeIOOperationCallback.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMBTreeIOOperationCallback.java
@@ -15,6 +15,8 @@
 
 package edu.uci.ics.asterix.transaction.management.ioopcallbacks;
 
+import java.util.List;
+
 import edu.uci.ics.asterix.transaction.management.opcallbacks.IndexOperationTracker;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
@@ -27,9 +29,23 @@
     }
 
     @Override
-    public void afterOperation(ILSMIOOperation operation, Object newComponent) throws HyracksDataException {
+    public void afterOperation(ILSMIOOperation operation, List<Object> oldComponents, Object newComponent) throws HyracksDataException {
         BTree btree = (BTree) newComponent;
-        putLSNIntoMetadata(btree);
+        putLSNIntoMetadata(btree, oldComponents);
     }
 
+    @Override
+    protected long getComponentLSN(List<Object> oldComponents) throws HyracksDataException {
+        if (oldComponents == null) {
+            // Implies a flush IO operation.
+            return opTracker.getLastLSN();
+        }
+        // Get max LSN from the oldComponents. Implies a merge IO operation.
+        long maxLSN = -1;
+        for (Object o : oldComponents) {
+            BTree btree = (BTree) o;
+            maxLSN = Math.max(getTreeIndexLSN(btree), maxLSN);
+        }
+        return maxLSN;
+    }
 }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
index 5975fa1..f5f60f9 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
@@ -15,6 +15,8 @@
 
 package edu.uci.ics.asterix.transaction.management.ioopcallbacks;
 
+import java.util.List;
+
 import edu.uci.ics.asterix.transaction.management.opcallbacks.IndexOperationTracker;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
@@ -27,9 +29,24 @@
     }
 
     @Override
-    public void afterOperation(ILSMIOOperation operation, Object newComponent) throws HyracksDataException {
+    public void afterOperation(ILSMIOOperation operation, List<Object> oldComponents, Object newComponent)
+            throws HyracksDataException {
         LSMInvertedIndexComponent invIndexComponent = (LSMInvertedIndexComponent) newComponent;
-        putLSNIntoMetadata(invIndexComponent.getDeletedKeysBTree());
+        putLSNIntoMetadata(invIndexComponent.getDeletedKeysBTree(), oldComponents);
     }
 
+    @Override
+    protected long getComponentLSN(List<Object> oldComponents) throws HyracksDataException {
+        if (oldComponents == null) {
+            // Implies a flush IO operation.
+            return opTracker.getLastLSN();
+        }
+        // Get max LSN from the oldComponents. Implies a merge IO operation.
+        long maxLSN = -1;
+        for (Object o : oldComponents) {
+            LSMInvertedIndexComponent invIndexComponent = (LSMInvertedIndexComponent) o;
+            maxLSN = Math.max(getTreeIndexLSN(invIndexComponent.getDeletedKeysBTree()), maxLSN);
+        }
+        return maxLSN;
+    }
 }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMRTreeIOOperationCallback.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMRTreeIOOperationCallback.java
index e29bbae..f02c49e 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMRTreeIOOperationCallback.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/ioopcallbacks/LSMRTreeIOOperationCallback.java
@@ -15,6 +15,8 @@
 
 package edu.uci.ics.asterix.transaction.management.ioopcallbacks;
 
+import java.util.List;
+
 import edu.uci.ics.asterix.transaction.management.opcallbacks.IndexOperationTracker;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
@@ -27,10 +29,24 @@
     }
 
     @Override
-    public void afterOperation(ILSMIOOperation operation, Object newComponent) throws HyracksDataException {
+    public void afterOperation(ILSMIOOperation operation, List<Object> oldComponents, Object newComponent) throws HyracksDataException {
         LSMRTreeComponent rtreeComponent = (LSMRTreeComponent) newComponent;
-        putLSNIntoMetadata(rtreeComponent.getRTree());
-        putLSNIntoMetadata(rtreeComponent.getBTree());
+        putLSNIntoMetadata(rtreeComponent.getRTree(), oldComponents);
+        putLSNIntoMetadata(rtreeComponent.getBTree(), oldComponents);
     }
 
+    @Override
+    protected long getComponentLSN(List<Object> oldComponents) throws HyracksDataException {
+        if (oldComponents == null) {
+            // Implies a flush IO operation.
+            return opTracker.getLastLSN();
+        }
+        // Get max LSN from the oldComponents. Implies a merge IO operation.
+        long maxLSN = -1;
+        for (Object o : oldComponents) {
+            LSMRTreeComponent rtreeComponent = (LSMRTreeComponent) o;
+            maxLSN = Math.max(getTreeIndexLSN(rtreeComponent.getRTree()), maxLSN);
+        }
+        return maxLSN;
+    }
 }