another bug fix(issue437: lost log record) and code clean up
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileBasedBuffer.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileBasedBuffer.java
index c08462f..354a745 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileBasedBuffer.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileBasedBuffer.java
@@ -19,6 +19,9 @@
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
  * Represent a buffer that is backed by a physical file. Provider custom APIs
@@ -35,6 +38,9 @@
     private int bufferNextWriteOffset;
     private final int diskSectorSize;
 
+    private final ReadWriteLock latch;
+    private final AtomicInteger referenceCount;
+
     public FileBasedBuffer(String filePath, long offset, int bufferSize, int diskSectorSize) throws IOException {
         this.filePath = filePath;
         buffer = ByteBuffer.allocate(bufferSize);
@@ -48,6 +54,8 @@
         bufferLastFlushOffset = 0;
         bufferNextWriteOffset = 0;
         this.diskSectorSize = diskSectorSize;
+        this.latch = new ReentrantReadWriteLock(true);
+        referenceCount = new AtomicInteger(0);
     }
 
     public String getFilePath() {
@@ -200,4 +208,38 @@
         }
     }
 
+    @Override
+    public void acquireWriteLatch() {
+        latch.writeLock().lock();
+    }
+
+    @Override
+    public void releaseWriteLatch() {
+        latch.writeLock().unlock();
+    }
+
+    @Override
+    public void acquireReadLatch() {
+        latch.readLock().lock();
+    }
+
+    @Override
+    public void releaseReadLatch() {
+        latch.readLock().unlock();
+    }
+
+    @Override
+    public void incRefCnt() {
+        referenceCount.incrementAndGet();
+    }
+    
+    @Override
+    public void decRefCnt() {
+        referenceCount.decrementAndGet();
+    }
+    
+    @Override
+    public int getRefCnt() {
+        return referenceCount.get();
+    }
 }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IFileBasedBuffer.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IFileBasedBuffer.java
index ce13df4..4f6d2b8 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IFileBasedBuffer.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IFileBasedBuffer.java
@@ -46,5 +46,19 @@
     int getBufferNextWriteOffset();
 
     void setBufferNextWriteOffset(int offset);
+    
+    public void acquireWriteLatch();
+
+    public void releaseWriteLatch();
+
+    public void acquireReadLatch();
+
+    public void releaseReadLatch();
+    
+    public void incRefCnt();
+    
+    public void decRefCnt();
+    
+    public int getRefCnt();
 
 }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogManager.java
index 9f20b5d..26229a7 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogManager.java
@@ -61,15 +61,6 @@
     public void readLog(long lsnValue, LogicalLogLocator logicalLogLocator) throws ACIDException;
 
     /**
-     * Flushes the log records up to the lsn represented by the
-     * logicalLogLocator
-     * 
-     * @param logicalLogLocator
-     * @throws ACIDException
-     */
-    public void flushLog(LogicalLogLocator logicalLogLocator) throws ACIDException;
-
-    /**
      * Retrieves the configuration parameters of the ILogManager
      * 
      * @return LogManagerProperties: the configuration parameters for the
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogRecordHelper.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogRecordHelper.java
index 80f74cb..ce5649a 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogRecordHelper.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogRecordHelper.java
@@ -61,5 +61,7 @@
     int getLogHeaderSize(byte logType);
 
     int getLogChecksumSize();
+    
+    int getCommitLogSize();
 
 }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
index 36d052d..333a661 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
@@ -27,15 +27,12 @@
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
 import edu.uci.ics.asterix.transaction.management.service.logging.IndexLogger.ReusableLogContentObject;
-import edu.uci.ics.asterix.transaction.management.service.logging.LogManager.PageOwnershipStatus;
-import edu.uci.ics.asterix.transaction.management.service.logging.LogManager.PageState;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
@@ -48,6 +45,9 @@
     private final TransactionSubsystem provider;
     private LogManagerProperties logManagerProperties;
     private LogPageFlushThread logPageFlusher;
+    private final int logPageSize;
+    private long statLogSize;
+    private long statLogCount;
 
     /*
      * the array of log pages. The number of log pages is configurable. Pages
@@ -62,47 +62,6 @@
      */
     private int numLogPages;
 
