implemented recovery mechanism, but still need to wire up interface of getting LSNs from LSM Index

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_lsm_stabilization@1115 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCBootstrapImpl.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCBootstrapImpl.java
index 24cf3a8..100c837 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCBootstrapImpl.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCBootstrapImpl.java
@@ -27,6 +27,8 @@
 import edu.uci.ics.asterix.metadata.api.IAsterixStateProxy;
 import edu.uci.ics.asterix.metadata.api.IMetadataNode;
 import edu.uci.ics.asterix.metadata.bootstrap.MetadataBootstrap;
+import edu.uci.ics.asterix.transaction.management.service.recovery.IRecoveryManager;
+import edu.uci.ics.asterix.transaction.management.service.recovery.IRecoveryManager.SystemState;
 import edu.uci.ics.hyracks.api.application.INCApplicationContext;
 import edu.uci.ics.hyracks.api.application.INCBootstrap;
 
@@ -64,7 +66,18 @@
             MetadataManager.INSTANCE = new MetadataManager(proxy);
             MetadataManager.INSTANCE.init();
             MetadataBootstrap.startUniverse(proxy.getAsterixProperties(), ncApplicationContext);
-
+        }
+        
+        //#. recover if the system is corrupted by checking system state.
+        IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager();
+        if (!proxy.getAsterixProperties().isNewUniverse()) {
+            if (recoveryMgr.getSystemState() == SystemState.CORRUPTED) {
+                recoveryMgr.startRecovery(true);
+            }
+        }
+        recoveryMgr.checkpoint();
+        
+        if (isMetadataNode) {
             // Start a sub-component for the API server. This server is only connected to by the 
             // API server that lives on the CC and never by a client wishing to execute AQL.
             // TODO: The API sub-system will change dramatically in the future and this code will go away, 
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
index b1a3c10..3e75a84 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -163,13 +163,15 @@
         fileMapProvider = runtimeContext.getFileMapManager();
         ioManager = ncApplicationContext.getRootContext().getIOManager();
 
-        // Begin a transaction against the metadata.
-        // Lock the metadata in X mode.
-        MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-        MetadataManager.INSTANCE.lock(mdTxnCtx, LockMode.X);
-
-        try {
-            if (isNewUniverse) {
+        if (isNewUniverse) {
+            //Do checkpoint only if it is new universe
+            runtimeContext.getTransactionSubsystem().getRecoveryManager().checkpoint();
+            MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+            try {
+                // Begin a transaction against the metadata.
+                // Lock the metadata in X mode.
+                MetadataManager.INSTANCE.lock(mdTxnCtx, LockMode.X);
+                
                 for (int i = 0; i < primaryIndexes.length; i++) {
                     enlistMetadataDataset(primaryIndexes[i], true);
                     registerTransactionalResource(primaryIndexes[i], resourceRepository);
@@ -185,24 +187,26 @@
                 insertNodes(mdTxnCtx);
                 insertInitialGroups(mdTxnCtx);
                 insertInitialAdapters(mdTxnCtx);
-                LOGGER.info("FINISHED CREATING METADATA B-TREES.");
-            } else {
-                for (int i = 0; i < primaryIndexes.length; i++) {
-                    enlistMetadataDataset(primaryIndexes[i], false);
-                    registerTransactionalResource(primaryIndexes[i], resourceRepository);
-                }
-                for (int i = 0; i < secondaryIndexes.length; i++) {
-                    enlistMetadataDataset(secondaryIndexes[i], false);
-                    registerTransactionalResource(secondaryIndexes[i], resourceRepository);
-                }
-                LOGGER.info("FINISHED ENLISTMENT OF METADATA B-TREES.");
+                
+                MetadataManager.INSTANCE.initializeDatasetIdFactory(mdTxnCtx);
+                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            } catch (Exception e) {
+                MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+                throw e;
             }
-            MetadataManager.INSTANCE.initializeDatasetIdFactory(mdTxnCtx);
-            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-        } catch (Exception e) {
-            MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
-            throw e;
+            LOGGER.info("FINISHED CREATING METADATA B-TREES.");
+        } else {
+            for (int i = 0; i < primaryIndexes.length; i++) {
+                enlistMetadataDataset(primaryIndexes[i], false);
+                registerTransactionalResource(primaryIndexes[i], resourceRepository);
+            }
+            for (int i = 0; i < secondaryIndexes.length; i++) {
+                enlistMetadataDataset(secondaryIndexes[i], false);
+                registerTransactionalResource(secondaryIndexes[i], resourceRepository);
+            }
+            LOGGER.info("FINISHED ENLISTMENT OF METADATA B-TREES.");
         }
+
     }
 
     public static void stopUniverse() throws HyracksDataException {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexResourceManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexResourceManager.java
index 703e3e6..ca72204 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexResourceManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexResourceManager.java
@@ -108,8 +108,46 @@
         }
     }
 
-    public void redo(ILogRecordHelper logRecordHelper, LogicalLogLocator logicalLogLocator) throws ACIDException {
-        throw new UnsupportedOperationException(" Redo logic will be implemented as part of crash recovery feature");
+    public void redo(ILogRecordHelper logRecordHelper, LogicalLogLocator logLocator) throws ACIDException {
+        long resourceId = logRecordHelper.getResourceId(logLocator);
+        int offset = logRecordHelper.getLogContentBeginPos(logLocator);
+
+        IIndex index = (IIndex) provider.getAsterixAppRuntimeContextProvider().getIndexLifecycleManager()
+                .getIndex(resourceId);
+
+        /* field count */
+        int fieldCount = logLocator.getBuffer().readInt(logLocator.getMemoryOffset() + offset);
+        offset += 4;
+
+        /* new operation */
+        byte newOperation = logLocator.getBuffer().getByte(logLocator.getMemoryOffset() + offset);
+        offset += 1;
+
+        /* new value size */
+        int newValueSize = logLocator.getBuffer().readInt(logLocator.getMemoryOffset() + offset);
+        offset += 4;
+
+        /* new value */
+        SimpleTupleWriter tupleWriter = new SimpleTupleWriter();
+        SimpleTupleReference newTuple = (SimpleTupleReference) tupleWriter.createTupleReference();
+        newTuple.setFieldCount(fieldCount);
+        newTuple.resetByTupleOffset(logLocator.getBuffer().getByteBuffer(), offset);
+        offset += newValueSize;
+
+        ILSMIndexAccessor indexAccessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
+                NoOpOperationCallback.INSTANCE);
+
+        try {
+            if (newOperation == IndexOperation.INSERT.ordinal()) {
+                indexAccessor.insert(newTuple);
+            } else if (newOperation == IndexOperation.DELETE.ordinal()) {
+                indexAccessor.delete(newTuple);
+            } else {
+                new ACIDException("Unsupported operation type for undo operation : " + newOperation);
+            }
+        } catch (Exception e) {
+            throw new ACIDException("Redo failed", e);
+        }
     }
 
 }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointObject.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointObject.java
new file mode 100644
index 0000000..b9dd59f
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointObject.java
@@ -0,0 +1,54 @@
+package edu.uci.ics.asterix.transaction.management.service.recovery;
+
+import java.io.Serializable;
+
+public class CheckpointObject implements Serializable, Comparable<CheckpointObject> {
+
+    private static final long serialVersionUID = 1L;
+    
+    private final long checkpointLSN;
+    private final long minMCTFirstLSN;
+    private final int maxJobId;
+    private final long timeStamp;
+
+    public CheckpointObject(long checkpointLSN, long minMCTFirstLSN, int maxJobId, long timeStamp) {
+        this.checkpointLSN = checkpointLSN;
+        this.minMCTFirstLSN = minMCTFirstLSN;
+        this.maxJobId = maxJobId;
+        this.timeStamp = timeStamp;
+    }
+    
+    public long getCheckpointLSN() {
+        return checkpointLSN;
+    }
+
+    public long getMinMCTFirstLSN() {
+        return minMCTFirstLSN;
+    }
+
+    public int getMaxJobId() {
+        return maxJobId;
+    }
+
+    public long getTimeStamp() {
+        return timeStamp;
+    }
+
+    @Override
+    public int compareTo(CheckpointObject checkpointObject) {
+        long compareTimeStamp = checkpointObject.getTimeStamp(); 
+        
+        //decending order
+        long diff = compareTimeStamp - this.timeStamp;
+        if (diff > 0) {
+            return 1;
+        } else if (diff == 0){
+            return 0;
+        } else {
+            return -1;
+        }
+ 
+        //ascending order
+        //return this.timeStamp - compareTimeStamp;
+    }
+}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/IRecoveryManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/IRecoveryManager.java
index 2e0fd47..5328091 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/IRecoveryManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/IRecoveryManager.java
@@ -57,7 +57,7 @@
      *         recovery.
      * @throws ACIDException
      */
-    public SystemState startRecovery(boolean synchronous) throws IOException, ACIDException;
+    public void startRecovery(boolean synchronous) throws IOException, ACIDException;
 
     /**
      * Rolls back a transaction.
@@ -67,4 +67,6 @@
      * @throws ACIDException
      */
     public void rollbackTransaction(TransactionContext txnContext) throws ACIDException;
+
+    public void checkpoint() throws ACIDException;
 }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
