Modified locking on DatasetLifeCycleManager

Change-Id: Ia5ab435f53879ba1d08b6dee24eb4969c5ad16e3
Reviewed-on: http://fulliautomatix.ics.uci.edu:8443/206
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Young-Seok Kim <kisskys@gmail.com>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/BaseOperationTracker.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/BaseOperationTracker.java
index 7e7ffd9..f69def8 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/BaseOperationTracker.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/BaseOperationTracker.java
@@ -14,6 +14,7 @@
  */
 package edu.uci.ics.asterix.common.context;
 
+import edu.uci.ics.asterix.common.context.DatasetLifecycleManager.DatasetInfo;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
 import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
@@ -25,17 +26,19 @@
 
     protected final DatasetLifecycleManager datasetLifecycleManager;
     protected final int datasetID;
-
-    public BaseOperationTracker(DatasetLifecycleManager datasetLifecycleManager, int datasetID) {
+    protected DatasetInfo dsInfo;
+    
+    public BaseOperationTracker(DatasetLifecycleManager datasetLifecycleManager, int datasetID, DatasetInfo dsInfo) {
         this.datasetLifecycleManager = datasetLifecycleManager;
         this.datasetID = datasetID;
+        this.dsInfo = dsInfo;
     }
 
     @Override
     public void beforeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
             IModificationOperationCallback modificationCallback) throws HyracksDataException {
         if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE) {
-            datasetLifecycleManager.declareActiveIOOperation(datasetID);
+            dsInfo.declareActiveIOOperation();
         }
     }
 
@@ -43,7 +46,7 @@
     public void afterOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
             IModificationOperationCallback modificationCallback) throws HyracksDataException {
         if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE) {
-            datasetLifecycleManager.undeclareActiveIOOperation(datasetID);
+            dsInfo.undeclareActiveIOOperation();
         }
     }
 
@@ -52,6 +55,10 @@
             IModificationOperationCallback modificationCallback) throws HyracksDataException {
     }
 
+    public void setDatasetInfo(DatasetInfo dsInfo){
+        this.dsInfo = dsInfo;
+    }
+    
     public void exclusiveJobCommitted() throws HyracksDataException {
     }
 }
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/CorrelatedPrefixMergePolicy.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/CorrelatedPrefixMergePolicy.java
index db9bfe9..0d81c17 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/CorrelatedPrefixMergePolicy.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/CorrelatedPrefixMergePolicy.java
@@ -21,6 +21,7 @@
 import java.util.Map;
 import java.util.Set;
 
+import edu.uci.ics.asterix.common.context.DatasetLifecycleManager.DatasetInfo;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
 import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
@@ -39,7 +40,7 @@
 
     private final DatasetLifecycleManager datasetLifecycleManager;
     private final int datasetID;
