Added flush logs and enabled non-sharp checkpoints

The following commits from your working branch will be included:

commit c11a22cfd916041c806409c6d8e66dd6ea162b7b
Author: hubailmor@gmail.com <mhubail@uci.edu>
Date:   Tue Nov 18 23:12:19 2014 -0800

    Defined Startup LSN

commit 95a8752072e53ef62a11c39799f296209a3c622c
Author: hubailmor@gmail.com <mhubail@uci.edu>
Date:   Tue Nov 18 20:22:08 2014 -0800

    Revised Checkpoint Logic

commit ea4baa22089ba5d3eaff3d5c787828efcbff8ad9
Author: hubailmor@gmail.com <mhubail@uci.edu>
Date:   Thu Nov 13 10:25:45 2014 -0800

    added comments

commit 8991554f840f50d800a619fc2ac327f1c024e838
Author: hubailmor@gmail.com <mhubail@uci.edu>
Date:   Wed Nov 12 23:34:25 2014 -0800

    Added a method to the interface

commit a31e36b0b8234fe876b42e0cdc4543e4e73314a1
Author: hubailmor@gmail.com <mhubail@uci.edu>
Date:   Wed Nov 12 22:57:51 2014 -0800

    Refactored getMinFirstLSN code

commit 25f62d585b2f279e78711efd2d327c777919f62c
Author: hubailmor@gmail.com <mhubail@uci.edu>
Date:   Tue Nov 11 14:05:34 2014 -0800

    Added flush logs

Change-Id: I13fb61c04b6b510b6e8af68d2a0e7e14859f519d
Reviewed-on: http://fulliautomatix.ics.uci.edu:8443/182
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Young-Seok Kim <kisskys@gmail.com>
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java
index 32bfa58..af83b83 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java
@@ -23,7 +23,7 @@
 import edu.uci.ics.asterix.common.transactions.ITransactionContext;
 import edu.uci.ics.asterix.common.transactions.ITransactionManager;
 import edu.uci.ics.asterix.common.transactions.JobId;
-import edu.uci.ics.asterix.transaction.management.service.logging.LogRecord;
+import edu.uci.ics.asterix.common.transactions.LogRecord;
 import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
index 8c6ee8f..6409a13 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
@@ -38,6 +38,7 @@
 import edu.uci.ics.asterix.metadata.feeds.FeedManager;
 import edu.uci.ics.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import edu.uci.ics.asterix.transaction.management.resource.PersistentLocalResourceRepositoryFactory;
+import edu.uci.ics.asterix.transaction.management.service.logging.LogManager;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
 import edu.uci.ics.hyracks.api.application.INCApplicationContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -135,12 +136,15 @@
         localResourceRepository = (PersistentLocalResourceRepository) persistentLocalResourceRepositoryFactory
                 .createRepository();
         resourceIdFactory = (new ResourceIdFactoryProvider(localResourceRepository)).createResourceIdFactory();
-        indexLifecycleManager = new DatasetLifecycleManager(storageProperties, localResourceRepository,
-                MetadataPrimaryIndexes.FIRST_AVAILABLE_USER_DATASET_ID);
+
         IAsterixAppRuntimeContextProvider asterixAppRuntimeContextProvider = new AsterixAppRuntimeContextProdiverForRecovery(
                 this);
         txnSubsystem = new TransactionSubsystem(ncApplicationContext.getNodeId(), asterixAppRuntimeContextProvider,
                 txnProperties);
+        
+        indexLifecycleManager = new DatasetLifecycleManager(storageProperties, localResourceRepository,
+                MetadataPrimaryIndexes.FIRST_AVAILABLE_USER_DATASET_ID,(LogManager)txnSubsystem.getLogManager());
+        
         isShuttingdown = false;
 
         feedManager = new FeedManager(ncApplicationContext.getNodeId());
@@ -148,9 +152,9 @@
         // The order of registration is important. The buffer cache must registered before recovery and transaction managers.
         ILifeCycleComponentManager lccm = ncApplicationContext.getLifeCycleComponentManager();
         lccm.register((ILifeCycleComponent) bufferCache);
-        lccm.register((ILifeCycleComponent) indexLifecycleManager);
         lccm.register((ILifeCycleComponent) txnSubsystem.getTransactionManager());
         lccm.register((ILifeCycleComponent) txnSubsystem.getLogManager());
+        lccm.register((ILifeCycleComponent) indexLifecycleManager);
         lccm.register((ILifeCycleComponent) txnSubsystem.getLockManager());
         lccm.register((ILifeCycleComponent) txnSubsystem.getRecoveryManager());
     }
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index 1f0c91d..295fac7 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -51,7 +51,8 @@
     private boolean isMetadataNode = false;
     private boolean stopInitiated = false;
     private SystemState systemState = SystemState.NEW_UNIVERSE;
-
+    private final long NON_SHARP_CHECKPOINT_TARGET_LSN = -1;
+    
     @Override
     public void start(INCApplicationContext ncAppCtx, String[] args) throws Exception {
         ncAppCtx.setThreadFactory(new AsterixThreadFactory(ncAppCtx.getLifeCycleComponentManager()));
@@ -101,7 +102,7 @@
             }
 
             IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager();