index 0905907..18e7be1 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -14,12 +14,14 @@
  */
 package edu.uci.ics.asterix.transaction.management.service.recovery;
 
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
 import java.io.File;
-import java.io.FileReader;
-import java.io.FileWriter;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FilenameFilter;
 import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
@@ -32,7 +34,11 @@
 import java.util.logging.Logger;
 
 import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
-import edu.uci.ics.asterix.transaction.management.service.logging.FileUtil;
+import edu.uci.ics.asterix.transaction.management.resource.ILocalResourceMetadata;
+import edu.uci.ics.asterix.transaction.management.resource.LSMBTreeLocalResourceMetadata;
+import edu.uci.ics.asterix.transaction.management.resource.LSMInvertedIndexLocalResourceMetadata;
+import edu.uci.ics.asterix.transaction.management.resource.LSMRTreeLocalResourceMetadata;
+import edu.uci.ics.asterix.transaction.management.resource.TransactionalResourceRepository;
 import edu.uci.ics.asterix.transaction.management.service.logging.IBuffer;
 import edu.uci.ics.asterix.transaction.management.service.logging.ILogCursor;
 import edu.uci.ics.asterix.transaction.management.service.logging.ILogFilter;
@@ -40,15 +46,22 @@
 import edu.uci.ics.asterix.transaction.management.service.logging.ILogRecordHelper;
 import edu.uci.ics.asterix.transaction.management.service.logging.IndexResourceManager;
 import edu.uci.ics.asterix.transaction.management.service.logging.LogManager;
