merged master to variable log page size issue branch
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileBasedBuffer.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileBasedBuffer.java
index 4319b8b..e88d74e 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileBasedBuffer.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileBasedBuffer.java
@@ -19,6 +19,9 @@
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
  * Represent a buffer that is backed by a physical file. Provider custom APIs
@@ -27,22 +30,32 @@
 public class FileBasedBuffer extends Buffer implements IFileBasedBuffer {
 
     private String filePath;
-    private long nextWritePosition;
     private FileChannel fileChannel;
     private RandomAccessFile raf;
-    private int size;
+    private int bufferSize;
 
-    public FileBasedBuffer(String filePath, long offset, int size) throws IOException {
+    private int bufferLastFlushOffset;
+    private int bufferNextWriteOffset;
+    private final int diskSectorSize;
+
+    private final ReadWriteLock latch;
+    private final AtomicInteger referenceCount;
+
+    public FileBasedBuffer(String filePath, long offset, int bufferSize, int diskSectorSize) throws IOException {
         this.filePath = filePath;
-        this.nextWritePosition = offset;
-        buffer = ByteBuffer.allocate(size);
+        buffer = ByteBuffer.allocate(bufferSize);
         raf = new RandomAccessFile(new File(filePath), "rw");
-        raf.seek(offset);
         fileChannel = raf.getChannel();
+        fileChannel.position(offset);
         fileChannel.read(buffer);
         buffer.position(0);
-        this.size = size;
-        buffer.limit(size);
+        this.bufferSize = bufferSize;
+        buffer.limit(bufferSize);
+        bufferLastFlushOffset = 0;
+        bufferNextWriteOffset = 0;
+        this.diskSectorSize = diskSectorSize;
+        latch = new ReentrantReadWriteLock(true);
+        referenceCount = new AtomicInteger(0);
     }
 
     public String getFilePath() {
@@ -53,17 +66,9 @@
         this.filePath = filePath;
     }
 
-    public long getOffset() {
-        return nextWritePosition;
-    }
-
-    public void setOffset(long offset) {
-        this.nextWritePosition = offset;
-    }
-
     @Override
     public int getSize() {
-        return buffer.limit();
+        return bufferSize;
     }
 
     public void clear() {
@@ -72,11 +77,18 @@
 
     @Override
     public void flush() throws IOException {
-        buffer.position(0);
-        buffer.limit(size);
+        //flush
+        int pos = bufferLastFlushOffset;
+        int limit = (((bufferNextWriteOffset - 1) / diskSectorSize) + 1) * diskSectorSize;
+        buffer.position(pos);
+        buffer.limit(limit);
         fileChannel.write(buffer);
         fileChannel.force(true);
-        erase();
+
+        //update variables
+        bufferLastFlushOffset = limit;
+        bufferNextWriteOffset = limit;
+        buffer.limit(bufferSize);
     }
 
     @Override
@@ -124,45 +136,110 @@
      * starting at offset.
      */
     @Override
-    public void reset(String filePath, long nextWritePosition, int size) throws IOException {
+    public void reset(String filePath, long diskNextWriteOffset, int bufferSize) throws IOException {
         if (!filePath.equals(this.filePath)) {
             raf.close();//required?
             fileChannel.close();
             raf = new RandomAccessFile(filePath, "rw");
             this.filePath = filePath;
         }
-        this.nextWritePosition = nextWritePosition;
-        raf.seek(nextWritePosition);
         fileChannel = raf.getChannel();
+        fileChannel.position(diskNextWriteOffset);
         erase();
         buffer.position(0);
-        buffer.limit(size);
-        this.size = size;
+        buffer.limit(bufferSize);
+        this.bufferSize = bufferSize;
+
+        bufferLastFlushOffset = 0;
+        bufferNextWriteOffset = 0;
     }
-    
+
     @Override
     public void close() throws IOException {
         fileChannel.close();
     }
-    
+
     @Override
-    public void open(String filePath, long offset, int size) throws IOException {
+    public void open(String filePath, long offset, int bufferSize) throws IOException {
         raf = new RandomAccessFile(filePath, "rw");
-        this.nextWritePosition = offset;
         fileChannel = raf.getChannel();
         fileChannel.position(offset);
         erase();
         buffer.position(0);
-        buffer.limit(size);
-        this.size = size;
+        buffer.limit(bufferSize);
+        this.bufferSize = bufferSize;
+        bufferLastFlushOffset = 0;
+        bufferNextWriteOffset = 0;
     }
 
-    public long getNextWritePosition() {
-        return nextWritePosition;
+    @Override
+    public long getDiskNextWriteOffset() throws IOException {
+        return fileChannel.position();
     }
 
-    public void setNextWritePosition(long nextWritePosition) {
-        this.nextWritePosition = nextWritePosition;
+    @Override
+    public void setDiskNextWriteOffset(long offset) throws IOException {
+        fileChannel.position(offset);
     }
 
+    @Override
+    public int getBufferLastFlushOffset() {
+        return bufferLastFlushOffset;
+    }
+
+    @Override
+    public void setBufferLastFlushOffset(int offset) {
+        this.bufferLastFlushOffset = offset;
+    }
+
+    @Override
+    public int getBufferNextWriteOffset() {
+        synchronized (fileChannel) {
+            return bufferNextWriteOffset;
+        }
+    }
+
+    @Override
+    public void setBufferNextWriteOffset(int offset) {
+        synchronized (fileChannel) {
+            if (bufferNextWriteOffset < offset) {
+                bufferNextWriteOffset = offset;
+            }
+        }
+    }
+
+    @Override
+    public void acquireWriteLatch() {
+        latch.writeLock().lock();
+    }
+
+    @Override
+    public void releaseWriteLatch() {
+        latch.writeLock().unlock();
+    }
+
+    @Override
+    public void acquireReadLatch() {
+        latch.readLock().lock();
+    }
+
+    @Override
+    public void releaseReadLatch() {
+        latch.readLock().unlock();
+    }
+
+    @Override
+    public void incRefCnt() {
+        referenceCount.incrementAndGet();
+    }
+    
+    @Override
+    public void decRefCnt() {
+        referenceCount.decrementAndGet();
+    }
+    
+    @Override
+    public int getRefCnt() {
+        return referenceCount.get();
+    }
 }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileUtil.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileUtil.java
index f2ffa0e..46e03f1 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileUtil.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileUtil.java
@@ -38,8 +38,8 @@
         return (new File(path)).mkdir();
     }
 
-    public static IFileBasedBuffer getFileBasedBuffer(String filePath, long offset, int size) throws IOException {
-        IFileBasedBuffer fileBasedBuffer = new FileBasedBuffer(filePath, offset, size);
+    public static IFileBasedBuffer getFileBasedBuffer(String filePath, long offset, int bufferSize, int diskSectorSize) throws IOException {
+        IFileBasedBuffer fileBasedBuffer = new FileBasedBuffer(filePath, offset, bufferSize, diskSectorSize);
         return fileBasedBuffer;
     }
 
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IFileBasedBuffer.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IFileBasedBuffer.java
index e1f9f95..a4ea3cb 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IFileBasedBuffer.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IFileBasedBuffer.java
@@ -31,12 +31,34 @@
      */
     public void reset(String filePath, long offset, int size) throws IOException;
 
-    public long getNextWritePosition();
+    public long getDiskNextWriteOffset() throws IOException;
 
-    public void setNextWritePosition(long writePosition);
+    public void setDiskNextWriteOffset(long writePosition) throws IOException;
 
     public void close() throws IOException;
     
     public void open(String filePath, long offset, int size) throws IOException;
 
+    public int getBufferLastFlushOffset();
+
+    public void setBufferLastFlushOffset(int offset);
+
+    public int getBufferNextWriteOffset();
+
+    public void setBufferNextWriteOffset(int offset);
+    
+    public void acquireWriteLatch();
+
+    public void releaseWriteLatch();
+
+    public void acquireReadLatch();
+
+    public void releaseReadLatch();
+    
+    public void incRefCnt();
+    
+    public void decRefCnt();
+    
+    public int getRefCnt();
+
 }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogManager.java
index c629d03..26229a7 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogManager.java
@@ -53,17 +53,6 @@
             ACIDException;
 
     /**
-     * Provides a cursor for retrieving logs that satisfy a given ILogFilter
-     * instance. Log records are retrieved in increasing order of lsn
-     * 
-     * @param logFilter
-     *            specifies the filtering criteria for the retrieved logs
-     * @return LogCursor an iterator for the retrieved logs
-     * @throws ACIDException
-     */
-    public ILogCursor readLog(ILogFilter logFilter) throws ACIDException;
-
-    /**
      * @param logicalLogLocator TODO
      * @param PhysicalLogLocator
      *            specifies the location of the log record to be read
@@ -72,15 +61,6 @@
     public void readLog(long lsnValue, LogicalLogLocator logicalLogLocator) throws ACIDException;
 
     /**
-     * Flushes the log records up to the lsn represented by the
-     * logicalLogLocator
-     * 
-     * @param logicalLogLocator
-     * @throws ACIDException
-     */
-    public void flushLog(LogicalLogLocator logicalLogLocator) throws ACIDException;
-
-    /**
      * Retrieves the configuration parameters of the ILogManager
      * 
      * @return LogManagerProperties: the configuration parameters for the
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogRecordHelper.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogRecordHelper.java
index 80f74cb..0e24f9d 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogRecordHelper.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogRecordHelper.java
@@ -25,41 +25,43 @@
 
 public interface ILogRecordHelper {
 
-    byte getLogType(LogicalLogLocator logicalLogLocator);
+    public byte getLogType(LogicalLogLocator logicalLogLocator);
 
-    int getJobId(LogicalLogLocator logicalLogLocator);
+    public int getJobId(LogicalLogLocator logicalLogLocator);
 
-    int getDatasetId(LogicalLogLocator logicalLogLocator);
+    public int getDatasetId(LogicalLogLocator logicalLogLocator);
 
-    int getPKHashValue(LogicalLogLocator logicalLogLocator);
+    public int getPKHashValue(LogicalLogLocator logicalLogLocator);
 
-    PhysicalLogLocator getPrevLSN(LogicalLogLocator logicalLogLocator);
+    public PhysicalLogLocator getPrevLSN(LogicalLogLocator logicalLogLocator);
 
-    boolean getPrevLSN(PhysicalLogLocator physicalLogLocator, LogicalLogLocator logicalLogLocator);
+    public boolean getPrevLSN(PhysicalLogLocator physicalLogLocator, LogicalLogLocator logicalLogLocator);
     
-    long getResourceId(LogicalLogLocator logicalLogLocator);
+    public long getResourceId(LogicalLogLocator logicalLogLocator);
     
-    byte getResourceMgrId(LogicalLogLocator logicalLogLocater);
+    public byte getResourceMgrId(LogicalLogLocator logicalLogLocater);
 
-    int getLogContentSize(LogicalLogLocator logicalLogLocater);
+    public int getLogContentSize(LogicalLogLocator logicalLogLocater);
 
-    long getLogChecksum(LogicalLogLocator logicalLogLocator);
+    public long getLogChecksum(LogicalLogLocator logicalLogLocator);
 
-    int getLogContentBeginPos(LogicalLogLocator logicalLogLocator);
+    public int getLogContentBeginPos(LogicalLogLocator logicalLogLocator);
 
-    int getLogContentEndPos(LogicalLogLocator logicalLogLocator);
+    public int getLogContentEndPos(LogicalLogLocator logicalLogLocator);
 
-    String getLogRecordForDisplay(LogicalLogLocator logicalLogLocator);
+    public String getLogRecordForDisplay(LogicalLogLocator logicalLogLocator);
 
-    void writeLogHeader(LogicalLogLocator logicalLogLocator, byte logType, TransactionContext context, int datasetId,
+    public void writeLogHeader(LogicalLogLocator logicalLogLocator, byte logType, TransactionContext context, int datasetId,
             int PKHashValue, long prevLogicalLogLocator, long resourceId, byte resourceMgrId, int logRecordSize);
 
-    boolean validateLogRecord(LogicalLogLocator logicalLogLocator);
+    public boolean validateLogRecord(LogicalLogLocator logicalLogLocator);
 
-    int getLogRecordSize(byte logType, int logBodySize);
+    public int getLogRecordSize(byte logType, int logBodySize);
 
-    int getLogHeaderSize(byte logType);
+    public int getLogHeaderSize(byte logType);
 
-    int getLogChecksumSize();
+    public int getLogChecksumSize();
+    
+    public int getCommitLogSize();
 
 }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java
index 8a2b188..7e954d8 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java
@@ -23,27 +23,16 @@
 
     private final LogManager logManager;
     private final ILogFilter logFilter;
+    private final int logPageSize;
     private IBuffer readOnlyBuffer;
     private LogicalLogLocator logicalLogLocator = null;
-    private long bufferIndex = 0;
-    private boolean firstNext = true;
-    private boolean readMemory = false;
-    private long readLSN = 0;
     private boolean needReloadBuffer = true;
 
-    /**
-     * @param logFilter
-     */
-    public LogCursor(final LogManager logManager, ILogFilter logFilter) throws ACIDException {
+    public LogCursor(final LogManager logManager, PhysicalLogLocator startingPhysicalLogLocator, ILogFilter logFilter,
+            int logPageSize) throws IOException, ACIDException {
         this.logFilter = logFilter;
         this.logManager = logManager;
-
-    }
-
-    public LogCursor(final LogManager logManager, PhysicalLogLocator startingPhysicalLogLocator, ILogFilter logFilter)
-            throws IOException, ACIDException {
-        this.logFilter = logFilter;
-        this.logManager = logManager;
+        this.logPageSize = logPageSize;
         initialize(startingPhysicalLogLocator);
     }
 
@@ -57,7 +46,8 @@
         File file = new File(filePath);
         if (file.exists()) {
             return FileUtil.getFileBasedBuffer(filePath, lsn
-                    % logManager.getLogManagerProperties().getLogPartitionSize(), size);
+                    % logManager.getLogManagerProperties().getLogPartitionSize(), size, logManager
+                    .getLogManagerProperties().getDiskSectorSize());
         } else {
             return null;
         }
@@ -87,8 +77,7 @@
             return false;
         }
 
-        //if the lsn to read is greater than the last flushed lsn, then read from memory
-        if (logicalLogLocator.getLsn() > logManager.getLastFlushedLsn().get()) {
+        if (logManager.isMemoryRead(logicalLogLocator.getLsn())) {
             return readFromMemory(currentLogLocator);
         }
 
@@ -96,10 +85,9 @@
         //needReloadBuffer is set to true if the log record is read from the memory log page.
         if (needReloadBuffer) {
             //log page size doesn't exceed integer boundary
-            int offset = (int)(logicalLogLocator.getLsn() % logManager.getLogManagerProperties().getLogPageSize());
+            int offset = (int) (logicalLogLocator.getLsn() % logPageSize);
             long adjustedLSN = logicalLogLocator.getLsn() - offset;
-            readOnlyBuffer = getReadOnlyBuffer(adjustedLSN, logManager.getLogManagerProperties()
-                    .getLogPageSize());
+            readOnlyBuffer = getReadOnlyBuffer(adjustedLSN, logPageSize);
             logicalLogLocator.setBuffer(readOnlyBuffer);
             logicalLogLocator.setMemoryOffset(offset);
             needReloadBuffer = false;
@@ -110,14 +98,14 @@
         while (logicalLogLocator.getMemoryOffset() <= readOnlyBuffer.getSize()
                 - logManager.getLogRecordHelper().getLogHeaderSize(LogType.COMMIT)) {
             integerRead = readOnlyBuffer.readInt(logicalLogLocator.getMemoryOffset());
-            if (integerRead == logManager.getLogManagerProperties().LOG_MAGIC_NUMBER) {
+            if (integerRead == LogManagerProperties.LOG_MAGIC_NUMBER) {
                 logRecordBeginPosFound = true;
                 break;
             }
             logicalLogLocator.increaseMemoryOffset(1);
             logicalLogLocator.incrementLsn();
             bytesSkipped++;
-            if (bytesSkipped > logManager.getLogManagerProperties().getLogPageSize()) {
+            if (bytesSkipped > logPageSize) {
                 return false; // the maximum size of a log record is limited to
                 // a log page size. If we have skipped as many
                 // bytes without finding a log record, it
@@ -133,10 +121,9 @@
             // need to reload the buffer
             // TODO
             // reduce IO by reading more pages(equal to logBufferSize) at a time.
-            long lsnpos = ((logicalLogLocator.getLsn() / logManager.getLogManagerProperties().getLogPageSize()) + 1)
-                    * logManager.getLogManagerProperties().getLogPageSize();
+            long lsnpos = ((logicalLogLocator.getLsn() / logPageSize) + 1) * logPageSize;
 
-            readOnlyBuffer = getReadOnlyBuffer(lsnpos, logManager.getLogManagerProperties().getLogPageSize());
+            readOnlyBuffer = getReadOnlyBuffer(lsnpos, logPageSize);
             if (readOnlyBuffer != null) {
                 logicalLogLocator.setBuffer(readOnlyBuffer);
                 logicalLogLocator.setLsn(lsnpos);
@@ -190,13 +177,12 @@
         IFileBasedBuffer logPage = logManager.getLogPage(pageIndex);
         synchronized (logPage) {
             // need to check again if the log record in the log buffer or has reached the disk
-            if (lsn > logManager.getLastFlushedLsn().get()) {
+            if (logManager.isMemoryRead(lsn)) {
 
                 //find the magic number to identify the start of the log record
                 //----------------------------------------------------------------
                 int readNumber = -1;
-                int logPageSize = logManager.getLogManagerProperties().getLogPageSize();
-                int logMagicNumber = logManager.getLogManagerProperties().LOG_MAGIC_NUMBER;
+                int logMagicNumber = LogManagerProperties.LOG_MAGIC_NUMBER;
                 int bytesSkipped = 0;
                 boolean logRecordBeginPosFound = false;
                 //check whether the currentOffset has enough space to have new log record by comparing
@@ -223,7 +209,8 @@
                     // need to read the next log page
                     readOnlyBuffer = null;
                     logicalLogLocator.setBuffer(null);
-                    logicalLogLocator.setLsn(lsn / logPageSize + 1);
+                    lsn = ((logicalLogLocator.getLsn() / logPageSize) + 1) * logPageSize;
+                    logicalLogLocator.setLsn(lsn);
                     logicalLogLocator.setMemoryOffset(0);
                     return next(currentLogLocator);
                 }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
index 9b8f09c..199fd0f 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
@@ -27,15 +27,12 @@
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
 import edu.uci.ics.asterix.transaction.management.service.logging.IndexLogger.ReusableLogContentObject;
-import edu.uci.ics.asterix.transaction.management.service.logging.LogManager.PageOwnershipStatus;
-import edu.uci.ics.asterix.transaction.management.service.logging.LogManager.PageState;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
@@ -48,6 +45,9 @@
     private final TransactionSubsystem provider;
     private LogManagerProperties logManagerProperties;
     private LogPageFlushThread logPageFlusher;
+    private final int logPageSize;
+    private long statLogSize;
+    private long statLogCount;
 
     /*
      * the array of log pages. The number of log pages is configurable. Pages
@@ -62,47 +62,6 @@
      */
     private int numLogPages;
 
-    /*
-     * Initially all pages have an owner count of 1 that is the LogManager. When
-     * a transaction requests to write in a log page, the owner count is
-     * incremented. The log manager reserves space in the log page and puts in
-     * the log header but leaves the space for the content and the checksum
-     * (covering the whole log record). When the content has been put, the log
-     * manager computes the checksum and puts it after the content. At this
-     * point, the ownership count is decremented as the transaction is done with
-     * using the page. When a page is requested to be flushed, logPageFlusher
-     * set the count to 0(LOG_FLUSHER: meaning that the page is being flushed)
-     * only if the count is 1(LOG_WRITER: meaning that there is no other
-     * transactions who own the page to write logs.) After flushing the page,
-     * logPageFlusher set this count to 1.
-     */
-    private AtomicInteger[] logPageOwnerCount;
-
-    static class PageOwnershipStatus {
-        public static final int LOG_WRITER = 1;
-        public static final int LOG_FLUSHER = 0;
-    }
-
-    /*
-     * LogPageStatus: A page is either ACTIVE or INACTIVE. The status for each
-     * page is maintained in logPageStatus. A page is ACTIVE when the LogManager
-     * can allocate space in the page for writing a log record. Initially all
-     * pages are ACTIVE. As transactions fill up space by writing log records, a
-     * page may not have sufficient space left for serving a request by a
-     * transaction. When this happens, the page is flushed to disk by calling
-     * logPageFlusher.requestFlush(). In the requestFlush(), after
-     * groupCommitWaitTime, the page status is set to INACTIVE. Then, there is
-     * no more writer on the page(meaning the corresponding logPageOwnerCount is
-     * 1), the page is flushed by the logPageFlusher and the status is reset to
-     * ACTIVE by the logPageFlusher.
-     */
-    private AtomicInteger[] logPageStatus;
-
-    static class PageState {
-        public static final int INACTIVE = 0;
-        public static final int ACTIVE = 1;
-    }
-
     private AtomicLong lastFlushedLSN = new AtomicLong(-1);
 
     /*
@@ -129,10 +88,6 @@
         return lastFlushedLSN;
     }
 
-    public AtomicInteger getLogPageStatus(int pageIndex) {
-        return logPageStatus[pageIndex];
-    }
-
     public AtomicLong getCurrentLsn() {
         return lsn;
     }
@@ -144,13 +99,19 @@
     public LogManager(TransactionSubsystem provider) throws ACIDException {
         this.provider = provider;
         initLogManagerProperties(this.provider.getId());
+        logPageSize = logManagerProperties.getLogPageSize();
         initLogManager();
+        statLogSize = 0;
+        statLogCount = 0;
     }
 
     public LogManager(TransactionSubsystem provider, String nodeId) throws ACIDException {
         this.provider = provider;
         initLogManagerProperties(nodeId);
+        logPageSize = logManagerProperties.getLogPageSize();
         initLogManager();
+        statLogSize = 0;
+        statLogCount = 0;
     }
 
     /*
@@ -186,9 +147,6 @@
     private void initLogManager() throws ACIDException {
         logRecordHelper = new LogRecordHelper(this);
         numLogPages = logManagerProperties.getNumLogPages();
-        logPageOwnerCount = new AtomicInteger[numLogPages];
-        logPageStatus = new AtomicInteger[numLogPages];
-
         activeTxnCountMaps = new ArrayList<HashMap<TransactionContext, Integer>>(numLogPages);
         for (int i = 0; i < numLogPages; i++) {
             activeTxnCountMaps.add(new HashMap<TransactionContext, Integer>());
@@ -199,20 +157,12 @@
         /*
          * place the log anchor at the end of the last log record written.
          */
-        PhysicalLogLocator nextPhysicalLsn = initLSN();
-
-        /*
-         * initialize meta data for each log page.
-         */
-        for (int i = 0; i < numLogPages; i++) {
-            logPageOwnerCount[i] = new AtomicInteger(PageOwnershipStatus.LOG_WRITER);
-            logPageStatus[i] = new AtomicInteger(PageState.ACTIVE);
-        }
+        initLSN();
 
         /*
          * initialize the log pages.
          */
-        initializeLogPages(nextPhysicalLsn);
+        initializeLogPages(startingLSN);
 
         /*
          * Instantiate and begin the LogFlusher thread. The Log Flusher thread
@@ -226,7 +176,7 @@
     }
 
     public int getLogPageIndex(long lsnValue) {
-        return (int) (((lsnValue - startingLSN) / logManagerProperties.getLogPageSize()) % numLogPages);
+        return (int) (((lsnValue - startingLSN) / logPageSize) % numLogPages);
     }
 
     /*
@@ -242,28 +192,7 @@
      * record is (to be) placed.
      */
     public int getLogPageOffset(long lsnValue) {
-        return (int) ((lsnValue - startingLSN) % logManagerProperties.getLogPageSize());
-    }
-
-    /*
-     * a transaction thread under certain scenarios is required to wait until
-     * the page where it has to write a log record becomes available for writing
-     * a log record.
-     */
-    private void waitUntillPageIsAvailableForWritingLog(int pageIndex) throws ACIDException {
-        if (logPageStatus[pageIndex].get() == PageState.ACTIVE
-                && logPageOwnerCount[pageIndex].get() >= PageOwnershipStatus.LOG_WRITER) {
-            return;
-        }
-        try {
-            synchronized (logPages[pageIndex]) {
-                while (!(logPageStatus[pageIndex].get() == PageState.ACTIVE && logPageOwnerCount[pageIndex].get() >= PageOwnershipStatus.LOG_WRITER)) {
-                    logPages[pageIndex].wait();
-                }
-            }
-        } catch (InterruptedException e) {
-            throw new ACIDException(" thread interrupted while waiting for page " + pageIndex + " to be available ", e);
-        }
+        return (int) (lsnValue % logPageSize);
     }
 
     /*
@@ -277,7 +206,6 @@
      * @param logType: the type of log record.
      */
     private long getLsn(int entrySize, byte logType) throws ACIDException {
-        long pageSize = logManagerProperties.getLogPageSize();
 
         while (true) {
             boolean forwardPage = false;
@@ -294,9 +222,9 @@
 
             // check if the log record will cross page boundaries, a case that
             // is not allowed.
-            if ((next - 1) / pageSize != old / pageSize || (next % pageSize == 0)) {
+            if ((next - 1) / logPageSize != old / logPageSize || (next % logPageSize == 0)) {
 
-                if ((old != 0 && old % pageSize == 0)) {
+                if ((old != 0 && old % logPageSize == 0)) {
                     // On second thought, this shall never be the case as it
                     // means that the lsn is
                     // currently at the beginning of a page and we still need to
@@ -309,7 +237,7 @@
 
                 } else {
                     // set the lsn to point to the beginning of the next page.
-                    retVal = ((old / pageSize) + 1) * pageSize;
+                    retVal = ((old / logPageSize) + 1) * logPageSize;
                 }
 
                 next = retVal;
@@ -323,20 +251,6 @@
                 pageIndex = getNextPageInSequence(pageIndex);
             }
 
-            /*
-             * we do not want to keep allocating LSNs if the corresponding page
-             * is unavailable. Consider a scenario when the log flusher thread
-             * is incredibly slow in flushing pages. Transaction threads will
-             * acquire an lsn each for writing their next log record. When a
-             * page has been made available, mulltiple transaction threads that
-             * were waiting can continue to write their log record at the
-             * assigned LSNs. Two transaction threads may get LSNs that are on
-             * the same log page but actually differ by the size of the log
-             * buffer. This would be erroneous. Transaction threads are made to
-             * wait upfront for avoiding this situation.
-             */
-            waitUntillPageIsAvailableForWritingLog(pageIndex);
-
             if (!lsn.compareAndSet(old, next)) {
                 // Atomic call -> returns true only when the value represented
                 // by lsn is same as
@@ -345,6 +259,10 @@
             }
 
             if (forwardPage) {
+
+                // forward the nextWriteOffset in the log page
+                logPages[pageIndex].setBufferNextWriteOffset(logPageSize);
+
                 addFlushRequest(prevPage, old, false);
 
                 // The transaction thread that discovers the need to forward a
@@ -352,21 +270,18 @@
                 continue;
 
             } else {
-                // the transaction thread has been given a space in a log page,
-                // but is made to wait until the page is available.
-                // (Is this needed? when does this wait happen?)
-                waitUntillPageIsAvailableForWritingLog(pageIndex);
-
+                logPages[pageIndex].acquireReadLatch();
                 // increment the counter as the transaction thread now holds a
                 // space in the log page and hence is an owner.
-                logPageOwnerCount[pageIndex].incrementAndGet();
+                logPages[pageIndex].incRefCnt();
+                logPages[pageIndex].releaseReadLatch();
 
                 // Before the count is incremented, if the flusher flushed the
                 // allocated page,
                 // then retry to get new LSN. Otherwise, the log with allocated
                 // lsn will be lost.
                 if (lastFlushedLSN.get() >= retVal) {
-                    logPageOwnerCount[pageIndex].decrementAndGet();
+                    logPages[pageIndex].decRefCnt();
                     continue;
                 }
             }
@@ -396,10 +311,10 @@
         int totalLogSize = logRecordHelper.getLogRecordSize(logType, logContentSize);
 
         // check for the total space requirement to be less than a log page.
-        if (totalLogSize > logManagerProperties.getLogPageSize()) {
+        if (totalLogSize > logPageSize) {
             throw new ACIDException(
                     " Maximum Log Content Size is "
-                            + (logManagerProperties.getLogPageSize() - logRecordHelper.getLogHeaderSize(LogType.UPDATE) - logRecordHelper
+                            + (logPageSize - logRecordHelper.getLogHeaderSize(LogType.UPDATE) - logRecordHelper
                                     .getLogChecksumSize()));
         }
 
@@ -482,16 +397,21 @@
             logPages[pageIndex].writeLong(pageOffset + logRecordHelper.getLogHeaderSize(logType) + logContentSize,
                     checksum);
 
-            if (IS_DEBUG_MODE) {
-                System.out.println("--------------> LSN(" + currentLSN + ") is written");
+            // forward the nextWriteOffset in the log page
+            int bufferNextWriteOffset = (int) ((currentLSN + totalLogSize) % logPageSize);
+            if (bufferNextWriteOffset == 0) {
+                bufferNextWriteOffset = logPageSize;
             }
+            logPages[pageIndex].setBufferNextWriteOffset(bufferNextWriteOffset);
 
-            // release the ownership as the log record has been placed in
-            // created space.
-            logPageOwnerCount[pageIndex].decrementAndGet();
+            if (logType != LogType.ENTITY_COMMIT) {
+                // release the ownership as the log record has been placed in
+                // created space.
+                logPages[pageIndex].decRefCnt();
 
-            // indicating that the transaction thread has released ownership
-            decremented = true;
+                // indicating that the transaction thread has released ownership
+                decremented = true;
+            }
 
             if (logType == LogType.ENTITY_COMMIT) {
                 map = activeTxnCountMaps.get(pageIndex);
@@ -502,18 +422,42 @@
                 } else {
                     map.put(txnCtx, 1);
                 }
+                //------------------------------------------------------------------------------
+                // [Notice]
+                // reference count should be decremented 
+                // after activeTxnCount is incremented, but before addFlushRequest() is called. 
+                //------------------------------------------------------------------------------
+                // release the ownership as the log record has been placed in
+                // created space.
+                logPages[pageIndex].decRefCnt();
+
+                // indicating that the transaction thread has released ownership
+                decremented = true;
+                
                 addFlushRequest(pageIndex, currentLSN, false);
             } else if (logType == LogType.COMMIT) {
+
                 addFlushRequest(pageIndex, currentLSN, true);
+                if (IS_DEBUG_MODE) {
+                    System.out.println("Running sum of log size: " + statLogSize + ", log count: " + statLogCount);
+                }
             }
 
+            if (IS_DEBUG_MODE) {
+                System.out.println("--------------> LSN(" + currentLSN + ") is written");
+            }
+
+            //collect statistics
+            statLogSize += totalLogSize;
+            statLogCount++;
+
         } catch (Exception e) {
             e.printStackTrace();
             throw new ACIDException(txnCtx, "Thread: " + Thread.currentThread().getName()
                     + " logger encountered exception", e);
         } finally {
             if (!decremented) {
-                logPageOwnerCount[pageIndex].decrementAndGet();
+                logPages[pageIndex].decRefCnt();
             }
         }
     }
@@ -526,20 +470,13 @@
 
         String filePath = LogUtil.getLogFilePath(logManagerProperties, getLogFileId(lsn));
 
-        logPages[pageIndex].reset(filePath, LogUtil.getFileOffset(this, nextWritePosition),
-                logManagerProperties.getLogPageSize());
-    }
-
-    @Override
-    public ILogCursor readLog(ILogFilter logFilter) throws ACIDException {
-        LogCursor cursor = new LogCursor(this, logFilter);
-        return cursor;
+        logPages[pageIndex].reset(filePath, LogUtil.getFileOffset(this, nextWritePosition), logPageSize);
     }
 
     @Override
     public ILogCursor readLog(PhysicalLogLocator physicalLogLocator, ILogFilter logFilter) throws IOException,
             ACIDException {
-        LogCursor cursor = new LogCursor(this, physicalLogLocator, logFilter);
+        LogCursor cursor = new LogCursor(this, physicalLogLocator, logFilter, logPageSize);
         return cursor;
     }
 
@@ -550,7 +487,7 @@
         String filePath = LogUtil.getLogFilePath(logManagerProperties, LogUtil.getFileId(this, lsnValue));
         long fileOffset = LogUtil.getFileOffset(this, lsnValue);
 
-        ByteBuffer buffer = ByteBuffer.allocate(logManagerProperties.getLogPageSize());
+        ByteBuffer buffer = ByteBuffer.allocate(logPageSize);
         RandomAccessFile raf = null;
         FileChannel fileChannel = null;
         try {
@@ -603,7 +540,7 @@
         }
 
         /* check if the log record in the log buffer or has reached the disk. */
-        if (lsnValue > getLastFlushedLsn().get()) {
+        if (isMemoryRead(lsnValue)) {
             int pageIndex = getLogPageIndex(lsnValue);
             int pageOffset = getLogPageOffset(lsnValue);
 
@@ -611,7 +548,7 @@
             // minimize memory allocation overhead. current code allocates the
             // log page size per reading a log record.
 
-            byte[] pageContent = new byte[logManagerProperties.getLogPageSize()];
+            byte[] pageContent = new byte[logPageSize];
 
             // take a lock on the log page so that the page is not flushed to
             // disk interim
@@ -619,8 +556,8 @@
 
                 // need to check again (this thread may have got de-scheduled
                 // and must refresh!)
-                if (lsnValue > getLastFlushedLsn().get()) {
 
+                if (isMemoryRead(lsnValue)) {
                     // get the log record length
                     logPages[pageIndex].getBytes(pageContent, 0, pageContent.length);
                     byte logType = pageContent[pageOffset + 4];
@@ -656,6 +593,20 @@
         readDiskLog(lsnValue, logicalLogLocator);
     }
 
+    public boolean isMemoryRead(long currentLSN) {
+        long flushLSN = lastFlushedLSN.get();
+        if ((flushLSN + 1) % logPageSize == 0) {
+            return false;
+        }
+        long logPageBeginOffset = flushLSN - (flushLSN % logPageSize);
+        long logPageEndOffset = logPageBeginOffset + logPageSize;
+        if (currentLSN > flushLSN || (currentLSN >= logPageBeginOffset && currentLSN < logPageEndOffset)) {
+            return true;
+        } else {
+            return false;
+        }
+    }
+
     public void renewLogFiles() throws ACIDException {
         List<String> logFileNames = LogUtil.getLogFiles(logManagerProperties);
         for (String name : logFileNames) {
@@ -670,7 +621,7 @@
         logPageFlusher.renew();
     }
 
-    private PhysicalLogLocator initLSN() throws ACIDException {
+    private void initLSN() throws ACIDException {
         PhysicalLogLocator nextPhysicalLsn = LogUtil.initializeLogAnchor(this);
         startingLSN = nextPhysicalLsn.getLsn();
         lastFlushedLSN.set(startingLSN - 1);
@@ -678,7 +629,6 @@
             LOGGER.info(" Starting lsn is : " + startingLSN);
         }
         lsn.set(startingLSN);
-        return nextPhysicalLsn;
     }
 
     private void closeLogPages() throws ACIDException {
@@ -695,9 +645,7 @@
         try {
             String filePath = LogUtil.getLogFilePath(logManagerProperties, LogUtil.getFileId(this, startingLSN));
             for (int i = 0; i < numLogPages; i++) {
-                logPages[i].open(filePath,
-                        LogUtil.getFileOffset(this, startingLSN) + i * logManagerProperties.getLogPageSize(),
-                        logManagerProperties.getLogPageSize());
+                logPages[i].open(filePath, LogUtil.getFileOffset(this, startingLSN) + i * logPageSize, logPageSize);
             }
         } catch (Exception e) {
             throw new ACIDException(Thread.currentThread().getName() + " unable to create log buffer", e);
@@ -710,33 +658,25 @@
     }
 
     /*
-     * This method shall be called by the Buffer manager when it needs to evict
-     * a page from the cache. TODO: Change the implementation from a looping
-     * logic to event based when log manager support is integrated with the
-     * Buffer Manager.
-     */
-    @Override
-    public synchronized void flushLog(LogicalLogLocator logicalLogLocator) throws ACIDException {
-        if (logicalLogLocator.getLsn() > lsn.get()) {
-            throw new ACIDException(" invalid lsn " + logicalLogLocator.getLsn());
-        }
-        while (lastFlushedLSN.get() < logicalLogLocator.getLsn());
-    }
-
-    /*
      * Map each log page to cover a physical byte range over a log file. When a
      * page is flushed, the page contents are put to disk in the corresponding
      * byte range.
      */
-    private void initializeLogPages(PhysicalLogLocator physicalLogLocator) throws ACIDException {
+    private void initializeLogPages(long beginLsn) throws ACIDException {
         try {
-            String filePath = LogUtil.getLogFilePath(logManagerProperties,
-                    LogUtil.getFileId(this, physicalLogLocator.getLsn()));
+            String filePath = LogUtil.getLogFilePath(logManagerProperties, LogUtil.getFileId(this, beginLsn));
+            long nextDiskWriteOffset = LogUtil.getFileOffset(this, beginLsn);
+            long nextBufferWriteOffset = nextDiskWriteOffset % logPageSize;
+            long bufferBeginOffset = nextDiskWriteOffset - nextBufferWriteOffset;
+
             for (int i = 0; i < numLogPages; i++) {
-                logPages[i] = FileUtil.getFileBasedBuffer(
-                        filePath,
-                        LogUtil.getFileOffset(this, physicalLogLocator.getLsn()) + i
-                                * logManagerProperties.getLogPageSize(), logManagerProperties.getLogPageSize());
+                logPages[i] = FileUtil.getFileBasedBuffer(filePath, bufferBeginOffset + i * logPageSize, logPageSize,
+                        logManagerProperties.getDiskSectorSize());
+                if (i == 0) {
+                    logPages[i].setBufferLastFlushOffset((int) nextBufferWriteOffset);
+                    logPages[i].setBufferNextWriteOffset((int) nextBufferWriteOffset);
+                    logPages[i].setDiskNextWriteOffset(nextDiskWriteOffset);
+                }
             }
         } catch (Exception e) {
             e.printStackTrace();
@@ -764,10 +704,6 @@
         return logPages[pageIndex];
     }
 
-    public AtomicInteger getLogPageOwnershipCount(int pageIndex) {
-        return logPageOwnerCount[pageIndex];
-    }
-
     public IFileBasedBuffer[] getLogPages() {
         return logPages;
     }
@@ -829,7 +765,7 @@
      */
     private final LinkedBlockingQueue<Object>[] flushRequestQueue;
     private final Object[] flushRequests;
-    private int pageToFlush;
+    private int flushPageIndex;
     private final long groupCommitWaitPeriod;
     private boolean isRenewRequest;
 
@@ -843,14 +779,14 @@
             flushRequestQueue[i] = new LinkedBlockingQueue<Object>(1);
             flushRequests[i] = new Object();
         }
-        this.pageToFlush = -1;
+        this.flushPageIndex = 0;
         groupCommitWaitPeriod = logManager.getLogManagerProperties().getGroupCommitWaitPeriod();
         isRenewRequest = false;
     }
 
     public void renew() {
         isRenewRequest = true;
-        pageToFlush = -1;
+        flushPageIndex = 0;
         this.interrupt();
         isRenewRequest = false;
     }
@@ -886,15 +822,19 @@
 
     @Override
     public void run() {
+        int logPageSize = logManager.getLogManagerProperties().getLogPageSize();
+        int logBufferSize = logManager.getLogManagerProperties().getLogBufferSize();
+        int beforeFlushOffset = 0;
+        int afterFlushOffset = 0;
+        boolean resetFlushPageIndex = false;
+
         while (true) {
             try {
-                pageToFlush = logManager.getNextPageInSequence(pageToFlush);
-
                 // A wait call on the linkedBLockingQueue. The flusher thread is
                 // notified when an object is added to the queue. Please note
                 // that each page has an associated blocking queue.
                 try {
-                    flushRequestQueue[pageToFlush].take();
+                    flushRequestQueue[flushPageIndex].take();
                 } catch (InterruptedException ie) {
                     while (isRenewRequest) {
                         sleep(1);
@@ -902,58 +842,67 @@
                     continue;
                 }
 
-                synchronized (logManager.getLogPage(pageToFlush)) {
-
-                    // #. sleep during the groupCommitWaitTime
+                //if the log page is already full, don't wait. 
+                if (logManager.getLogPage(flushPageIndex).getBufferNextWriteOffset() < logPageSize
+                        - logManager.getLogRecordHelper().getCommitLogSize()) {
+                    // #. sleep for the groupCommitWaitTime
                     sleep(groupCommitWaitPeriod);
+                }
 
-                    // #. set the logPageStatus to INACTIVE in order to prevent
-                    // other txns from writing on this page.
-                    logManager.getLogPageStatus(pageToFlush).set(PageState.INACTIVE);
+                synchronized (logManager.getLogPage(flushPageIndex)) {
+                    logManager.getLogPage(flushPageIndex).acquireWriteLatch();
+                    try {
 
-                    // #. need to wait until the logPageOwnerCount reaches 1
-                    // (LOG_WRITER)
-                    // meaning every one has finished writing logs on this page.
-                    while (logManager.getLogPageOwnershipCount(pageToFlush).get() != PageOwnershipStatus.LOG_WRITER) {
-                        sleep(0);
+                        // #. need to wait until the reference count reaches 0
+                        while (logManager.getLogPage(flushPageIndex).getRefCnt() != 0) {
+                            sleep(0);
+                        }
+
+                        beforeFlushOffset = logManager.getLogPage(flushPageIndex).getBufferLastFlushOffset();
+
+                        // put the content to disk (the thread still has a lock on the log page)
+                        logManager.getLogPage(flushPageIndex).flush();
+
+                        afterFlushOffset = logManager.getLogPage(flushPageIndex).getBufferLastFlushOffset();
+
+                        // increment the last flushed lsn
+                        logManager.incrementLastFlushedLsn(afterFlushOffset - beforeFlushOffset);
+
+                        // increment currentLSN if currentLSN is less than flushLSN.
+                        if (logManager.getLastFlushedLsn().get() + 1 > logManager.getCurrentLsn().get()) {
+                            logManager.getCurrentLsn().set(logManager.getLastFlushedLsn().get() + 1);
+                        }
+                        
+                        // Map the log page to a new region in the log file if the flushOffset reached the logPageSize
+                        if (afterFlushOffset == logPageSize) {
+                            long diskNextWriteOffset = logManager.getLogPages()[flushPageIndex].getDiskNextWriteOffset()
+                                    + logBufferSize;
+                            logManager.resetLogPage(logManager.getLastFlushedLsn().get() + 1 + logBufferSize,
+                                    diskNextWriteOffset, flushPageIndex);
+                            resetFlushPageIndex = true;
+                        }
+                        
+                        // decrement activeTxnCountOnIndexes
+                        logManager.decrementActiveTxnCountOnIndexes(flushPageIndex);
+
+                    } finally {
+                        logManager.getLogPage(flushPageIndex).releaseWriteLatch();
                     }
 
-                    // #. set the logPageOwnerCount to 0 (LOG_FLUSHER)
-                    // meaning it is flushing.
-                    logManager.getLogPageOwnershipCount(pageToFlush).set(PageOwnershipStatus.LOG_FLUSHER);
-
-                    // put the content to disk (the thread still has a lock on
-                    // the log page)
-                    logManager.getLogPage(pageToFlush).flush();
-
-                    // Map the log page to a new region in the log file.
-                    long nextWritePosition = logManager.getLogPages()[pageToFlush].getNextWritePosition()
-                            + logManager.getLogManagerProperties().getLogBufferSize();
-
-                    logManager.resetLogPage(logManager.getLastFlushedLsn().get() + 1
-                            + logManager.getLogManagerProperties().getLogBufferSize(), nextWritePosition, pageToFlush);
-
-                    // increment the last flushed lsn and lastFlushedPage
-                    logManager.incrementLastFlushedLsn(logManager.getLogManagerProperties().getLogPageSize());
-
-                    // decrement activeTxnCountOnIndexes
-                    logManager.decrementActiveTxnCountOnIndexes(pageToFlush);
-
-                    // reset the count to 1
-                    logManager.getLogPageOwnershipCount(pageToFlush).set(PageOwnershipStatus.LOG_WRITER);
-
-                    // mark the page as ACTIVE
-                    logManager.getLogPageStatus(pageToFlush).set(LogManager.PageState.ACTIVE);
-
                     // #. checks the queue whether there is another flush
                     // request on the same log buffer
                     // If there is another request, then simply remove it.
-                    if (flushRequestQueue[pageToFlush].peek() != null) {
-                        flushRequestQueue[pageToFlush].take();
+                    if (flushRequestQueue[flushPageIndex].peek() != null) {
+                        flushRequestQueue[flushPageIndex].take();
                     }
 
                     // notify all waiting (transaction) threads.
-                    logManager.getLogPage(pageToFlush).notifyAll();
+                    logManager.getLogPage(flushPageIndex).notifyAll();
+
+                    if (resetFlushPageIndex) {
+                        flushPageIndex = logManager.getNextPageInSequence(flushPageIndex);
+                        resetFlushPageIndex = false;
+                    }
                 }
             } catch (IOException ioe) {
                 ioe.printStackTrace();
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManagerProperties.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManagerProperties.java
index 5040fa9..581ce4c 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManagerProperties.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManagerProperties.java
@@ -28,6 +28,7 @@
     public static final String NUM_LOG_PAGES_KEY = "num_log_pages";
     public static final String LOG_FILE_PREFIX_KEY = "log_file_prefix";
     public static final String GROUP_COMMIT_WAIT_PERIOD_KEY = "group_commit_wait_period";
+    public static final String DISK_SECTOR_SIZE_KEY = "disk_sector_size";
 
     private static final int DEFAULT_LOG_PAGE_SIZE = 128 * 1024; //128KB
     private static final int DEFAULT_NUM_LOG_PAGES = 8;
@@ -35,6 +36,7 @@
     private static final long DEFAULT_GROUP_COMMIT_WAIT_PERIOD = 200; // time in millisec.
     private static final String DEFAULT_LOG_FILE_PREFIX = "asterix_transaction_log";
     private static final String DEFAULT_LOG_DIRECTORY = "asterix_logs/";
+    private static final int DEFAULT_DISK_SECTOR_SIZE = 4096;
 
     // follow the naming convention <logFilePrefix>_<number> where number starts from 0
     private final String logFilePrefix;
@@ -51,6 +53,8 @@
     private final int logBufferSize;
     // maximum size of each log file
     private final long logPartitionSize;
+    // default disk sector size
+    private final int diskSectorSize;
 
     public LogManagerProperties(Properties properties, String nodeId) {
         this.logDirKey = new String(nodeId + LOG_DIR_SUFFIX_KEY);
@@ -66,6 +70,8 @@
         this.logBufferSize = logPageSize * numLogPages;
         //make sure that the log partition size is the multiple of log buffer size.
         this.logPartitionSize = (logPartitionSize / logBufferSize) * logBufferSize;
+        this.diskSectorSize = Integer.parseInt(properties.getProperty(DISK_SECTOR_SIZE_KEY, ""
+                + DEFAULT_DISK_SECTOR_SIZE));
     }
 
     public long getLogPartitionSize() {
@@ -99,6 +105,10 @@
     public String getLogDirKey() {
         return logDirKey;
     }
+    
+    public int getDiskSectorSize() {
+        return diskSectorSize;
+    }
 
     public String toString() {
         StringBuilder builder = new StringBuilder();
@@ -108,6 +118,7 @@
         builder.append("num_log_pages : " + numLogPages + FileUtil.lineSeparator);
         builder.append("log_partition_size : " + logPartitionSize + FileUtil.lineSeparator);
         builder.append("group_commit_wait_period : " + groupCommitWaitPeriod + FileUtil.lineSeparator);
+        builder.append("disk_sector_size : " + diskSectorSize + FileUtil.lineSeparator);
         return builder.toString();
     }
 }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java
index 6b882ef..1b65d8f 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java
@@ -50,6 +50,9 @@
 public class LogRecordHelper implements ILogRecordHelper {
 
     private final int LOG_CHECKSUM_SIZE = 8;
+    private final int LOG_HEADER_PART1_SIZE = 17;
+    private final int LOG_HEADER_PART2_SIZE = 21;
+    private final int COMMIT_LOG_SIZE = LOG_HEADER_PART1_SIZE + LOG_CHECKSUM_SIZE;
 
     private final int MAGIC_NO_POS = 0;
     private final int LOG_TYPE_POS = 4;
@@ -60,7 +63,9 @@
     private final int RESOURCE_ID_POS = 25;
     private final int RESOURCE_MGR_ID_POS = 33;
     private final int LOG_RECORD_SIZE_POS = 34;
+    
 
+    
     private ILogManager logManager;
 
     public LogRecordHelper(ILogManager logManager) {
@@ -118,7 +123,11 @@
 
     @Override
     public int getLogContentSize(LogicalLogLocator logicalLogLocater) {
-        return logicalLogLocater.getBuffer().readInt(logicalLogLocater.getMemoryOffset() + LOG_RECORD_SIZE_POS);
+        if (getLogType(logicalLogLocater) == LogType.COMMIT || getLogType(logicalLogLocater) == LogType.ENTITY_COMMIT) {
+            return 0;
+        } else {
+            return logicalLogLocater.getBuffer().readInt(logicalLogLocater.getMemoryOffset() + LOG_RECORD_SIZE_POS);
+        }
     }
 
     @Override
@@ -178,7 +187,7 @@
 
         /* magic no */
         (logicalLogLocator.getBuffer()).writeInt(logicalLogLocator.getMemoryOffset() + MAGIC_NO_POS,
-                logManager.getLogManagerProperties().LOG_MAGIC_NUMBER);
+                LogManagerProperties.LOG_MAGIC_NUMBER);
 
         /* log type */
         (logicalLogLocator.getBuffer()).put(logicalLogLocator.getMemoryOffset() + LOG_TYPE_POS, logType);
@@ -230,18 +239,18 @@
     @Override
     public int getLogRecordSize(byte logType, int logBodySize) {
         if (logType == LogType.UPDATE) {
-            return 46 + logBodySize;
+            return LOG_HEADER_PART1_SIZE + LOG_HEADER_PART2_SIZE + LOG_CHECKSUM_SIZE + logBodySize;
         } else {
-            return 25;
+            return COMMIT_LOG_SIZE;
         }
     }
 
     @Override
     public int getLogHeaderSize(byte logType) {
         if (logType == LogType.UPDATE) {
-            return 38;
+            return LOG_HEADER_PART1_SIZE + LOG_HEADER_PART2_SIZE;
         } else {
-            return 17;
+            return LOG_HEADER_PART1_SIZE;
         }
     }
 
@@ -249,4 +258,8 @@
     public int getLogChecksumSize() {
         return LOG_CHECKSUM_SIZE;
     }
+    
+    public int getCommitLogSize() {
+        return COMMIT_LOG_SIZE;
+    }
 }