changes to fix Issue 409(supporting variable log page size)
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileBasedBuffer.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileBasedBuffer.java
index 4319b8b..c08462f 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileBasedBuffer.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileBasedBuffer.java
@@ -27,22 +27,27 @@
 public class FileBasedBuffer extends Buffer implements IFileBasedBuffer {
 
     private String filePath;
-    private long nextWritePosition;
     private FileChannel fileChannel;
     private RandomAccessFile raf;
-    private int size;
+    private int bufferSize;
 
-    public FileBasedBuffer(String filePath, long offset, int size) throws IOException {
+    private int bufferLastFlushOffset;
+    private int bufferNextWriteOffset;
+    private final int diskSectorSize;
+
+    public FileBasedBuffer(String filePath, long offset, int bufferSize, int diskSectorSize) throws IOException {
         this.filePath = filePath;
-        this.nextWritePosition = offset;
-        buffer = ByteBuffer.allocate(size);
+        buffer = ByteBuffer.allocate(bufferSize);
         raf = new RandomAccessFile(new File(filePath), "rw");
-        raf.seek(offset);
         fileChannel = raf.getChannel();
+        fileChannel.position(offset);
         fileChannel.read(buffer);
         buffer.position(0);
-        this.size = size;
-        buffer.limit(size);
+        this.bufferSize = bufferSize;
+        buffer.limit(bufferSize);
+        bufferLastFlushOffset = 0;
+        bufferNextWriteOffset = 0;
+        this.diskSectorSize = diskSectorSize;
     }
 
     public String getFilePath() {
@@ -53,17 +58,9 @@
         this.filePath = filePath;
     }
 
-    public long getOffset() {
-        return nextWritePosition;
-    }
-
-    public void setOffset(long offset) {
-        this.nextWritePosition = offset;
-    }
-
     @Override
     public int getSize() {
-        return buffer.limit();
+        return bufferSize;
     }
 
     public void clear() {
@@ -72,11 +69,18 @@
 
     @Override
     public void flush() throws IOException {
-        buffer.position(0);
-        buffer.limit(size);
+        //flush
+        int pos = bufferLastFlushOffset;
+        int limit = (((bufferNextWriteOffset - 1) / diskSectorSize) + 1) * diskSectorSize;
+        buffer.position(pos);
+        buffer.limit(limit);
         fileChannel.write(buffer);
         fileChannel.force(true);
-        erase();
+
+        //update variables
+        bufferLastFlushOffset = limit;
+        bufferNextWriteOffset = limit;
+        buffer.limit(bufferSize);
     }
 
     @Override
@@ -124,45 +128,76 @@
      * starting at offset.
      */
     @Override
-    public void reset(String filePath, long nextWritePosition, int size) throws IOException {
+    public void reset(String filePath, long diskNextWriteOffset, int bufferSize) throws IOException {
         if (!filePath.equals(this.filePath)) {
             raf.close();//required?
             fileChannel.close();
             raf = new RandomAccessFile(filePath, "rw");
             this.filePath = filePath;
         }
-        this.nextWritePosition = nextWritePosition;
-        raf.seek(nextWritePosition);
         fileChannel = raf.getChannel();
+        fileChannel.position(diskNextWriteOffset);
         erase();
         buffer.position(0);
-        buffer.limit(size);
-        this.size = size;
+        buffer.limit(bufferSize);
+        this.bufferSize = bufferSize;
+
+        bufferLastFlushOffset = 0;
+        bufferNextWriteOffset = 0;
     }
-    
+
     @Override
     public void close() throws IOException {
         fileChannel.close();
     }
-    
+
     @Override
-    public void open(String filePath, long offset, int size) throws IOException {
+    public void open(String filePath, long offset, int bufferSize) throws IOException {
         raf = new RandomAccessFile(filePath, "rw");
-        this.nextWritePosition = offset;
         fileChannel = raf.getChannel();
         fileChannel.position(offset);
         erase();
         buffer.position(0);
-        buffer.limit(size);
-        this.size = size;
+        buffer.limit(bufferSize);
+        this.bufferSize = bufferSize;
+        bufferLastFlushOffset = 0;
+        bufferNextWriteOffset = 0;
     }
 
-    public long getNextWritePosition() {
-        return nextWritePosition;
+    @Override
+    public long getDiskNextWriteOffset() throws IOException {
+        return fileChannel.position();
     }
 
-    public void setNextWritePosition(long nextWritePosition) {
-        this.nextWritePosition = nextWritePosition;
+    @Override
+    public void setDiskNextWriteOffset(long offset) throws IOException {
+        fileChannel.position(offset);
+    }
+
+    @Override
+    public int getBufferLastFlushOffset() {
+        return bufferLastFlushOffset;
+    }
+
+    @Override
+    public void setBufferLastFlushOffset(int offset) {
+        this.bufferLastFlushOffset = offset;
+    }
+
+    @Override
+    public int getBufferNextWriteOffset() {
+        synchronized (fileChannel) {
+            return bufferNextWriteOffset;
+        }
+    }
+
+    @Override
+    public void setBufferNextWriteOffset(int offset) {
+        synchronized (fileChannel) {
+            if (bufferNextWriteOffset < offset) {
+                bufferNextWriteOffset = offset;
+            }
+        }
     }
 
 }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileUtil.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileUtil.java
index f2ffa0e..46e03f1 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileUtil.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/FileUtil.java
@@ -38,8 +38,8 @@
         return (new File(path)).mkdir();
     }
 
-    public static IFileBasedBuffer getFileBasedBuffer(String filePath, long offset, int size) throws IOException {
-        IFileBasedBuffer fileBasedBuffer = new FileBasedBuffer(filePath, offset, size);
+    public static IFileBasedBuffer getFileBasedBuffer(String filePath, long offset, int bufferSize, int diskSectorSize) throws IOException {
+        IFileBasedBuffer fileBasedBuffer = new FileBasedBuffer(filePath, offset, bufferSize, diskSectorSize);
         return fileBasedBuffer;
     }
 
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IFileBasedBuffer.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IFileBasedBuffer.java
index e1f9f95..ce13df4 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IFileBasedBuffer.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IFileBasedBuffer.java
@@ -31,12 +31,20 @@
      */
     public void reset(String filePath, long offset, int size) throws IOException;
 
-    public long getNextWritePosition();
+    public long getDiskNextWriteOffset() throws IOException;
 
-    public void setNextWritePosition(long writePosition);
+    public void setDiskNextWriteOffset(long writePosition) throws IOException;
 
     public void close() throws IOException;
     
     public void open(String filePath, long offset, int size) throws IOException;
 
+    int getBufferLastFlushOffset();
+
+    void setBufferLastFlushOffset(int offset);
+
+    int getBufferNextWriteOffset();
+
+    void setBufferNextWriteOffset(int offset);
+
 }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogManager.java
index c629d03..9f20b5d 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogManager.java
@@ -53,17 +53,6 @@
             ACIDException;
 
     /**
-     * Provides a cursor for retrieving logs that satisfy a given ILogFilter
-     * instance. Log records are retrieved in increasing order of lsn
-     * 
-     * @param logFilter
-     *            specifies the filtering criteria for the retrieved logs
-     * @return LogCursor an iterator for the retrieved logs
-     * @throws ACIDException
-     */
-    public ILogCursor readLog(ILogFilter logFilter) throws ACIDException;
-
-    /**
      * @param logicalLogLocator TODO
      * @param PhysicalLogLocator
      *            specifies the location of the log record to be read
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java
index 8a2b188..1376e1c 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java
@@ -23,27 +23,16 @@
 
     private final LogManager logManager;
     private final ILogFilter logFilter;
+    private final int logPageSize;
     private IBuffer readOnlyBuffer;
     private LogicalLogLocator logicalLogLocator = null;
-    private long bufferIndex = 0;
-    private boolean firstNext = true;
-    private boolean readMemory = false;
-    private long readLSN = 0;
     private boolean needReloadBuffer = true;
 
-    /**
-     * @param logFilter
-     */
-    public LogCursor(final LogManager logManager, ILogFilter logFilter) throws ACIDException {
+    public LogCursor(final LogManager logManager, PhysicalLogLocator startingPhysicalLogLocator, ILogFilter logFilter,
+            int logPageSize) throws IOException, ACIDException {
         this.logFilter = logFilter;
         this.logManager = logManager;
-
-    }
-
-    public LogCursor(final LogManager logManager, PhysicalLogLocator startingPhysicalLogLocator, ILogFilter logFilter)
-            throws IOException, ACIDException {
-        this.logFilter = logFilter;
-        this.logManager = logManager;
+        this.logPageSize = logPageSize;
         initialize(startingPhysicalLogLocator);
     }
 
@@ -57,7 +46,8 @@
         File file = new File(filePath);
         if (file.exists()) {
             return FileUtil.getFileBasedBuffer(filePath, lsn
-                    % logManager.getLogManagerProperties().getLogPartitionSize(), size);
+                    % logManager.getLogManagerProperties().getLogPartitionSize(), size, logManager
+                    .getLogManagerProperties().getDiskSectorSize());
         } else {
             return null;
         }
@@ -87,8 +77,7 @@
             return false;
         }
 
-        //if the lsn to read is greater than the last flushed lsn, then read from memory
-        if (logicalLogLocator.getLsn() > logManager.getLastFlushedLsn().get()) {
+        if (logManager.isMemoryRead(logicalLogLocator.getLsn())) {
             return readFromMemory(currentLogLocator);
         }
 
@@ -96,10 +85,9 @@
         //needReloadBuffer is set to true if the log record is read from the memory log page.
         if (needReloadBuffer) {
             //log page size doesn't exceed integer boundary
-            int offset = (int)(logicalLogLocator.getLsn() % logManager.getLogManagerProperties().getLogPageSize());
+            int offset = (int) (logicalLogLocator.getLsn() % logPageSize);
             long adjustedLSN = logicalLogLocator.getLsn() - offset;
-            readOnlyBuffer = getReadOnlyBuffer(adjustedLSN, logManager.getLogManagerProperties()
-                    .getLogPageSize());
+            readOnlyBuffer = getReadOnlyBuffer(adjustedLSN, logPageSize);
             logicalLogLocator.setBuffer(readOnlyBuffer);
             logicalLogLocator.setMemoryOffset(offset);
             needReloadBuffer = false;
@@ -117,7 +105,7 @@
             logicalLogLocator.increaseMemoryOffset(1);
             logicalLogLocator.incrementLsn();
             bytesSkipped++;
-            if (bytesSkipped > logManager.getLogManagerProperties().getLogPageSize()) {
+            if (bytesSkipped > logPageSize) {
                 return false; // the maximum size of a log record is limited to
                 // a log page size. If we have skipped as many
                 // bytes without finding a log record, it
@@ -133,10 +121,9 @@
             // need to reload the buffer
             // TODO
             // reduce IO by reading more pages(equal to logBufferSize) at a time.
-            long lsnpos = ((logicalLogLocator.getLsn() / logManager.getLogManagerProperties().getLogPageSize()) + 1)
-                    * logManager.getLogManagerProperties().getLogPageSize();
+            long lsnpos = ((logicalLogLocator.getLsn() / logPageSize) + 1) * logPageSize;
 
-            readOnlyBuffer = getReadOnlyBuffer(lsnpos, logManager.getLogManagerProperties().getLogPageSize());
+            readOnlyBuffer = getReadOnlyBuffer(lsnpos, logPageSize);
             if (readOnlyBuffer != null) {
                 logicalLogLocator.setBuffer(readOnlyBuffer);
                 logicalLogLocator.setLsn(lsnpos);
@@ -190,12 +177,11 @@
         IFileBasedBuffer logPage = logManager.getLogPage(pageIndex);
         synchronized (logPage) {
             // need to check again if the log record in the log buffer or has reached the disk
-            if (lsn > logManager.getLastFlushedLsn().get()) {
+            if (logManager.isMemoryRead(lsn)) {
 
                 //find the magic number to identify the start of the log record
                 //----------------------------------------------------------------
                 int readNumber = -1;
-                int logPageSize = logManager.getLogManagerProperties().getLogPageSize();
                 int logMagicNumber = logManager.getLogManagerProperties().LOG_MAGIC_NUMBER;
                 int bytesSkipped = 0;
                 boolean logRecordBeginPosFound = false;
@@ -223,7 +209,8 @@
                     // need to read the next log page
                     readOnlyBuffer = null;
                     logicalLogLocator.setBuffer(null);
-                    logicalLogLocator.setLsn(lsn / logPageSize + 1);
+                    lsn = ((logicalLogLocator.getLsn() / logPageSize) + 1) * logPageSize;
+                    logicalLogLocator.setLsn(lsn);
                     logicalLogLocator.setMemoryOffset(0);
                     return next(currentLogLocator);
                 }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
index 9b8f09c..36d052d 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
@@ -199,7 +199,7 @@
         /*
          * place the log anchor at the end of the last log record written.
          */
-        PhysicalLogLocator nextPhysicalLsn = initLSN();
+        initLSN();
 
         /*
          * initialize meta data for each log page.
@@ -212,7 +212,7 @@
         /*
          * initialize the log pages.
          */
-        initializeLogPages(nextPhysicalLsn);
+        initializeLogPages(startingLSN);
 
         /*
          * Instantiate and begin the LogFlusher thread. The Log Flusher thread
@@ -242,7 +242,7 @@
      * record is (to be) placed.
      */
     public int getLogPageOffset(long lsnValue) {
-        return (int) ((lsnValue - startingLSN) % logManagerProperties.getLogPageSize());
+        return (int) (lsnValue % logManagerProperties.getLogPageSize());
     }
 
     /*
@@ -345,6 +345,10 @@
             }
 
             if (forwardPage) {
+
+                // forward the nextWriteOffset in the log page
+                logPages[pageIndex].setBufferNextWriteOffset(logManagerProperties.getLogPageSize());
+
                 addFlushRequest(prevPage, old, false);
 
                 // The transaction thread that discovers the need to forward a
@@ -482,6 +486,13 @@
             logPages[pageIndex].writeLong(pageOffset + logRecordHelper.getLogHeaderSize(logType) + logContentSize,
                     checksum);
 
+            // forward the nextWriteOffset in the log page
+            int bufferNextWriteOffset = (int) ((currentLSN + totalLogSize) % logManagerProperties.getLogPageSize());
+            if (bufferNextWriteOffset == 0) {
+                bufferNextWriteOffset = logManagerProperties.getLogPageSize();
+            }
+            logPages[pageIndex].setBufferNextWriteOffset(bufferNextWriteOffset);
+
             if (IS_DEBUG_MODE) {
                 System.out.println("--------------> LSN(" + currentLSN + ") is written");
             }
@@ -531,15 +542,9 @@
     }
 
     @Override
-    public ILogCursor readLog(ILogFilter logFilter) throws ACIDException {
-        LogCursor cursor = new LogCursor(this, logFilter);
-        return cursor;
-    }
-
-    @Override
     public ILogCursor readLog(PhysicalLogLocator physicalLogLocator, ILogFilter logFilter) throws IOException,
             ACIDException {
-        LogCursor cursor = new LogCursor(this, physicalLogLocator, logFilter);
+        LogCursor cursor = new LogCursor(this, physicalLogLocator, logFilter, logManagerProperties.getLogPageSize());
         return cursor;
     }
 
@@ -603,7 +608,7 @@
         }
 
         /* check if the log record in the log buffer or has reached the disk. */
-        if (lsnValue > getLastFlushedLsn().get()) {
+        if (isMemoryRead(lsnValue)) {
             int pageIndex = getLogPageIndex(lsnValue);
             int pageOffset = getLogPageOffset(lsnValue);
 
@@ -619,8 +624,8 @@
 
                 // need to check again (this thread may have got de-scheduled
                 // and must refresh!)
-                if (lsnValue > getLastFlushedLsn().get()) {
 
+                if (isMemoryRead(lsnValue)) {
                     // get the log record length
                     logPages[pageIndex].getBytes(pageContent, 0, pageContent.length);
                     byte logType = pageContent[pageOffset + 4];
@@ -656,6 +661,17 @@
         readDiskLog(lsnValue, logicalLogLocator);
     }
 
+    public boolean isMemoryRead(long currentLSN) {
+        long flushLSN = lastFlushedLSN.get();
+        long logPageBeginOffset = flushLSN - (flushLSN % logManagerProperties.getLogPageSize());
+        long logPageEndOffset = logPageBeginOffset + logManagerProperties.getLogPageSize();
+        if (currentLSN > flushLSN || (currentLSN >= logPageBeginOffset && currentLSN < logPageEndOffset)) {
+            return true;
+        } else {
+            return false;
+        }
+    }
+
     public void renewLogFiles() throws ACIDException {
         List<String> logFileNames = LogUtil.getLogFiles(logManagerProperties);
         for (String name : logFileNames) {
@@ -670,7 +686,7 @@
         logPageFlusher.renew();
     }
 
-    private PhysicalLogLocator initLSN() throws ACIDException {
+    private void initLSN() throws ACIDException {
         PhysicalLogLocator nextPhysicalLsn = LogUtil.initializeLogAnchor(this);
         startingLSN = nextPhysicalLsn.getLsn();
         lastFlushedLSN.set(startingLSN - 1);
@@ -678,7 +694,6 @@
             LOGGER.info(" Starting lsn is : " + startingLSN);
         }
         lsn.set(startingLSN);
-        return nextPhysicalLsn;
     }
 
     private void closeLogPages() throws ACIDException {
@@ -728,15 +743,23 @@
      * page is flushed, the page contents are put to disk in the corresponding
      * byte range.
      */
-    private void initializeLogPages(PhysicalLogLocator physicalLogLocator) throws ACIDException {
+    private void initializeLogPages(long beginLsn) throws ACIDException {
         try {
             String filePath = LogUtil.getLogFilePath(logManagerProperties,
-                    LogUtil.getFileId(this, physicalLogLocator.getLsn()));
+                    LogUtil.getFileId(this, beginLsn));
+            long nextDiskWriteOffset = LogUtil.getFileOffset(this, beginLsn);
+            long nextBufferWriteOffset = nextDiskWriteOffset % logManagerProperties.getLogPageSize();
+            long bufferBeginOffset = nextDiskWriteOffset - nextBufferWriteOffset;
+
             for (int i = 0; i < numLogPages; i++) {
-                logPages[i] = FileUtil.getFileBasedBuffer(
-                        filePath,
-                        LogUtil.getFileOffset(this, physicalLogLocator.getLsn()) + i
-                                * logManagerProperties.getLogPageSize(), logManagerProperties.getLogPageSize());
+                logPages[i] = FileUtil.getFileBasedBuffer(filePath,
+                        bufferBeginOffset + i * logManagerProperties.getLogPageSize(),
+                        logManagerProperties.getLogPageSize(), logManagerProperties.getDiskSectorSize());
+                if (i == 0) {
+                    logPages[i].setBufferLastFlushOffset((int)nextBufferWriteOffset);
+                    logPages[i].setBufferNextWriteOffset((int)nextBufferWriteOffset);
+                    logPages[i].setDiskNextWriteOffset(nextDiskWriteOffset);
+                }
             }
         } catch (Exception e) {
             e.printStackTrace();
@@ -829,7 +852,7 @@
      */
     private final LinkedBlockingQueue<Object>[] flushRequestQueue;
     private final Object[] flushRequests;
-    private int pageToFlush;
+    private int flushPageIndex;
     private final long groupCommitWaitPeriod;
     private boolean isRenewRequest;
 
@@ -843,14 +866,14 @@
             flushRequestQueue[i] = new LinkedBlockingQueue<Object>(1);
             flushRequests[i] = new Object();
         }
-        this.pageToFlush = -1;
+        this.flushPageIndex = 0;
         groupCommitWaitPeriod = logManager.getLogManagerProperties().getGroupCommitWaitPeriod();
         isRenewRequest = false;
     }
 
     public void renew() {
         isRenewRequest = true;
-        pageToFlush = -1;
+        flushPageIndex = 0;
         this.interrupt();
         isRenewRequest = false;
     }
@@ -886,15 +909,19 @@
 
     @Override
     public void run() {
+        int logPageSize = logManager.getLogManagerProperties().getLogPageSize();
+        int logBufferSize = logManager.getLogManagerProperties().getLogBufferSize();
+        int beforeFlushOffset = 0;
+        int afterFlushOffset = 0;
+        boolean resetFlushPageIndex = false;
+
         while (true) {
             try {
-                pageToFlush = logManager.getNextPageInSequence(pageToFlush);
-
                 // A wait call on the linkedBLockingQueue. The flusher thread is
                 // notified when an object is added to the queue. Please note
                 // that each page has an associated blocking queue.
                 try {
-                    flushRequestQueue[pageToFlush].take();
+                    flushRequestQueue[flushPageIndex].take();
                 } catch (InterruptedException ie) {
                     while (isRenewRequest) {
                         sleep(1);
@@ -902,58 +929,69 @@
                     continue;
                 }
 
-                synchronized (logManager.getLogPage(pageToFlush)) {
+                synchronized (logManager.getLogPage(flushPageIndex)) {
 
-                    // #. sleep during the groupCommitWaitTime
+                    // #. sleep for the groupCommitWaitTime
                     sleep(groupCommitWaitPeriod);
 
                     // #. set the logPageStatus to INACTIVE in order to prevent
                     // other txns from writing on this page.
-                    logManager.getLogPageStatus(pageToFlush).set(PageState.INACTIVE);
+                    logManager.getLogPageStatus(flushPageIndex).set(PageState.INACTIVE);
 
                     // #. need to wait until the logPageOwnerCount reaches 1
                     // (LOG_WRITER)
                     // meaning every one has finished writing logs on this page.
-                    while (logManager.getLogPageOwnershipCount(pageToFlush).get() != PageOwnershipStatus.LOG_WRITER) {
+                    while (logManager.getLogPageOwnershipCount(flushPageIndex).get() != PageOwnershipStatus.LOG_WRITER) {
                         sleep(0);
                     }
 
                     // #. set the logPageOwnerCount to 0 (LOG_FLUSHER)
                     // meaning it is flushing.
-                    logManager.getLogPageOwnershipCount(pageToFlush).set(PageOwnershipStatus.LOG_FLUSHER);
+                    logManager.getLogPageOwnershipCount(flushPageIndex).set(PageOwnershipStatus.LOG_FLUSHER);
+
+                    beforeFlushOffset = logManager.getLogPage(flushPageIndex).getBufferLastFlushOffset();
 
                     // put the content to disk (the thread still has a lock on
                     // the log page)
-                    logManager.getLogPage(pageToFlush).flush();
+                    logManager.getLogPage(flushPageIndex).flush();
 
-                    // Map the log page to a new region in the log file.
-                    long nextWritePosition = logManager.getLogPages()[pageToFlush].getNextWritePosition()
-                            + logManager.getLogManagerProperties().getLogBufferSize();
+                    afterFlushOffset = logManager.getLogPage(flushPageIndex).getBufferLastFlushOffset();
 
-                    logManager.resetLogPage(logManager.getLastFlushedLsn().get() + 1
-                            + logManager.getLogManagerProperties().getLogBufferSize(), nextWritePosition, pageToFlush);
+                    // increment the last flushed lsn
+                    logManager.incrementLastFlushedLsn(afterFlushOffset - beforeFlushOffset);
 
-                    // increment the last flushed lsn and lastFlushedPage
-                    logManager.incrementLastFlushedLsn(logManager.getLogManagerProperties().getLogPageSize());
+                    // Map the log page to a new region in the log file if the flushOffset reached the logPageSize
+                    if (afterFlushOffset == logPageSize) {
+                        long diskNextWriteOffset = logManager.getLogPages()[flushPageIndex].getDiskNextWriteOffset()
+                                + logBufferSize;
+                        logManager.resetLogPage(logManager.getLastFlushedLsn().get() + 1 + logBufferSize,
+                                diskNextWriteOffset, flushPageIndex);
+                        resetFlushPageIndex = true;
+                    }
 
                     // decrement activeTxnCountOnIndexes
-                    logManager.decrementActiveTxnCountOnIndexes(pageToFlush);
+                    logManager.decrementActiveTxnCountOnIndexes(flushPageIndex);
 
                     // reset the count to 1
-                    logManager.getLogPageOwnershipCount(pageToFlush).set(PageOwnershipStatus.LOG_WRITER);
+                    logManager.getLogPageOwnershipCount(flushPageIndex).set(PageOwnershipStatus.LOG_WRITER);
 
                     // mark the page as ACTIVE
-                    logManager.getLogPageStatus(pageToFlush).set(LogManager.PageState.ACTIVE);
+                    logManager.getLogPageStatus(flushPageIndex).set(LogManager.PageState.ACTIVE);
 
                     // #. checks the queue whether there is another flush
                     // request on the same log buffer
                     // If there is another request, then simply remove it.
-                    if (flushRequestQueue[pageToFlush].peek() != null) {
-                        flushRequestQueue[pageToFlush].take();
+                    if (flushRequestQueue[flushPageIndex].peek() != null) {
+                        flushRequestQueue[flushPageIndex].take();
                     }
 
                     // notify all waiting (transaction) threads.
-                    logManager.getLogPage(pageToFlush).notifyAll();
+                    logManager.getLogPage(flushPageIndex).notifyAll();
+
+                    if (resetFlushPageIndex) {
+                        flushPageIndex = logManager.getNextPageInSequence(flushPageIndex);
+                        resetFlushPageIndex = false;
+                    }
                 }
             } catch (IOException ioe) {
                 ioe.printStackTrace();
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManagerProperties.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManagerProperties.java
index 5040fa9..581ce4c 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManagerProperties.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManagerProperties.java
@@ -28,6 +28,7 @@
     public static final String NUM_LOG_PAGES_KEY = "num_log_pages";
     public static final String LOG_FILE_PREFIX_KEY = "log_file_prefix";
     public static final String GROUP_COMMIT_WAIT_PERIOD_KEY = "group_commit_wait_period";
+    public static final String DISK_SECTOR_SIZE_KEY = "disk_sector_size";
 
     private static final int DEFAULT_LOG_PAGE_SIZE = 128 * 1024; //128KB
     private static final int DEFAULT_NUM_LOG_PAGES = 8;
@@ -35,6 +36,7 @@
     private static final long DEFAULT_GROUP_COMMIT_WAIT_PERIOD = 200; // time in millisec.
     private static final String DEFAULT_LOG_FILE_PREFIX = "asterix_transaction_log";
     private static final String DEFAULT_LOG_DIRECTORY = "asterix_logs/";
+    private static final int DEFAULT_DISK_SECTOR_SIZE = 4096;
 
     // follow the naming convention <logFilePrefix>_<number> where number starts from 0
     private final String logFilePrefix;
@@ -51,6 +53,8 @@
     private final int logBufferSize;
     // maximum size of each log file
     private final long logPartitionSize;
+    // default disk sector size
+    private final int diskSectorSize;
 
     public LogManagerProperties(Properties properties, String nodeId) {
         this.logDirKey = new String(nodeId + LOG_DIR_SUFFIX_KEY);
@@ -66,6 +70,8 @@
         this.logBufferSize = logPageSize * numLogPages;
         //make sure that the log partition size is the multiple of log buffer size.
         this.logPartitionSize = (logPartitionSize / logBufferSize) * logBufferSize;
+        this.diskSectorSize = Integer.parseInt(properties.getProperty(DISK_SECTOR_SIZE_KEY, ""
+                + DEFAULT_DISK_SECTOR_SIZE));
     }
 
     public long getLogPartitionSize() {
@@ -99,6 +105,10 @@
     public String getLogDirKey() {
         return logDirKey;
     }
+    
+    public int getDiskSectorSize() {
+        return diskSectorSize;
+    }
 
     public String toString() {
         StringBuilder builder = new StringBuilder();
@@ -108,6 +118,7 @@
         builder.append("num_log_pages : " + numLogPages + FileUtil.lineSeparator);
         builder.append("log_partition_size : " + logPartitionSize + FileUtil.lineSeparator);
         builder.append("group_commit_wait_period : " + groupCommitWaitPeriod + FileUtil.lineSeparator);
+        builder.append("disk_sector_size : " + diskSectorSize + FileUtil.lineSeparator);
         return builder.toString();
     }
 }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java
index 6b882ef..8eae204 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java
@@ -60,7 +60,7 @@
     private final int RESOURCE_ID_POS = 25;
     private final int RESOURCE_MGR_ID_POS = 33;
     private final int LOG_RECORD_SIZE_POS = 34;
-
+    
     private ILogManager logManager;
 
     public LogRecordHelper(ILogManager logManager) {
@@ -118,7 +118,11 @@
 
     @Override
     public int getLogContentSize(LogicalLogLocator logicalLogLocater) {
-        return logicalLogLocater.getBuffer().readInt(logicalLogLocater.getMemoryOffset() + LOG_RECORD_SIZE_POS);
+        if (getLogType(logicalLogLocater) == LogType.COMMIT || getLogType(logicalLogLocater) == LogType.ENTITY_COMMIT) {
+            return 0;
+        } else {
+            return logicalLogLocater.getBuffer().readInt(logicalLogLocater.getMemoryOffset() + LOG_RECORD_SIZE_POS);
+        }
     }
 
     @Override