-import edu.uci.ics.asterix.transaction.management.service.logging.LogRecordHelper;
 import edu.uci.ics.asterix.transaction.management.service.logging.LogType;
 import edu.uci.ics.asterix.transaction.management.service.logging.LogUtil;
 import edu.uci.ics.asterix.transaction.management.service.logging.LogicalLogLocator;
 import edu.uci.ics.asterix.transaction.management.service.logging.PhysicalLogLocator;
 import edu.uci.ics.asterix.transaction.management.service.transaction.IResourceManager;
+import edu.uci.ics.asterix.transaction.management.service.transaction.IResourceManager.ResourceType;
+import edu.uci.ics.asterix.transaction.management.service.transaction.ITransactionManager;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants;
+import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManager;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexLifecycleManager;
+import edu.uci.ics.hyracks.storage.common.file.ILocalResourceRepository;
+import edu.uci.ics.hyracks.storage.common.file.LocalResource;
 
 /**
  * This is the Recovery Manager and is responsible for rolling back a
@@ -66,102 +79,350 @@
      * A file at a known location that contains the LSN of the last log record
      * traversed doing a successful checkpoint.
      */
-    private static final String checkpoint_record_file = "last_checkpoint_lsn";
+    private static final String CHECKPOINT_FILENAME_PREFIX = "checkpoint_";
     private SystemState state;