-
+    
     public CorrelatedPrefixMergePolicy(IIndexLifecycleManager datasetLifecycleManager, int datasetID) {
         this.datasetLifecycleManager = (DatasetLifecycleManager) datasetLifecycleManager;
         this.datasetID = datasetID;
@@ -77,7 +78,7 @@
         int startIndex = -1;
         int minNumComponents = Integer.MAX_VALUE;
 
-        Set<ILSMIndex> indexes = datasetLifecycleManager.getDatasetIndexes(datasetID);
+        Set<ILSMIndex> indexes = datasetLifecycleManager.getDatasetInfo(datasetID).getDatasetIndexes();
         for (ILSMIndex lsmIndex : indexes) {
             minNumComponents = Math.min(minNumComponents, lsmIndex.getImmutableComponents().size());
         }
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java
index 14db1d2..b7064d3 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java
@@ -91,6 +91,11 @@
         DatasetInfo dsInfo = datasetInfos.get(did);
         if (dsInfo == null) {
             dsInfo = new DatasetInfo(did, !index.hasMemoryComponents());
+            PrimaryIndexOperationTracker opTracker = (PrimaryIndexOperationTracker) datasetOpTrackers
+                    .get(dsInfo.datasetID);
+            if (opTracker != null) {
+                opTracker.setDatasetInfo(dsInfo);
+            }
         } else if (dsInfo.indexes.containsKey(resourceID)) {
             throw new HyracksDataException("Index with resource ID " + resourceID + " already exists.");
         }
@@ -124,11 +129,14 @@
         // TODO: use fine-grained counters, one for each index instead of a single counter per dataset.
 
         // First wait for any ongoing IO operations
-        while (dsInfo.numActiveIOOps > 0) {
-            try {
-                wait();
-            } catch (InterruptedException e) {
-                throw new HyracksDataException(e);
+        synchronized (dsInfo) {
+            while (dsInfo.numActiveIOOps > 0) {
+                try {
+                    //notification will come from DatasetInfo class (undeclareActiveIOOperation)
+                    dsInfo.wait();
+                } catch (InterruptedException e) {
+                    throw new HyracksDataException(e);
+                }
             }
         }
 
@@ -153,23 +161,6 @@
         }
     }
 
-    public synchronized void declareActiveIOOperation(int datasetID) throws HyracksDataException {
-        DatasetInfo dsInfo = datasetInfos.get(datasetID);
-        if (dsInfo == null) {
-            throw new HyracksDataException("Failed to find a dataset with ID " + datasetID);
-        }
-        dsInfo.incrementActiveIOOps();
-    }
-
-    public synchronized void undeclareActiveIOOperation(int datasetID) throws HyracksDataException {
-        DatasetInfo dsInfo = datasetInfos.get(datasetID);
-        if (dsInfo == null) {
-            throw new HyracksDataException("Failed to find a dataset with ID " + datasetID);
-        }
-        dsInfo.decrementActiveIOOps();
-        notifyAll();
-    }
-
     @Override
     public synchronized void open(long resourceID) throws HyracksDataException {
         int did = getDIDfromRID(resourceID);
@@ -235,16 +226,27 @@
                     NoOpOperationCallback.INSTANCE);
             accessor.scheduleFlush(iInfo.index.getIOOperationCallback());
         }
+
         // Wait for the above flush op.