-    /*
-     * Initially all pages have an owner count of 1 that is the LogManager. When
-     * a transaction requests to write in a log page, the owner count is
-     * incremented. The log manager reserves space in the log page and puts in
-     * the log header but leaves the space for the content and the checksum
-     * (covering the whole log record). When the content has been put, the log
-     * manager computes the checksum and puts it after the content. At this
-     * point, the ownership count is decremented as the transaction is done with
-     * using the page. When a page is requested to be flushed, logPageFlusher
-     * set the count to 0(LOG_FLUSHER: meaning that the page is being flushed)
-     * only if the count is 1(LOG_WRITER: meaning that there is no other
-     * transactions who own the page to write logs.) After flushing the page,
-     * logPageFlusher set this count to 1.
-     */
-    private AtomicInteger[] logPageOwnerCount;
-
-    static class PageOwnershipStatus {
-        public static final int LOG_WRITER = 1;
-        public static final int LOG_FLUSHER = 0;
-    }
-
-    /*
-     * LogPageStatus: A page is either ACTIVE or INACTIVE. The status for each
-     * page is maintained in logPageStatus. A page is ACTIVE when the LogManager
-     * can allocate space in the page for writing a log record. Initially all
-     * pages are ACTIVE. As transactions fill up space by writing log records, a
-     * page may not have sufficient space left for serving a request by a
-     * transaction. When this happens, the page is flushed to disk by calling
-     * logPageFlusher.requestFlush(). In the requestFlush(), after
-     * groupCommitWaitTime, the page status is set to INACTIVE. Then, there is
-     * no more writer on the page(meaning the corresponding logPageOwnerCount is
-     * 1), the page is flushed by the logPageFlusher and the status is reset to
-     * ACTIVE by the logPageFlusher.
-     */
-    private AtomicInteger[] logPageStatus;
-
-    static class PageState {
-        public static final int INACTIVE = 0;
-        public static final int ACTIVE = 1;
-    }
-
     private AtomicLong lastFlushedLSN = new AtomicLong(-1);
 
     /*
@@ -129,10 +88,6 @@
         return lastFlushedLSN;
     }
 
-    public AtomicInteger getLogPageStatus(int pageIndex) {
-        return logPageStatus[pageIndex];
-    }
-
     public AtomicLong getCurrentLsn() {
         return lsn;
     }
@@ -144,13 +99,19 @@
     public LogManager(TransactionSubsystem provider) throws ACIDException {
         this.provider = provider;
         initLogManagerProperties(this.provider.getId());
+        logPageSize = logManagerProperties.getLogPageSize();
         initLogManager();
+        statLogSize = 0;
+        statLogCount = 0;
     }
 
     public LogManager(TransactionSubsystem provider, String nodeId) throws ACIDException {
         this.provider = provider;
         initLogManagerProperties(nodeId);
+        logPageSize = logManagerProperties.getLogPageSize();
         initLogManager();
+        statLogSize = 0;
+        statLogCount = 0;
     }
 
     /*
@@ -186,9 +147,6 @@
     private void initLogManager() throws ACIDException {
         logRecordHelper = new LogRecordHelper(this);
         numLogPages = logManagerProperties.getNumLogPages();
-        logPageOwnerCount = new AtomicInteger[numLogPages];
-        logPageStatus = new AtomicInteger[numLogPages];
-
         activeTxnCountMaps = new ArrayList<HashMap<TransactionContext, Integer>>(numLogPages);
         for (int i = 0; i < numLogPages; i++) {
             activeTxnCountMaps.add(new HashMap<TransactionContext, Integer>());
@@ -202,14 +160,6 @@
         initLSN();
 
         /*
-         * initialize meta data for each log page.
-         */
-        for (int i = 0; i < numLogPages; i++) {
-            logPageOwnerCount[i] = new AtomicInteger(PageOwnershipStatus.LOG_WRITER);
-            logPageStatus[i] = new AtomicInteger(PageState.ACTIVE);
-        }
-
-        /*
          * initialize the log pages.
          */
         initializeLogPages(startingLSN);