-    private Map<Long, TransactionTableEntry> transactionTable;
-    private Map<Long, List<PhysicalLogLocator>> dirtyPagesTable;
 
     public RecoveryManager(TransactionSubsystem TransactionProvider) throws ACIDException {
         this.txnSubsystem = TransactionProvider;
-        try {
-            FileUtil.createFileIfNotExists(checkpoint_record_file);
-        } catch (IOException ioe) {
-            throw new ACIDException(" unable to create checkpoint record file " + checkpoint_record_file, ioe);
-        }
+        /**********
+         * try {
+         * FileUtil.createFileIfNotExists(checkpoint_record_file);
+         * } catch (IOException ioe) {
+         * throw new ACIDException(" unable to create checkpoint record file " + checkpoint_record_file, ioe);
+         * }
+         ***********/
     }
 
+    /**
+     * returns system state which could be one of the three states: HEALTHY, RECOVERING, CORRUPTED.
+     * This state information could be used in a case where more than one thread is running
+     * in the bootstrap process to provide higher availability. In other words, while the system
+     * is recovered, another thread may start a new transaction with understanding the side effect
+     * of the operation, or the system can be recovered concurrently. This kind of concurrency is
+     * not supported, yet.
+     */
     public SystemState getSystemState() throws ACIDException {
-        return state;
+
+        //#. read checkpoint file
+        CheckpointObject checkpointObject = null;
+        try {
+            checkpointObject = readCheckpoint();
+        } catch (FileNotFoundException e) {
+            //This is initial bootstrap. 
+            //Otherwise, the checkpoint file is deleted unfortunately. What we can do in this case?
+            state = SystemState.CORRUPTED;
+            new ACIDException("Checkpoint file doesn't exist", e);
+        }
+
+        //#. if checkpointLSN in the checkpoint file is equal to the lastLSN in the log file, 
+        //   then return healthy state. Otherwise, return corrupted.
+        LogManager logMgr = (LogManager) txnSubsystem.getLogManager();
+        if (checkpointObject.getCheckpointLSN() == logMgr.getCurrentLsn().get()) {
+            state = SystemState.HEALTHY;
+            return state;
+        } else {
+            state = SystemState.CORRUPTED;
+            return state;
+        }
     }
 
     private PhysicalLogLocator getBeginRecoveryLSN() throws ACIDException {
         return new PhysicalLogLocator(0, txnSubsystem.getLogManager());
     }
 
