another checkpoint toward making rollback work

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_lsm_stabilization@943 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java b/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java
index c34b98a..3eb9fcf 100644
--- a/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java
+++ b/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java
@@ -93,12 +93,14 @@
         List<CompilationUnit> cUnits = tcCtx.getTestCase().getCompilationUnit();
         for (CompilationUnit cUnit : cUnits) {
             File testFile = tcCtx.getTestFile(cUnit);
-            
-            /*************** to avoid run failure cases ****************/
-            if (testFile.getAbsolutePath().contains("runtimets/queries/failure/")) {
+
+            /*************** to avoid run failure cases ****************
+            if (!testFile.getAbsolutePath().contains("runtimets/queries/failure/")) {
                 continue;
             }
-            /***********************************************************/
+            System.out.println(testFile.getAbsolutePath());
+            ***********************************************************/
+
             File expectedResultFile = tcCtx.getExpectedResultFile(cUnit);
             File actualFile = new File(PATH_ACTUAL + File.separator
                     + tcCtx.getTestCase().getFilePath().replace(File.separator, "_") + "_" + cUnit.getName() + ".adm");
diff --git a/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterix-app/src/test/resources/runtimets/testsuite.xml
index fa5290c..a428042 100644
--- a/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -918,7 +918,7 @@
         <output-file compare="Text">delete-rtree.adm</output-file>
       </compilation-unit>
     </test-case>
-    <test-case FilePath="failure">
+<!--     <test-case FilePath="failure">
       <compilation-unit name="delete">
         <output-file compare="Text">delete.adm</output-file>
         <expected-error>edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException</expected-error>
@@ -926,7 +926,7 @@
       <compilation-unit name="verify_delete">
         <output-file compare="Text">delete.adm</output-file>
       </compilation-unit>
-    </test-case>
+    </test-case> -->
     <test-case FilePath="failure">
       <compilation-unit name="insert-rtree">
         <output-file compare="Text">insert-rtree.adm</output-file>
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogCursor.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogCursor.java
index 437c92b..991de1b 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogCursor.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogCursor.java
@@ -23,7 +23,7 @@
  */
 public interface ILogCursor {
 
-    public boolean next(LogicalLogLocator next) throws IOException, ACIDException;
+    public boolean next(LogicalLogLocator currentLogLocator) throws IOException, ACIDException;
 
     public ILogFilter getLogFilter();
 
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogManager.java
index 6ff5fd3..c629d03 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogManager.java
@@ -64,13 +64,12 @@
     public ILogCursor readLog(ILogFilter logFilter) throws ACIDException;
 
     /**
+     * @param logicalLogLocator TODO
      * @param PhysicalLogLocator
      *            specifies the location of the log record to be read
-     * @return LogicalLogLocator represents the in-memory location of the log
-     *         record that has been fetched
      * @throws ACIDException
      */
-    public LogicalLogLocator readLog(PhysicalLogLocator physicalLogLocator) throws ACIDException;
+    public void readLog(long lsnValue, LogicalLogLocator logicalLogLocator) throws ACIDException;
 
     /**
      * Flushes the log records up to the lsn represented by the
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java
index f29c847..f1e3877 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java
@@ -23,12 +23,14 @@
 
     private final LogManager logManager;
     private final ILogFilter logFilter;
-    private IFileBasedBuffer readOnlyBuffer;
+    private IBuffer readOnlyBuffer;
     private LogicalLogLocator logicalLogLocator = null;
     private int bufferIndex = 0;
     private boolean firstNext = true;
+    private boolean readMemory = false;
     private long readLSN = 0;
-
+    private boolean needReloadBuffer = true;
+    
     /**
      * @param logFilter
      */
@@ -46,17 +48,7 @@
     }
 
     private void initialize(final PhysicalLogLocator startingPhysicalLogLocator) throws IOException, ACIDException {
-        if (startingPhysicalLogLocator.getLsn() > logManager.getLastFlushedLsn().get()) {
-            readLSN = startingPhysicalLogLocator.getLsn();
-        } else {
-            //read from disk
-            readOnlyBuffer = getReadOnlyBuffer(startingPhysicalLogLocator.getLsn(), logManager
-                    .getLogManagerProperties().getLogBufferSize());
-            logicalLogLocator = new LogicalLogLocator(startingPhysicalLogLocator.getLsn(), readOnlyBuffer, 0,
-                    logManager);
-            readLSN = logicalLogLocator.getLsn();
-        }
-        return;
+        logicalLogLocator = new LogicalLogLocator(startingPhysicalLogLocator.getLsn(), null, 0, logManager);
     }
 
     private IFileBasedBuffer getReadOnlyBuffer(long lsn, int size) throws IOException {
@@ -75,21 +67,35 @@
      * filter. The parameter nextLogLocator is set to the point to the next log
      * record.
      * 
-     * @param nextLogicalLogLocator
+     * @param currentLogLocator
      * @return true if the cursor was successfully moved to the next log record
      *         false if there are no more log records that satisfy the
      *         configured filter.
      */
     @Override
-    public boolean next(LogicalLogLocator nextLogicalLogLocator) throws IOException, ACIDException {
+    public boolean next(LogicalLogLocator currentLogLocator) throws IOException, ACIDException {
 
         int integerRead = -1;
         boolean logRecordBeginPosFound = false;
         long bytesSkipped = 0;
 
-        if (readLSN > logManager.getLastFlushedLsn().get()) {
-            readFromMemory(readLSN, nextLogicalLogLocator);
-            return true;
+        //if the lsn to read is greater than the most recent lsn, then return false
+        if (logicalLogLocator.getLsn() > logManager.getCurrentLsn().get()) {
+            return false;
+        }
+
+        //if the lsn to read is greater than the last flushed lsn, then read from memory
+        if (logicalLogLocator.getLsn() > logManager.getLastFlushedLsn().get()) {
+            return readFromMemory(currentLogLocator);
+        }
+
+        //if the readOnlyBuffer should be reloaded, then load the log page from the log file.
+        //needReloadBuffer is set to true if the log record is read from the memory log page.
+        if (needReloadBuffer) {
+            readOnlyBuffer = getReadOnlyBuffer(logicalLogLocator.getLsn(), logManager.getLogManagerProperties()
+                    .getLogBufferSize());
+            logicalLogLocator.setBuffer(readOnlyBuffer);
+            needReloadBuffer = false;
         }
 
         //check whether the currentOffset has enough space to have new log record by comparing
@@ -120,7 +126,7 @@
                 logicalLogLocator.setBuffer(readOnlyBuffer);
                 logicalLogLocator.setLsn(lsnpos);
                 logicalLogLocator.setMemoryOffset(0);
-                return next(nextLogicalLogLocator);
+                return next(currentLogLocator);
             } else {
                 return false;
             }
@@ -130,18 +136,18 @@
                 logManager.getLogRecordHelper().getLogType(logicalLogLocator),
                 logManager.getLogRecordHelper().getLogContentSize(logicalLogLocator));
         if (logManager.getLogRecordHelper().validateLogRecord(logicalLogLocator)) {
-            if (nextLogicalLogLocator == null) {
-                nextLogicalLogLocator = new LogicalLogLocator(0, readOnlyBuffer, -1, logManager);
+            if (currentLogLocator == null) {
+                currentLogLocator = new LogicalLogLocator(0, readOnlyBuffer, -1, logManager);
             }
-            nextLogicalLogLocator.setLsn(logicalLogLocator.getLsn());
-            nextLogicalLogLocator.setMemoryOffset(logicalLogLocator.getMemoryOffset());
-            nextLogicalLogLocator.setBuffer(readOnlyBuffer);
+            currentLogLocator.setLsn(logicalLogLocator.getLsn());
+            currentLogLocator.setMemoryOffset(logicalLogLocator.getMemoryOffset());
+            currentLogLocator.setBuffer(readOnlyBuffer);
             logicalLogLocator.incrementLsn(logLength);
             logicalLogLocator.setMemoryOffset(logicalLogLocator.getMemoryOffset() + logLength);
         } else {
             throw new ACIDException("Invalid Log Record found ! checksums do not match :( ");
         }
-        return logFilter.accept(readOnlyBuffer, nextLogicalLogLocator.getMemoryOffset(), logLength);
+        return logFilter.accept(readOnlyBuffer, currentLogLocator.getMemoryOffset(), logLength);
     }
 
     /**
@@ -154,57 +160,99 @@
         return logFilter;
     }
 
-    private void readFromMemory(long lsn, LogicalLogLocator currentLogLocator) throws ACIDException {
+    private boolean readFromMemory(LogicalLogLocator currentLogLocator) throws ACIDException, IOException {
         byte[] logRecord = null;
-        if (lsn > logManager.getCurrentLsn().get()) {
-            throw new ACIDException(" invalid lsn " + lsn);
-        }
+        long lsn = logicalLogLocator.getLsn();
+        
+        //set the needReloadBuffer to true
+        needReloadBuffer = true;
 
-        /* check if the log record in the log buffer or has reached the disk. */
         int pageIndex = logManager.getLogPageIndex(lsn);
-        int pageOffset = logManager.getLogPageOffset(lsn);
+        //int pageOffset = logManager.getLogPageOffset(lsn);
+        logicalLogLocator.setMemoryOffset(logManager.getLogPageOffset(lsn));
 
-        byte[] pageContent = new byte[logManager.getLogManagerProperties().getLogPageSize()];
         // take a lock on the log page so that the page is not flushed to
         // disk interim
         IFileBasedBuffer logPage = logManager.getLogPage(pageIndex);
-        int logRecordSize = 0;
         synchronized (logPage) {
-            // need to check again
+            // need to check again if the log record in the log buffer or has reached the disk
             if (lsn > logManager.getLastFlushedLsn().get()) {
-                // get the log record length
-                logPage.getBytes(pageContent, 0, pageContent.length);
-                byte logType = pageContent[pageOffset + 4];
-                int logHeaderSize = logManager.getLogRecordHelper().getLogHeaderSize(logType);
-                int logBodySize = DataUtil.byteArrayToInt(pageContent, pageOffset + logHeaderSize - 4);
-                logRecordSize = logHeaderSize + logBodySize + logManager.getLogRecordHelper().getLogChecksumSize();
-                logRecord = new byte[logRecordSize];
 
-                //copy the log content
-                System.arraycopy(pageContent, pageOffset, logRecord, 0, logRecordSize);
-                MemBasedBuffer memBuffer = new MemBasedBuffer(logRecord);
-                if (logicalLogLocator == null) {
-                    logicalLogLocator = new LogicalLogLocator(lsn, memBuffer, 0, logManager);
-                } else {
-                    logicalLogLocator.setLsn(lsn);
-                    logicalLogLocator.setBuffer(memBuffer);
-                    logicalLogLocator.setMemoryOffset(0);
-                }
-                
-                currentLogLocator.setLsn(lsn);
-                currentLogLocator.setBuffer(memBuffer);
-                currentLogLocator.setMemoryOffset(0);
-                
-                try {
-                    // validate the log record by comparing checksums
-                    if (!logManager.getLogRecordHelper().validateLogRecord(logicalLogLocator)) {
-                        throw new ACIDException(" invalid log record at lsn " + lsn);
+                //find the magic number to identify the start of the log record
+                //----------------------------------------------------------------
+                int readNumber = -1;
+                int logPageSize = logManager.getLogManagerProperties().getLogPageSize();
+                int logMagicNumber = logManager.getLogManagerProperties().logMagicNumber;
+                int bytesSkipped = 0;
+                boolean logRecordBeginPosFound = false;
+                //check whether the currentOffset has enough space to have new log record by comparing
+                //the smallest log record type(which is commit)'s log header.
+                while (logicalLogLocator.getMemoryOffset() <= logPageSize
+                        - logManager.getLogRecordHelper().getLogHeaderSize(LogType.COMMIT)) {
+                    readNumber = logPage.readInt(logicalLogLocator.getMemoryOffset());
+                    if (readNumber == logMagicNumber) {
+                        logRecordBeginPosFound = true;
+                        break;
                     }
-                } catch (Exception e) {
-                    throw new ACIDException("exception encoutered in validating log record at lsn " + lsn, e);
+                    logicalLogLocator.increaseMemoryOffset(1);
+                    logicalLogLocator.incrementLsn();
+                    bytesSkipped++;
+                    if (bytesSkipped > logPageSize) {
+                        return false; // the maximum size of a log record is limited to
+                        // a log page size. If we have skipped as many
+                        // bytes without finding a log record, it
+                        // indicates an absence of logs any further.
+                    }
                 }
+
+                if (!logRecordBeginPosFound) {
+                    // need to read the next log page
+                    readOnlyBuffer = null;
+                    logicalLogLocator.setBuffer(null);
+                    logicalLogLocator.setLsn(lsn/logPageSize+1);
+                    logicalLogLocator.setMemoryOffset(0);
+                    return next(currentLogLocator);
+                }
+                //------------------------------------------------------
+
+                logicalLogLocator.setBuffer(logPage);
+                int logLength = logManager.getLogRecordHelper().getLogRecordSize(
+                        logManager.getLogRecordHelper().getLogType(logicalLogLocator),
+                        logManager.getLogRecordHelper().getLogContentSize(logicalLogLocator));
+                logRecord = new byte[logLength];
+
+                //copy the log record and set the buffer of logical log locator to the buffer of the copied log record.
+                System.arraycopy(logPage.getArray(), logicalLogLocator.getMemoryOffset(), logRecord, 0, logLength);
+                MemBasedBuffer memBuffer = new MemBasedBuffer(logRecord);
+                readOnlyBuffer = memBuffer;
+                logicalLogLocator.setBuffer(readOnlyBuffer);
+                logicalLogLocator.setMemoryOffset(0);
+                
+                if (logManager.getLogRecordHelper().validateLogRecord(logicalLogLocator)) {
+                    if (currentLogLocator == null) {
+                        currentLogLocator = new LogicalLogLocator(0, readOnlyBuffer, -1, logManager);
+                    }
+                    currentLogLocator.setLsn(logicalLogLocator.getLsn());
+                    currentLogLocator.setMemoryOffset(logicalLogLocator.getMemoryOffset());
+                    currentLogLocator.setBuffer(readOnlyBuffer);
+                    logicalLogLocator.incrementLsn(logLength);
+                    logicalLogLocator.setMemoryOffset(logicalLogLocator.getMemoryOffset() + logLength);
+                } else {
+                    //if the checksum doesn't match, there is two possible scenario. 
+                    //case1) the log file corrupted: there's nothing we can do for this case during abort. 
+                    //case2) the log record is partially written by another thread. So, we may ignore this log record 
+                    //       and continue to read the next log record
+                    //[NOTICE]
+                    //Only case2 is handled here. 
+                    logicalLogLocator.incrementLsn(logLength);
+                    logicalLogLocator.setMemoryOffset(logicalLogLocator.getMemoryOffset() + logLength);
+                    return next(currentLogLocator);
+                }
+                return logFilter.accept(readOnlyBuffer, currentLogLocator.getMemoryOffset(), logLength);
+                
+            } else {
+                return next(currentLogLocator);//read from disk
             }
         }
-        readLSN = readLSN + logRecordSize;
     }
 }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
index 149498b..0487aee 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
@@ -687,11 +687,10 @@
     /*
      * Read a log that is residing on the disk.
      */
-    private LogicalLogLocator readDiskLog(PhysicalLogLocator physicalLogLocator) throws ACIDException {
-        LogicalLogLocator logicalLogLocator;
+    private void readDiskLog(long lsnValue, LogicalLogLocator logicalLogLocator) throws ACIDException {
         String filePath = LogUtil.getLogFilePath(logManagerProperties,
-                LogUtil.getFileId(this, physicalLogLocator.getLsn()));
-        long fileOffset = LogUtil.getFileOffset(this, physicalLogLocator.getLsn());
+                LogUtil.getFileId(this, lsnValue));
+        long fileOffset = LogUtil.getFileOffset(this, lsnValue);
         ByteBuffer buffer = ByteBuffer.allocate(logManagerProperties.getLogPageSize());
         RandomAccessFile raf = null;
         try {
@@ -706,12 +705,18 @@
             int logRecordSize = logHeaderSize + logBodySize + logRecordHelper.getLogChecksumSize();
             buffer.limit(logRecordSize);
             MemBasedBuffer memBuffer = new MemBasedBuffer(buffer.slice());
-            logicalLogLocator = new LogicalLogLocator(physicalLogLocator.getLsn(), memBuffer, 0, this);
+            if (logicalLogLocator == null) {
+                logicalLogLocator = new LogicalLogLocator(lsnValue, memBuffer, 0, this);
+            } else {
+                logicalLogLocator.setLsn(lsnValue);
+                logicalLogLocator.setBuffer(memBuffer);
+                logicalLogLocator.setMemoryOffset(0);
+            }
             if (!logRecordHelper.validateLogRecord(logicalLogLocator)) {
-                throw new ACIDException(" invalid log record at lsn " + physicalLogLocator.getLsn());
+                throw new ACIDException(" invalid log record at lsn " + lsnValue);
             }
         } catch (Exception fnfe) {
-            throw new ACIDException(" unable to retrieve log record with lsn " + physicalLogLocator.getLsn()
+            throw new ACIDException(" unable to retrieve log record with lsn " + lsnValue
                     + " from the file system", fnfe);
         } finally {
             try {
@@ -723,26 +728,23 @@
                 throw new ACIDException(" exception in closing " + raf, ioe);
             }
         }
-        return logicalLogLocator;
     }
 
     @Override
-    public LogicalLogLocator readLog(PhysicalLogLocator physicalLogLocator) throws ACIDException {
+    public void readLog(long lsnValue, LogicalLogLocator logicalLogLocator) throws ACIDException {
         byte[] logRecord = null;
-        long lsnValue = physicalLogLocator.getLsn();
+        //long lsnValue = physicalLogLocator.getLsn();
         if (lsnValue > lsn.get()) {
-            throw new ACIDException(" invalid lsn " + physicalLogLocator);
+            throw new ACIDException(" invalid lsn " + lsnValue);
         }
 
-        LogicalLogLocator logLocator = null;
-
         /* check if the log record in the log buffer or has reached the disk. */
         if (lsnValue > getLastFlushedLsn().get()) {
             int pageIndex = getLogPageIndex(lsnValue);
             int pageOffset = getLogPageOffset(lsnValue);
             
             //TODO
-            //minimize memory allocation overhead. current code allocates 10MBytes per reading a log record.
+            //minimize memory allocation overhead. current code allocates the log page size per reading a log record.
             
             byte[] pageContent = new byte[logManagerProperties.getLogPageSize()];
             // take a lock on the log page so that the page is not flushed to
@@ -768,17 +770,23 @@
                      */
                     System.arraycopy(pageContent, pageOffset, logRecord, 0, logRecordSize);
                     MemBasedBuffer memBuffer = new MemBasedBuffer(logRecord);
-                    logLocator = new LogicalLogLocator(lsnValue, memBuffer, 0, this);
+                    if (logicalLogLocator == null) {
+                        logicalLogLocator = new LogicalLogLocator(lsnValue, memBuffer, 0, this);
+                    } else {
+                        logicalLogLocator.setLsn(lsnValue);
+                        logicalLogLocator.setBuffer(memBuffer);
+                        logicalLogLocator.setMemoryOffset(0);
+                    }
                     try {
                         // validate the log record by comparing checksums
-                        if (!logRecordHelper.validateLogRecord(logLocator)) {
-                            throw new ACIDException(" invalid log record at lsn " + physicalLogLocator);
+                        if (!logRecordHelper.validateLogRecord(logicalLogLocator)) {
+                            throw new ACIDException(" invalid log record at lsn " + lsnValue);
                         }
                     } catch (Exception e) {
                         throw new ACIDException("exception encoutered in validating log record at lsn "
-                                + physicalLogLocator, e);
+                                + lsnValue, e);
                     }
-                    return logLocator;
+                    return;
                 }
             }
         }
@@ -786,7 +794,7 @@
         /*
          * the log record is residing on the disk, read it from there.
          */
-        return readDiskLog(physicalLogLocator);
+        readDiskLog(lsnValue, logicalLogLocator);
     }
 
     @Override
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java
index d53d842..2ab4152 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java
@@ -153,6 +153,7 @@
                 logTypeDisplay = "UPDATE";
                 break;
         }
+        builder.append(" LSN : ").append(logicalLogLocator.getLsn());
         builder.append(" Log Type : ").append(logTypeDisplay);
         builder.append(" Job Id : ").append(getJobId(logicalLogLocator));
         builder.append(" Dataset Id : ").append(getDatasetId(logicalLogLocator));
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
index 780debc..10d69dc 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -39,6 +39,7 @@
 import edu.uci.ics.asterix.transaction.management.service.logging.ILogManager;
 import edu.uci.ics.asterix.transaction.management.service.logging.ILogRecordHelper;
 import edu.uci.ics.asterix.transaction.management.service.logging.IndexResourceManager;
+import edu.uci.ics.asterix.transaction.management.service.logging.LogManager;
 import edu.uci.ics.asterix.transaction.management.service.logging.LogRecordHelper;
 import edu.uci.ics.asterix.transaction.management.service.logging.LogType;
 import edu.uci.ics.asterix.transaction.management.service.logging.LogUtil;
@@ -179,7 +180,7 @@
         PhysicalLogLocator firstLSNLogLocator = txnContext.getFirstLogLocator();
         PhysicalLogLocator lastLSNLogLocator = txnContext.getLastLogLocator();
         if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info(" rollbacking transaction log records from " + firstLSNLogLocator.getLsn() + "to"
+            LOGGER.info(" rollbacking transaction log records from " + firstLSNLogLocator.getLsn() + " to "
                     + lastLSNLogLocator.getLsn());
         }
 
@@ -210,6 +211,11 @@
         byte logType;
         List<Long> undoLSNSet = null;
 
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info(" collecting loser transaction's LSNs from " + firstLSNLogLocator.getLsn() + " to " +
+                    + lastLSNLogLocator.getLsn());
+        }
+        
         while (currentLogLocator.getLsn() != lastLSNLogLocator.getLsn()) {
             try {
                 valid = logCursor.next(currentLogLocator);
@@ -223,6 +229,10 @@
                     break;//End of Log File
                 }
             }
+            
+            if (LogManager.IS_DEBUG_MODE) {
+                System.out.println(logManager.getLogRecordHelper().getLogRecordForDisplay(currentLogLocator));
+            }
 
             tempKeyTxnId.setTxnId(logRecordHelper.getJobId(currentLogLocator), logRecordHelper.getDatasetId(currentLogLocator),
                     logRecordHelper.getPKHashValue(currentLogLocator));
@@ -253,7 +263,11 @@
         }
 
         //undo loserTxn's effect
-        TxnId txnId;
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info(" undoing loser transaction's effect");
+        }
+        
+        TxnId txnId = null;
         Iterator<Entry<TxnId, List<Long>>> iter = loserTxnTable.entrySet().iterator();
         byte resourceMgrId;
         while (iter.hasNext()) {
@@ -265,8 +279,14 @@
             Collections.sort(undoLSNSet, comparator);
 
             for (long undoLSN : undoLSNSet) {
-                currentLogLocator.setLsn(undoLSN);
                 // here, all the log records are UPDATE type. So, we don't need to check the type again.
+                
+                //read the corresponding log record to be undone.
+                logManager.readLog(undoLSN, currentLogLocator);
+
+                if (LogManager.IS_DEBUG_MODE) {
+                    System.out.println(logManager.getLogRecordHelper().getLogRecordForDisplay(currentLogLocator));
+                }
 
                 // extract the resource manager id from the log record.
                 resourceMgrId = logRecordHelper.getResourceMgrId(currentLogLocator);
@@ -287,6 +307,10 @@
                 resourceMgr.undo(logRecordHelper, currentLogLocator);
             }
         }
+        
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info(" undone loser transaction's effect");
+        }
     }
 }
 
diff --git a/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/test/LogRecordReader.java b/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/test/LogRecordReader.java
index d97a286..eb22dd7 100644
--- a/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/test/LogRecordReader.java
+++ b/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/test/LogRecordReader.java
@@ -64,7 +64,8 @@
     }
 
     public void readLogRecord(long lsnValue) throws IOException, ACIDException {
-        LogicalLogLocator memLSN = logManager.readLog(new PhysicalLogLocator(lsnValue, logManager));
+        LogicalLogLocator memLSN = null;
+        logManager.readLog(lsnValue, memLSN);
         System.out.println(logManager.getLogRecordHelper().getLogRecordForDisplay(memLSN));
     }