changes for fixing the logCursor bug which appeared during Rollback when logs are read from both disk log file and memory log buffer

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_lsm_stabilization@1336 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java
index 5d57603..8533e74 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java
@@ -44,6 +44,7 @@
     private final DatasetId datasetId;
     private final int[] primaryKeyFields;
     private final boolean isWriteTransaction;
+    private final long[] longHashes; 
 
     private TransactionContext transactionContext;
     private RecordDescriptor inputRecordDesc;
@@ -61,6 +62,7 @@
         this.primaryKeyFields = primaryKeyFields;
         this.frameTupleReference = new FrameTupleReference();
         this.isWriteTransaction = isWriteTransaction;
+        this.longHashes= new long[2];
     }
 
     @Override
@@ -91,7 +93,6 @@
     }
     
     private int computePrimaryKeyHashValue(ITupleReference tuple, int[] primaryKeyFields) {
-        long[] longHashes= new long[2];
         MurmurHash128Bit.hash3_x64_128(tuple, primaryKeyFields, SEED, longHashes);
         return Math.abs((int) longHashes[0]); 
     }
diff --git a/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java b/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java
index 36236d4..4e4d78b 100644
--- a/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java
+++ b/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java
@@ -106,12 +106,11 @@
         for (CompilationUnit cUnit : cUnits) {
             File testFile = tcCtx.getTestFile(cUnit);
 
-            /*************** to avoid run failure cases ****************
-            if (!testFile.getAbsolutePath().contains("queries/failure")) {//&& !testFile.getAbsolutePath().contains("partition-by-nonexistent-field.aql"))       ) {
+            /*************** to avoid/run failure cases ****************
+            if (testFile.getAbsolutePath().contains("queries/failure")) {
                 continue;
                 //System.out.println(testFile.getAbsolutePath());
             }
-            System.out.println(testFile.getAbsolutePath());
             ************************************************************/
 
             File expectedResultFile = tcCtx.getExpectedResultFile(cUnit);
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/AbstractOperationCallback.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/AbstractOperationCallback.java
index 9e4c944..fdbb707 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/AbstractOperationCallback.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/AbstractOperationCallback.java
@@ -30,6 +30,7 @@
     protected final ILockManager lockManager;
     protected final TransactionContext txnCtx;
     protected int transactorLocalNumActiveOperations = 0;
+    protected final long[] longHashes;
 
     public AbstractOperationCallback(int datasetId, int[] primaryKeyFields,
             TransactionContext txnCtx, ILockManager lockManager) {
@@ -37,10 +38,10 @@
         this.primaryKeyFields = primaryKeyFields;
         this.txnCtx = txnCtx;
         this.lockManager = lockManager;
+        this.longHashes= new long[2];
     }
 
     public int computePrimaryKeyHashValue(ITupleReference tuple, int[] primaryKeyFields) {
-        long[] longHashes= new long[2];
         MurmurHash128Bit.hash3_x64_128(tuple, primaryKeyFields, SEED, longHashes);
         return Math.abs((int) longHashes[0]); 
     }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java
index 61c47f4..e04ca95 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java
@@ -82,8 +82,8 @@
         boolean logRecordBeginPosFound = false;
         long bytesSkipped = 0;
 
-        //if the lsn to read is greater than the most recent lsn, then return false
-        if (logicalLogLocator.getLsn() > logManager.getCurrentLsn().get()) {
+        //if the lsn to read is greater than or equal to the most recent lsn, then return false
+        if (logicalLogLocator.getLsn() >= logManager.getCurrentLsn().get()) {
             return false;
         }
 
@@ -98,9 +98,10 @@
             readOnlyBuffer = getReadOnlyBuffer(logicalLogLocator.getLsn(), logManager.getLogManagerProperties()
                     .getLogBufferSize());
             logicalLogLocator.setBuffer(readOnlyBuffer);
+            logicalLogLocator.setMemoryOffset(0);
             needReloadBuffer = false;
         }
-
+        
         //check whether the currentOffset has enough space to have new log record by comparing
         //the smallest log record type(which is commit)'s log header.
         while (logicalLogLocator.getMemoryOffset() <= readOnlyBuffer.getSize()
@@ -179,7 +180,6 @@
         needReloadBuffer = true;
 
         int pageIndex = logManager.getLogPageIndex(lsn);
-        //int pageOffset = logManager.getLogPageOffset(lsn);
         logicalLogLocator.setMemoryOffset(logManager.getLogPageOffset(lsn));
 
         // take a lock on the log page so that the page is not flushed to
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
index 8d66f6b..64376aa 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
@@ -73,9 +73,9 @@
      * point, the ownership count is decremented as the transaction is done with
      * using the page. When a page is requested to be flushed, logPageFlusher
      * set the count to 0(LOG_FLUSHER: meaning that the page is being flushed)
-     * only if the count is 1(LOG_WRITER: meaning that there is no other 
-     * transactions who own the page to write logs.) After flushing the page, 
-     * logPageFlusher set this count to 1. 
+     * only if the count is 1(LOG_WRITER: meaning that there is no other
+     * transactions who own the page to write logs.) After flushing the page,
+     * logPageFlusher set this count to 1.
      */
     private AtomicInteger[] logPageOwnerCount;
 
@@ -86,15 +86,16 @@
 
     /*
      * LogPageStatus: A page is either ACTIVE or INACTIVE. The status for each
-     * page is maintained in logPageStatus. A page is ACTIVE when the LogManager 
-     * can allocate space in the page for writing a log record. Initially all 
-     * pages are ACTIVE. As transactions fill up space by writing log records, 
-     * a page may not have sufficient space left for serving a request by a 
+     * page is maintained in logPageStatus. A page is ACTIVE when the LogManager
+     * can allocate space in the page for writing a log record. Initially all
+     * pages are ACTIVE. As transactions fill up space by writing log records, a
+     * page may not have sufficient space left for serving a request by a
      * transaction. When this happens, the page is flushed to disk by calling
-     * logPageFlusher.requestFlush(). In the requestFlush(), after groupCommitWaitTime,
-     * the page status is set to INACTIVE. Then, there is no more writer on the
-     * page(meaning the corresponding logPageOwnerCount is 1), the page is flushed
-     * by the logPageFlusher and the status is reset to ACTIVE by the logPageFlusher.   
+     * logPageFlusher.requestFlush(). In the requestFlush(), after
+     * groupCommitWaitTime, the page status is set to INACTIVE. Then, there is
+     * no more writer on the page(meaning the corresponding logPageOwnerCount is
+     * 1), the page is flushed by the logPageFlusher and the status is reset to
+     * ACTIVE by the logPageFlusher.
      */
     private AtomicInteger[] logPageStatus;
 
@@ -297,22 +298,28 @@
             boolean forwardPage = false;
             long old = lsn.get();
 
-            //get the log page corresponding to the current lsn value
+            // get the log page corresponding to the current lsn value
             int pageIndex = getLogPageIndex(old);
             long retVal = old;
 
-            // the lsn value for the next request if the current request is served.
+            // the lsn value for the next request if the current request is
+            // served.
             long next = old + entrySize;
             int prevPage = -1;
 
-            // check if the log  record will cross page boundaries, a case that is not allowed.
+            // check if the log record will cross page boundaries, a case that
+            // is not allowed.
             if ((next - 1) / pageSize != old / pageSize || (next % pageSize == 0)) {
 
                 if ((old != 0 && old % pageSize == 0)) {
-                    // On second thought, this shall never be the case as it means that the lsn is
-                    // currently at the beginning of a page and we still need to forward the page which
-                    // means that the entrySize exceeds a log page size. If this is the case, an
-                    // exception is thrown before calling this API. would remove this case.
+                    // On second thought, this shall never be the case as it
+                    // means that the lsn is
+                    // currently at the beginning of a page and we still need to
+                    // forward the page which
+                    // means that the entrySize exceeds a log page size. If this
+                    // is the case, an
+                    // exception is thrown before calling this API. would remove
+                    // this case.
                     retVal = old;
 
                 } else {
@@ -322,7 +329,8 @@
 
                 next = retVal;
 
-                // as the log record shall cross log page boundary, we must re-assign the lsn so
+                // as the log record shall cross log page boundary, we must
+                // re-assign the lsn so
                 // that the log record begins on a different location.
                 forwardPage = true;
 
@@ -345,7 +353,8 @@
             waitUntillPageIsAvailableForWritingLog(pageIndex);
 
             if (!lsn.compareAndSet(old, next)) {
-                // Atomic call -> returns true only when the value represented by lsn is same as
+                // Atomic call -> returns true only when the value represented
+                // by lsn is same as
                 // "old". The value is updated to "next".
                 continue;
             }
@@ -367,8 +376,10 @@
                 // space in the log page and hence is an owner.
                 logPageOwnerCount[pageIndex].incrementAndGet();
 
-                // Before the count is incremented, if the flusher flushed the allocated page, 
-                // then retry to get new LSN. Otherwise, the log with allocated lsn will be lost. 
+                // Before the count is incremented, if the flusher flushed the
+                // allocated page,
+                // then retry to get new LSN. Otherwise, the log with allocated
+                // lsn will be lost.
                 if (lastFlushedLSN.get() >= retVal) {
                     logPageOwnerCount[pageIndex].decrementAndGet();
                     continue;
@@ -387,7 +398,8 @@
         HashMap<TransactionContext, Integer> map = null;
         int activeTxnCount;
 
-        // logLocator is a re-usable object that is appropriately set in each invocation. 
+        // logLocator is a re-usable object that is appropriately set in each
+        // invocation.
         // If the reference is null, the log manager must throw an exception.
         if (logicalLogLocator == null) {
             throw new ACIDException(
@@ -409,7 +421,8 @@
         // all constraints checked and we are good to go and acquire a lsn.
         long previousLSN = -1;
 
-        // the will be set to the location (a long value) where the log record needs to be placed.
+        // the will be set to the location (a long value) where the log record
+        // needs to be placed.
         long currentLSN;
 
         // The logs written by a transaction need to be linked to each other for
@@ -435,7 +448,8 @@
          * performed correctly that is ownership is released.
          */
 
-        // indicates if the transaction thread has release ownership of the page.
+        // indicates if the transaction thread has release ownership of the
+        // page.
         boolean decremented = false;
 
         int pageIndex = (int) getLogPageIndex(currentLSN);
@@ -455,8 +469,9 @@
             // content in the correct region of the allocated space.
             logicalLogLocator.increaseMemoryOffset(logRecordHelper.getLogHeaderSize(logType));
 
-            // a COMMIT log record does not have any content and hence 
-            // the logger (responsible for putting the log content) is not invoked.
+            // a COMMIT log record does not have any content and hence
+            // the logger (responsible for putting the log content) is not
+            // invoked.
             if (logContentSize != 0) {
                 logger.preLog(txnCtx, reusableLogContentObject);
             }
@@ -486,7 +501,8 @@
                 System.out.println("--------------> LSN(" + currentLSN + ") is written");
             }
 
-            // release the ownership as the log record has been placed in created space.
+            // release the ownership as the log record has been placed in
+            // created space.
             logPageOwnerCount[pageIndex].decrementAndGet();
 
             // indicating that the transaction thread has released ownership
@@ -548,14 +564,17 @@
     private void readDiskLog(long lsnValue, LogicalLogLocator logicalLogLocator) throws ACIDException {
         String filePath = LogUtil.getLogFilePath(logManagerProperties, LogUtil.getFileId(this, lsnValue));
         long fileOffset = LogUtil.getFileOffset(this, lsnValue);
+
         ByteBuffer buffer = ByteBuffer.allocate(logManagerProperties.getLogPageSize());
         RandomAccessFile raf = null;
+        FileChannel fileChannel = null;
         try {
             raf = new RandomAccessFile(filePath, "r");
-            raf.seek(fileOffset);
-            FileChannel fileChannel = raf.getChannel();
+            fileChannel = raf.getChannel();
+            fileChannel.position(fileOffset);
             fileChannel.read(buffer);
             buffer.position(0);
+            
             byte logType = buffer.get(4);
             int logHeaderSize = logRecordHelper.getLogHeaderSize(logType);
             int logBodySize = buffer.getInt(logHeaderSize - 4);
@@ -573,16 +592,19 @@
                 throw new ACIDException(" invalid log record at lsn " + lsnValue);
             }
         } catch (Exception fnfe) {
+            fnfe.printStackTrace();
             throw new ACIDException(" unable to retrieve log record with lsn " + lsnValue + " from the file system",
                     fnfe);
         } finally {
             try {
-                if (raf != null) {
+                if (fileChannel != null) {
+                    fileChannel.close();
+                } else if (raf != null) {
                     raf.close();
                 }
             } catch (IOException ioe) {
                 ioe.printStackTrace();
-                throw new ACIDException(" exception in closing " + raf, ioe);
+                throw new ACIDException(" exception in closing a file: " + filePath, ioe);
             }
         }
     }
@@ -590,8 +612,8 @@
     @Override
     public void readLog(long lsnValue, LogicalLogLocator logicalLogLocator) throws ACIDException {
         byte[] logRecord = null;
-        //long lsnValue = physicalLogLocator.getLsn();
-        if (lsnValue > lsn.get()) {
+
+        if (lsnValue >= lsn.get()) {
             throw new ACIDException(" invalid lsn " + lsnValue);
         }
 
@@ -600,15 +622,18 @@
             int pageIndex = getLogPageIndex(lsnValue);
             int pageOffset = getLogPageOffset(lsnValue);
 
-            //TODO
-            //minimize memory allocation overhead. current code allocates the log page size per reading a log record.
+            // TODO
+            // minimize memory allocation overhead. current code allocates the
+            // log page size per reading a log record.
 
             byte[] pageContent = new byte[logManagerProperties.getLogPageSize()];
 
-            // take a lock on the log page so that the page is not flushed to disk interim
+            // take a lock on the log page so that the page is not flushed to
+            // disk interim
             synchronized (logPages[pageIndex]) {
 
-                // need to check again (this thread may have got de-scheduled and must refresh!)
+                // need to check again (this thread may have got de-scheduled
+                // and must refresh!)
                 if (lsnValue > getLastFlushedLsn().get()) {
 
                     // get the log record length
@@ -669,9 +694,9 @@
         lsn.set(startingLSN);
         return nextPhysicalLsn;
     }
-    
+
     private void closeLogPages() throws ACIDException {
-        for (int i=0; i < numLogPages; i++) {
+        for (int i = 0; i < numLogPages; i++) {
             try {
                 logPages[i].close();
             } catch (IOException e) {
@@ -837,20 +862,21 @@
 
     public void requestFlush(int pageIndex, long lsn, boolean isSynchronous) {
         synchronized (logManager.getLogPage(pageIndex)) {
-            //return if flushedLSN >= lsn
+            // return if flushedLSN >= lsn
             if (logManager.getLastFlushedLsn().get() >= lsn) {
                 return;
             }
 
-            //put a new request to the queue only if the request on the page is not in the queue.
+            // put a new request to the queue only if the request on the page is
+            // not in the queue.
             flushRequestQueue[pageIndex].offer(flushRequests[pageIndex]);
 
-            //return if the request is asynchronous
+            // return if the request is asynchronous
             if (!isSynchronous) {
                 return;
             }
 
-            //wait until there is flush.
+            // wait until there is flush.
             boolean isNotified = false;
             while (!isNotified) {
                 try {
@@ -889,23 +915,26 @@
                                 logManager.getLogManagerProperties().getLogPageSize());
                     }
 
-                    //#. sleep during the groupCommitWaitTime
+                    // #. sleep during the groupCommitWaitTime
                     sleep(groupCommitWaitPeriod);
 
-                    //#. set the logPageStatus to INACTIVE in order to prevent other txns from writing on this page.
+                    // #. set the logPageStatus to INACTIVE in order to prevent
+                    // other txns from writing on this page.
                     logManager.getLogPageStatus(pageToFlush).set(PageState.INACTIVE);
 
-                    //#. need to wait until the logPageOwnerCount reaches 1 (LOG_WRITER) 
-                    //   meaning every one has finished writing logs on this page.
+                    // #. need to wait until the logPageOwnerCount reaches 1
+                    // (LOG_WRITER)
+                    // meaning every one has finished writing logs on this page.
                     while (logManager.getLogPageOwnershipCount(pageToFlush).get() != PageOwnershipStatus.LOG_WRITER) {
                         sleep(0);
                     }
 
-                    //#. set the logPageOwnerCount to 0 (LOG_FLUSHER)
-                    //   meaning it is flushing. 
+                    // #. set the logPageOwnerCount to 0 (LOG_FLUSHER)
+                    // meaning it is flushing.
                     logManager.getLogPageOwnershipCount(pageToFlush).set(PageOwnershipStatus.LOG_FLUSHER);
 
-                    // put the content to disk (the thread still has a lock on the log page)
+                    // put the content to disk (the thread still has a lock on
+                    // the log page)
                     logManager.getLogPage(pageToFlush).flush();
 
                     // increment the last flushed lsn and lastFlushedPage
@@ -927,8 +956,9 @@
                     // mark the page as ACTIVE
                     logManager.getLogPageStatus(pageToFlush).set(LogManager.PageState.ACTIVE);
 
-                    //#. checks the queue whether there is another flush request on the same log buffer
-                    //   If there is another request, then simply remove it.
+                    // #. checks the queue whether there is another flush
+                    // request on the same log buffer
+                    // If there is another request, then simply remove it.
                     if (flushRequestQueue[pageToFlush].peek() != null) {
                         flushRequestQueue[pageToFlush].take();
                     }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
index fdd3f07..226c81b 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -625,7 +625,7 @@
                 }
             }
 
-            if (LogManager.IS_DEBUG_MODE) {
+            if (IS_DEBUG_MODE) {
                 System.out.println(logManager.getLogRecordHelper().getLogRecordForDisplay(currentLogLocator));
             }
 
@@ -640,7 +640,7 @@
                         TxnId txnId = new TxnId(logRecordHelper.getJobId(currentLogLocator),
                                 logRecordHelper.getDatasetId(currentLogLocator),
                                 logRecordHelper.getPKHashValue(currentLogLocator));
-                        undoLSNSet = new ArrayList<Long>();
+                        undoLSNSet = new LinkedList<Long>();
                         loserTxnTable.put(txnId, undoLSNSet);
                     }
                     undoLSNSet.add(currentLogLocator.getLsn());
@@ -679,13 +679,13 @@
         byte resourceMgrId;
         int undoCount = 0;
         while (iter.hasNext()) {
-
+            //TODO 
+            //Sort the lsns in order to undo in one pass. 
+            
             Map.Entry<TxnId, List<Long>> loserTxn = (Map.Entry<TxnId, List<Long>>) iter.next();
             txnId = loserTxn.getKey();
 
             undoLSNSet = loserTxn.getValue();
-            Comparator<Long> comparator = Collections.reverseOrder();
-            Collections.sort(undoLSNSet, comparator);
 
             for (long undoLSN : undoLSNSet) {
                 // here, all the log records are UPDATE type. So, we don't need to check the type again.
@@ -693,7 +693,7 @@
                 //read the corresponding log record to be undone.
                 logManager.readLog(undoLSN, currentLogLocator);
 
-                if (LogManager.IS_DEBUG_MODE) {
+                if (IS_DEBUG_MODE) {
                     System.out.println(logManager.getLogRecordHelper().getLogRecordForDisplay(currentLogLocator));
                 }
 
diff --git a/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/test/LogRecordReader.java b/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/test/LogRecordReader.java
index eb22dd7..260e5ff 100644
--- a/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/test/LogRecordReader.java
+++ b/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/test/LogRecordReader.java
@@ -51,12 +51,12 @@
                 return true;
             }
         });
-        LogicalLogLocator memLSN = LogUtil.getDummyLogicalLogLocator(logManager);
+        LogicalLogLocator currentLogLocator = LogUtil.getDummyLogicalLogLocator(logManager);
         int logCount = 0;
         while (true) {
-            boolean logValidity = logCursor.next(memLSN);
+            boolean logValidity = logCursor.next(currentLogLocator);
             if (logValidity) {
-                System.out.println(++logCount + parser.getLogRecordForDisplay(memLSN));
+                System.out.println(++logCount + parser.getLogRecordForDisplay(currentLogLocator));
             } else {
                 break;
             }
@@ -73,16 +73,13 @@
      * @param args
      */
     public static void main(String[] args) throws ACIDException, Exception {
-        long lsnValue = 10747454;
-        String id = "nc1";
-        String logDir = "/home/raman/research/work/hyracks-branches/svn/trunk/hyracks/asterix_logs/";
         Properties props = new Properties();
-        props.setProperty(LogManagerProperties.LOG_DIR_KEY, logDir + "/" + id);
+        props.setProperty(LogManagerProperties.LOG_DIR_KEY,
+                "/home/kisskys/workspace/lsm_merge/asterix_lsm_stabilization/debug/asterix_logs/nc1");
         LogManagerProperties logProps = new LogManagerProperties(props);
         LogManager logManager = new LogManager(null, logProps);
         LogRecordReader logReader = new LogRecordReader(logManager);
         logReader.readLogs(0);
-        //   logReader.readLogRecord(1703620);
     }
 
 }