added a checkpoint thread which does fuzzy checkpoint periodically
git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_lsm_stabilization@1271 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointThread.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointThread.java
new file mode 100644
index 0000000..3eb87bc
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointThread.java
@@ -0,0 +1,66 @@
+package edu.uci.ics.asterix.transaction.management.service.recovery;
+
+import java.util.List;
+
+import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
+import edu.uci.ics.asterix.transaction.management.opcallbacks.IndexOperationTracker;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+
+public class CheckpointThread extends Thread {
+
+ private static final long LSN_THRESHOLD = 64 * 1024 * 1024;
+ private long checkpointTermInSecs = 120; //seconds.
+
+ private long lastMinMCTFirstLSN = 0;
+
+ private final IRecoveryManager recoveryMgr;
+ private final IIndexLifecycleManager indexLifecycleManager;
+
+ public CheckpointThread(IRecoveryManager recoveryMgr, IIndexLifecycleManager indexLifecycleManager,
+ long checkpointTermInSecs) {
+ this.recoveryMgr = recoveryMgr;
+ this.indexLifecycleManager = indexLifecycleManager;
+ if (this.checkpointTermInSecs < checkpointTermInSecs) {
+ this.checkpointTermInSecs = checkpointTermInSecs;
+ }
+ }
+
+ @Override
+ public void run() {
+ long currentMinMCTFirstLSN = 0;
+ while (true) {
+ try {
+ sleep(checkpointTermInSecs * 1000);
+ } catch (InterruptedException e) {
+ //ignore
+ }
+
+ currentMinMCTFirstLSN = getMinMCTFirstLSN();
+ if (currentMinMCTFirstLSN - lastMinMCTFirstLSN > LSN_THRESHOLD) {
+ try {
+ recoveryMgr.checkpoint(false);
+ lastMinMCTFirstLSN = currentMinMCTFirstLSN;
+ } catch (ACIDException e) {
+ throw new Error("failed to checkpoint", e);
+ }
+ }
+ }
+ }
+
+ private long getMinMCTFirstLSN() {
+ List<IIndex> openIndexList = indexLifecycleManager.getOpenIndexes();
+ long minMCTFirstLSN = Long.MAX_VALUE;
+ long firstLSN;
+ if (openIndexList.size() > 0) {
+ for (IIndex index : openIndexList) {
+ firstLSN = ((IndexOperationTracker) ((ILSMIndex) index).getOperationTracker()).getFirstLSN();
+ minMCTFirstLSN = Math.min(minMCTFirstLSN, firstLSN);
+ }
+ } else {
+ minMCTFirstLSN = -1;
+ }
+ return minMCTFirstLSN;
+ }
+}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
index fb37202..fdd3f07 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -87,7 +87,7 @@
public static final boolean IS_DEBUG_MODE = false;//true
private static final Logger LOGGER = Logger.getLogger(RecoveryManager.class.getName());
- private TransactionSubsystem txnSubsystem;
+ private final TransactionSubsystem txnSubsystem;
/**
* A file at a known location that contains the LSN of the last log record
@@ -382,7 +382,7 @@
}
@Override
- public void checkpoint(boolean isSharpCheckpoint) throws ACIDException {
+ public synchronized void checkpoint(boolean isSharpCheckpoint) throws ACIDException {
LogManager logMgr = (LogManager) txnSubsystem.getLogManager();
TransactionManager txnMgr = (TransactionManager) txnSubsystem.getTransactionManager();
@@ -471,7 +471,7 @@
file.delete();
}
}
-
+
if (isSharpCheckpoint) {
logMgr.renewLogFiles();
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionSubsystem.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionSubsystem.java
index 1fc2765..35d1c9b 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionSubsystem.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionSubsystem.java
@@ -21,6 +21,7 @@
import edu.uci.ics.asterix.transaction.management.service.logging.ILogManager;
import edu.uci.ics.asterix.transaction.management.service.logging.IndexLoggerRepository;
import edu.uci.ics.asterix.transaction.management.service.logging.LogManager;
+import edu.uci.ics.asterix.transaction.management.service.recovery.CheckpointThread;
import edu.uci.ics.asterix.transaction.management.service.recovery.IAsterixAppRuntimeContextProvider;
import edu.uci.ics.asterix.transaction.management.service.recovery.IRecoveryManager;
import edu.uci.ics.asterix.transaction.management.service.recovery.RecoveryManager;
@@ -38,6 +39,7 @@
private final TransactionalResourceManagerRepository resourceRepository;
private final IndexLoggerRepository loggerRepository;
private final IAsterixAppRuntimeContextProvider asterixAppRuntimeContextProvider;
+ private final CheckpointThread checkpointThread;
public TransactionSubsystem(String id, IAsterixAppRuntimeContextProvider asterixAppRuntimeContextProvider)
throws ACIDException {
@@ -49,6 +51,8 @@
this.loggerRepository = new IndexLoggerRepository(this);
this.resourceRepository = new TransactionalResourceManagerRepository();
this.asterixAppRuntimeContextProvider = asterixAppRuntimeContextProvider;
+ this.checkpointThread = new CheckpointThread(recoveryManager,
+ asterixAppRuntimeContextProvider.getIndexLifecycleManager(), 0);
}
public ILogManager getLogManager() {