changes to use the Interfaces to get correct LSN of indexes for checkpoint and recovery.
git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_lsm_stabilization@1231 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
index 7fd79c2..e048b3a 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -30,15 +30,13 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
+import edu.uci.ics.asterix.transaction.management.opcallbacks.IndexOperationTracker;
import edu.uci.ics.asterix.transaction.management.resource.ILocalResourceMetadata;
-import edu.uci.ics.asterix.transaction.management.resource.LSMBTreeLocalResourceMetadata;
-import edu.uci.ics.asterix.transaction.management.resource.LSMInvertedIndexLocalResourceMetadata;
-import edu.uci.ics.asterix.transaction.management.resource.LSMRTreeLocalResourceMetadata;
-import edu.uci.ics.asterix.transaction.management.resource.TransactionalResourceManagerRepository;
import edu.uci.ics.asterix.transaction.management.service.logging.IBuffer;
import edu.uci.ics.asterix.transaction.management.service.logging.ILogCursor;
import edu.uci.ics.asterix.transaction.management.service.logging.ILogFilter;
@@ -52,15 +50,26 @@
import edu.uci.ics.asterix.transaction.management.service.logging.PhysicalLogLocator;
import edu.uci.ics.asterix.transaction.management.service.transaction.IResourceManager;
import edu.uci.ics.asterix.transaction.management.service.transaction.IResourceManager.ResourceType;
-import edu.uci.ics.asterix.transaction.management.service.transaction.ITransactionManager;
import edu.uci.ics.asterix.transaction.management.service.transaction.JobIdFactory;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManager;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexLifecycleManager;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrame;
+import edu.uci.ics.hyracks.storage.am.lsm.btree.impls.LSMBTreeImmutableComponent;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndexImmutableComponent;
+import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.LSMRTreeImmutableComponent;
+import edu.uci.ics.hyracks.storage.am.rtree.impls.RTree;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
+import edu.uci.ics.hyracks.storage.common.file.BufferedFileHandle;
import edu.uci.ics.hyracks.storage.common.file.ILocalResourceRepository;
import edu.uci.ics.hyracks.storage.common.file.LocalResource;
@@ -132,9 +141,6 @@
ILogManager logManager = txnSubsystem.getLogManager();
ILogRecordHelper logRecordHelper = logManager.getLogRecordHelper();
- ITransactionManager txnManager = txnSubsystem.getTransactionManager();
- TransactionalResourceManagerRepository txnResourceRepository = txnSubsystem
- .getTransactionalResourceRepository();
//winnerTxnTable is used to add pairs, <committed TxnId, the most recent commit LSN of the TxnId>
Map<TxnId, Long> winnerTxnTable = new HashMap<TxnId, Long>();
@@ -216,10 +222,12 @@
byte resourceMgrId;
long maxDiskLastLSN;
long currentLSN;
- IIndex index = null;
+ int resourceType;
+ ILSMIndex index = null;
LocalResource localResource = null;
ILocalResourceMetadata localResourceMetadata = null;
- List<Long> resourceIdList = new ArrayList<Long>();
+ Map<Long, Long> resourceId2MaxLSNMap = new HashMap<Long, Long>();
+ List<ILSMComponent> immutableDiskIndexList = null;
//#. get indexLifeCycleManager
IAsterixAppRuntimeContextProvider appRuntimeContext = txnSubsystem.getAsterixAppRuntimeContextProvider();
@@ -244,13 +252,13 @@
if (winnerTxnTable.containsKey(tempKeyTxnId)) {
currentLSN = winnerTxnTable.get(tempKeyTxnId);
resourceId = logRecordHelper.getResourceId(currentLogLocator);
+ localResource = localResourceRepository.getResourceById(resourceId);
//get index instance from IndexLifeCycleManager
//if index is not registered into IndexLifeCycleManager,
//create the index using LocalMetadata stored in LocalResourceRepository
- index = indexLifecycleManager.getIndex(resourceId);
+ index = (ILSMIndex) indexLifecycleManager.getIndex(resourceId);
if (index == null) {
- localResource = localResourceRepository.getResourceById(resourceId);
/*******************************************************************
* [Notice]
@@ -270,19 +278,51 @@
}
/*******************************************************************/
+ //#. create index instance and register to indexLifeCycleManager
localResourceMetadata = (ILocalResourceMetadata) localResource.getResourceObject();
index = localResourceMetadata.createIndexInstance(appRuntimeContext,
localResource.getResourceName(), localResource.getPartition());
indexLifecycleManager.register(resourceId, index);
indexLifecycleManager.open(resourceId);
- resourceIdList.add(resourceId);
- }
- /***************************************************/
- //TODO
- //get the right maxDiskLastLSN with the resourceId from the Zach. :)
- maxDiskLastLSN = 100;
- /***************************************************/
+ //#. get maxDiskLastLSN
+ resourceType = localResource.getResourceType();
+ immutableDiskIndexList = index.getImmutableComponents();
+
+ maxDiskLastLSN = -1;
+ switch (resourceType) {
+
+ case ResourceType.LSM_BTREE:
+ for (ILSMComponent c : immutableDiskIndexList) {
+ BTree btree = ((LSMBTreeImmutableComponent) c).getBTree();
+ maxDiskLastLSN = Math.max(getTreeIndexLSN(btree), maxDiskLastLSN);
+ }
+ break;
+
+ case ResourceType.LSM_RTREE:
+ for (ILSMComponent c : immutableDiskIndexList) {
+ RTree rtree = ((LSMRTreeImmutableComponent) c).getRTree();
+ maxDiskLastLSN = Math.max(getTreeIndexLSN(rtree), maxDiskLastLSN);
+ }
+ break;
+
+ case ResourceType.LSM_INVERTED_INDEX:
+ for (ILSMComponent c : immutableDiskIndexList) {
+ BTree delKeyBtree = ((LSMInvertedIndexImmutableComponent) c)
+ .getDeletedKeysBTree();
+ maxDiskLastLSN = Math.max(getTreeIndexLSN(delKeyBtree), maxDiskLastLSN);
+ }
+ break;
+
+ default:
+ throw new ACIDException("Unsupported resouce type");
+ }
+
+ //#. set resourceId and maxDiskLastLSN to the map
+ resourceId2MaxLSNMap.put(resourceId, maxDiskLastLSN);
+ } else {
+ maxDiskLastLSN = resourceId2MaxLSNMap.get(resourceId);
+ }
if (currentLSN > maxDiskLastLSN) {
resourceMgrId = logRecordHelper.getResourceMgrId(currentLogLocator);
@@ -312,15 +352,35 @@
throw new ACIDException("Unsupported LogType: " + logType);
}
}
-
+
//close all indexes
+ Set<Long> resourceIdList = resourceId2MaxLSNMap.keySet();
for (long r : resourceIdList) {
indexLifecycleManager.close(r);
}
-
+
JobIdFactory.initJobId(maxJobId);
}
+ //TODO
+ //This function came from the AbstractLSMIOOperationCallback class.
+ //We'd better factor out this function into a component of reading/writing the local metadata of indexes.
+ private long getTreeIndexLSN(ITreeIndex treeIndex) throws HyracksDataException {
+ int fileId = treeIndex.getFileId();
+ IBufferCache bufferCache = treeIndex.getBufferCache();
+ ITreeIndexMetaDataFrame metadataFrame = treeIndex.getFreePageManager().getMetaDataFrameFactory().createFrame();
+ int metadataPageId = treeIndex.getFreePageManager().getFirstMetadataPage();
+ ICachedPage metadataPage = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, metadataPageId), false);
+ metadataPage.acquireReadLatch();
+ try {
+ metadataFrame.setPage(metadataPage);
+ return metadataFrame.getLSN();
+ } finally {
+ metadataPage.releaseReadLatch();
+ bufferCache.unpin(metadataPage);
+ }
+ }
+
@Override
public void checkpoint() throws ACIDException {
@@ -333,9 +393,18 @@
File[] prevCheckpointFiles = getPreviousCheckpointFiles();
//#. create and store the checkpointObject into the new checkpoint file
- //TODO
- //put the correct minMCTFirstLSN by getting from Zach. :)
- long minMCTFirstLSM = -1;
+ long minMCTFirstLSM = Long.MAX_VALUE;
+
+ //#. get indexLifeCycleManager
+ IIndexLifecycleManager indexLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
+ .getIndexLifecycleManager();
+ List<IIndex> openIndexList = indexLifecycleManager.getOpenIndexes();
+ long firstLSN;
+ for (IIndex index : openIndexList) {
+ firstLSN = ((IndexOperationTracker) ((ILSMIndex) index).getOperationTracker()).getFirstLSN();
+ minMCTFirstLSM = Math.min(minMCTFirstLSM, firstLSN);
+ }
+
CheckpointObject checkpointObject = new CheckpointObject(logMgr.getCurrentLsn().get(), minMCTFirstLSM,
txnMgr.getMaxJobId(), System.currentTimeMillis());