-    /**
-     * TODO:This method is currently not implemented completely.
-     */
-    public SystemState startRecovery(boolean synchronous) throws IOException, ACIDException {
-        ILogManager logManager = txnSubsystem.getLogManager();
-        state = SystemState.RECOVERING;
-        transactionTable = new HashMap<Long, TransactionTableEntry>();
-        dirtyPagesTable = new HashMap<Long, List<PhysicalLogLocator>>();
+    public void startRecovery(boolean synchronous) throws IOException, ACIDException {
 
-        PhysicalLogLocator beginLSN = getBeginRecoveryLSN();
-        ILogCursor cursor = logManager.readLog(beginLSN, new ILogFilter() {
+        state = SystemState.RECOVERING;
+
+        ILogManager logManager = txnSubsystem.getLogManager();
+        ILogRecordHelper logRecordHelper = logManager.getLogRecordHelper();
+        ITransactionManager txnManager = txnSubsystem.getTransactionManager();
+        TransactionalResourceRepository txnResourceRepository = txnSubsystem.getTransactionalResourceRepository();
+
+        //winnerTxnTable is used to add pairs, <committed TxnId, the most recent commit LSN of the TxnId>
+        Map<TxnId, Long> winnerTxnTable = new HashMap<TxnId, Long>();
+        TxnId tempKeyTxnId = new TxnId(-1, -1, -1);
+        byte logType;
+
+        //#. read checkpoint file and set lowWaterMark where anaylsis and redo start
+        CheckpointObject checkpointObject = readCheckpoint();
+        long lowWaterMarkLSN = checkpointObject.getMinMCTFirstLSN();
+        int maxJobId = checkpointObject.getMaxJobId();
+        int currentJobId;
+
+        //-------------------------------------------------------------------------
+        //  [ analysis phase ]
+        //  - collect all committed LSN 
+        //  - if there are duplicate commits for the same TxnId, 
+        //    keep only the mostRecentCommitLSN among the duplicates.
+        //-------------------------------------------------------------------------
+
+        //#. set log cursor to the lowWaterMarkLSN
+        ILogCursor logCursor = logManager.readLog(new PhysicalLogLocator(lowWaterMarkLSN, logManager),
+                new ILogFilter() {
+                    public boolean accept(IBuffer logs, long startOffset, int endOffset) {
+                        return true;
+                    }
+                });
+        LogicalLogLocator currentLogLocator = LogUtil.getDummyLogicalLogLocator(logManager);
+
+        //#. collect all committed txn's pairs,<TxnId, LSN>
+        while (logCursor.next(currentLogLocator)) {
+
+            if (LogManager.IS_DEBUG_MODE) {
+                System.out.println(logManager.getLogRecordHelper().getLogRecordForDisplay(currentLogLocator));
+            }
+
+            logType = logRecordHelper.getLogType(currentLogLocator);
+
+            //update max jobId
+            currentJobId = logRecordHelper.getJobId(currentLogLocator);
+            if (currentJobId > maxJobId) {
+                maxJobId = currentJobId;
+            }
+
+            switch (logType) {
+                case LogType.UPDATE:
+                    //do nothing                    
+                    break;
+
+                case LogType.COMMIT:
+                    tempKeyTxnId.setTxnId(logRecordHelper.getJobId(currentLogLocator),
+                            logRecordHelper.getDatasetId(currentLogLocator),
+                            logRecordHelper.getPKHashValue(currentLogLocator));
+                    winnerTxnTable.put(tempKeyTxnId, currentLogLocator.getLsn());
+                    break;
+
+                default:
+                    throw new ACIDException("Unsupported LogType: " + logType);
+            }
+        }
+
+        //-------------------------------------------------------------------------
+        //  [ redo phase ]
+        //  - redo if
+        //    1) The TxnId is committed --> gurantee durability
+        //      &&  
+        //    2) the currentLSN > maxDiskLastLSN of the index --> guarantee idempotance
+        //-------------------------------------------------------------------------
+
+        //#. set log cursor to the lowWaterMarkLSN again.
+        logCursor = logManager.readLog(new PhysicalLogLocator(lowWaterMarkLSN, logManager), new ILogFilter() {
             public boolean accept(IBuffer logs, long startOffset, int endOffset) {
                 return true;
             }
         });
-        LogicalLogLocator memLSN = new LogicalLogLocator(beginLSN.getLsn(), null, -1, logManager);
-        boolean logValidity = true;
-        LogRecordHelper parser = new LogRecordHelper(logManager);
-        try {
-            while (logValidity) {
-                logValidity = cursor.next(memLSN);
-                if (!logValidity) {
-                    if (LOGGER.isLoggable(Level.INFO)) {
-                        LOGGER.info("reached end of log !");
+        currentLogLocator = LogUtil.getDummyLogicalLogLocator(logManager);
+
+        long resourceId;
+        byte resourceMgrId;
+        long maxDiskLastLSN;
+        long currentLSN;
+        IIndex index = null;
+        LocalResource localResource = null;
+        ILocalResourceMetadata localResourceMetadata = null;
+
+        //#. get indexLifeCycleManager 
+        IAsterixAppRuntimeContextProvider appRuntimeContext = txnSubsystem.getAsterixAppRuntimeContextProvider();
+        IIndexLifecycleManager indexLifecycleManager = appRuntimeContext.getIndexLifecycleManager();
+        ILocalResourceRepository localResourceRepository = appRuntimeContext.getLocalResourceRepository();
+
+        //#. redo
+        while (logCursor.next(currentLogLocator)) {
+
+            if (LogManager.IS_DEBUG_MODE) {
+                System.out.println(logManager.getLogRecordHelper().getLogRecordForDisplay(currentLogLocator));
+            }
+
+            logType = logRecordHelper.getLogType(currentLogLocator);
+
+            switch (logType) {
+                case LogType.UPDATE:
+                    tempKeyTxnId.setTxnId(logRecordHelper.getJobId(currentLogLocator),
+                            logRecordHelper.getDatasetId(currentLogLocator),
+                            logRecordHelper.getPKHashValue(currentLogLocator));
+
+                    if (winnerTxnTable.containsKey(tempKeyTxnId)) {
+                        currentLSN = winnerTxnTable.get(tempKeyTxnId);
+                        resourceId = logRecordHelper.getResourceId(currentLogLocator);
+
+                        //get index instance from IndexLifeCycleManager
+                        //if index is not registered into IndexLifeCycleManager,
+                        //create the index using LocalMetadata stored in LocalResourceRepository
+                        index = indexLifecycleManager.getIndex(resourceId);
+                        if (index == null) {
+                            localResource = localResourceRepository.getResourceById(resourceId);
+                            localResourceMetadata = (ILocalResourceMetadata) localResource.getResourceObject();
+                            index = localResourceMetadata.createIndexInstance(appRuntimeContext,
+                                    localResource.getResourceName(), localResource.getPartition());
+                            indexLifecycleManager.register(resourceId, index);
+                        }
+
+                        /***************************************************/
+                        //TODO
+                        //get the right maxDiskLastLSN with the resourceId from the Zach. :) 
+                        maxDiskLastLSN = 100;
+                        /***************************************************/
+
+                        if (currentLSN > maxDiskLastLSN) {
+                            resourceMgrId = logRecordHelper.getResourceMgrId(currentLogLocator);
+
+                            // look up the repository to get the resource manager
+                            // register resourceMgr if it is not registered. 
+                            IResourceManager resourceMgr = txnSubsystem.getTransactionalResourceRepository()
+                                    .getTransactionalResourceMgr(resourceMgrId);
+                            if (resourceMgr == null) {
+                                resourceMgr = new IndexResourceManager(resourceMgrId, txnSubsystem);
+                                txnSubsystem.getTransactionalResourceRepository().registerTransactionalResourceManager(
+                                        resourceMgrId, resourceMgr);
+                            }
+                            
+                            //redo finally.
+                            resourceMgr.redo(logRecordHelper, currentLogLocator);
+                        }
                     }
                     break;
-                }
-                byte resourceMgrId = parser.getResourceMgrId(memLSN);
-                IResourceManager resourceMgr = txnSubsystem.getTransactionalResourceRepository()
-                        .getTransactionalResourceMgr(resourceMgrId);
-                //register resourceMgr if it is not registered.
-                if (resourceMgr == null) {
-                    resourceMgr = new IndexResourceManager(resourceMgrId, txnSubsystem);
-                    txnSubsystem.getTransactionalResourceRepository().registerTransactionalResourceManager(
-                            resourceMgrId, resourceMgr);
-                }
-                resourceMgr.redo(parser, memLSN);
 
-                writeCheckpointRecord(memLSN.getLsn());
+                case LogType.COMMIT:
+                    //do nothing
+                    break;
+
+                default:
+                    throw new ACIDException("Unsupported LogType: " + logType);
             }
-            state = SystemState.HEALTHY;
-        } catch (Exception e) {
-            state = SystemState.CORRUPTED;
-            throw new ACIDException(" could not recover , corrputed log !", e);
         }
-        return state;
     }
 
-    private void writeCheckpointRecord(long lsn) throws ACIDException {
+    @Override
+    public void checkpoint() throws ACIDException {
+
+        LogManager logMgr = (LogManager) txnSubsystem.getLogManager();
+        TransactionManager txnMgr = (TransactionManager) txnSubsystem.getTransactionManager();
+        String logDir = logMgr.getLogManagerProperties().getLogDir();
+
+        //#. get the filename of the previous checkpoint files which are about to be deleted 
+        //   right after the new checkpoint file is written.
+        File[] prevCheckpointFiles = getPreviousCheckpointFiles();
+
+        //#. create and store the checkpointObject into the new checkpoint file
+        //TODO
+        //put the correct minMCTFirstLSN by getting from Zach. :)
+        long minMCTFirstLSM = 0;
+        CheckpointObject checkpointObject = new CheckpointObject(logMgr.getCurrentLsn().get(), minMCTFirstLSM,
+                txnMgr.getMaxJobId(), System.currentTimeMillis());
+
+        FileOutputStream fos = null;
+        ObjectOutputStream oosToFos = null;
         try {
-            FileWriter writer = new FileWriter(new File(checkpoint_record_file));
-            BufferedWriter buffWriter = new BufferedWriter(writer);
-            buffWriter.write("" + lsn);
-            buffWriter.flush();
-        } catch (IOException ioe) {
-            throw new ACIDException(" unable to create check point record", ioe);
+            String fileName = getFileName(logDir, Long.toString(checkpointObject.getTimeStamp()));
+            fos = new FileOutputStream(fileName);
+            oosToFos = new ObjectOutputStream(fos);
+            oosToFos.writeObject(checkpointObject);
+            oosToFos.flush();
+        } catch (IOException e) {
+            throw new ACIDException("Failed to checkpoint", e);
+        } finally {
+            if (oosToFos != null) {
+                try {
+                    oosToFos.close();
+                } catch (IOException e) {
+                    throw new ACIDException("Failed to checkpoint", e);
+                }
+            }
+            if (oosToFos == null && fos != null) {
+                try {
+                    fos.close();
+                } catch (IOException e) {
+                    throw new ACIDException("Failed to checkpoint", e);
+                }
+            }
+        }
+
+        //#. delete the previous checkpoint files
+        if (prevCheckpointFiles != null) {
+            for (File file : prevCheckpointFiles) {
+                file.delete();
+            }
         }
     }
 
-    /*
-     * Currently this method is not used, but will be used as part of crash
-     * recovery logic.
-     */
-    private long getLastCheckpointRecordLSN() throws Exception {
-        FileReader reader;
-        BufferedReader buffReader;
-        String content = null;
-        reader = new FileReader(new File(checkpoint_record_file));
-        buffReader = new BufferedReader(reader);
-        content = buffReader.readLine();
-        if (content != null) {
-            return Long.parseLong(content);
+    private CheckpointObject readCheckpoint() throws ACIDException, FileNotFoundException {
+
+        CheckpointObject checkpointObject = null;
+
+        //#. read all checkpointObjects from the existing checkpoint files
+        File[] prevCheckpointFiles = getPreviousCheckpointFiles();
+
+        if (prevCheckpointFiles == null || prevCheckpointFiles.length == 0) {
+            throw new FileNotFoundException("Checkpoint file is not found");
         }
-        return -1;
+
+        List<CheckpointObject> checkpointObjectList = new ArrayList<CheckpointObject>();
+
+        for (File file : prevCheckpointFiles) {
+            FileInputStream fis = null;
+            ObjectInputStream oisFromFis = null;
+
+            try {
+                fis = new FileInputStream(file);
+                oisFromFis = new ObjectInputStream(fis);
+                checkpointObject = (CheckpointObject) oisFromFis.readObject();
+                checkpointObjectList.add(checkpointObject);
+            } catch (Exception e) {
+                throw new ACIDException("Failed to read a checkpoint file", e);
+            } finally {
+                if (oisFromFis != null) {
+                    try {
+                        oisFromFis.close();
+                    } catch (IOException e) {
+                        throw new ACIDException("Failed to read a checkpoint file", e);
+                    }
+                }
+                if (oisFromFis == null && fis != null) {
+                    try {
+                        fis.close();
+                    } catch (IOException e) {
+                        throw new ACIDException("Failed to read a checkpoint file", e);
+                    }
+                }
+            }
+        }
+
+        //#. sort checkpointObjects in descending order by timeStamp to find out the most recent one.
+        Collections.sort(checkpointObjectList);
+
+        //#. return the most recent one (the first one in sorted list)
+        return checkpointObjectList.get(0);
+    }
+
+    private File[] getPreviousCheckpointFiles() {
+        String logDir = txnSubsystem.getLogManager().getLogManagerProperties().getLogDir();
+
+        File parentDir = new File(logDir);
+
+        FilenameFilter filter = new FilenameFilter() {
+            @Override
+            public boolean accept(File dir, String name) {
+                if (name.contains(CHECKPOINT_FILENAME_PREFIX)) {
+                    return true;
+                } else {
+                    return false;
+                }
+            }
+        };
+
+        File[] prevCheckpointFiles = parentDir.listFiles(filter);
+
+        return prevCheckpointFiles;
+    }
+
+    private String getFileName(String baseDir, String suffix) {
+
+        if (!baseDir.endsWith(System.getProperty("file.separator"))) {
+            baseDir += System.getProperty("file.separator");
+        }
+
+        return baseDir + CHECKPOINT_FILENAME_PREFIX + suffix;
     }
 
     /**
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
index bafa753..9cd4104 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -16,6 +16,7 @@
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -31,6 +32,7 @@
     private static final Logger LOGGER = Logger.getLogger(TransactionManager.class.getName());
     private final TransactionSubsystem transactionProvider;
     private Map<JobId, TransactionContext> transactionContextRepository = new HashMap<JobId, TransactionContext>();
+    private AtomicInteger maxJobId = new AtomicInteger(0);
 
     public TransactionManager(TransactionSubsystem provider) {
         this.transactionProvider = provider;
@@ -64,6 +66,7 @@
 
     @Override
     public TransactionContext beginTransaction(JobId jobId) throws ACIDException {
+        setMaxJobId(jobId.getId());
         TransactionContext txnContext = new TransactionContext(jobId, transactionProvider);
         synchronized (this) {
             transactionContextRepository.put(jobId, txnContext);
@@ -73,6 +76,7 @@
 
     @Override
     public TransactionContext getTransactionContext(JobId jobId) throws ACIDException {
+        setMaxJobId(jobId.getId());
         synchronized (transactionContextRepository) {
 
             TransactionContext context = transactionContextRepository.get(jobId);
@@ -142,4 +146,12 @@
     public TransactionSubsystem getTransactionProvider() {
         return transactionProvider;
     }
+    
+    public void setMaxJobId(int jobId) {
+        maxJobId.set(Math.max(maxJobId.get(), jobId));
+    }
+    
+    public int getMaxJobId() {
+        return maxJobId.get();
+    }
 }
diff --git a/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/logging/test/RecoverySimulator.java b/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/logging/test/RecoverySimulator.java
index a7bd24c..54ec036 100644
--- a/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/logging/test/RecoverySimulator.java
+++ b/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/logging/test/RecoverySimulator.java
@@ -24,8 +24,8 @@
 
     private static IRecoveryManager recoveryManager;
 
-    public static IRecoveryManager.SystemState startRecovery() throws IOException, ACIDException {
-        return recoveryManager.startRecovery(true);
+    public static void startRecovery() throws IOException, ACIDException {
+        recoveryManager.startRecovery(true);
     }
 
     public static void main(String args[]) throws IOException, ACIDException {
diff --git a/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/test/TransactionSimulator.java b/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/test/TransactionSimulator.java
index 3cc8f66..092f2ed 100644
--- a/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/test/TransactionSimulator.java
+++ b/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/test/TransactionSimulator.java
@@ -95,10 +95,9 @@
         transactionManager.commitTransaction(context, new DatasetId(-1), -1);
     }
 
-    public SystemState recover() throws ACIDException, IOException {
-        SystemState state = recoveryManager.startRecovery(true);
+    public void recover() throws ACIDException, IOException {
+        recoveryManager.startRecovery(true);
         ((FileResource) resource).sync();
-        return state;
     }
 
     /**
@@ -125,8 +124,7 @@
         }
 
         int finalExpectedValue = existingValue + schedule.getDeltaChange();
-        SystemState state = txnSimulator.recover();
-        System.out.println(" State is " + state);
+        txnSimulator.recover();
         boolean isCorrect = ((FileResource) resource).checkIfValueInSync(finalExpectedValue);
         System.out.println(" Did recovery happen correctly " + isCorrect);
     }