-        while (dsInfo.numActiveIOOps > 0) {
-            try {
-                wait();
-            } catch (InterruptedException e) {
-                throw new HyracksDataException(e);
+        synchronized (dsInfo) {
+            while (dsInfo.numActiveIOOps > 0) {
+                try {
+                    //notification will come from DatasetInfo class (undeclareActiveIOOperation)
+                    dsInfo.wait();
+                } catch (InterruptedException e) {
+                    throw new HyracksDataException(e);
+                }
             }
         }
     }
 
+    public DatasetInfo getDatasetInfo(int datasetID) {
+
+        synchronized (datasetInfos) {
+            return datasetInfos.get(datasetID);
+        }
+    }
+
     @Override
     public synchronized void close(long resourceID) throws HyracksDataException {
         int did = getDIDfromRID(resourceID);
@@ -296,28 +298,14 @@
         synchronized (datasetOpTrackers) {
             ILSMOperationTracker opTracker = datasetOpTrackers.get(datasetID);
             if (opTracker == null) {
-                opTracker = new PrimaryIndexOperationTracker(this, datasetID, logManager);
+                opTracker = new PrimaryIndexOperationTracker(this, datasetID, logManager, getDatasetInfo(datasetID));
                 datasetOpTrackers.put(datasetID, opTracker);
             }
             return opTracker;
         }
     }
 
-    public synchronized Set<ILSMIndex> getDatasetIndexes(int datasetID) throws HyracksDataException {
-        DatasetInfo dsInfo = datasetInfos.get(datasetID);
-        if (dsInfo == null) {
-            throw new HyracksDataException("No dataset found with datasetID " + datasetID);
-        }
-        Set<ILSMIndex> datasetIndexes = new HashSet<ILSMIndex>();
-        for (IndexInfo iInfo : dsInfo.indexes.values()) {
-            if (iInfo.isOpen) {
-                datasetIndexes.add(iInfo.index);
-            }
-        }
-        return datasetIndexes;
-    }
-
-    private static abstract class Info {
+    private abstract class Info {
         protected int referenceCount;
         protected boolean isOpen;
 
@@ -335,7 +323,7 @@
         }
     }
 
-    private static class IndexInfo extends Info {
+    private class IndexInfo extends Info {
         private final ILSMIndex index;
 
         public IndexInfo(ILSMIndex index) {
@@ -343,7 +331,7 @@
         }
     }
 
-    private static class DatasetInfo extends Info implements Comparable<DatasetInfo> {
+    public class DatasetInfo extends Info implements Comparable<DatasetInfo> {
         private final Map<Long, IndexInfo> indexes;
         private final int datasetID;
         private long lastAccess;
@@ -369,12 +357,25 @@
             lastAccess = System.currentTimeMillis();
         }
 
-        public void incrementActiveIOOps() {
+        public synchronized void declareActiveIOOperation() {
             numActiveIOOps++;
         }
 
-        public void decrementActiveIOOps() {
+        public synchronized void undeclareActiveIOOperation() {
             numActiveIOOps--;
+            //notify threads waiting on this dataset info
+            notifyAll();
+        }
+
+        public synchronized Set<ILSMIndex> getDatasetIndexes() throws HyracksDataException {
+            Set<ILSMIndex> datasetIndexes = new HashSet<ILSMIndex>();
+            for (IndexInfo iInfo : indexes.values()) {
+                if (iInfo.isOpen) {
+                    datasetIndexes.add(iInfo.index);
+                }
+            }
+
+            return datasetIndexes;
         }
 
         @Override
@@ -437,32 +438,33 @@
     }
 
     public synchronized void scheduleAsyncFlushForLaggingDatasets(long targetLSN) throws HyracksDataException {
-
-        List<DatasetInfo> laggingDatasets = new ArrayList<DatasetInfo>();
-        long firstLSN;
-        //find dataset with min lsn < targetLSN
+        //schedule flush for datasets with min LSN (Log Serial Number) < targetLSN
         for (DatasetInfo dsInfo : datasetInfos.values()) {
-            for (IndexInfo iInfo : dsInfo.indexes.values()) {
-                AbstractLSMIOOperationCallback ioCallback = (AbstractLSMIOOperationCallback) iInfo.index
-                        .getIOOperationCallback();
-                if (!((AbstractLSMIndex) iInfo.index).isCurrentMutableComponentEmpty() || ioCallback.hasPendingFlush()) {
-                    firstLSN = ioCallback.getFirstLSN();
-
-                    if (firstLSN < targetLSN) {
-                        laggingDatasets.add(dsInfo);
-                        break;
+            PrimaryIndexOperationTracker opTracker = (PrimaryIndexOperationTracker) getOperationTracker(dsInfo.datasetID);
+            synchronized (opTracker) {
+                for (IndexInfo iInfo : dsInfo.indexes.values()) {
+                    AbstractLSMIOOperationCallback ioCallback = (AbstractLSMIOOperationCallback) iInfo.index
+                            .getIOOperationCallback();
+                    if (!(((AbstractLSMIndex) iInfo.index).isCurrentMutableComponentEmpty()
+                            || ioCallback.hasPendingFlush() || opTracker.isFlushLogCreated() || opTracker.isFlushOnExit())) {
+                        long firstLSN = ioCallback.getFirstLSN();
+                        if (firstLSN < targetLSN) {
+                            opTracker.setFlushOnExit(true);
+                            if (opTracker.getNumActiveOperations() == 0) {
+                                // No Modify operations currently, we need to trigger the flush and we can do so safely
+                                opTracker.flushIfRequested();
+                            }
+                            break;
+                        }
                     }
                 }
             }
         }
-
-        //schedule a sync flush
-        for (DatasetInfo dsInfo : laggingDatasets) {
-            flushDatasetOpenIndexes(dsInfo, true);
-        }
-
     }
 
+    /*
+     * This method can only be called asynchronously safely if we're sure no modify operation will take place until the flush is scheduled
+     */
     private void flushDatasetOpenIndexes(DatasetInfo dsInfo, boolean asyncFlush) throws HyracksDataException {
         if (!dsInfo.isExternal) {
             synchronized (logRecord) {
@@ -472,7 +474,9 @@
                 } catch (ACIDException e) {
                     throw new HyracksDataException("could not write flush log while closing dataset", e);
                 }
+
                 try {
+                    //notification will come from LogPage class (notifyFlushTerminator)
                     logRecord.wait();
                 } catch (InterruptedException e) {
                     throw new HyracksDataException(e);
@@ -507,14 +511,16 @@
 
     private void closeDataset(DatasetInfo dsInfo) throws HyracksDataException {
         // First wait for any ongoing IO operations
-        while (dsInfo.numActiveIOOps > 0) {
-            try {
-                wait();
-            } catch (InterruptedException e) {
-                throw new HyracksDataException(e);
+        synchronized (dsInfo) {
+            while (dsInfo.numActiveIOOps > 0) {
+                try {
+                    dsInfo.wait();
+                } catch (InterruptedException e) {
+                    throw new HyracksDataException(e);
+                }
             }
-        }
 
+        }
         try {
             flushDatasetOpenIndexes(dsInfo, false);
         } catch (Exception e) {
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java
index 744c208..f94d6f5 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -18,6 +18,7 @@
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import edu.uci.ics.asterix.common.context.DatasetLifecycleManager.DatasetInfo;
 import edu.uci.ics.asterix.common.exceptions.ACIDException;
 import edu.uci.ics.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
 import edu.uci.ics.asterix.common.transactions.AbstractOperationCallback;
@@ -30,6 +31,7 @@
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexInternal;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMOperationType;
 
 public class PrimaryIndexOperationTracker extends BaseOperationTracker {
@@ -37,10 +39,12 @@
     // Number of active operations on an ILSMIndex instance.
     private final AtomicInteger numActiveOperations;
     private final ILogManager logManager;
+    private boolean flushOnExit = false;
+    private boolean flushLogCreated = false;
 
     public PrimaryIndexOperationTracker(DatasetLifecycleManager datasetLifecycleManager, int datasetID,
-            ILogManager logManager) {
-        super(datasetLifecycleManager, datasetID);
+            ILogManager logManager, DatasetInfo dsInfo) {
+        super(datasetLifecycleManager, datasetID, dsInfo);
         this.logManager = logManager;
         this.numActiveOperations = new AtomicInteger();
     }
@@ -51,7 +55,7 @@
         if (opType == LSMOperationType.MODIFICATION || opType == LSMOperationType.FORCE_MODIFICATION) {
             incrementNumActiveOperations(modificationCallback);
         } else if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE) {
-            datasetLifecycleManager.declareActiveIOOperation(datasetID);
+            dsInfo.declareActiveIOOperation();
         }
     }
 
@@ -70,27 +74,30 @@
         if (opType == LSMOperationType.MODIFICATION || opType == LSMOperationType.FORCE_MODIFICATION) {
             decrementNumActiveOperations(modificationCallback);
             if (numActiveOperations.get() == 0) {
-                flushIfFull();
+                flushIfRequested();
             }
         } else if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE) {
-            datasetLifecycleManager.undeclareActiveIOOperation(datasetID);
+            dsInfo.undeclareActiveIOOperation();
         }
     }
 
-    private void flushIfFull() throws HyracksDataException {
-        // If we need a flush, and this is the last completing operation, then schedule the flush. 
-        // TODO: Is there a better way to check if we need to flush instead of communicating with the datasetLifecycleManager each time?
-        // Maybe we could keep the set of indexes here instead of consulting the datasetLifecycleManager each time?
-        Set<ILSMIndex> indexes = datasetLifecycleManager.getDatasetIndexes(datasetID);
+    public void flushIfRequested() throws HyracksDataException {
+        // If we need a flush, and this is the last completing operation, then schedule the flush,  
+        // or if there is a flush scheduled by the checkpoint (flushOnExit), then schedule it
+
         boolean needsFlush = false;
-        for (ILSMIndex lsmIndex : indexes) {
-            if (((ILSMIndexInternal) lsmIndex).hasFlushRequestForCurrentMutableComponent()) {
-                needsFlush = true;
-                break;
+        Set<ILSMIndex> indexes = dsInfo.getDatasetIndexes();
+
+        if (!flushOnExit) {
+            for (ILSMIndex lsmIndex : indexes) {
+                if (((ILSMIndexInternal) lsmIndex).hasFlushRequestForCurrentMutableComponent()) {
+                    needsFlush = true;
+                    break;
+                }
             }
         }
 
-        if (needsFlush) {
+        if (needsFlush || flushOnExit) {
             LogRecord logRecord = new LogRecord();
             logRecord.formFlushLogRecord(datasetID, this);
             try {
@@ -98,13 +105,25 @@
             } catch (ACIDException e) {
                 throw new HyracksDataException("could not write flush log", e);
             }
+            flushLogCreated = true;
+        }
+        
+        if (flushOnExit) {
+            //Make the current mutable components (if have been modified) UnWritable to stop coming modify operations from entering them
+            for (ILSMIndex lsmIndex : indexes) {
+                if (!((AbstractLSMIndex) lsmIndex).isCurrentMutableComponentEmpty()) {
+                    ((AbstractLSMIndex) lsmIndex).makeCurrentMutableComponentUnWritable();
+                }
+            }
+            flushOnExit = false;
         }
     }
 
     public void triggerScheduleFlush(LogRecord logRecord) throws HyracksDataException {
         // why synchronized ??
         synchronized (this) {
-            for (ILSMIndex lsmIndex : datasetLifecycleManager.getDatasetIndexes(datasetID)) {
+            for (ILSMIndex lsmIndex : dsInfo.getDatasetIndexes()) {
+
                 //get resource
                 ILSMIndexAccessor accessor = lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE,
                         NoOpOperationCallback.INSTANCE);
@@ -117,13 +136,15 @@
                 //schedule flush after update
                 accessor.scheduleFlush(lsmIndex.getIOOperationCallback());
             }
+            
+            flushLogCreated = false;
         }
     }
 
     @Override
     public void exclusiveJobCommitted() throws HyracksDataException {
         numActiveOperations.set(0);
-        flushIfFull();
+        flushIfRequested();
     }
 
     public int getNumActiveOperations() {
@@ -152,4 +173,16 @@
         callback.resetLocalNumActiveOperations();
     }
 
+    public boolean isFlushOnExit() {
+        return flushOnExit;
+    }
+
+    public void setFlushOnExit(boolean flushOnExit) {
+        this.flushOnExit = flushOnExit;
+    }
+
+    public boolean isFlushLogCreated() {
+        return flushLogCreated;
+    }
+
 }
\ No newline at end of file
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
index 11ee638..1df7121 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -387,7 +387,8 @@
         long resourceID = -1;
         ILSMOperationTracker opTracker = index.isPrimaryIndex() ? runtimeContext.getLSMBTreeOperationTracker(index
                 .getDatasetId().getId()) : new BaseOperationTracker((DatasetLifecycleManager) indexLifecycleManager,
-                index.getDatasetId().getId());
+                index.getDatasetId().getId(), ((DatasetLifecycleManager) indexLifecycleManager).getDatasetInfo(index
+                .getDatasetId().getId()));
         final String path = file.getFile().getPath();
         if (create) {
             lsmBtree = LSMBTreeUtils.createLSMTree(
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerProvider.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerProvider.java
index 140a8dd..f4368a0 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerProvider.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerProvider.java
@@ -35,7 +35,7 @@
     public ILSMOperationTracker getOperationTracker(IHyracksTaskContext ctx) {
         DatasetLifecycleManager dslcManager = (DatasetLifecycleManager) ((IAsterixAppRuntimeContext) ctx
                 .getJobletContext().getApplicationContext().getApplicationObject()).getIndexLifecycleManager();
-        return new BaseOperationTracker(dslcManager, datasetID);
+        return new BaseOperationTracker(dslcManager, datasetID, dslcManager.getDatasetInfo(datasetID));
     }
 
 }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/ExternalBTreeLocalResourceMetadata.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/ExternalBTreeLocalResourceMetadata.java
index 66906cb..7d699b3 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/ExternalBTreeLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/ExternalBTreeLocalResourceMetadata.java
@@ -49,7 +49,7 @@
                 runtimeContextProvider.getBloomFilterFalsePositiveRate(), mergePolicyFactory.createMergePolicy(
                         mergePolicyProperties, runtimeContextProvider.getIndexLifecycleManager()),
                 new BaseOperationTracker((DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager(),
-                        datasetID), runtimeContextProvider.getLSMIOScheduler(),
+                        datasetID, ((DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager()).getDatasetInfo(datasetID)), runtimeContextProvider.getLSMIOScheduler(),
                 LSMBTreeIOOperationCallbackFactory.INSTANCE.createIOOperationCallback(), -1);
         return lsmBTree;
     }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/ExternalBTreeWithBuddyLocalResourceMetadata.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/ExternalBTreeWithBuddyLocalResourceMetadata.java
index ec9724d..ad4ff0c 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/ExternalBTreeWithBuddyLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/ExternalBTreeWithBuddyLocalResourceMetadata.java
@@ -61,7 +61,7 @@
                 runtimeContextProvider.getFileMapManager(), typeTraits, btreeCmpFactories, runtimeContextProvider
                         .getBloomFilterFalsePositiveRate(), mergePolicyFactory.createMergePolicy(mergePolicyProperties,
                         runtimeContextProvider.getIndexLifecycleManager()), new BaseOperationTracker(
-                        (DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager(), datasetID),
+                        (DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager(), datasetID, ((DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager()).getDatasetInfo(datasetID)),
                 runtimeContextProvider.getLSMIOScheduler(), LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE
                         .createIOOperationCallback(), buddyBtreeFields, -1);
     }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/ExternalRTreeLocalResourceMetadata.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/ExternalRTreeLocalResourceMetadata.java
index 5b9d27a..7c4a437 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/ExternalRTreeLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/ExternalRTreeLocalResourceMetadata.java
@@ -59,7 +59,7 @@
                     valueProviderFactories, rtreePolicyType, runtimeContextProvider.getBloomFilterFalsePositiveRate(),
                     mergePolicyFactory.createMergePolicy(mergePolicyProperties,
                             runtimeContextProvider.getIndexLifecycleManager()), new BaseOperationTracker(
-                            (DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager(), datasetID),
+                            (DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager(), datasetID, ((DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager()).getDatasetInfo(datasetID)),
                     runtimeContextProvider.getLSMIOScheduler(), LSMRTreeIOOperationCallbackFactory.INSTANCE
                             .createIOOperationCallback(), linearizeCmpFactory, btreeFields, -1);
         } catch (TreeIndexException e) {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
index 7c7c90e..76dd03a 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
@@ -67,7 +67,7 @@
                 bloomFilterKeyFields, runtimeContextProvider.getBloomFilterFalsePositiveRate(), mergePolicyFactory
                         .createMergePolicy(mergePolicyProperties, runtimeContextProvider.getIndexLifecycleManager()),
                 isPrimary ? runtimeContextProvider.getLSMBTreeOperationTracker(datasetID) : new BaseOperationTracker(
-                        (DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager(), datasetID),
+                        (DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager(), datasetID, ((DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager()).getDatasetInfo(datasetID)),
                 runtimeContextProvider.getLSMIOScheduler(), LSMBTreeIOOperationCallbackFactory.INSTANCE
                         .createIOOperationCallback(), isPrimary, filterTypeTraits, filterCmpFactories, btreeFields,
                 filterFields);
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java
index b7fff02..7d43626 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java
@@ -88,7 +88,7 @@
                         mergePolicyFactory.createMergePolicy(mergePolicyProperties,
                                 runtimeContextProvider.getIndexLifecycleManager()),
                         new BaseOperationTracker((DatasetLifecycleManager) runtimeContextProvider
-                                .getIndexLifecycleManager(), datasetID), runtimeContextProvider.getLSMIOScheduler(),
+                                .getIndexLifecycleManager(), datasetID, ((DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager()).getDatasetInfo(datasetID)), runtimeContextProvider.getLSMIOScheduler(),
                         LSMInvertedIndexIOOperationCallbackFactory.INSTANCE.createIOOperationCallback(),
                         invertedIndexFields, filterTypeTraits, filterCmpFactories, filterFields,
                         filterFieldsForNonBulkLoadOps, invertedIndexFieldsForNonBulkLoadOps);
@@ -107,7 +107,7 @@
                         mergePolicyFactory.createMergePolicy(mergePolicyProperties,
                                 runtimeContextProvider.getIndexLifecycleManager()),
                         new BaseOperationTracker((DatasetLifecycleManager) runtimeContextProvider
-                                .getIndexLifecycleManager(), datasetID), runtimeContextProvider.getLSMIOScheduler(),
+                                .getIndexLifecycleManager(), datasetID, ((DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager()).getDatasetInfo(datasetID)), runtimeContextProvider.getLSMIOScheduler(),
                         LSMInvertedIndexIOOperationCallbackFactory.INSTANCE.createIOOperationCallback(),
                         invertedIndexFields, filterTypeTraits, filterCmpFactories, filterFields,
                         filterFieldsForNonBulkLoadOps, invertedIndexFieldsForNonBulkLoadOps);
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java
index e7b0fa9..6b0597d 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java
@@ -80,7 +80,7 @@
                     valueProviderFactories, rtreePolicyType, runtimeContextProvider.getBloomFilterFalsePositiveRate(),
                     mergePolicyFactory.createMergePolicy(mergePolicyProperties,
                             runtimeContextProvider.getIndexLifecycleManager()), new BaseOperationTracker(
-                            (DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager(), datasetID),
+                            (DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager(), datasetID, ((DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager()).getDatasetInfo(datasetID)),
                     runtimeContextProvider.getLSMIOScheduler(), LSMRTreeIOOperationCallbackFactory.INSTANCE
                             .createIOOperationCallback(), linearizeCmpFactory, rtreeFields, btreeFields,
                     filterTypeTraits, filterCmpFactories, filterFields);
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointThread.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointThread.java
index a9d1cf2..0e21746 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointThread.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointThread.java
@@ -38,6 +38,9 @@
 
     @Override
     public void run() {
+        
+        Thread.currentThread().setName("Checkpoint Thread");
+
         long currentCheckpointAttemptMinLSN = -1;
         long lastCheckpointLSN = -1;
         long currentLogLSN = 0;
@@ -87,4 +90,4 @@
         }
     }
 
-}
+}
\ No newline at end of file
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
index 492f020..ea30846 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -848,4 +848,4 @@
         }
         return true;
     }
-}
+}
\ No newline at end of file