@@ -226,7 +176,7 @@
     }
 
     public int getLogPageIndex(long lsnValue) {
-        return (int) (((lsnValue - startingLSN) / logManagerProperties.getLogPageSize()) % numLogPages);
+        return (int) (((lsnValue - startingLSN) / logPageSize) % numLogPages);
     }
 
     /*
@@ -242,28 +192,7 @@
      * record is (to be) placed.
      */
     public int getLogPageOffset(long lsnValue) {
-        return (int) (lsnValue % logManagerProperties.getLogPageSize());
-    }
-
-    /*
-     * a transaction thread under certain scenarios is required to wait until
-     * the page where it has to write a log record becomes available for writing
-     * a log record.
-     */
-    private void waitUntillPageIsAvailableForWritingLog(int pageIndex) throws ACIDException {
-        if (logPageStatus[pageIndex].get() == PageState.ACTIVE
-                && logPageOwnerCount[pageIndex].get() >= PageOwnershipStatus.LOG_WRITER) {
-            return;
-        }
-        try {
-            synchronized (logPages[pageIndex]) {
-                while (!(logPageStatus[pageIndex].get() == PageState.ACTIVE && logPageOwnerCount[pageIndex].get() >= PageOwnershipStatus.LOG_WRITER)) {
-                    logPages[pageIndex].wait();
-                }
-            }
-        } catch (InterruptedException e) {
-            throw new ACIDException(" thread interrupted while waiting for page " + pageIndex + " to be available ", e);
-        }
+        return (int) (lsnValue % logPageSize);
     }
 
     /*
@@ -277,7 +206,6 @@
      * @param logType: the type of log record.
      */
     private long getLsn(int entrySize, byte logType) throws ACIDException {
-        long pageSize = logManagerProperties.getLogPageSize();
 
         while (true) {
             boolean forwardPage = false;
@@ -294,9 +222,9 @@
 
             // check if the log record will cross page boundaries, a case that
             // is not allowed.
-            if ((next - 1) / pageSize != old / pageSize || (next % pageSize == 0)) {
+            if ((next - 1) / logPageSize != old / logPageSize || (next % logPageSize == 0)) {
 
-                if ((old != 0 && old % pageSize == 0)) {
+                if ((old != 0 && old % logPageSize == 0)) {
                     // On second thought, this shall never be the case as it
                     // means that the lsn is
                     // currently at the beginning of a page and we still need to
@@ -309,7 +237,7 @@
 
                 } else {
                     // set the lsn to point to the beginning of the next page.
-                    retVal = ((old / pageSize) + 1) * pageSize;
+                    retVal = ((old / logPageSize) + 1) * logPageSize;
                 }
 
                 next = retVal;
@@ -323,20 +251,6 @@
                 pageIndex = getNextPageInSequence(pageIndex);
             }
 
-            /*
-             * we do not want to keep allocating LSNs if the corresponding page
-             * is unavailable. Consider a scenario when the log flusher thread
-             * is incredibly slow in flushing pages. Transaction threads will
-             * acquire an lsn each for writing their next log record. When a
-             * page has been made available, mulltiple transaction threads that
-             * were waiting can continue to write their log record at the
-             * assigned LSNs. Two transaction threads may get LSNs that are on
-             * the same log page but actually differ by the size of the log
-             * buffer. This would be erroneous. Transaction threads are made to
-             * wait upfront for avoiding this situation.
-             */
-            waitUntillPageIsAvailableForWritingLog(pageIndex);
-
             if (!lsn.compareAndSet(old, next)) {
                 // Atomic call -> returns true only when the value represented
                 // by lsn is same as
@@ -347,7 +261,7 @@
             if (forwardPage) {
 
                 // forward the nextWriteOffset in the log page
-                logPages[pageIndex].setBufferNextWriteOffset(logManagerProperties.getLogPageSize());
+                logPages[pageIndex].setBufferNextWriteOffset(logPageSize);
 
                 addFlushRequest(prevPage, old, false);
 
@@ -356,21 +270,18 @@
                 continue;
 
             } else {
-                // the transaction thread has been given a space in a log page,
-                // but is made to wait until the page is available.
-                // (Is this needed? when does this wait happen?)
-                waitUntillPageIsAvailableForWritingLog(pageIndex);
-
+                logPages[pageIndex].acquireReadLatch();
                 // increment the counter as the transaction thread now holds a
                 // space in the log page and hence is an owner.
-                logPageOwnerCount[pageIndex].incrementAndGet();
+                logPages[pageIndex].incRefCnt();
+                logPages[pageIndex].releaseReadLatch();
 
                 // Before the count is incremented, if the flusher flushed the
                 // allocated page,
                 // then retry to get new LSN. Otherwise, the log with allocated
                 // lsn will be lost.
                 if (lastFlushedLSN.get() >= retVal) {
-                    logPageOwnerCount[pageIndex].decrementAndGet();
+                    logPages[pageIndex].decRefCnt();
                     continue;
                 }
             }
@@ -400,10 +311,10 @@
         int totalLogSize = logRecordHelper.getLogRecordSize(logType, logContentSize);
 
         // check for the total space requirement to be less than a log page.
-        if (totalLogSize > logManagerProperties.getLogPageSize()) {
+        if (totalLogSize > logPageSize) {
             throw new ACIDException(
                     " Maximum Log Content Size is "
-                            + (logManagerProperties.getLogPageSize() - logRecordHelper.getLogHeaderSize(LogType.UPDATE) - logRecordHelper
+                            + (logPageSize - logRecordHelper.getLogHeaderSize(LogType.UPDATE) - logRecordHelper
                                     .getLogChecksumSize()));
         }
 
@@ -487,9 +398,9 @@
                     checksum);
 
             // forward the nextWriteOffset in the log page
-            int bufferNextWriteOffset = (int) ((currentLSN + totalLogSize) % logManagerProperties.getLogPageSize());
+            int bufferNextWriteOffset = (int) ((currentLSN + totalLogSize) % logPageSize);
             if (bufferNextWriteOffset == 0) {
-                bufferNextWriteOffset = logManagerProperties.getLogPageSize();
+                bufferNextWriteOffset = logPageSize;
             }
             logPages[pageIndex].setBufferNextWriteOffset(bufferNextWriteOffset);
 
@@ -499,7 +410,11 @@
 
             // release the ownership as the log record has been placed in
             // created space.
-            logPageOwnerCount[pageIndex].decrementAndGet();
+            logPages[pageIndex].decRefCnt();
+
+            //collect statistics
+            statLogSize += totalLogSize;
+            statLogCount++;
 
             // indicating that the transaction thread has released ownership
             decremented = true;
@@ -516,6 +431,9 @@
                 addFlushRequest(pageIndex, currentLSN, false);
             } else if (logType == LogType.COMMIT) {
                 addFlushRequest(pageIndex, currentLSN, true);
+                if (IS_DEBUG_MODE) {
+                    System.out.println("Running sum of log size: " + statLogSize + ", log count: " + statLogCount);
+                }
             }
 
         } catch (Exception e) {
@@ -524,7 +442,7 @@
                     + " logger encountered exception", e);
         } finally {
             if (!decremented) {
-                logPageOwnerCount[pageIndex].decrementAndGet();
+                logPages[pageIndex].decRefCnt();
             }
         }
     }
@@ -537,14 +455,13 @@
 
         String filePath = LogUtil.getLogFilePath(logManagerProperties, getLogFileId(lsn));
 
-        logPages[pageIndex].reset(filePath, LogUtil.getFileOffset(this, nextWritePosition),
-                logManagerProperties.getLogPageSize());
+        logPages[pageIndex].reset(filePath, LogUtil.getFileOffset(this, nextWritePosition), logPageSize);
     }
 
     @Override
     public ILogCursor readLog(PhysicalLogLocator physicalLogLocator, ILogFilter logFilter) throws IOException,
             ACIDException {
-        LogCursor cursor = new LogCursor(this, physicalLogLocator, logFilter, logManagerProperties.getLogPageSize());
+        LogCursor cursor = new LogCursor(this, physicalLogLocator, logFilter, logPageSize);
         return cursor;
     }
 
@@ -555,7 +472,7 @@
         String filePath = LogUtil.getLogFilePath(logManagerProperties, LogUtil.getFileId(this, lsnValue));
         long fileOffset = LogUtil.getFileOffset(this, lsnValue);
 
-        ByteBuffer buffer = ByteBuffer.allocate(logManagerProperties.getLogPageSize());
+        ByteBuffer buffer = ByteBuffer.allocate(logPageSize);
         RandomAccessFile raf = null;
         FileChannel fileChannel = null;
         try {
@@ -616,7 +533,7 @@
             // minimize memory allocation overhead. current code allocates the
             // log page size per reading a log record.
 
-            byte[] pageContent = new byte[logManagerProperties.getLogPageSize()];
+            byte[] pageContent = new byte[logPageSize];
 
             // take a lock on the log page so that the page is not flushed to
             // disk interim
@@ -663,8 +580,11 @@
 
     public boolean isMemoryRead(long currentLSN) {
         long flushLSN = lastFlushedLSN.get();
-        long logPageBeginOffset = flushLSN - (flushLSN % logManagerProperties.getLogPageSize());
-        long logPageEndOffset = logPageBeginOffset + logManagerProperties.getLogPageSize();
+        if ((flushLSN + 1) % logPageSize == 0) {
+            return false;
+        }
+        long logPageBeginOffset = flushLSN - (flushLSN % logPageSize);
+        long logPageEndOffset = logPageBeginOffset + logPageSize;
         if (currentLSN > flushLSN || (currentLSN >= logPageBeginOffset && currentLSN < logPageEndOffset)) {
             return true;
         } else {
@@ -710,9 +630,7 @@
         try {
             String filePath = LogUtil.getLogFilePath(logManagerProperties, LogUtil.getFileId(this, startingLSN));
             for (int i = 0; i < numLogPages; i++) {
-                logPages[i].open(filePath,
-                        LogUtil.getFileOffset(this, startingLSN) + i * logManagerProperties.getLogPageSize(),
-                        logManagerProperties.getLogPageSize());
+                logPages[i].open(filePath, LogUtil.getFileOffset(this, startingLSN) + i * logPageSize, logPageSize);
             }
         } catch (Exception e) {
             throw new ACIDException(Thread.currentThread().getName() + " unable to create log buffer", e);
@@ -725,39 +643,23 @@
     }
 
     /*
-     * This method shall be called by the Buffer manager when it needs to evict
-     * a page from the cache. TODO: Change the implementation from a looping
-     * logic to event based when log manager support is integrated with the
-     * Buffer Manager.
-     */
-    @Override
-    public synchronized void flushLog(LogicalLogLocator logicalLogLocator) throws ACIDException {
-        if (logicalLogLocator.getLsn() > lsn.get()) {
-            throw new ACIDException(" invalid lsn " + logicalLogLocator.getLsn());
-        }
-        while (lastFlushedLSN.get() < logicalLogLocator.getLsn());
-    }
-
-    /*
      * Map each log page to cover a physical byte range over a log file. When a
      * page is flushed, the page contents are put to disk in the corresponding
      * byte range.
      */
     private void initializeLogPages(long beginLsn) throws ACIDException {
         try {
-            String filePath = LogUtil.getLogFilePath(logManagerProperties,
-                    LogUtil.getFileId(this, beginLsn));
+            String filePath = LogUtil.getLogFilePath(logManagerProperties, LogUtil.getFileId(this, beginLsn));
             long nextDiskWriteOffset = LogUtil.getFileOffset(this, beginLsn);
-            long nextBufferWriteOffset = nextDiskWriteOffset % logManagerProperties.getLogPageSize();
+            long nextBufferWriteOffset = nextDiskWriteOffset % logPageSize;
             long bufferBeginOffset = nextDiskWriteOffset - nextBufferWriteOffset;
 
             for (int i = 0; i < numLogPages; i++) {
-                logPages[i] = FileUtil.getFileBasedBuffer(filePath,
-                        bufferBeginOffset + i * logManagerProperties.getLogPageSize(),
-                        logManagerProperties.getLogPageSize(), logManagerProperties.getDiskSectorSize());
+                logPages[i] = FileUtil.getFileBasedBuffer(filePath, bufferBeginOffset + i * logPageSize, logPageSize,
+                        logManagerProperties.getDiskSectorSize());
                 if (i == 0) {
-                    logPages[i].setBufferLastFlushOffset((int)nextBufferWriteOffset);
-                    logPages[i].setBufferNextWriteOffset((int)nextBufferWriteOffset);
+                    logPages[i].setBufferLastFlushOffset((int) nextBufferWriteOffset);
+                    logPages[i].setBufferNextWriteOffset((int) nextBufferWriteOffset);
                     logPages[i].setDiskNextWriteOffset(nextDiskWriteOffset);
                 }
             }
@@ -787,10 +689,6 @@
         return logPages[pageIndex];
     }
 
-    public AtomicInteger getLogPageOwnershipCount(int pageIndex) {
-        return logPageOwnerCount[pageIndex];
-    }
-
     public IFileBasedBuffer[] getLogPages() {
         return logPages;
     }
@@ -930,64 +828,62 @@
                 }
 
                 synchronized (logManager.getLogPage(flushPageIndex)) {
+                    logManager.getLogPage(flushPageIndex).acquireWriteLatch();
+                    try {
 
-                    // #. sleep for the groupCommitWaitTime
-                    sleep(groupCommitWaitPeriod);
+                        //if the log page is already full, don't wait. 
+                        if (logManager.getLogPage(flushPageIndex).getBufferNextWriteOffset() > logPageSize
+                                - logManager.getLogRecordHelper().getCommitLogSize()) {
+                            // #. sleep for the groupCommitWaitTime
+                            sleep(groupCommitWaitPeriod);
+                        }
 
-                    // #. set the logPageStatus to INACTIVE in order to prevent
-                    // other txns from writing on this page.
-                    logManager.getLogPageStatus(flushPageIndex).set(PageState.INACTIVE);
+                        // #. need to wait until the reference count reaches 0
+                        while (logManager.getLogPage(flushPageIndex).getRefCnt() != 0) {
+                            sleep(0);
+                        }
 
-                    // #. need to wait until the logPageOwnerCount reaches 1
-                    // (LOG_WRITER)
-                    // meaning every one has finished writing logs on this page.
-                    while (logManager.getLogPageOwnershipCount(flushPageIndex).get() != PageOwnershipStatus.LOG_WRITER) {
-                        sleep(0);
+                        beforeFlushOffset = logManager.getLogPage(flushPageIndex).getBufferLastFlushOffset();
+
+                        // put the content to disk (the thread still has a lock on the log page)
+                        logManager.getLogPage(flushPageIndex).flush();
+
+                        afterFlushOffset = logManager.getLogPage(flushPageIndex).getBufferLastFlushOffset();
+
+                        // increment the last flushed lsn
+                        logManager.incrementLastFlushedLsn(afterFlushOffset - beforeFlushOffset);
+
+                        // increment currentLSN if currentLSN is less than flushLSN.
+                        if (logManager.getLastFlushedLsn().get() + 1 > logManager.getCurrentLsn().get()) {
+                            logManager.getCurrentLsn().set(logManager.getLastFlushedLsn().get() + 1);
+                        }
+
+                        // Map the log page to a new region in the log file if the flushOffset reached the logPageSize
+                        if (afterFlushOffset == logPageSize) {
+                            long diskNextWriteOffset = logManager.getLogPages()[flushPageIndex]
+                                    .getDiskNextWriteOffset() + logBufferSize;
+                            logManager.resetLogPage(logManager.getLastFlushedLsn().get() + 1 + logBufferSize,
+                                    diskNextWriteOffset, flushPageIndex);
+                            resetFlushPageIndex = true;
+                        }
+
+                        // decrement activeTxnCountOnIndexes
+                        logManager.decrementActiveTxnCountOnIndexes(flushPageIndex);
+
+                        // #. checks the queue whether there is another flush
+                        // request on the same log buffer
+                        // If there is another request, then simply remove it.
+                        if (flushRequestQueue[flushPageIndex].peek() != null) {
+                            flushRequestQueue[flushPageIndex].take();
+                        }
+
+                        // notify all waiting (transaction) threads.
+                        logManager.getLogPage(flushPageIndex).notifyAll();
+
+                    } finally {
+                        logManager.getLogPage(flushPageIndex).releaseWriteLatch();
                     }
 
-                    // #. set the logPageOwnerCount to 0 (LOG_FLUSHER)
-                    // meaning it is flushing.
-                    logManager.getLogPageOwnershipCount(flushPageIndex).set(PageOwnershipStatus.LOG_FLUSHER);
-
-                    beforeFlushOffset = logManager.getLogPage(flushPageIndex).getBufferLastFlushOffset();
-
-                    // put the content to disk (the thread still has a lock on
-                    // the log page)
-                    logManager.getLogPage(flushPageIndex).flush();
-
-                    afterFlushOffset = logManager.getLogPage(flushPageIndex).getBufferLastFlushOffset();
-
-                    // increment the last flushed lsn
-                    logManager.incrementLastFlushedLsn(afterFlushOffset - beforeFlushOffset);
-
-                    // Map the log page to a new region in the log file if the flushOffset reached the logPageSize
-                    if (afterFlushOffset == logPageSize) {
-                        long diskNextWriteOffset = logManager.getLogPages()[flushPageIndex].getDiskNextWriteOffset()
-                                + logBufferSize;
-                        logManager.resetLogPage(logManager.getLastFlushedLsn().get() + 1 + logBufferSize,
-                                diskNextWriteOffset, flushPageIndex);
-                        resetFlushPageIndex = true;
-                    }
-
-                    // decrement activeTxnCountOnIndexes
-                    logManager.decrementActiveTxnCountOnIndexes(flushPageIndex);
-
-                    // reset the count to 1
-                    logManager.getLogPageOwnershipCount(flushPageIndex).set(PageOwnershipStatus.LOG_WRITER);
-
-                    // mark the page as ACTIVE
-                    logManager.getLogPageStatus(flushPageIndex).set(LogManager.PageState.ACTIVE);
-
-                    // #. checks the queue whether there is another flush
-                    // request on the same log buffer
-                    // If there is another request, then simply remove it.
-                    if (flushRequestQueue[flushPageIndex].peek() != null) {
-                        flushRequestQueue[flushPageIndex].take();
-                    }
-
-                    // notify all waiting (transaction) threads.
-                    logManager.getLogPage(flushPageIndex).notifyAll();
-
                     if (resetFlushPageIndex) {
                         flushPageIndex = logManager.getNextPageInSequence(flushPageIndex);
                         resetFlushPageIndex = false;
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java
index 8eae204..e46b7ac 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java
@@ -61,6 +61,8 @@
     private final int RESOURCE_MGR_ID_POS = 33;
     private final int LOG_RECORD_SIZE_POS = 34;
     
+    public final int COMMIT_LOG_SIZE = PREV_LSN_POS + LOG_CHECKSUM_SIZE;
+    
     private ILogManager logManager;
 
     public LogRecordHelper(ILogManager logManager) {
@@ -236,7 +238,7 @@
         if (logType == LogType.UPDATE) {
             return 46 + logBodySize;
         } else {
-            return 25;
+            return COMMIT_LOG_SIZE;
         }
     }
 
@@ -253,4 +255,8 @@
     public int getLogChecksumSize() {
         return LOG_CHECKSUM_SIZE;
     }
+    
+    public int getCommitLogSize() {
+        return COMMIT_LOG_SIZE;
+    }
 }