implemented recovery mechanism, but still need to wire up interface of getting LSNs from LSM Index
git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_lsm_stabilization@1115 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCBootstrapImpl.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCBootstrapImpl.java
index 24cf3a8..100c837 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCBootstrapImpl.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCBootstrapImpl.java
@@ -27,6 +27,8 @@
import edu.uci.ics.asterix.metadata.api.IAsterixStateProxy;
import edu.uci.ics.asterix.metadata.api.IMetadataNode;
import edu.uci.ics.asterix.metadata.bootstrap.MetadataBootstrap;
+import edu.uci.ics.asterix.transaction.management.service.recovery.IRecoveryManager;
+import edu.uci.ics.asterix.transaction.management.service.recovery.IRecoveryManager.SystemState;
import edu.uci.ics.hyracks.api.application.INCApplicationContext;
import edu.uci.ics.hyracks.api.application.INCBootstrap;
@@ -64,7 +66,18 @@
MetadataManager.INSTANCE = new MetadataManager(proxy);
MetadataManager.INSTANCE.init();
MetadataBootstrap.startUniverse(proxy.getAsterixProperties(), ncApplicationContext);
-
+ }
+
+ //#. recover if the system is corrupted by checking system state.
+ IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager();
+ if (!proxy.getAsterixProperties().isNewUniverse()) {
+ if (recoveryMgr.getSystemState() == SystemState.CORRUPTED) {
+ recoveryMgr.startRecovery(true);
+ }
+ }
+ recoveryMgr.checkpoint();
+
+ if (isMetadataNode) {
// Start a sub-component for the API server. This server is only connected to by the
// API server that lives on the CC and never by a client wishing to execute AQL.
// TODO: The API sub-system will change dramatically in the future and this code will go away,
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
index b1a3c10..3e75a84 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -163,13 +163,15 @@
fileMapProvider = runtimeContext.getFileMapManager();
ioManager = ncApplicationContext.getRootContext().getIOManager();
- // Begin a transaction against the metadata.
- // Lock the metadata in X mode.
- MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
- MetadataManager.INSTANCE.lock(mdTxnCtx, LockMode.X);
-
- try {
- if (isNewUniverse) {
+ if (isNewUniverse) {
+ //Do checkpoint only if it is new universe
+ runtimeContext.getTransactionSubsystem().getRecoveryManager().checkpoint();
+ MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ try {
+ // Begin a transaction against the metadata.
+ // Lock the metadata in X mode.
+ MetadataManager.INSTANCE.lock(mdTxnCtx, LockMode.X);
+
for (int i = 0; i < primaryIndexes.length; i++) {
enlistMetadataDataset(primaryIndexes[i], true);
registerTransactionalResource(primaryIndexes[i], resourceRepository);
@@ -185,24 +187,26 @@
insertNodes(mdTxnCtx);
insertInitialGroups(mdTxnCtx);
insertInitialAdapters(mdTxnCtx);
- LOGGER.info("FINISHED CREATING METADATA B-TREES.");
- } else {
- for (int i = 0; i < primaryIndexes.length; i++) {
- enlistMetadataDataset(primaryIndexes[i], false);
- registerTransactionalResource(primaryIndexes[i], resourceRepository);
- }
- for (int i = 0; i < secondaryIndexes.length; i++) {
- enlistMetadataDataset(secondaryIndexes[i], false);
- registerTransactionalResource(secondaryIndexes[i], resourceRepository);
- }
- LOGGER.info("FINISHED ENLISTMENT OF METADATA B-TREES.");
+
+ MetadataManager.INSTANCE.initializeDatasetIdFactory(mdTxnCtx);
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ } catch (Exception e) {
+ MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+ throw e;
}
- MetadataManager.INSTANCE.initializeDatasetIdFactory(mdTxnCtx);
- MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
- } catch (Exception e) {
- MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
- throw e;
+ LOGGER.info("FINISHED CREATING METADATA B-TREES.");
+ } else {
+ for (int i = 0; i < primaryIndexes.length; i++) {
+ enlistMetadataDataset(primaryIndexes[i], false);
+ registerTransactionalResource(primaryIndexes[i], resourceRepository);
+ }
+ for (int i = 0; i < secondaryIndexes.length; i++) {
+ enlistMetadataDataset(secondaryIndexes[i], false);
+ registerTransactionalResource(secondaryIndexes[i], resourceRepository);
+ }
+ LOGGER.info("FINISHED ENLISTMENT OF METADATA B-TREES.");
}
+
}
public static void stopUniverse() throws HyracksDataException {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexResourceManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexResourceManager.java
index 703e3e6..ca72204 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexResourceManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexResourceManager.java
@@ -108,8 +108,46 @@
}
}
- public void redo(ILogRecordHelper logRecordHelper, LogicalLogLocator logicalLogLocator) throws ACIDException {
- throw new UnsupportedOperationException(" Redo logic will be implemented as part of crash recovery feature");
+ public void redo(ILogRecordHelper logRecordHelper, LogicalLogLocator logLocator) throws ACIDException {
+ long resourceId = logRecordHelper.getResourceId(logLocator);
+ int offset = logRecordHelper.getLogContentBeginPos(logLocator);
+
+ IIndex index = (IIndex) provider.getAsterixAppRuntimeContextProvider().getIndexLifecycleManager()
+ .getIndex(resourceId);
+
+ /* field count */
+ int fieldCount = logLocator.getBuffer().readInt(logLocator.getMemoryOffset() + offset);
+ offset += 4;
+
+ /* new operation */
+ byte newOperation = logLocator.getBuffer().getByte(logLocator.getMemoryOffset() + offset);
+ offset += 1;
+
+ /* new value size */
+ int newValueSize = logLocator.getBuffer().readInt(logLocator.getMemoryOffset() + offset);
+ offset += 4;
+
+ /* new value */
+ SimpleTupleWriter tupleWriter = new SimpleTupleWriter();
+ SimpleTupleReference newTuple = (SimpleTupleReference) tupleWriter.createTupleReference();
+ newTuple.setFieldCount(fieldCount);
+ newTuple.resetByTupleOffset(logLocator.getBuffer().getByteBuffer(), offset);
+ offset += newValueSize;
+
+ ILSMIndexAccessor indexAccessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
+ NoOpOperationCallback.INSTANCE);
+
+ try {
+ if (newOperation == IndexOperation.INSERT.ordinal()) {
+ indexAccessor.insert(newTuple);
+ } else if (newOperation == IndexOperation.DELETE.ordinal()) {
+ indexAccessor.delete(newTuple);
+ } else {
+ new ACIDException("Unsupported operation type for undo operation : " + newOperation);
+ }
+ } catch (Exception e) {
+ throw new ACIDException("Redo failed", e);
+ }
}
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointObject.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointObject.java
new file mode 100644
index 0000000..b9dd59f
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointObject.java
@@ -0,0 +1,54 @@
+package edu.uci.ics.asterix.transaction.management.service.recovery;
+
+import java.io.Serializable;
+
+public class CheckpointObject implements Serializable, Comparable<CheckpointObject> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final long checkpointLSN;
+ private final long minMCTFirstLSN;
+ private final int maxJobId;
+ private final long timeStamp;
+
+ public CheckpointObject(long checkpointLSN, long minMCTFirstLSN, int maxJobId, long timeStamp) {
+ this.checkpointLSN = checkpointLSN;
+ this.minMCTFirstLSN = minMCTFirstLSN;
+ this.maxJobId = maxJobId;
+ this.timeStamp = timeStamp;
+ }
+
+ public long getCheckpointLSN() {
+ return checkpointLSN;
+ }
+
+ public long getMinMCTFirstLSN() {
+ return minMCTFirstLSN;
+ }
+
+ public int getMaxJobId() {
+ return maxJobId;
+ }
+
+ public long getTimeStamp() {
+ return timeStamp;
+ }
+
+ @Override
+ public int compareTo(CheckpointObject checkpointObject) {
+ long compareTimeStamp = checkpointObject.getTimeStamp();
+
+ //decending order
+ long diff = compareTimeStamp - this.timeStamp;
+ if (diff > 0) {
+ return 1;
+ } else if (diff == 0){
+ return 0;
+ } else {
+ return -1;
+ }
+
+ //ascending order
+ //return this.timeStamp - compareTimeStamp;
+ }
+}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/IRecoveryManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/IRecoveryManager.java
index 2e0fd47..5328091 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/IRecoveryManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/IRecoveryManager.java
@@ -57,7 +57,7 @@
* recovery.
* @throws ACIDException
*/
- public SystemState startRecovery(boolean synchronous) throws IOException, ACIDException;
+ public void startRecovery(boolean synchronous) throws IOException, ACIDException;
/**
* Rolls back a transaction.
@@ -67,4 +67,6 @@
* @throws ACIDException
*/
public void rollbackTransaction(TransactionContext txnContext) throws ACIDException;
+
+ public void checkpoint() throws ACIDException;
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
index 0905907..18e7be1 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -14,12 +14,14 @@
*/
package edu.uci.ics.asterix.transaction.management.service.recovery;
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
import java.io.File;
-import java.io.FileReader;
-import java.io.FileWriter;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FilenameFilter;
import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
@@ -32,7 +34,11 @@
import java.util.logging.Logger;
import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
-import edu.uci.ics.asterix.transaction.management.service.logging.FileUtil;
+import edu.uci.ics.asterix.transaction.management.resource.ILocalResourceMetadata;
+import edu.uci.ics.asterix.transaction.management.resource.LSMBTreeLocalResourceMetadata;
+import edu.uci.ics.asterix.transaction.management.resource.LSMInvertedIndexLocalResourceMetadata;
+import edu.uci.ics.asterix.transaction.management.resource.LSMRTreeLocalResourceMetadata;
+import edu.uci.ics.asterix.transaction.management.resource.TransactionalResourceRepository;
import edu.uci.ics.asterix.transaction.management.service.logging.IBuffer;
import edu.uci.ics.asterix.transaction.management.service.logging.ILogCursor;
import edu.uci.ics.asterix.transaction.management.service.logging.ILogFilter;
@@ -40,15 +46,22 @@
import edu.uci.ics.asterix.transaction.management.service.logging.ILogRecordHelper;
import edu.uci.ics.asterix.transaction.management.service.logging.IndexResourceManager;
import edu.uci.ics.asterix.transaction.management.service.logging.LogManager;
-import edu.uci.ics.asterix.transaction.management.service.logging.LogRecordHelper;
import edu.uci.ics.asterix.transaction.management.service.logging.LogType;
import edu.uci.ics.asterix.transaction.management.service.logging.LogUtil;
import edu.uci.ics.asterix.transaction.management.service.logging.LogicalLogLocator;
import edu.uci.ics.asterix.transaction.management.service.logging.PhysicalLogLocator;
import edu.uci.ics.asterix.transaction.management.service.transaction.IResourceManager;
+import edu.uci.ics.asterix.transaction.management.service.transaction.IResourceManager.ResourceType;
+import edu.uci.ics.asterix.transaction.management.service.transaction.ITransactionManager;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants;
+import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManager;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexLifecycleManager;
+import edu.uci.ics.hyracks.storage.common.file.ILocalResourceRepository;
+import edu.uci.ics.hyracks.storage.common.file.LocalResource;
/**
* This is the Recovery Manager and is responsible for rolling back a
@@ -66,102 +79,350 @@
* A file at a known location that contains the LSN of the last log record
* traversed doing a successful checkpoint.
*/
- private static final String checkpoint_record_file = "last_checkpoint_lsn";
+ private static final String CHECKPOINT_FILENAME_PREFIX = "checkpoint_";
private SystemState state;
- private Map<Long, TransactionTableEntry> transactionTable;
- private Map<Long, List<PhysicalLogLocator>> dirtyPagesTable;
public RecoveryManager(TransactionSubsystem TransactionProvider) throws ACIDException {
this.txnSubsystem = TransactionProvider;
- try {
- FileUtil.createFileIfNotExists(checkpoint_record_file);
- } catch (IOException ioe) {
- throw new ACIDException(" unable to create checkpoint record file " + checkpoint_record_file, ioe);
- }
+ /**********
+ * try {
+ * FileUtil.createFileIfNotExists(checkpoint_record_file);
+ * } catch (IOException ioe) {
+ * throw new ACIDException(" unable to create checkpoint record file " + checkpoint_record_file, ioe);
+ * }
+ ***********/
}
+ /**
+ * returns system state which could be one of the three states: HEALTHY, RECOVERING, CORRUPTED.
+ * This state information could be used in a case where more than one thread is running
+ * in the bootstrap process to provide higher availability. In other words, while the system
+ * is recovered, another thread may start a new transaction with understanding the side effect
+ * of the operation, or the system can be recovered concurrently. This kind of concurrency is
+ * not supported, yet.
+ */
public SystemState getSystemState() throws ACIDException {
- return state;
+
+ //#. read checkpoint file
+ CheckpointObject checkpointObject = null;
+ try {
+ checkpointObject = readCheckpoint();
+ } catch (FileNotFoundException e) {
+ //This is initial bootstrap.
+ //Otherwise, the checkpoint file is deleted unfortunately. What we can do in this case?
+ state = SystemState.CORRUPTED;
+ new ACIDException("Checkpoint file doesn't exist", e);
+ }
+
+ //#. if checkpointLSN in the checkpoint file is equal to the lastLSN in the log file,
+ // then return healthy state. Otherwise, return corrupted.
+ LogManager logMgr = (LogManager) txnSubsystem.getLogManager();
+ if (checkpointObject.getCheckpointLSN() == logMgr.getCurrentLsn().get()) {
+ state = SystemState.HEALTHY;
+ return state;
+ } else {
+ state = SystemState.CORRUPTED;
+ return state;
+ }
}
private PhysicalLogLocator getBeginRecoveryLSN() throws ACIDException {
return new PhysicalLogLocator(0, txnSubsystem.getLogManager());
}
- /**
- * TODO:This method is currently not implemented completely.
- */
- public SystemState startRecovery(boolean synchronous) throws IOException, ACIDException {
- ILogManager logManager = txnSubsystem.getLogManager();
- state = SystemState.RECOVERING;
- transactionTable = new HashMap<Long, TransactionTableEntry>();
- dirtyPagesTable = new HashMap<Long, List<PhysicalLogLocator>>();
+ public void startRecovery(boolean synchronous) throws IOException, ACIDException {
- PhysicalLogLocator beginLSN = getBeginRecoveryLSN();
- ILogCursor cursor = logManager.readLog(beginLSN, new ILogFilter() {
+ state = SystemState.RECOVERING;
+
+ ILogManager logManager = txnSubsystem.getLogManager();
+ ILogRecordHelper logRecordHelper = logManager.getLogRecordHelper();
+ ITransactionManager txnManager = txnSubsystem.getTransactionManager();
+ TransactionalResourceRepository txnResourceRepository = txnSubsystem.getTransactionalResourceRepository();
+
+ //winnerTxnTable is used to add pairs, <committed TxnId, the most recent commit LSN of the TxnId>
+ Map<TxnId, Long> winnerTxnTable = new HashMap<TxnId, Long>();
+ TxnId tempKeyTxnId = new TxnId(-1, -1, -1);
+ byte logType;
+
+ //#. read checkpoint file and set lowWaterMark where anaylsis and redo start
+ CheckpointObject checkpointObject = readCheckpoint();
+ long lowWaterMarkLSN = checkpointObject.getMinMCTFirstLSN();
+ int maxJobId = checkpointObject.getMaxJobId();
+ int currentJobId;
+
+ //-------------------------------------------------------------------------
+ // [ analysis phase ]
+ // - collect all committed LSN
+ // - if there are duplicate commits for the same TxnId,
+ // keep only the mostRecentCommitLSN among the duplicates.
+ //-------------------------------------------------------------------------
+
+ //#. set log cursor to the lowWaterMarkLSN
+ ILogCursor logCursor = logManager.readLog(new PhysicalLogLocator(lowWaterMarkLSN, logManager),
+ new ILogFilter() {
+ public boolean accept(IBuffer logs, long startOffset, int endOffset) {
+ return true;
+ }
+ });
+ LogicalLogLocator currentLogLocator = LogUtil.getDummyLogicalLogLocator(logManager);
+
+ //#. collect all committed txn's pairs,<TxnId, LSN>
+ while (logCursor.next(currentLogLocator)) {
+
+ if (LogManager.IS_DEBUG_MODE) {
+ System.out.println(logManager.getLogRecordHelper().getLogRecordForDisplay(currentLogLocator));
+ }
+
+ logType = logRecordHelper.getLogType(currentLogLocator);
+
+ //update max jobId
+ currentJobId = logRecordHelper.getJobId(currentLogLocator);
+ if (currentJobId > maxJobId) {
+ maxJobId = currentJobId;
+ }
+
+ switch (logType) {
+ case LogType.UPDATE:
+ //do nothing
+ break;
+
+ case LogType.COMMIT:
+ tempKeyTxnId.setTxnId(logRecordHelper.getJobId(currentLogLocator),
+ logRecordHelper.getDatasetId(currentLogLocator),
+ logRecordHelper.getPKHashValue(currentLogLocator));
+ winnerTxnTable.put(tempKeyTxnId, currentLogLocator.getLsn());
+ break;
+
+ default:
+ throw new ACIDException("Unsupported LogType: " + logType);
+ }
+ }
+
+ //-------------------------------------------------------------------------
+ // [ redo phase ]
+ // - redo if
+ // 1) The TxnId is committed --> gurantee durability
+ // &&
+ // 2) the currentLSN > maxDiskLastLSN of the index --> guarantee idempotance
+ //-------------------------------------------------------------------------
+
+ //#. set log cursor to the lowWaterMarkLSN again.
+ logCursor = logManager.readLog(new PhysicalLogLocator(lowWaterMarkLSN, logManager), new ILogFilter() {
public boolean accept(IBuffer logs, long startOffset, int endOffset) {
return true;
}
});
- LogicalLogLocator memLSN = new LogicalLogLocator(beginLSN.getLsn(), null, -1, logManager);
- boolean logValidity = true;
- LogRecordHelper parser = new LogRecordHelper(logManager);
- try {
- while (logValidity) {
- logValidity = cursor.next(memLSN);
- if (!logValidity) {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("reached end of log !");
+ currentLogLocator = LogUtil.getDummyLogicalLogLocator(logManager);
+
+ long resourceId;
+ byte resourceMgrId;
+ long maxDiskLastLSN;
+ long currentLSN;
+ IIndex index = null;
+ LocalResource localResource = null;
+ ILocalResourceMetadata localResourceMetadata = null;
+
+ //#. get indexLifeCycleManager
+ IAsterixAppRuntimeContextProvider appRuntimeContext = txnSubsystem.getAsterixAppRuntimeContextProvider();
+ IIndexLifecycleManager indexLifecycleManager = appRuntimeContext.getIndexLifecycleManager();
+ ILocalResourceRepository localResourceRepository = appRuntimeContext.getLocalResourceRepository();
+
+ //#. redo
+ while (logCursor.next(currentLogLocator)) {
+
+ if (LogManager.IS_DEBUG_MODE) {
+ System.out.println(logManager.getLogRecordHelper().getLogRecordForDisplay(currentLogLocator));
+ }
+
+ logType = logRecordHelper.getLogType(currentLogLocator);
+
+ switch (logType) {
+ case LogType.UPDATE:
+ tempKeyTxnId.setTxnId(logRecordHelper.getJobId(currentLogLocator),
+ logRecordHelper.getDatasetId(currentLogLocator),
+ logRecordHelper.getPKHashValue(currentLogLocator));
+
+ if (winnerTxnTable.containsKey(tempKeyTxnId)) {
+ currentLSN = winnerTxnTable.get(tempKeyTxnId);
+ resourceId = logRecordHelper.getResourceId(currentLogLocator);
+
+ //get index instance from IndexLifeCycleManager
+ //if index is not registered into IndexLifeCycleManager,
+ //create the index using LocalMetadata stored in LocalResourceRepository
+ index = indexLifecycleManager.getIndex(resourceId);
+ if (index == null) {
+ localResource = localResourceRepository.getResourceById(resourceId);
+ localResourceMetadata = (ILocalResourceMetadata) localResource.getResourceObject();
+ index = localResourceMetadata.createIndexInstance(appRuntimeContext,
+ localResource.getResourceName(), localResource.getPartition());
+ indexLifecycleManager.register(resourceId, index);
+ }
+
+ /***************************************************/
+ //TODO
+ //get the right maxDiskLastLSN with the resourceId from the Zach. :)
+ maxDiskLastLSN = 100;
+ /***************************************************/
+
+ if (currentLSN > maxDiskLastLSN) {
+ resourceMgrId = logRecordHelper.getResourceMgrId(currentLogLocator);
+
+ // look up the repository to get the resource manager
+ // register resourceMgr if it is not registered.
+ IResourceManager resourceMgr = txnSubsystem.getTransactionalResourceRepository()
+ .getTransactionalResourceMgr(resourceMgrId);
+ if (resourceMgr == null) {
+ resourceMgr = new IndexResourceManager(resourceMgrId, txnSubsystem);
+ txnSubsystem.getTransactionalResourceRepository().registerTransactionalResourceManager(
+ resourceMgrId, resourceMgr);
+ }
+
+ //redo finally.
+ resourceMgr.redo(logRecordHelper, currentLogLocator);
+ }
}
break;
- }
- byte resourceMgrId = parser.getResourceMgrId(memLSN);
- IResourceManager resourceMgr = txnSubsystem.getTransactionalResourceRepository()
- .getTransactionalResourceMgr(resourceMgrId);
- //register resourceMgr if it is not registered.
- if (resourceMgr == null) {
- resourceMgr = new IndexResourceManager(resourceMgrId, txnSubsystem);
- txnSubsystem.getTransactionalResourceRepository().registerTransactionalResourceManager(
- resourceMgrId, resourceMgr);
- }
- resourceMgr.redo(parser, memLSN);
- writeCheckpointRecord(memLSN.getLsn());
+ case LogType.COMMIT:
+ //do nothing
+ break;
+
+ default:
+ throw new ACIDException("Unsupported LogType: " + logType);
}
- state = SystemState.HEALTHY;
- } catch (Exception e) {
- state = SystemState.CORRUPTED;
- throw new ACIDException(" could not recover , corrputed log !", e);
}
- return state;
}
- private void writeCheckpointRecord(long lsn) throws ACIDException {
+ @Override
+ public void checkpoint() throws ACIDException {
+
+ LogManager logMgr = (LogManager) txnSubsystem.getLogManager();
+ TransactionManager txnMgr = (TransactionManager) txnSubsystem.getTransactionManager();
+ String logDir = logMgr.getLogManagerProperties().getLogDir();
+
+ //#. get the filename of the previous checkpoint files which are about to be deleted
+ // right after the new checkpoint file is written.
+ File[] prevCheckpointFiles = getPreviousCheckpointFiles();
+
+ //#. create and store the checkpointObject into the new checkpoint file
+ //TODO
+ //put the correct minMCTFirstLSN by getting from Zach. :)
+ long minMCTFirstLSM = 0;
+ CheckpointObject checkpointObject = new CheckpointObject(logMgr.getCurrentLsn().get(), minMCTFirstLSM,
+ txnMgr.getMaxJobId(), System.currentTimeMillis());
+
+ FileOutputStream fos = null;
+ ObjectOutputStream oosToFos = null;
try {
- FileWriter writer = new FileWriter(new File(checkpoint_record_file));
- BufferedWriter buffWriter = new BufferedWriter(writer);
- buffWriter.write("" + lsn);
- buffWriter.flush();
- } catch (IOException ioe) {
- throw new ACIDException(" unable to create check point record", ioe);
+ String fileName = getFileName(logDir, Long.toString(checkpointObject.getTimeStamp()));
+ fos = new FileOutputStream(fileName);
+ oosToFos = new ObjectOutputStream(fos);
+ oosToFos.writeObject(checkpointObject);
+ oosToFos.flush();
+ } catch (IOException e) {
+ throw new ACIDException("Failed to checkpoint", e);
+ } finally {
+ if (oosToFos != null) {
+ try {
+ oosToFos.close();
+ } catch (IOException e) {
+ throw new ACIDException("Failed to checkpoint", e);
+ }
+ }
+ if (oosToFos == null && fos != null) {
+ try {
+ fos.close();
+ } catch (IOException e) {
+ throw new ACIDException("Failed to checkpoint", e);
+ }
+ }
+ }
+
+ //#. delete the previous checkpoint files
+ if (prevCheckpointFiles != null) {
+ for (File file : prevCheckpointFiles) {
+ file.delete();
+ }
}
}
- /*
- * Currently this method is not used, but will be used as part of crash
- * recovery logic.
- */
- private long getLastCheckpointRecordLSN() throws Exception {
- FileReader reader;
- BufferedReader buffReader;
- String content = null;
- reader = new FileReader(new File(checkpoint_record_file));
- buffReader = new BufferedReader(reader);
- content = buffReader.readLine();
- if (content != null) {
- return Long.parseLong(content);
+ private CheckpointObject readCheckpoint() throws ACIDException, FileNotFoundException {
+
+ CheckpointObject checkpointObject = null;
+
+ //#. read all checkpointObjects from the existing checkpoint files
+ File[] prevCheckpointFiles = getPreviousCheckpointFiles();
+
+ if (prevCheckpointFiles == null || prevCheckpointFiles.length == 0) {
+ throw new FileNotFoundException("Checkpoint file is not found");
}
- return -1;
+
+ List<CheckpointObject> checkpointObjectList = new ArrayList<CheckpointObject>();
+
+ for (File file : prevCheckpointFiles) {
+ FileInputStream fis = null;
+ ObjectInputStream oisFromFis = null;
+
+ try {
+ fis = new FileInputStream(file);
+ oisFromFis = new ObjectInputStream(fis);
+ checkpointObject = (CheckpointObject) oisFromFis.readObject();
+ checkpointObjectList.add(checkpointObject);
+ } catch (Exception e) {
+ throw new ACIDException("Failed to read a checkpoint file", e);
+ } finally {
+ if (oisFromFis != null) {
+ try {
+ oisFromFis.close();
+ } catch (IOException e) {
+ throw new ACIDException("Failed to read a checkpoint file", e);
+ }
+ }
+ if (oisFromFis == null && fis != null) {
+ try {
+ fis.close();
+ } catch (IOException e) {
+ throw new ACIDException("Failed to read a checkpoint file", e);
+ }
+ }
+ }
+ }
+
+ //#. sort checkpointObjects in descending order by timeStamp to find out the most recent one.
+ Collections.sort(checkpointObjectList);
+
+ //#. return the most recent one (the first one in sorted list)
+ return checkpointObjectList.get(0);
+ }
+
+ private File[] getPreviousCheckpointFiles() {
+ String logDir = txnSubsystem.getLogManager().getLogManagerProperties().getLogDir();
+
+ File parentDir = new File(logDir);
+
+ FilenameFilter filter = new FilenameFilter() {
+ @Override
+ public boolean accept(File dir, String name) {
+ if (name.contains(CHECKPOINT_FILENAME_PREFIX)) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+ };
+
+ File[] prevCheckpointFiles = parentDir.listFiles(filter);
+
+ return prevCheckpointFiles;
+ }
+
+ private String getFileName(String baseDir, String suffix) {
+
+ if (!baseDir.endsWith(System.getProperty("file.separator"))) {
+ baseDir += System.getProperty("file.separator");
+ }
+
+ return baseDir + CHECKPOINT_FILENAME_PREFIX + suffix;
}
/**
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
index bafa753..9cd4104 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -16,6 +16,7 @@
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -31,6 +32,7 @@
private static final Logger LOGGER = Logger.getLogger(TransactionManager.class.getName());
private final TransactionSubsystem transactionProvider;
private Map<JobId, TransactionContext> transactionContextRepository = new HashMap<JobId, TransactionContext>();
+ private AtomicInteger maxJobId = new AtomicInteger(0);
public TransactionManager(TransactionSubsystem provider) {
this.transactionProvider = provider;
@@ -64,6 +66,7 @@
@Override
public TransactionContext beginTransaction(JobId jobId) throws ACIDException {
+ setMaxJobId(jobId.getId());
TransactionContext txnContext = new TransactionContext(jobId, transactionProvider);
synchronized (this) {
transactionContextRepository.put(jobId, txnContext);
@@ -73,6 +76,7 @@
@Override
public TransactionContext getTransactionContext(JobId jobId) throws ACIDException {
+ setMaxJobId(jobId.getId());
synchronized (transactionContextRepository) {
TransactionContext context = transactionContextRepository.get(jobId);
@@ -142,4 +146,12 @@
public TransactionSubsystem getTransactionProvider() {
return transactionProvider;
}
+
+ public void setMaxJobId(int jobId) {
+ maxJobId.set(Math.max(maxJobId.get(), jobId));
+ }
+
+ public int getMaxJobId() {
+ return maxJobId.get();
+ }
}
diff --git a/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/logging/test/RecoverySimulator.java b/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/logging/test/RecoverySimulator.java
index a7bd24c..54ec036 100644
--- a/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/logging/test/RecoverySimulator.java
+++ b/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/logging/test/RecoverySimulator.java
@@ -24,8 +24,8 @@
private static IRecoveryManager recoveryManager;
- public static IRecoveryManager.SystemState startRecovery() throws IOException, ACIDException {
- return recoveryManager.startRecovery(true);
+ public static void startRecovery() throws IOException, ACIDException {
+ recoveryManager.startRecovery(true);
}
public static void main(String args[]) throws IOException, ACIDException {
diff --git a/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/test/TransactionSimulator.java b/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/test/TransactionSimulator.java
index 3cc8f66..092f2ed 100644
--- a/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/test/TransactionSimulator.java
+++ b/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/test/TransactionSimulator.java
@@ -95,10 +95,9 @@
transactionManager.commitTransaction(context, new DatasetId(-1), -1);
}
- public SystemState recover() throws ACIDException, IOException {
- SystemState state = recoveryManager.startRecovery(true);
+ public void recover() throws ACIDException, IOException {
+ recoveryManager.startRecovery(true);
((FileResource) resource).sync();
- return state;
}
/**
@@ -125,8 +124,7 @@
}
int finalExpectedValue = existingValue + schedule.getDeltaChange();
- SystemState state = txnSimulator.recover();
- System.out.println(" State is " + state);
+ txnSimulator.recover();
boolean isCorrect = ((FileResource) resource).checkIfValueInSync(finalExpectedValue);
System.out.println(" Did recovery happen correctly " + isCorrect);
}