changes to use the Interfaces to get correct LSN of indexes for checkpoint and recovery.

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_lsm_stabilization@1231 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
index 7fd79c2..e048b3a 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -30,15 +30,13 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
+import edu.uci.ics.asterix.transaction.management.opcallbacks.IndexOperationTracker;
 import edu.uci.ics.asterix.transaction.management.resource.ILocalResourceMetadata;
-import edu.uci.ics.asterix.transaction.management.resource.LSMBTreeLocalResourceMetadata;
-import edu.uci.ics.asterix.transaction.management.resource.LSMInvertedIndexLocalResourceMetadata;
-import edu.uci.ics.asterix.transaction.management.resource.LSMRTreeLocalResourceMetadata;
-import edu.uci.ics.asterix.transaction.management.resource.TransactionalResourceManagerRepository;
 import edu.uci.ics.asterix.transaction.management.service.logging.IBuffer;
 import edu.uci.ics.asterix.transaction.management.service.logging.ILogCursor;
 import edu.uci.ics.asterix.transaction.management.service.logging.ILogFilter;
@@ -52,15 +50,26 @@
 import edu.uci.ics.asterix.transaction.management.service.logging.PhysicalLogLocator;
 import edu.uci.ics.asterix.transaction.management.service.transaction.IResourceManager;
 import edu.uci.ics.asterix.transaction.management.service.transaction.IResourceManager.ResourceType;
-import edu.uci.ics.asterix.transaction.management.service.transaction.ITransactionManager;
 import edu.uci.ics.asterix.transaction.management.service.transaction.JobIdFactory;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManager;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexLifecycleManager;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrame;
+import edu.uci.ics.hyracks.storage.am.lsm.btree.impls.LSMBTreeImmutableComponent;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndexImmutableComponent;
+import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.LSMRTreeImmutableComponent;
+import edu.uci.ics.hyracks.storage.am.rtree.impls.RTree;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
+import edu.uci.ics.hyracks.storage.common.file.BufferedFileHandle;
 import edu.uci.ics.hyracks.storage.common.file.ILocalResourceRepository;
 import edu.uci.ics.hyracks.storage.common.file.LocalResource;
 
@@ -132,9 +141,6 @@
 
         ILogManager logManager = txnSubsystem.getLogManager();
         ILogRecordHelper logRecordHelper = logManager.getLogRecordHelper();
-        ITransactionManager txnManager = txnSubsystem.getTransactionManager();
-        TransactionalResourceManagerRepository txnResourceRepository = txnSubsystem
-                .getTransactionalResourceRepository();
 
         //winnerTxnTable is used to add pairs, <committed TxnId, the most recent commit LSN of the TxnId>
         Map<TxnId, Long> winnerTxnTable = new HashMap<TxnId, Long>();
@@ -216,10 +222,12 @@
         byte resourceMgrId;
         long maxDiskLastLSN;
         long currentLSN;
-        IIndex index = null;
+        int resourceType;
+        ILSMIndex index = null;
         LocalResource localResource = null;
         ILocalResourceMetadata localResourceMetadata = null;
-        List<Long> resourceIdList = new ArrayList<Long>();
+        Map<Long, Long> resourceId2MaxLSNMap = new HashMap<Long, Long>();
+        List<ILSMComponent> immutableDiskIndexList = null;
 
         //#. get indexLifeCycleManager 
         IAsterixAppRuntimeContextProvider appRuntimeContext = txnSubsystem.getAsterixAppRuntimeContextProvider();
