added a checkpoint thread which does fuzzy checkpoint periodically

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_lsm_stabilization@1271 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointThread.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointThread.java
new file mode 100644
index 0000000..3eb87bc
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointThread.java
@@ -0,0 +1,66 @@
+package edu.uci.ics.asterix.transaction.management.service.recovery;
+
+import java.util.List;
+
+import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
+import edu.uci.ics.asterix.transaction.management.opcallbacks.IndexOperationTracker;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+
+public class CheckpointThread extends Thread {
+
+    private static final long LSN_THRESHOLD = 64 * 1024 * 1024;
+    private long checkpointTermInSecs = 120; //seconds.
+
+    private long lastMinMCTFirstLSN = 0;
+
+    private final IRecoveryManager recoveryMgr;
+    private final IIndexLifecycleManager indexLifecycleManager;
+
+    public CheckpointThread(IRecoveryManager recoveryMgr, IIndexLifecycleManager indexLifecycleManager,
+            long checkpointTermInSecs) {
+        this.recoveryMgr = recoveryMgr;
+        this.indexLifecycleManager = indexLifecycleManager;
+        if (this.checkpointTermInSecs < checkpointTermInSecs) {
+            this.checkpointTermInSecs = checkpointTermInSecs;
+        }
+    }
+
+    @Override
+    public void run() {
+        long currentMinMCTFirstLSN = 0;
+        while (true) {
+            try {
+                sleep(checkpointTermInSecs * 1000);
+            } catch (InterruptedException e) {
+                //ignore
+            }
+
+            currentMinMCTFirstLSN = getMinMCTFirstLSN();
+            if (currentMinMCTFirstLSN - lastMinMCTFirstLSN > LSN_THRESHOLD) {
+                try {
+                    recoveryMgr.checkpoint(false);
+                    lastMinMCTFirstLSN = currentMinMCTFirstLSN;
+                } catch (ACIDException e) {
+                    throw new Error("failed to checkpoint", e);
+                }
+            }
+        }
+    }
+
+    private long getMinMCTFirstLSN() {
+        List<IIndex> openIndexList = indexLifecycleManager.getOpenIndexes();
+        long minMCTFirstLSN = Long.MAX_VALUE;
+        long firstLSN;
+        if (openIndexList.size() > 0) {
+            for (IIndex index : openIndexList) {
+                firstLSN = ((IndexOperationTracker) ((ILSMIndex) index).getOperationTracker()).getFirstLSN();
+                minMCTFirstLSN = Math.min(minMCTFirstLSN, firstLSN);
+            }
+        } else {
+            minMCTFirstLSN = -1;
+        }
+        return minMCTFirstLSN;
+    }
+}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
index fb37202..fdd3f07 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -87,7 +87,7 @@
 
     public static final boolean IS_DEBUG_MODE = false;//true
     private static final Logger LOGGER = Logger.getLogger(RecoveryManager.class.getName());
-    private TransactionSubsystem txnSubsystem;
+    private final TransactionSubsystem txnSubsystem;
 
     /**
      * A file at a known location that contains the LSN of the last log record
@@ -382,7 +382,7 @@
     }
 
     @Override
-    public void checkpoint(boolean isSharpCheckpoint) throws ACIDException {
+    public synchronized void checkpoint(boolean isSharpCheckpoint) throws ACIDException {
 
         LogManager logMgr = (LogManager) txnSubsystem.getLogManager();
         TransactionManager txnMgr = (TransactionManager) txnSubsystem.getTransactionManager();
@@ -471,7 +471,7 @@
                 file.delete();
             }
         }
-        
+
         if (isSharpCheckpoint) {
             logMgr.renewLogFiles();
         }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionSubsystem.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionSubsystem.java
index 1fc2765..35d1c9b 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionSubsystem.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionSubsystem.java
@@ -21,6 +21,7 @@
 import edu.uci.ics.asterix.transaction.management.service.logging.ILogManager;
 import edu.uci.ics.asterix.transaction.management.service.logging.IndexLoggerRepository;
 import edu.uci.ics.asterix.transaction.management.service.logging.LogManager;
+import edu.uci.ics.asterix.transaction.management.service.recovery.CheckpointThread;
 import edu.uci.ics.asterix.transaction.management.service.recovery.IAsterixAppRuntimeContextProvider;
 import edu.uci.ics.asterix.transaction.management.service.recovery.IRecoveryManager;
 import edu.uci.ics.asterix.transaction.management.service.recovery.RecoveryManager;
@@ -38,6 +39,7 @@
     private final TransactionalResourceManagerRepository resourceRepository;
     private final IndexLoggerRepository loggerRepository;
     private final IAsterixAppRuntimeContextProvider asterixAppRuntimeContextProvider;
+    private final CheckpointThread checkpointThread;
 
     public TransactionSubsystem(String id, IAsterixAppRuntimeContextProvider asterixAppRuntimeContextProvider)
             throws ACIDException {
@@ -49,6 +51,8 @@
         this.loggerRepository = new IndexLoggerRepository(this);
         this.resourceRepository = new TransactionalResourceManagerRepository();
         this.asterixAppRuntimeContextProvider = asterixAppRuntimeContextProvider;
+        this.checkpointThread = new CheckpointThread(recoveryManager,
+                asterixAppRuntimeContextProvider.getIndexLifecycleManager(), 0);
     }
 
     public ILogManager getLogManager() {