-            recoveryMgr.checkpoint(true);
+            recoveryMgr.checkpoint(true, NON_SHARP_CHECKPOINT_TARGET_LSN);
 
             if (isMetadataNode) {
                 MetadataBootstrap.stopUniverse();
@@ -183,7 +184,7 @@
         lccm.startAll();
 
         IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager();
-        recoveryMgr.checkpoint(true);
+        recoveryMgr.checkpoint(true, NON_SHARP_CHECKPOINT_TARGET_LSN);
 
         if (isMetadataNode) {
             IMetadataNode stub = null;
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java
index 63f4c3c..379a7eb 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java
@@ -23,8 +23,13 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+
 import edu.uci.ics.asterix.common.api.ILocalResourceMetadata;
 import edu.uci.ics.asterix.common.config.AsterixStorageProperties;
+import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
+import edu.uci.ics.asterix.common.transactions.ILogManager;
+import edu.uci.ics.asterix.common.transactions.LogRecord;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
@@ -34,6 +39,7 @@
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.MultitenantVirtualBufferCache;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.VirtualBufferCache;
 import edu.uci.ics.hyracks.storage.common.buffercache.HeapBufferAllocator;
@@ -49,9 +55,12 @@
     private final int firstAvilableUserDatasetID;
     private final long capacity;
     private long used;
+    private final ILogManager logManager;
+    private LogRecord logRecord;
 
     public DatasetLifecycleManager(AsterixStorageProperties storageProperties,
-            ILocalResourceRepository resourceRepository, int firstAvilableUserDatasetID) {
+            ILocalResourceRepository resourceRepository, int firstAvilableUserDatasetID, ILogManager logManager) {
+        this.logManager = logManager;
         this.storageProperties = storageProperties;
         this.resourceRepository = resourceRepository;
         this.firstAvilableUserDatasetID = firstAvilableUserDatasetID;
@@ -60,6 +69,7 @@
         datasetInfos = new HashMap<Integer, DatasetInfo>();
         capacity = storageProperties.getMemoryComponentGlobalBudget();
         used = 0;
+        logRecord = new LogRecord();
     }
 
     @Override
@@ -80,7 +90,7 @@
         int did = getDIDfromRID(resourceID);
         DatasetInfo dsInfo = datasetInfos.get(did);
         if (dsInfo == null) {
-            dsInfo = new DatasetInfo(did,!index.hasMemoryComponents());
+            dsInfo = new DatasetInfo(did, !index.hasMemoryComponents());
         } else if (dsInfo.indexes.containsKey(resourceID)) {
             throw new HyracksDataException("Index with resource ID " + resourceID + " already exists.");
         }
@@ -286,7 +296,7 @@
         synchronized (datasetOpTrackers) {
             ILSMOperationTracker opTracker = datasetOpTrackers.get(datasetID);
             if (opTracker == null) {
-                opTracker = new PrimaryIndexOperationTracker(this, datasetID);
+                opTracker = new PrimaryIndexOperationTracker(this, datasetID, logManager);
                 datasetOpTrackers.put(datasetID, opTracker);
             }
             return opTracker;
@@ -338,7 +348,7 @@
         private final int datasetID;
         private long lastAccess;
         private int numActiveIOOps;
-        private final boolean isExternal; 
+        private final boolean isExternal;
 
         public DatasetInfo(int datasetID, boolean isExternal) {
             this.indexes = new HashMap<Long, IndexInfo>();
@@ -410,6 +420,81 @@
         used = 0;
     }
 
+    public synchronized void flushAllDatasets() throws HyracksDataException {
+        for (DatasetInfo dsInfo : datasetInfos.values()) {
+            flushDatasetOpenIndexes(dsInfo, false);
+        }
+    }
+
+    public synchronized void scheduleAsyncFlushForLaggingDatasets(long targetLSN) throws HyracksDataException {
+
+        List<DatasetInfo> laggingDatasets = new ArrayList<DatasetInfo>();
+        long firstLSN;
+        //find dataset with min lsn < targetLSN
+        for (DatasetInfo dsInfo : datasetInfos.values()) {
+            for (IndexInfo iInfo : dsInfo.indexes.values()) {
+                AbstractLSMIOOperationCallback ioCallback = (AbstractLSMIOOperationCallback) ((ILSMIndex) iInfo.index)
+                        .getIOOperationCallback();
+                if (!((AbstractLSMIndex) iInfo.index).isCurrentMutableComponentEmpty() || ioCallback.hasPendingFlush()) {
+                    firstLSN = ioCallback.getFirstLSN();
+
+                    if (firstLSN < targetLSN) {
+                        laggingDatasets.add(dsInfo);
+                        break;
+                    }
+                }
+            }
+        }
+
+        //schedule a sync flush
+        for (DatasetInfo dsInfo : laggingDatasets) {
+            flushDatasetOpenIndexes(dsInfo, true);
+        }
+
+    }
+
+    private void flushDatasetOpenIndexes(DatasetInfo dsInfo, boolean asyncFlush) throws HyracksDataException {
+        if (!dsInfo.isExternal) {
+            synchronized (logRecord) {
+                logRecord.formFlushLogRecord(dsInfo.datasetID, null);
+                try {
+                    logManager.log(logRecord);
+                } catch (ACIDException e) {
+                    throw new HyracksDataException("could not write flush log while closing dataset", e);
+                }
+                try {
+                    logRecord.wait();
+                } catch (InterruptedException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+            for (IndexInfo iInfo : dsInfo.indexes.values()) {
+                //update resource lsn
+                AbstractLSMIOOperationCallback ioOpCallback = (AbstractLSMIOOperationCallback) iInfo.index
+                        .getIOOperationCallback();
+                ioOpCallback.updateLastLSN(logRecord.getLSN());
+            }
+        }
+
+        if (asyncFlush) {
+
+            for (IndexInfo iInfo : dsInfo.indexes.values()) {
+                ILSMIndexAccessor accessor = (ILSMIndexAccessor) iInfo.index.createAccessor(
+                        NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+                accessor.scheduleFlush(iInfo.index.getIOOperationCallback());
+            }
+        } else {
+
+            for (IndexInfo iInfo : dsInfo.indexes.values()) {
+                // TODO: This is not efficient since we flush the indexes sequentially. 
+                // Think of a way to allow submitting the flush requests concurrently. We don't do them concurrently because this
+                // may lead to a deadlock scenario between the DatasetLifeCycleManager and the PrimaryIndexOperationTracker.
+
+                flushAndWaitForIO(dsInfo, iInfo);
+            }
+        }
+    }
+
     private void closeDataset(DatasetInfo dsInfo) throws HyracksDataException {
         // First wait for any ongoing IO operations
         while (dsInfo.numActiveIOOps > 0) {
@@ -420,11 +505,10 @@
             }
         }
 
-        for (IndexInfo iInfo : dsInfo.indexes.values()) {
-            // TODO: This is not efficient since we flush the indexes sequentially. 
-            // Think of a way to allow submitting the flush requests concurrently. We don't do them concurrently because this
-            // may lead to a deadlock scenario between the DatasetLifeCycleManager and the PrimaryIndexOperationTracker.
-            flushAndWaitForIO(dsInfo, iInfo);
+        try {
+            flushDatasetOpenIndexes(dsInfo, false);
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
         }
 
         for (IndexInfo iInfo : dsInfo.indexes.values()) {
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java
index ef58937..5f15bf3 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -18,7 +18,11 @@
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
 import edu.uci.ics.asterix.common.transactions.AbstractOperationCallback;
+import edu.uci.ics.asterix.common.transactions.ILogManager;
+import edu.uci.ics.asterix.common.transactions.LogRecord;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
 import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
@@ -32,9 +36,12 @@
 
     // Number of active operations on an ILSMIndex instance.
     private final AtomicInteger numActiveOperations;
+    private final ILogManager logManager;
 
-    public PrimaryIndexOperationTracker(DatasetLifecycleManager datasetLifecycleManager, int datasetID) {
+    public PrimaryIndexOperationTracker(DatasetLifecycleManager datasetLifecycleManager, int datasetID,
+            ILogManager logManager) {
         super(datasetLifecycleManager, datasetID);
+        this.logManager = logManager;
         this.numActiveOperations = new AtomicInteger();
     }
 
@@ -62,7 +69,9 @@
             IModificationOperationCallback modificationCallback) throws HyracksDataException {
         if (opType == LSMOperationType.MODIFICATION || opType == LSMOperationType.FORCE_MODIFICATION) {
             decrementNumActiveOperations(modificationCallback);
-            flushIfFull();
+            if (numActiveOperations.get() == 0) {
+                flushIfFull();
+            }
         } else if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE) {
             datasetLifecycleManager.undeclareActiveIOOperation(datasetID);
         }
@@ -71,6 +80,7 @@
     private void flushIfFull() throws HyracksDataException {
         // If we need a flush, and this is the last completing operation, then schedule the flush. 
         // TODO: Is there a better way to check if we need to flush instead of communicating with the datasetLifecycleManager each time?
+        // Maybe we could keep the set of indexes here instead of consulting the datasetLifecycleManager each time?
         Set<ILSMIndex> indexes = datasetLifecycleManager.getDatasetIndexes(datasetID);
         boolean needsFlush = false;
         for (ILSMIndex lsmIndex : indexes) {
@@ -79,13 +89,34 @@
                 break;
             }
         }
+
+        if (needsFlush) {
+
+            LogRecord logRecord = new LogRecord();
+            logRecord.formFlushLogRecord(datasetID, this);
+            try {
+                logManager.log(logRecord);
+            } catch (ACIDException e) {
+                throw new HyracksDataException("could not write flush log", e);
+            }
+        }
+    }
+
+    public void triggerScheduleFlush(LogRecord logRecord) throws HyracksDataException {
+        // why synchronized ??
         synchronized (this) {
-            if (needsFlush && numActiveOperations.get() == 0) {
-                for (ILSMIndex lsmIndex : indexes) {
-                    ILSMIndexAccessor accessor = (ILSMIndexAccessor) lsmIndex.createAccessor(
-                            NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
-                    accessor.scheduleFlush(lsmIndex.getIOOperationCallback());
-                }
+            for (ILSMIndex lsmIndex : datasetLifecycleManager.getDatasetIndexes(datasetID)) {
+                //get resource
+                ILSMIndexAccessor accessor = (ILSMIndexAccessor) lsmIndex.createAccessor(
+                        NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+
+                //update resource lsn
+                AbstractLSMIOOperationCallback ioOpCallback = (AbstractLSMIOOperationCallback) lsmIndex
+                        .getIOOperationCallback();
+                ioOpCallback.updateLastLSN(logRecord.getLSN());
+
+                //schedule flush after update
+                accessor.scheduleFlush(lsmIndex.getIOOperationCallback());
             }
         }
     }
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
index 26a79c3..4673266 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
@@ -16,14 +16,23 @@
 
 import java.nio.ByteBuffer;
 
+import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
+import edu.uci.ics.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
+import edu.uci.ics.asterix.common.transactions.ILogManager;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.ITupleFilterFactory;
 import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import edu.uci.ics.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
 
 public class AsterixLSMInsertDeleteOperatorNodePushable extends LSMIndexInsertUpdateDeleteOperatorNodePushable {
 
@@ -37,6 +46,43 @@
     }
 
     @Override
+    public void open() throws HyracksDataException {
+        RecordDescriptor inputRecDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
+        accessor = new FrameTupleAccessor(ctx.getFrameSize(), inputRecDesc);
+        writeBuffer = ctx.allocateFrame();
+        writer.open();
+        indexHelper.open();
+        AbstractLSMIndex lsmIndex = (AbstractLSMIndex) indexHelper.getIndexInstance();
+        try {
+            modCallback = opDesc.getModificationOpCallbackFactory().createModificationOperationCallback(
+                    indexHelper.getResourceID(), lsmIndex, ctx);
+            indexAccessor = lsmIndex.createAccessor(modCallback, NoOpOperationCallback.INSTANCE);
+            ITupleFilterFactory tupleFilterFactory = opDesc.getTupleFilterFactory();
+            if (tupleFilterFactory != null) {
+                tupleFilter = tupleFilterFactory.createTupleFilter(indexHelper.getTaskContext());
+                frameTuple = new FrameTupleReference();
+            }
+            // If the index has an empty memory component, we need to set its first LSN (For soft checkpoint)
+            if (lsmIndex.isCurrentMutableComponentEmpty()) {
+                //prevent transactions from incorrectly setting the first LSN on a modified component
+                synchronized (lsmIndex.getOperationTracker()) {
+                    if (lsmIndex.isCurrentMutableComponentEmpty()) {
+                        AbstractLSMIOOperationCallback ioOpCallback = (AbstractLSMIOOperationCallback) lsmIndex
+                                .getIOOperationCallback();
+                        IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
+                                .getApplicationContext().getApplicationObject();
+                        ILogManager logManager = runtimeCtx.getTransactionSubsystem().getLogManager();
+                        ioOpCallback.setFirstLSN(logManager.getAppendLSN());
+                    }
+                }
+            }
+        } catch (Exception e) {
+            indexHelper.close();
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
         boolean first = true;
         accessor.reset(buffer);
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
index d22280e..ec56f3d 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
@@ -17,6 +17,7 @@
 
 import java.util.List;
 
+import edu.uci.ics.asterix.common.exceptions.AsterixException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrame;
@@ -27,21 +28,26 @@
 import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
 import edu.uci.ics.hyracks.storage.common.file.BufferedFileHandle;
 
+// A single LSMIOOperationCallback per LSM index used to perform actions around Flush and Merge operations
 public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationCallback {
 
-    protected long firstLSN;
-    protected long lastLSN;
-    protected long[] immutableLastLSNs;
+    // First LSN per mutable component
+    protected long[] firstLSNs;
+    // A boolean array to keep track of flush operations
+    protected boolean[] flushRequested;
+    // I think this was meant to be mutableLastLSNs
+    // protected long[] immutableLastLSNs;
+    protected long[] mutableLastLSNs;
+    // Index of the currently flushing or next to be flushed component
     protected int readIndex;
+    // Index of the currently being written to component
     protected int writeIndex;
 
-    public AbstractLSMIOOperationCallback() {
-        resetLSNs();
-    }
-
     @Override
     public void setNumOfMutableComponents(int count) {
-        immutableLastLSNs = new long[count];
+        mutableLastLSNs = new long[count];
+        firstLSNs = new long[count];
+        flushRequested = new boolean[count];
         readIndex = 0;
         writeIndex = 0;
     }
@@ -49,17 +55,36 @@
     @Override
     public void beforeOperation(LSMOperationType opType) {
         if (opType == LSMOperationType.FLUSH) {
+            /*
+             * This method was called on the scheduleFlush operation.
+             * We set the lastLSN to the last LSN for the index (the LSN for the flush log)
+             * We mark the component flushing flag
+             * We then move the write pointer to the next component and sets its first LSN to the flush log LSN
+             */
             synchronized (this) {
-                immutableLastLSNs[writeIndex] = lastLSN;
-                writeIndex = (writeIndex + 1) % immutableLastLSNs.length;
-                resetLSNs();
+                flushRequested[writeIndex] = true;
+                writeIndex = (writeIndex + 1) % mutableLastLSNs.length;
+                // Set the firstLSN of the next component unless it is being flushed
+                if (writeIndex != readIndex) {
+                    firstLSNs[writeIndex] = mutableLastLSNs[writeIndex];
+                }
             }
         }
     }
 
     @Override
     public void afterFinalize(LSMOperationType opType, ILSMComponent newComponent) {
-        // Do nothing.
+        // The operation was complete and the next I/O operation for the LSM index didn't start yet
+        if (opType == LSMOperationType.FLUSH && newComponent != null) {
+            synchronized (this) {
+                flushRequested[readIndex] = false;
+                // if the component which just finished flushing is the component that will be modified next, we set its first LSN to its previous LSN
+                if (readIndex == writeIndex) {
+                    firstLSNs[writeIndex] = mutableLastLSNs[writeIndex];
+                }
+                readIndex = (readIndex + 1) % mutableLastLSNs.length;
+            }
+        }
     }
 
     public abstract long getComponentLSN(List<ILSMComponent> oldComponents) throws HyracksDataException;
@@ -98,24 +123,28 @@
         }
     }
 
-    protected void resetLSNs() {
-        firstLSN = -1;
-        lastLSN = -1;
-    }
-
     public void updateLastLSN(long lastLSN) {
-        if (firstLSN == -1) {
-            firstLSN = lastLSN;
+        mutableLastLSNs[writeIndex] = lastLSN;
+    }
+
+    public void setFirstLSN(long firstLSN) throws AsterixException {
+        // We make sure that this method is only called on an empty component so the first LSN is not set incorrectly
+          firstLSNs[writeIndex] = firstLSN;
+    }
+
+    public synchronized long getFirstLSN() {
+        // We make sure that this method is only called on a non-empty component so the returned LSN is meaningful
+        // The firstLSN is always the lsn of the currently being flushed component or the next to be flushed when no flush operation is on going
+        return firstLSNs[readIndex];
+    }
+
+    public synchronized boolean hasPendingFlush(){
+
+        for(int i=0; i<flushRequested.length; i++){
+            if(flushRequested[i]){
+                return true;
+            }
         }
-        this.lastLSN = Math.max(this.lastLSN, lastLSN);
+        return false;
     }
-
-    public long getFirstLSN() {
-        return firstLSN;
-    }
-
-    public long getLastLSN() {
-        return lastLSN;
-    }
-
 }
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
index 8e9b44e..5d21653 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
@@ -41,10 +41,10 @@
     @Override
     public long getComponentLSN(List<ILSMComponent> diskComponents) throws HyracksDataException {
         if (diskComponents == null) {
-            // Implies a flush IO operation.
+            // Implies a flush IO operation. --> moves the flush pointer
+            // Flush operation of an LSM index are executed sequentially.
             synchronized (this) {
-                long lsn = immutableLastLSNs[readIndex];
-                readIndex = (readIndex + 1) % immutableLastLSNs.length;
+                long lsn = mutableLastLSNs[readIndex];
                 return lsn;
             }
         }
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java
index 57f8658..dbad7ef 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java
@@ -36,10 +36,9 @@
     @Override
     public long getComponentLSN(List<ILSMComponent> diskComponents) throws HyracksDataException {
         if (diskComponents == null) {
-            // Implies a flush IO operation.
+            // Implies a flush IO operation <Will never happen currently as Btree with buddy btree is only used with external datasets>
             synchronized (this) {
-                long lsn = immutableLastLSNs[readIndex];
-                readIndex = (readIndex + 1) % immutableLastLSNs.length;
+                long lsn = mutableLastLSNs[readIndex];
                 return lsn;
             }
         }
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
index 5532f97..62b928e 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
@@ -42,8 +42,7 @@
         if (diskComponents == null) {
             // Implies a flush IO operation.
             synchronized (this) {
-                long lsn = immutableLastLSNs[readIndex];
-                readIndex = (readIndex + 1) % immutableLastLSNs.length;
+                long lsn = mutableLastLSNs[readIndex];
                 return lsn;
             }
         }
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
index 1497e17..308460e 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
@@ -43,8 +43,7 @@
         if (diskComponents == null) {
             // Implies a flush IO operation.
             synchronized (this) {
-                long lsn = immutableLastLSNs[readIndex];
-                readIndex = (readIndex + 1) % immutableLastLSNs.length;
+                long lsn = mutableLastLSNs[readIndex];
                 return lsn;
             }
         }
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogManager.java
index 27a91a4..69155ac 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogManager.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogManager.java
@@ -18,8 +18,36 @@
 
 public interface ILogManager {
 
+    /**
+     * Submits a logRecord to log Manager which appends it to the log tail
+     * @param logRecord
+     * @throws ACIDException
+     */
     public void log(ILogRecord logRecord) throws ACIDException;
 
+    /**
+     * 
+     * @param isRecoveryMode
+     * @returnLogReader instance which enables reading the log files
+     */
     public ILogReader getLogReader(boolean isRecoveryMode);
+    
+    /**
+     * 
+     * @return the last LSN the log manager used
+     */
+    public long getAppendLSN(); 
+    
+    /**
+     * Deletes all log partitions which have a maximum LSN less than checkpointLSN
+     * @param checkpointLSN
+     */
+    public void deleteOldLogFiles(long checkpointLSN);
+    
+    /**
+     * 
+     * @return the smallest readable LSN on the current log partitions
+     */
+    public long getReadableSmallestLSN();
 
 }
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogRecord.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogRecord.java
index b9baa3b..89aabd7 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogRecord.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogRecord.java
@@ -23,7 +23,8 @@
     public static final int JOB_TERMINATE_LOG_SIZE = 13; //JOB_COMMIT or ABORT log type
     public static final int ENTITY_COMMIT_LOG_BASE_SIZE = 25;
     public static final int UPDATE_LOG_BASE_SIZE = 54;
-
+    public static final int FLUSH_LOG_SIZE = 17;
+    
     public boolean readLogRecord(ByteBuffer buffer);
 
     public void writeLogRecord(ByteBuffer buffer);
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/IRecoveryManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/IRecoveryManager.java
index 6af5ebf..9c6d0b4 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/IRecoveryManager.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/IRecoveryManager.java
@@ -75,5 +75,15 @@
      */
     public void rollbackTransaction(ITransactionContext txnContext) throws ACIDException;
 
-    public void checkpoint(boolean isSharpCheckpoint) throws ACIDException, HyracksDataException;
+    /**
+     * Makes a system checkpoint.
+     * @param isSharpCheckpoint a flag indicating whether to perform a sharp or non-sharp checkpoint.
+     * @param nonSharpCheckpointTargetLSN if a non-sharp checkpoint to be performed, what is the minimum LSN it should target.
+     * @return the LSN at which the checkpoint was performed.
+     * @throws ACIDException
+     * @throws HyracksDataException
+     */
+    public long checkpoint(boolean isSharpCheckpoint, long nonSharpCheckpointTargetLSN) throws ACIDException, HyracksDataException;
+    
+    
 }
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionContext.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionContext.java
index dc33e69..56bb3e9 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionContext.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionContext.java
@@ -35,7 +35,7 @@
 
     public long getLastLSN();
 
-    public void setLastLSN(long resourceId, long LSN);
+    public void setLastLSN(long LSN);
 
     public boolean isWriteTxn();
 
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionSubsystem.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionSubsystem.java
index 9b1cbb6..6eb01c9 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionSubsystem.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionSubsystem.java
@@ -28,4 +28,5 @@
     public IAsterixAppRuntimeContextProvider getAsterixAppRuntimeContextProvider();
 
     public String getId();
+    
 }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecord.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/LogRecord.java
similarity index 86%
rename from asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecord.java
rename to asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/LogRecord.java
index 1337e33..2a53712 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecord.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/LogRecord.java
@@ -12,18 +12,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.asterix.transaction.management.service.logging;
+package edu.uci.ics.asterix.common.transactions;
 
 import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.zip.CRC32;
 
-import edu.uci.ics.asterix.common.transactions.ILogRecord;
-import edu.uci.ics.asterix.common.transactions.IRecoveryManager.ResourceType;
-import edu.uci.ics.asterix.common.transactions.ITransactionContext;
+import edu.uci.ics.asterix.common.context.PrimaryIndexOperationTracker;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
-import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
 import edu.uci.ics.hyracks.storage.am.common.tuples.SimpleTupleReference;
 import edu.uci.ics.hyracks.storage.am.common.tuples.SimpleTupleWriter;
 
@@ -59,6 +56,7 @@
  * 2) ENTITY_COMMIT: 25 + PKSize (5 + 12 + PKSize + 8)
  *    --> ENTITY_COMMIT_LOG_BASE_SIZE = 25
  * 3) UPDATE: 54 + PKValueSize + NewValueSize (5 + 12 + PKValueSize + 20 + 9 + NewValueSize + 8)
+ * 4) FLUSH: 5 + 8 + DatasetId(4)
  *    --> UPDATE_LOG_BASE_SIZE = 54
  */
 public class LogRecord implements ILogRecord {
@@ -90,6 +88,7 @@
     private final SimpleTupleReference readNewValue;
     private final CRC32 checksumGen;
     private int[] PKFields;
+    private PrimaryIndexOperationTracker opTracker;
 
     public LogRecord() {
         isFlushed = new AtomicBoolean(false);
@@ -122,6 +121,11 @@
             buffer.putInt(newValueSize);
             writeTuple(buffer, newValue, newValueSize);
         }
+        
+        if (logType == LogType.FLUSH) {
+            buffer.putInt(datasetId);
+        }
+        
         checksum = generateChecksum(buffer, beginOffset, logSize - CHECKSUM_SIZE);
         buffer.putLong(checksum);
     }
@@ -151,29 +155,38 @@
         try {
             logType = buffer.get();
             jobId = buffer.getInt();
-            if (logType == LogType.JOB_COMMIT || logType == LogType.ABORT) {
-                datasetId = -1;
-                PKHashValue = -1;
-            } else {
-                datasetId = buffer.getInt();
-                PKHashValue = buffer.getInt();
-                PKValueSize = buffer.getInt();
-                if (PKValueSize <= 0) {
-                    throw new IllegalStateException("Primary Key Size is less than or equal to 0");
+            if(logType != LogType.FLUSH)
+            {
+                if (logType == LogType.JOB_COMMIT || logType == LogType.ABORT) {
+                    datasetId = -1;
+                    PKHashValue = -1;
+                } else {
+                    datasetId = buffer.getInt();
+                    PKHashValue = buffer.getInt();
+                    PKValueSize = buffer.getInt();
+                    if (PKValueSize <= 0) {
+                        throw new IllegalStateException("Primary Key Size is less than or equal to 0");
+                    }
+                    PKValue = readPKValue(buffer);
                 }
-                PKValue = readPKValue(buffer);
+                if (logType == LogType.UPDATE) {
+                    prevLSN = buffer.getLong();
+                    resourceId = buffer.getLong();
+                    logSize = buffer.getInt();
+                    fieldCnt = buffer.getInt();
+                    newOp = buffer.get();
+                    newValueSize = buffer.getInt();
+                    newValue = readTuple(buffer, readNewValue, fieldCnt, newValueSize);
+                } else {
+                    computeAndSetLogSize();
+                }
             }
-            if (logType == LogType.UPDATE) {
-                prevLSN = buffer.getLong();
-                resourceId = buffer.getLong();
-                logSize = buffer.getInt();
-                fieldCnt = buffer.getInt();
-                newOp = buffer.get();
-                newValueSize = buffer.getInt();
-                newValue = readTuple(buffer, readNewValue, fieldCnt, newValueSize);
-            } else {
+            else{
                 computeAndSetLogSize();
+                datasetId = buffer.getInt();
+                resourceId = 0l;
             }
+            
             checksum = buffer.getLong();
             if (checksum != generateChecksum(buffer, beginOffset, logSize - CHECKSUM_SIZE)) {
                 throw new IllegalStateException();
@@ -213,6 +226,14 @@
         this.PKHashValue = -1;
         computeAndSetLogSize();
     }
+    
+    public void formFlushLogRecord(int datasetId, PrimaryIndexOperationTracker opTracker) {
+        this.logType = LogType.FLUSH;
+        this.jobId = -1;
+        this.datasetId = datasetId;
+        this.opTracker = opTracker;
+        computeAndSetLogSize();
+    }
 
     @Override
     public void formEntityCommitLogRecord(ITransactionContext txnCtx, int datasetId, int PKHashValue,
@@ -255,6 +276,9 @@
             case LogType.ENTITY_COMMIT:
                 logSize = ENTITY_COMMIT_LOG_BASE_SIZE + PKValueSize;
                 break;
+            case LogType.FLUSH:
+                logSize = FLUSH_LOG_SIZE;
+                break;
             default:
                 throw new IllegalStateException("Unsupported Log Type");
         }
@@ -445,4 +469,8 @@
     public void setPKValue(ITupleReference PKValue) {
         this.PKValue = PKValue;
     }
+
+    public PrimaryIndexOperationTracker getOpTracker() {
+        return opTracker;
+    }
 }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogType.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/LogType.java
similarity index 87%
rename from asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogType.java
rename to asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/LogType.java
index 1716cfb..e548c50 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogType.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/LogType.java
@@ -12,7 +12,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.uci.ics.asterix.transaction.management.service.logging;
+package edu.uci.ics.asterix.common.transactions;
 
 public class LogType {
 
@@ -20,10 +20,14 @@
     public static final byte JOB_COMMIT = 1;
     public static final byte ENTITY_COMMIT = 2;
     public static final byte ABORT = 3;
+    public static final byte FLUSH = 4;
+
     private static final String STRING_UPDATE = "UPDATE";
     private static final String STRING_JOB_COMMIT = "JOB_COMMIT";
     private static final String STRING_ENTITY_COMMIT = "ENTITY_COMMIT";
     private static final String STRING_ABORT = "ABORT";
+    private static final String STRING_FLUSH = "FLUSH";
+
     private static final String STRING_INVALID_LOG_TYPE = "INVALID_LOG_TYPE";
 
     public static String toString(byte logType) {
@@ -36,6 +40,8 @@
                 return STRING_ENTITY_COMMIT;
             case LogType.ABORT:
                 return STRING_ABORT;
+            case LogType.FLUSH:
+                return STRING_FLUSH;
             default:
                 return STRING_INVALID_LOG_TYPE;
         }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/PrimaryKeyTupleReference.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/PrimaryKeyTupleReference.java
similarity index 90%
rename from asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/PrimaryKeyTupleReference.java
rename to asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/PrimaryKeyTupleReference.java
index 7acc000..78cfc08 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/PrimaryKeyTupleReference.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/PrimaryKeyTupleReference.java
@@ -1,4 +1,4 @@
-package edu.uci.ics.asterix.transaction.management.service.logging;
+package edu.uci.ics.asterix.common.transactions;
 
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
index cba690d..ef584fe 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
@@ -18,11 +18,11 @@
 import edu.uci.ics.asterix.common.transactions.AbstractOperationCallback;
 import edu.uci.ics.asterix.common.transactions.ILockManager;
 import edu.uci.ics.asterix.common.transactions.ILogRecord;
+import edu.uci.ics.asterix.common.transactions.LogRecord;
+import edu.uci.ics.asterix.common.transactions.LogType;
 import edu.uci.ics.asterix.common.transactions.IRecoveryManager.ResourceType;
 import edu.uci.ics.asterix.common.transactions.ITransactionContext;
 import edu.uci.ics.asterix.common.transactions.ITransactionSubsystem;
-import edu.uci.ics.asterix.transaction.management.service.logging.LogRecord;
-import edu.uci.ics.asterix.transaction.management.service.logging.LogType;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
 import edu.uci.ics.hyracks.storage.am.common.tuples.SimpleTupleWriter;
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
index 1b22a34..3720f08 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
@@ -31,10 +31,10 @@
 import edu.uci.ics.asterix.common.transactions.ITransactionContext;
 import edu.uci.ics.asterix.common.transactions.ITransactionManager;
 import edu.uci.ics.asterix.common.transactions.JobId;
+import edu.uci.ics.asterix.common.transactions.LogRecord;
+import edu.uci.ics.asterix.common.transactions.LogType;
 import edu.uci.ics.asterix.transaction.management.service.logging.LogPage;
 import edu.uci.ics.asterix.transaction.management.service.logging.LogPageReader;
-import edu.uci.ics.asterix.transaction.management.service.logging.LogRecord;
-import edu.uci.ics.asterix.transaction.management.service.logging.LogType;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
 import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
index 7292f82..baaa87a 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
@@ -29,6 +29,7 @@
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -39,6 +40,7 @@
 import edu.uci.ics.asterix.common.transactions.ITransactionContext;
 import edu.uci.ics.asterix.common.transactions.ITransactionManager;
 import edu.uci.ics.asterix.common.transactions.LogManagerProperties;
+import edu.uci.ics.asterix.common.transactions.LogType;
 import edu.uci.ics.asterix.common.transactions.MutableLong;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
 import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
@@ -58,7 +60,7 @@
     private final MutableLong flushLSN;
     private LinkedBlockingQueue<LogPage> emptyQ;
     private LinkedBlockingQueue<LogPage> flushQ;
-    private long appendLSN;
+    private final AtomicLong appendLSN;
     private FileChannel appendChannel;
     private LogPage appendPage;
     private LogFlusher logFlusher;
@@ -74,6 +76,7 @@
         logDir = logManagerProperties.getLogDir();
         logFilePrefix = logManagerProperties.getLogFilePrefix();
         flushLSN = new MutableLong();
+        appendLSN = new AtomicLong();
         initializeLogManager(0);
     }
 
@@ -83,23 +86,25 @@
         for (int i = 0; i < numLogPages; i++) {
             emptyQ.offer(new LogPage(txnSubsystem, logPageSize, flushLSN));
         }
-        appendLSN = initializeLogAnchor(nextLogFileId);
-        flushLSN.set(appendLSN);
+        appendLSN.set(initializeLogAnchor(nextLogFileId));
+        flushLSN.set(appendLSN.get());
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("LogManager starts logging in LSN: " + appendLSN);
         }
-        appendChannel = getFileChannel(appendLSN, false);
+        appendChannel = getFileChannel(appendLSN.get(), false);
         getAndInitNewPage();
         logFlusher = new LogFlusher(this, emptyQ, flushQ);
         futureLogFlusher = txnSubsystem.getAsterixAppRuntimeContextProvider().getThreadExecutor().submit(logFlusher);
     }
-
+    
     @Override
     public void log(ILogRecord logRecord) throws ACIDException {
         if (logRecord.getLogSize() > logPageSize) {
             throw new IllegalStateException();
         }
+
         syncLog(logRecord);
+        
         if ((logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT)
                 && !logRecord.isFlushed()) {
             synchronized (logRecord) {
@@ -115,11 +120,17 @@
     }
 
     private synchronized void syncLog(ILogRecord logRecord) throws ACIDException {
-        ITransactionContext txnCtx = logRecord.getTxnCtx();
-        if (txnCtx.getTxnState() == ITransactionManager.ABORTED && logRecord.getLogType() != LogType.ABORT) {
-            throw new ACIDException("Aborted job(" + txnCtx.getJobId() + ") tried to write non-abort type log record.");
+        ITransactionContext txnCtx = null;
+        
+        if(logRecord.getLogType() != LogType.FLUSH)
+        {       
+            txnCtx = logRecord.getTxnCtx();
+            if (txnCtx.getTxnState() == ITransactionManager.ABORTED && logRecord.getLogType() != LogType.ABORT) {
+                throw new ACIDException("Aborted job(" + txnCtx.getJobId() + ") tried to write non-abort type log record.");
+            }
+
         }
-        if (getLogFileOffset(appendLSN) + logRecord.getLogSize() > logFileSize) {
+        if (getLogFileOffset(appendLSN.get()) + logRecord.getLogSize() > logFileSize) {
             prepareNextLogFile();
             appendPage.isFull(true);
             getAndInitNewPage();
@@ -130,8 +141,13 @@
         if (logRecord.getLogType() == LogType.UPDATE) {
             logRecord.setPrevLSN(txnCtx.getLastLSN());
         }
-        appendPage.append(logRecord, appendLSN);
-        appendLSN += logRecord.getLogSize();
+        appendPage.append(logRecord, appendLSN.get());
+        
+        if(logRecord.getLogType() == LogType.FLUSH)
+        {
+            logRecord.setLSN(appendLSN.get());
+        }
+        appendLSN.addAndGet(logRecord.getLogSize());
     }
 
     private void getAndInitNewPage() {
@@ -149,8 +165,8 @@
     }
 
     private void prepareNextLogFile() {
-        appendLSN += logFileSize - getLogFileOffset(appendLSN);
-        appendChannel = getFileChannel(appendLSN, true);
+        appendLSN.addAndGet(logFileSize - getLogFileOffset(appendLSN.get()));
+        appendChannel = getFileChannel(appendLSN.get(), true);
         appendPage.isLastPage(true);
         //[Notice]
         //the current log file channel is closed if 
@@ -170,8 +186,9 @@
         return txnSubsystem;
     }
 
+    @Override
     public long getAppendLSN() {
-        return appendLSN;
+        return appendLSN.get();
     }
 
     @Override
@@ -275,6 +292,22 @@
         initializeLogManager(lastMaxLogFileId + 1);
     }
 
+    public void deleteOldLogFiles(long checkpointLSN){
+
+        Long checkpointLSNLogFileID = getLogFileId(checkpointLSN);
+        List<Long> logFileIds = getLogFileIds();
+        for (Long id : logFileIds) {
+
+            if(id < checkpointLSNLogFileID)
+            {
+                File file = new File(getLogFilePath(id));
+                if (!file.delete()) {
+                    throw new IllegalStateException("Failed to delete a file: " + file.getAbsolutePath());
+                }
+            }
+        }
+    }
+    
     private void terminateLogFlusher() {
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Terminating LogFlusher thread ...");
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
index cec6cd3..f3e8f77 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
@@ -21,16 +21,19 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.logging.Logger;
 
+import edu.uci.ics.asterix.common.context.PrimaryIndexOperationTracker;
 import edu.uci.ics.asterix.common.exceptions.ACIDException;
 import edu.uci.ics.asterix.common.transactions.DatasetId;
 import edu.uci.ics.asterix.common.transactions.ILogPage;
 import edu.uci.ics.asterix.common.transactions.ILogRecord;
 import edu.uci.ics.asterix.common.transactions.ITransactionContext;
 import edu.uci.ics.asterix.common.transactions.JobId;
+import edu.uci.ics.asterix.common.transactions.LogRecord;
+import edu.uci.ics.asterix.common.transactions.LogType;
 import edu.uci.ics.asterix.common.transactions.MutableLong;
-import edu.uci.ics.asterix.transaction.management.service.locking.LockManager;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 
 public class LogPage implements ILogPage {
 
@@ -48,6 +51,8 @@
     private final ByteBuffer unlockBuffer;
     private boolean isLastPage;
     private final LinkedBlockingQueue<ILogRecord> syncCommitQ;
+    private final LinkedBlockingQueue<ILogRecord> flushQ;
+
     private FileChannel fileChannel;
     private boolean stop;
     private DatasetId reusableDsId;
@@ -66,6 +71,8 @@
         flushOffset = 0;
         isLastPage = false;
         syncCommitQ = new LinkedBlockingQueue<ILogRecord>(logPageSize / ILogRecord.JOB_TERMINATE_LOG_SIZE);
+        flushQ = new LinkedBlockingQueue<ILogRecord>();
+
         reusableDsId = new DatasetId(-1);
         reusableJobId = new JobId(-1);
     }
@@ -77,8 +84,10 @@
     @Override
     public void append(ILogRecord logRecord, long appendLSN) {
         logRecord.writeLogRecord(appendBuffer);
-        logRecord.getTxnCtx().setLastLSN(logRecord.getLogType() == LogType.UPDATE ? logRecord.getResourceId() : -1,
-                appendLSN);
+        // mhubail Update impacted resource with the flushed lsn
+        if (logRecord.getLogType() != LogType.FLUSH) {
+            logRecord.getTxnCtx().setLastLSN(appendLSN);
+        }
         synchronized (this) {
             appendOffset += logRecord.getLogSize();
             if (IS_DEBUG_MODE) {
@@ -88,6 +97,10 @@
                 logRecord.isFlushed(false);
                 syncCommitQ.offer(logRecord);
             }
+            if (logRecord.getLogType() == LogType.FLUSH) {
+                logRecord.isFlushed(false);
+                flushQ.offer(logRecord);
+            }
             this.notify();
         }
     }
@@ -205,14 +218,18 @@
                     reusableJobId.setId(logRecord.getJobId());
                     txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(reusableJobId, false);
                     reusableDsId.setId(logRecord.getDatasetId());
-                    txnSubsystem.getLockManager().unlock(reusableDsId, logRecord.getPKHashValue(), LockMode.ANY, txnCtx);
+                    txnSubsystem.getLockManager()
+                            .unlock(reusableDsId, logRecord.getPKHashValue(), LockMode.ANY, txnCtx);
                     txnCtx.notifyOptracker(false);
                 } else if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT) {
                     reusableJobId.setId(logRecord.getJobId());
                     txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(reusableJobId, false);
                     txnCtx.notifyOptracker(true);
                     notifyJobTerminator();
+                } else if (logRecord.getLogType() == LogType.FLUSH) {
+                    notifyFlushTerminator();
                 }
+
                 logRecord = logPageReader.next();
             }
         }
@@ -233,6 +250,27 @@
         }
     }
 
+    public void notifyFlushTerminator() throws ACIDException {
+        LogRecord logRecord = null;
+        try {
+            logRecord = (LogRecord) flushQ.take();
+        } catch (InterruptedException e) {
+            //ignore
+        }
+        synchronized (logRecord) {
+            logRecord.isFlushed(true);
+            logRecord.notifyAll();
+        }
+        PrimaryIndexOperationTracker opTracker = logRecord.getOpTracker();
+        if (opTracker != null) {
+            try {
+                opTracker.triggerScheduleFlush(logRecord);
+            } catch (HyracksDataException e) {
+                throw new ACIDException(e);
+            }
+        }
+    }
+
     public boolean isStop() {
         return stop;
     }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPageReader.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPageReader.java
index 9e54abc..9663a8c 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPageReader.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPageReader.java
@@ -16,6 +16,8 @@
 
 import java.nio.ByteBuffer;
 
+import edu.uci.ics.asterix.common.transactions.LogRecord;
+
 public class LogPageReader {
 
     private final ByteBuffer buffer;
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogReader.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogReader.java
index 778d24b..728c694 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogReader.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogReader.java
@@ -22,6 +22,7 @@
 import edu.uci.ics.asterix.common.exceptions.ACIDException;
 import edu.uci.ics.asterix.common.transactions.ILogReader;
 import edu.uci.ics.asterix.common.transactions.ILogRecord;
+import edu.uci.ics.asterix.common.transactions.LogRecord;
 import edu.uci.ics.asterix.common.transactions.MutableLong;
 
 public class LogReader implements ILogReader {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointThread.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointThread.java
index dca14d8..a9d1cf2 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointThread.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointThread.java
@@ -14,49 +14,72 @@
  */
 package edu.uci.ics.asterix.transaction.management.service.recovery;
 
-import java.util.List;
-
 import edu.uci.ics.asterix.common.exceptions.ACIDException;
-import edu.uci.ics.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
+import edu.uci.ics.asterix.common.transactions.ILogManager;
 import edu.uci.ics.asterix.common.transactions.IRecoveryManager;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
 
 public class CheckpointThread extends Thread {
 
     private long lsnThreshold;
     private long checkpointTermInSecs;
 
-    private long lastMinMCTFirstLSN = 0;
-
+    private final ILogManager logManager;
     private final IRecoveryManager recoveryMgr;
-    private final IIndexLifecycleManager indexLifecycleManager;
 
-    public CheckpointThread(IRecoveryManager recoveryMgr, IIndexLifecycleManager indexLifecycleManager,
+    public CheckpointThread(IRecoveryManager recoveryMgr, IIndexLifecycleManager indexLifecycleManager, ILogManager logManager,
             long lsnThreshold, long checkpointTermInSecs) {
         this.recoveryMgr = recoveryMgr;
-        this.indexLifecycleManager = indexLifecycleManager;
+        this.logManager = logManager;
         this.lsnThreshold = lsnThreshold;
         this.checkpointTermInSecs = checkpointTermInSecs;
     }
 
     @Override
     public void run() {
-        long currentMinMCTFirstLSN = 0;
+        long currentCheckpointAttemptMinLSN = -1;
+        long lastCheckpointLSN = -1;
+        long currentLogLSN = 0;
+        long targetCheckpointLSN = 0;
         while (true) {
             try {
                 sleep(checkpointTermInSecs * 1000);
             } catch (InterruptedException e) {
                 //ignore
             }
-
-            currentMinMCTFirstLSN = getMinMCTFirstLSN();
-            if (currentMinMCTFirstLSN - lastMinMCTFirstLSN > lsnThreshold) {
+            
+            
+            if(lastCheckpointLSN == -1)
+            {
                 try {
-                    recoveryMgr.checkpoint(false);
-                    lastMinMCTFirstLSN = currentMinMCTFirstLSN;
+                    //Since the system just started up after sharp checkpoint, last checkpoint LSN is considered as the min LSN of the current log partition
+                    lastCheckpointLSN = logManager.getReadableSmallestLSN();
+                } catch (Exception e) {
+                    lastCheckpointLSN = 0; 
+                }
+            }
+            
+            //1. get current log LSN
+            currentLogLSN = logManager.getAppendLSN();
+            
+            //2. if current log LSN - previous checkpoint > threshold, do checkpoint
+            if (currentLogLSN - lastCheckpointLSN > lsnThreshold) {
+                try {
+                    // in check point:
+                    //1. get minimum first LSN (MFL) from open indexes.
+                    //2. if current MinFirstLSN < targetCheckpointLSN, schedule async flush for any open index witch has first LSN < force flush delta
+                    //3. next time checkpoint comes, it will be able to remove log files which have end range less than current targetCheckpointLSN
+                   
+                    targetCheckpointLSN = lastCheckpointLSN + lsnThreshold;
+                    currentCheckpointAttemptMinLSN = recoveryMgr.checkpoint(false, targetCheckpointLSN);
+                    
+                    //checkpoint was completed at target LSN or above
+                    if(currentCheckpointAttemptMinLSN >= targetCheckpointLSN)
+                    {
+                        lastCheckpointLSN = currentCheckpointAttemptMinLSN; 
+                    }
+                    
                 } catch (ACIDException | HyracksDataException e) {
                     throw new Error("failed to checkpoint", e);
                 }
@@ -64,19 +87,4 @@
         }
     }
 
-    private long getMinMCTFirstLSN() {
-        List<IIndex> openIndexList = indexLifecycleManager.getOpenIndexes();
-        long minMCTFirstLSN = Long.MAX_VALUE;
-        long firstLSN;
-        if (openIndexList.size() > 0) {
-            for (IIndex index : openIndexList) {
-                firstLSN = ((AbstractLSMIOOperationCallback) ((ILSMIndex) index).getIOOperationCallback())
-                        .getFirstLSN();
-                minMCTFirstLSN = Math.min(minMCTFirstLSN, firstLSN);
-            }
-        } else {
-            minMCTFirstLSN = -1;
-        }
-        return minMCTFirstLSN;
-    }
 }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
index a7fb73b..492f020 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -38,6 +38,7 @@
 import java.util.logging.Logger;
 
 import edu.uci.ics.asterix.common.api.ILocalResourceMetadata;
+import edu.uci.ics.asterix.common.context.DatasetLifecycleManager;
 import edu.uci.ics.asterix.common.exceptions.ACIDException;
 import edu.uci.ics.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
 import edu.uci.ics.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
@@ -45,8 +46,8 @@
 import edu.uci.ics.asterix.common.transactions.ILogRecord;
 import edu.uci.ics.asterix.common.transactions.IRecoveryManager;
 import edu.uci.ics.asterix.common.transactions.ITransactionContext;
+import edu.uci.ics.asterix.common.transactions.LogType;
 import edu.uci.ics.asterix.transaction.management.service.logging.LogManager;
-import edu.uci.ics.asterix.transaction.management.service.logging.LogType;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManager;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
@@ -59,7 +60,7 @@
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.BlockingIOOperationCallbackWrapper;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
 import edu.uci.ics.hyracks.storage.common.file.ILocalResourceRepository;
 import edu.uci.ics.hyracks.storage.common.file.LocalResource;
 
@@ -213,6 +214,8 @@
                 case LogType.ABORT:
                     abortLogCount++;
                     break;
+                case LogType.FLUSH:
+                    break;
                 default:
                     throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
             }
@@ -321,6 +324,8 @@
                 case LogType.JOB_COMMIT:
                 case LogType.ENTITY_COMMIT:
                 case LogType.ABORT:
+                case LogType.FLUSH:
+
                     //do nothing
                     break;
 
@@ -347,9 +352,11 @@
     }
 
     @Override
-    public synchronized void checkpoint(boolean isSharpCheckpoint) throws ACIDException, HyracksDataException {
+    public synchronized long checkpoint(boolean isSharpCheckpoint, long nonSharpCheckpointTargetLSN) throws ACIDException, HyracksDataException {
 
         long minMCTFirstLSN;
+        boolean nonSharpCheckpointSucceeded = false;
+        
         if (isSharpCheckpoint && LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Starting sharp checkpoint ... ");
         }
@@ -361,52 +368,26 @@
         //   right after the new checkpoint file is written.
         File[] prevCheckpointFiles = getPreviousCheckpointFiles();
 
-        IIndexLifecycleManager indexLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
-                .getIndexLifecycleManager();
-        List<IIndex> openIndexList = indexLifecycleManager.getOpenIndexes();
-
+        DatasetLifecycleManager datasetLifecycleManager = (DatasetLifecycleManager)txnSubsystem.getAsterixAppRuntimeContextProvider().getIndexLifecycleManager();
         //#. flush all in-memory components if it is the sharp checkpoint
         if (isSharpCheckpoint) {
-            ///////////////////////////////////////////////
-            //TODO : change the code inside the if statement into indexLifeCycleManager.flushAllDatasets()
-            //indexLifeCycleManager.flushAllDatasets();
-            ///////////////////////////////////////////////
-            List<BlockingIOOperationCallbackWrapper> callbackList = new LinkedList<BlockingIOOperationCallbackWrapper>();
-            for (IIndex index : openIndexList) {
-                ILSMIndex lsmIndex = (ILSMIndex) index;
-                ILSMIndexAccessor indexAccessor = lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE,
-                        NoOpOperationCallback.INSTANCE);
-                BlockingIOOperationCallbackWrapper cb = new BlockingIOOperationCallbackWrapper(
-                        lsmIndex.getIOOperationCallback());
-                callbackList.add(cb);
-                try {
-                    indexAccessor.scheduleFlush(cb);
-                } catch (HyracksDataException e) {
-                    throw new ACIDException(e);
-                }
-            }
 
-            for (BlockingIOOperationCallbackWrapper cb : callbackList) {
-                try {
-                    cb.waitForIO();
-                } catch (InterruptedException e) {
-                    throw new ACIDException(e);
-                }
-            }
+            datasetLifecycleManager.flushAllDatasets();
+
             minMCTFirstLSN = SHARP_CHECKPOINT_LSN;
         } else {
-            long firstLSN;
-            minMCTFirstLSN = Long.MAX_VALUE;
-            if (openIndexList.size() > 0) {
-                for (IIndex index : openIndexList) {
-                    firstLSN = ((AbstractLSMIOOperationCallback) ((ILSMIndex) index).getIOOperationCallback())
-                            .getFirstLSN();
-                    minMCTFirstLSN = Math.min(minMCTFirstLSN, firstLSN);
-                }
-            } else {
-                minMCTFirstLSN = SHARP_CHECKPOINT_LSN;
+
+            minMCTFirstLSN = getMinFirstLSN();
+            
+            if(minMCTFirstLSN >= nonSharpCheckpointTargetLSN){
+                nonSharpCheckpointSucceeded = true;
+            }
+            else{
+                //flush datasets with indexes behind target checkpoint LSN
+                datasetLifecycleManager.scheduleAsyncFlushForLaggingDatasets(nonSharpCheckpointTargetLSN);
             }
         }
+
         CheckpointObject checkpointObject = new CheckpointObject(logMgr.getAppendLSN(), minMCTFirstLSN,
                 txnMgr.getMaxJobId(), System.currentTimeMillis());
 
@@ -454,9 +435,39 @@
             }
         }
 
+        if(nonSharpCheckpointSucceeded){
+            logMgr.deleteOldLogFiles(minMCTFirstLSN);
+        }
+        
         if (isSharpCheckpoint && LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Completed sharp checkpoint.");
         }
+
+        //return the min LSN that was recorded in the checkpoint
+        return minMCTFirstLSN;
+    }
+
+    public long getMinFirstLSN() throws HyracksDataException
+    {
+        IIndexLifecycleManager indexLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider().getIndexLifecycleManager();
+        List<IIndex> openIndexList = indexLifecycleManager.getOpenIndexes();
+        long firstLSN;
+        //the min first lsn can only be the current append or smaller
+        long minFirstLSN = logMgr.getAppendLSN();
+
+        if (openIndexList.size() > 0) {
+
+            for (IIndex index : openIndexList) {
+
+                AbstractLSMIOOperationCallback ioCallback =  (AbstractLSMIOOperationCallback)((ILSMIndex) index).getIOOperationCallback();
+                if(!((AbstractLSMIndex)index).isCurrentMutableComponentEmpty() || ioCallback.hasPendingFlush()){
+                    firstLSN = ioCallback.getFirstLSN();
+                    minFirstLSN = Math.min(minFirstLSN, firstLSN);
+                }
+            }
+        }
+
+        return minFirstLSN;
     }
 
     private CheckpointObject readCheckpoint() throws ACIDException, FileNotFoundException {
@@ -628,6 +639,7 @@
                     break;
 
                 case LogType.ABORT:
+                case LogType.FLUSH:
                     //ignore
                     break;
 
@@ -722,7 +734,6 @@
             } else {
                 throw new IllegalStateException("Unsupported OperationType: " + logRecord.getNewOp());
             }
-            ((AbstractLSMIOOperationCallback) index.getIOOperationCallback()).updateLastLSN(logRecord.getLSN());
         } catch (Exception e) {
             throw new IllegalStateException("Failed to redo", e);
         }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
index 664a4f1..b28964a 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
@@ -28,9 +28,9 @@
 import edu.uci.ics.asterix.common.transactions.ITransactionContext;
 import edu.uci.ics.asterix.common.transactions.ITransactionManager;
 import edu.uci.ics.asterix.common.transactions.JobId;
+import edu.uci.ics.asterix.common.transactions.LogRecord;
 import edu.uci.ics.asterix.common.transactions.MutableLong;
 import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallback;
-import edu.uci.ics.asterix.transaction.management.service.logging.LogRecord;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMOperationType;
@@ -87,7 +87,6 @@
     // avoid object creations.
     // Those are used in synchronized methods.
     private MutableLong tempResourceIdForRegister;
-    private MutableLong tempResourceIdForSetLSN;
     private LogRecord logRecord;
 
     // TODO: implement transactionContext pool in order to avoid object
@@ -106,7 +105,6 @@
         indexMap = new HashMap<MutableLong, AbstractLSMIOOperationCallback>();
         primaryIndex = null;
         tempResourceIdForRegister = new MutableLong();
-        tempResourceIdForSetLSN = new MutableLong();
         logRecord = new LogRecord();
     }
 
@@ -128,20 +126,10 @@
 
     // [Notice]
     // This method is called sequentially by the LogAppender threads.
-    // However, the indexMap is concurrently read and modified through this
-    // method and registerIndexAndCallback()
     @Override
-    public void setLastLSN(long resourceId, long LSN) {
-        synchronized (indexMap) {
-            firstLSN.compareAndSet(-1, LSN);
-            lastLSN.set(Math.max(lastLSN.get(), LSN));
-            if (resourceId != -1) {
-                // Non-update log's resourceId is -1.
-                tempResourceIdForSetLSN.set(resourceId);
-                AbstractLSMIOOperationCallback ioOpCallback = indexMap.get(tempResourceIdForSetLSN);
-                ioOpCallback.updateLastLSN(LSN);
-            }
-        }
+    public void setLastLSN(long LSN) {
+        firstLSN.compareAndSet(-1, LSN);
+        lastLSN.set(Math.max(lastLSN.get(), LSN));
     }
 
     @Override
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
index 94be125..ec90cc4 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -27,7 +27,7 @@
 import edu.uci.ics.asterix.common.transactions.ITransactionContext;
 import edu.uci.ics.asterix.common.transactions.ITransactionManager;
 import edu.uci.ics.asterix.common.transactions.JobId;
-import edu.uci.ics.asterix.transaction.management.service.logging.LogRecord;
+import edu.uci.ics.asterix.common.transactions.LogRecord;
 import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
 
 /**
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionSubsystem.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionSubsystem.java
index e6bcc5d..20612ed 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionSubsystem.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionSubsystem.java
@@ -52,8 +52,9 @@
         this.recoveryManager = new RecoveryManager(this);
         if (asterixAppRuntimeContextProvider != null) {
             this.checkpointThread = new CheckpointThread(recoveryManager,
-                    asterixAppRuntimeContextProvider.getIndexLifecycleManager(),
+                    asterixAppRuntimeContextProvider.getIndexLifecycleManager(),logManager,
                     this.txnProperties.getCheckpointLSNThreshold(), this.txnProperties.getCheckpointPollFrequency());
+            this.checkpointThread.start();
         } else {
             this.checkpointThread = null;
         }