@@ -244,13 +252,13 @@
                     if (winnerTxnTable.containsKey(tempKeyTxnId)) {
                         currentLSN = winnerTxnTable.get(tempKeyTxnId);
                         resourceId = logRecordHelper.getResourceId(currentLogLocator);
+                        localResource = localResourceRepository.getResourceById(resourceId);
 
                         //get index instance from IndexLifeCycleManager
                         //if index is not registered into IndexLifeCycleManager,
                         //create the index using LocalMetadata stored in LocalResourceRepository
-                        index = indexLifecycleManager.getIndex(resourceId);
+                        index = (ILSMIndex) indexLifecycleManager.getIndex(resourceId);
                         if (index == null) {
-                            localResource = localResourceRepository.getResourceById(resourceId);
 
                             /*******************************************************************
                              * [Notice]
@@ -270,19 +278,51 @@
                             }
                             /*******************************************************************/
 
+                            //#. create index instance and register to indexLifeCycleManager
                             localResourceMetadata = (ILocalResourceMetadata) localResource.getResourceObject();
                             index = localResourceMetadata.createIndexInstance(appRuntimeContext,
                                     localResource.getResourceName(), localResource.getPartition());
                             indexLifecycleManager.register(resourceId, index);
                             indexLifecycleManager.open(resourceId);
-                            resourceIdList.add(resourceId);
-                        }
 
-                        /***************************************************/
-                        //TODO
-                        //get the right maxDiskLastLSN with the resourceId from the Zach. :) 
-                        maxDiskLastLSN = 100;
-                        /***************************************************/
+                            //#. get maxDiskLastLSN
+                            resourceType = localResource.getResourceType();
+                            immutableDiskIndexList = index.getImmutableComponents();
+
+                            maxDiskLastLSN = -1;
+                            switch (resourceType) {
+
+                                case ResourceType.LSM_BTREE:
+                                    for (ILSMComponent c : immutableDiskIndexList) {
+                                        BTree btree = ((LSMBTreeImmutableComponent) c).getBTree();
+                                        maxDiskLastLSN = Math.max(getTreeIndexLSN(btree), maxDiskLastLSN);
+                                    }
+                                    break;
+
+                                case ResourceType.LSM_RTREE:
+                                    for (ILSMComponent c : immutableDiskIndexList) {
+                                        RTree rtree = ((LSMRTreeImmutableComponent) c).getRTree();
+                                        maxDiskLastLSN = Math.max(getTreeIndexLSN(rtree), maxDiskLastLSN);
+                                    }
+                                    break;
+
+                                case ResourceType.LSM_INVERTED_INDEX:
+                                    for (ILSMComponent c : immutableDiskIndexList) {
+                                        BTree delKeyBtree = ((LSMInvertedIndexImmutableComponent) c)
+                                                .getDeletedKeysBTree();
+                                        maxDiskLastLSN = Math.max(getTreeIndexLSN(delKeyBtree), maxDiskLastLSN);
+                                    }
+                                    break;
+
+                                default:
+                                    throw new ACIDException("Unsupported resouce type");
+                            }
+
+                            //#. set resourceId and maxDiskLastLSN to the map
+                            resourceId2MaxLSNMap.put(resourceId, maxDiskLastLSN);
+                        } else {
+                            maxDiskLastLSN = resourceId2MaxLSNMap.get(resourceId);
+                        }
 
                         if (currentLSN > maxDiskLastLSN) {
                             resourceMgrId = logRecordHelper.getResourceMgrId(currentLogLocator);
@@ -312,15 +352,35 @@
                     throw new ACIDException("Unsupported LogType: " + logType);
             }
         }
-        
+
         //close all indexes
+        Set<Long> resourceIdList = resourceId2MaxLSNMap.keySet();
         for (long r : resourceIdList) {
             indexLifecycleManager.close(r);
         }
-        
+
         JobIdFactory.initJobId(maxJobId);
     }
 
+    //TODO
+    //This function came from the AbstractLSMIOOperationCallback class. 
+    //We'd better factor out this function into a component of reading/writing the local metadata of indexes.
+    private long getTreeIndexLSN(ITreeIndex treeIndex) throws HyracksDataException {
+        int fileId = treeIndex.getFileId();
+        IBufferCache bufferCache = treeIndex.getBufferCache();
+        ITreeIndexMetaDataFrame metadataFrame = treeIndex.getFreePageManager().getMetaDataFrameFactory().createFrame();
+        int metadataPageId = treeIndex.getFreePageManager().getFirstMetadataPage();
+        ICachedPage metadataPage = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, metadataPageId), false);
+        metadataPage.acquireReadLatch();
+        try {
+            metadataFrame.setPage(metadataPage);
+            return metadataFrame.getLSN();
+        } finally {
+            metadataPage.releaseReadLatch();
+            bufferCache.unpin(metadataPage);
+        }
+    }
+
     @Override
     public void checkpoint() throws ACIDException {
 
@@ -333,9 +393,18 @@
         File[] prevCheckpointFiles = getPreviousCheckpointFiles();
 
         //#. create and store the checkpointObject into the new checkpoint file
-        //TODO
-        //put the correct minMCTFirstLSN by getting from Zach. :)
-        long minMCTFirstLSM = -1;
+        long minMCTFirstLSM = Long.MAX_VALUE;
+
+        //#. get indexLifeCycleManager
+        IIndexLifecycleManager indexLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
+                .getIndexLifecycleManager();
+        List<IIndex> openIndexList = indexLifecycleManager.getOpenIndexes();
+        long firstLSN;
+        for (IIndex index : openIndexList) {
+            firstLSN = ((IndexOperationTracker) ((ILSMIndex) index).getOperationTracker()).getFirstLSN();
+            minMCTFirstLSM = Math.min(minMCTFirstLSM, firstLSN);
+        }
+
         CheckpointObject checkpointObject = new CheckpointObject(logMgr.getCurrentLsn().get(), minMCTFirstLSM,
                 txnMgr.getMaxJobId(), System.currentTimeMillis());