changes for fixing the logCursor bug which appeared during Rollback when logs are read from both disk log file and memory log buffer
git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_lsm_stabilization@1336 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java
index 5d57603..8533e74 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java
@@ -44,6 +44,7 @@
private final DatasetId datasetId;
private final int[] primaryKeyFields;
private final boolean isWriteTransaction;
+ private final long[] longHashes;
private TransactionContext transactionContext;
private RecordDescriptor inputRecordDesc;
@@ -61,6 +62,7 @@
this.primaryKeyFields = primaryKeyFields;
this.frameTupleReference = new FrameTupleReference();
this.isWriteTransaction = isWriteTransaction;
+ this.longHashes= new long[2];
}
@Override
@@ -91,7 +93,6 @@
}
private int computePrimaryKeyHashValue(ITupleReference tuple, int[] primaryKeyFields) {
- long[] longHashes= new long[2];
MurmurHash128Bit.hash3_x64_128(tuple, primaryKeyFields, SEED, longHashes);
return Math.abs((int) longHashes[0]);
}
diff --git a/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java b/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java
index 36236d4..4e4d78b 100644
--- a/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java
+++ b/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java
@@ -106,12 +106,11 @@
for (CompilationUnit cUnit : cUnits) {
File testFile = tcCtx.getTestFile(cUnit);
- /*************** to avoid run failure cases ****************
- if (!testFile.getAbsolutePath().contains("queries/failure")) {//&& !testFile.getAbsolutePath().contains("partition-by-nonexistent-field.aql")) ) {
+ /*************** to avoid/run failure cases ****************
+ if (testFile.getAbsolutePath().contains("queries/failure")) {
continue;
//System.out.println(testFile.getAbsolutePath());
}
- System.out.println(testFile.getAbsolutePath());
************************************************************/
File expectedResultFile = tcCtx.getExpectedResultFile(cUnit);
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/AbstractOperationCallback.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/AbstractOperationCallback.java
index 9e4c944..fdbb707 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/AbstractOperationCallback.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/AbstractOperationCallback.java
@@ -30,6 +30,7 @@
protected final ILockManager lockManager;
protected final TransactionContext txnCtx;
protected int transactorLocalNumActiveOperations = 0;
+ protected final long[] longHashes;
public AbstractOperationCallback(int datasetId, int[] primaryKeyFields,
TransactionContext txnCtx, ILockManager lockManager) {
@@ -37,10 +38,10 @@
this.primaryKeyFields = primaryKeyFields;
this.txnCtx = txnCtx;
this.lockManager = lockManager;
+ this.longHashes= new long[2];
}
public int computePrimaryKeyHashValue(ITupleReference tuple, int[] primaryKeyFields) {
- long[] longHashes= new long[2];
MurmurHash128Bit.hash3_x64_128(tuple, primaryKeyFields, SEED, longHashes);
return Math.abs((int) longHashes[0]);
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java
index 61c47f4..e04ca95 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java
@@ -82,8 +82,8 @@
boolean logRecordBeginPosFound = false;
long bytesSkipped = 0;
- //if the lsn to read is greater than the most recent lsn, then return false
- if (logicalLogLocator.getLsn() > logManager.getCurrentLsn().get()) {
+ //if the lsn to read is greater than or equal to the most recent lsn, then return false
+ if (logicalLogLocator.getLsn() >= logManager.getCurrentLsn().get()) {
return false;
}
@@ -98,9 +98,10 @@
readOnlyBuffer = getReadOnlyBuffer(logicalLogLocator.getLsn(), logManager.getLogManagerProperties()
.getLogBufferSize());
logicalLogLocator.setBuffer(readOnlyBuffer);
+ logicalLogLocator.setMemoryOffset(0);
needReloadBuffer = false;
}
-
+
//check whether the currentOffset has enough space to have new log record by comparing
//the smallest log record type(which is commit)'s log header.
while (logicalLogLocator.getMemoryOffset() <= readOnlyBuffer.getSize()
@@ -179,7 +180,6 @@
needReloadBuffer = true;
int pageIndex = logManager.getLogPageIndex(lsn);
- //int pageOffset = logManager.getLogPageOffset(lsn);
logicalLogLocator.setMemoryOffset(logManager.getLogPageOffset(lsn));
// take a lock on the log page so that the page is not flushed to
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
index 8d66f6b..64376aa 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
@@ -73,9 +73,9 @@
* point, the ownership count is decremented as the transaction is done with
* using the page. When a page is requested to be flushed, logPageFlusher
* set the count to 0(LOG_FLUSHER: meaning that the page is being flushed)
- * only if the count is 1(LOG_WRITER: meaning that there is no other
- * transactions who own the page to write logs.) After flushing the page,
- * logPageFlusher set this count to 1.
+ * only if the count is 1(LOG_WRITER: meaning that there is no other
+ * transactions who own the page to write logs.) After flushing the page,
+ * logPageFlusher set this count to 1.
*/
private AtomicInteger[] logPageOwnerCount;
@@ -86,15 +86,16 @@
/*
* LogPageStatus: A page is either ACTIVE or INACTIVE. The status for each
- * page is maintained in logPageStatus. A page is ACTIVE when the LogManager
- * can allocate space in the page for writing a log record. Initially all
- * pages are ACTIVE. As transactions fill up space by writing log records,
- * a page may not have sufficient space left for serving a request by a
+ * page is maintained in logPageStatus. A page is ACTIVE when the LogManager
+ * can allocate space in the page for writing a log record. Initially all
+ * pages are ACTIVE. As transactions fill up space by writing log records, a
+ * page may not have sufficient space left for serving a request by a
* transaction. When this happens, the page is flushed to disk by calling
- * logPageFlusher.requestFlush(). In the requestFlush(), after groupCommitWaitTime,
- * the page status is set to INACTIVE. Then, there is no more writer on the
- * page(meaning the corresponding logPageOwnerCount is 1), the page is flushed
- * by the logPageFlusher and the status is reset to ACTIVE by the logPageFlusher.
+ * logPageFlusher.requestFlush(). In the requestFlush(), after
+ * groupCommitWaitTime, the page status is set to INACTIVE. Then, there is
+ * no more writer on the page(meaning the corresponding logPageOwnerCount is
+ * 1), the page is flushed by the logPageFlusher and the status is reset to
+ * ACTIVE by the logPageFlusher.
*/
private AtomicInteger[] logPageStatus;
@@ -297,22 +298,28 @@
boolean forwardPage = false;
long old = lsn.get();
- //get the log page corresponding to the current lsn value
+ // get the log page corresponding to the current lsn value
int pageIndex = getLogPageIndex(old);
long retVal = old;
- // the lsn value for the next request if the current request is served.
+ // the lsn value for the next request if the current request is
+ // served.
long next = old + entrySize;
int prevPage = -1;
- // check if the log record will cross page boundaries, a case that is not allowed.
+ // check if the log record will cross page boundaries, a case that
+ // is not allowed.
if ((next - 1) / pageSize != old / pageSize || (next % pageSize == 0)) {
if ((old != 0 && old % pageSize == 0)) {
- // On second thought, this shall never be the case as it means that the lsn is
- // currently at the beginning of a page and we still need to forward the page which
- // means that the entrySize exceeds a log page size. If this is the case, an
- // exception is thrown before calling this API. would remove this case.
+ // On second thought, this shall never be the case as it
+ // means that the lsn is
+ // currently at the beginning of a page and we still need to
+ // forward the page which
+ // means that the entrySize exceeds a log page size. If this
+ // is the case, an
+ // exception is thrown before calling this API. would remove
+ // this case.
retVal = old;
} else {
@@ -322,7 +329,8 @@
next = retVal;
- // as the log record shall cross log page boundary, we must re-assign the lsn so
+ // as the log record shall cross log page boundary, we must
+ // re-assign the lsn so
// that the log record begins on a different location.
forwardPage = true;
@@ -345,7 +353,8 @@
waitUntillPageIsAvailableForWritingLog(pageIndex);
if (!lsn.compareAndSet(old, next)) {
- // Atomic call -> returns true only when the value represented by lsn is same as
+ // Atomic call -> returns true only when the value represented
+ // by lsn is same as
// "old". The value is updated to "next".
continue;
}
@@ -367,8 +376,10 @@
// space in the log page and hence is an owner.
logPageOwnerCount[pageIndex].incrementAndGet();
- // Before the count is incremented, if the flusher flushed the allocated page,
- // then retry to get new LSN. Otherwise, the log with allocated lsn will be lost.
+ // Before the count is incremented, if the flusher flushed the
+ // allocated page,
+ // then retry to get new LSN. Otherwise, the log with allocated
+ // lsn will be lost.
if (lastFlushedLSN.get() >= retVal) {
logPageOwnerCount[pageIndex].decrementAndGet();
continue;
@@ -387,7 +398,8 @@
HashMap<TransactionContext, Integer> map = null;
int activeTxnCount;
- // logLocator is a re-usable object that is appropriately set in each invocation.
+ // logLocator is a re-usable object that is appropriately set in each
+ // invocation.
// If the reference is null, the log manager must throw an exception.
if (logicalLogLocator == null) {
throw new ACIDException(
@@ -409,7 +421,8 @@
// all constraints checked and we are good to go and acquire a lsn.
long previousLSN = -1;
- // the will be set to the location (a long value) where the log record needs to be placed.
+ // the will be set to the location (a long value) where the log record
+ // needs to be placed.
long currentLSN;
// The logs written by a transaction need to be linked to each other for
@@ -435,7 +448,8 @@
* performed correctly that is ownership is released.
*/
- // indicates if the transaction thread has release ownership of the page.
+ // indicates if the transaction thread has release ownership of the
+ // page.
boolean decremented = false;
int pageIndex = (int) getLogPageIndex(currentLSN);
@@ -455,8 +469,9 @@
// content in the correct region of the allocated space.
logicalLogLocator.increaseMemoryOffset(logRecordHelper.getLogHeaderSize(logType));
- // a COMMIT log record does not have any content and hence
- // the logger (responsible for putting the log content) is not invoked.
+ // a COMMIT log record does not have any content and hence
+ // the logger (responsible for putting the log content) is not
+ // invoked.
if (logContentSize != 0) {
logger.preLog(txnCtx, reusableLogContentObject);
}
@@ -486,7 +501,8 @@
System.out.println("--------------> LSN(" + currentLSN + ") is written");
}
- // release the ownership as the log record has been placed in created space.
+ // release the ownership as the log record has been placed in
+ // created space.
logPageOwnerCount[pageIndex].decrementAndGet();
// indicating that the transaction thread has released ownership
@@ -548,14 +564,17 @@
private void readDiskLog(long lsnValue, LogicalLogLocator logicalLogLocator) throws ACIDException {
String filePath = LogUtil.getLogFilePath(logManagerProperties, LogUtil.getFileId(this, lsnValue));
long fileOffset = LogUtil.getFileOffset(this, lsnValue);
+
ByteBuffer buffer = ByteBuffer.allocate(logManagerProperties.getLogPageSize());
RandomAccessFile raf = null;
+ FileChannel fileChannel = null;
try {
raf = new RandomAccessFile(filePath, "r");
- raf.seek(fileOffset);
- FileChannel fileChannel = raf.getChannel();
+ fileChannel = raf.getChannel();
+ fileChannel.position(fileOffset);
fileChannel.read(buffer);
buffer.position(0);
+
byte logType = buffer.get(4);
int logHeaderSize = logRecordHelper.getLogHeaderSize(logType);
int logBodySize = buffer.getInt(logHeaderSize - 4);
@@ -573,16 +592,19 @@
throw new ACIDException(" invalid log record at lsn " + lsnValue);
}
} catch (Exception fnfe) {
+ fnfe.printStackTrace();
throw new ACIDException(" unable to retrieve log record with lsn " + lsnValue + " from the file system",
fnfe);
} finally {
try {
- if (raf != null) {
+ if (fileChannel != null) {
+ fileChannel.close();
+ } else if (raf != null) {
raf.close();
}
} catch (IOException ioe) {
ioe.printStackTrace();
- throw new ACIDException(" exception in closing " + raf, ioe);
+ throw new ACIDException(" exception in closing a file: " + filePath, ioe);
}
}
}
@@ -590,8 +612,8 @@
@Override
public void readLog(long lsnValue, LogicalLogLocator logicalLogLocator) throws ACIDException {
byte[] logRecord = null;
- //long lsnValue = physicalLogLocator.getLsn();
- if (lsnValue > lsn.get()) {
+
+ if (lsnValue >= lsn.get()) {
throw new ACIDException(" invalid lsn " + lsnValue);
}
@@ -600,15 +622,18 @@
int pageIndex = getLogPageIndex(lsnValue);
int pageOffset = getLogPageOffset(lsnValue);
- //TODO
- //minimize memory allocation overhead. current code allocates the log page size per reading a log record.
+ // TODO
+ // minimize memory allocation overhead. current code allocates the
+ // log page size per reading a log record.
byte[] pageContent = new byte[logManagerProperties.getLogPageSize()];
- // take a lock on the log page so that the page is not flushed to disk interim
+ // take a lock on the log page so that the page is not flushed to
+ // disk interim
synchronized (logPages[pageIndex]) {
- // need to check again (this thread may have got de-scheduled and must refresh!)
+ // need to check again (this thread may have got de-scheduled
+ // and must refresh!)
if (lsnValue > getLastFlushedLsn().get()) {
// get the log record length
@@ -669,9 +694,9 @@
lsn.set(startingLSN);
return nextPhysicalLsn;
}
-
+
private void closeLogPages() throws ACIDException {
- for (int i=0; i < numLogPages; i++) {
+ for (int i = 0; i < numLogPages; i++) {
try {
logPages[i].close();
} catch (IOException e) {
@@ -837,20 +862,21 @@
public void requestFlush(int pageIndex, long lsn, boolean isSynchronous) {
synchronized (logManager.getLogPage(pageIndex)) {
- //return if flushedLSN >= lsn
+ // return if flushedLSN >= lsn
if (logManager.getLastFlushedLsn().get() >= lsn) {
return;
}
- //put a new request to the queue only if the request on the page is not in the queue.
+ // put a new request to the queue only if the request on the page is
+ // not in the queue.
flushRequestQueue[pageIndex].offer(flushRequests[pageIndex]);
- //return if the request is asynchronous
+ // return if the request is asynchronous
if (!isSynchronous) {
return;
}
- //wait until there is flush.
+ // wait until there is flush.
boolean isNotified = false;
while (!isNotified) {
try {
@@ -889,23 +915,26 @@
logManager.getLogManagerProperties().getLogPageSize());
}
- //#. sleep during the groupCommitWaitTime
+ // #. sleep during the groupCommitWaitTime
sleep(groupCommitWaitPeriod);
- //#. set the logPageStatus to INACTIVE in order to prevent other txns from writing on this page.
+ // #. set the logPageStatus to INACTIVE in order to prevent
+ // other txns from writing on this page.
logManager.getLogPageStatus(pageToFlush).set(PageState.INACTIVE);
- //#. need to wait until the logPageOwnerCount reaches 1 (LOG_WRITER)
- // meaning every one has finished writing logs on this page.
+ // #. need to wait until the logPageOwnerCount reaches 1
+ // (LOG_WRITER)
+ // meaning every one has finished writing logs on this page.
while (logManager.getLogPageOwnershipCount(pageToFlush).get() != PageOwnershipStatus.LOG_WRITER) {
sleep(0);
}
- //#. set the logPageOwnerCount to 0 (LOG_FLUSHER)
- // meaning it is flushing.
+ // #. set the logPageOwnerCount to 0 (LOG_FLUSHER)
+ // meaning it is flushing.
logManager.getLogPageOwnershipCount(pageToFlush).set(PageOwnershipStatus.LOG_FLUSHER);
- // put the content to disk (the thread still has a lock on the log page)
+ // put the content to disk (the thread still has a lock on
+ // the log page)
logManager.getLogPage(pageToFlush).flush();
// increment the last flushed lsn and lastFlushedPage
@@ -927,8 +956,9 @@
// mark the page as ACTIVE
logManager.getLogPageStatus(pageToFlush).set(LogManager.PageState.ACTIVE);
- //#. checks the queue whether there is another flush request on the same log buffer
- // If there is another request, then simply remove it.
+ // #. checks the queue whether there is another flush
+ // request on the same log buffer
+ // If there is another request, then simply remove it.
if (flushRequestQueue[pageToFlush].peek() != null) {
flushRequestQueue[pageToFlush].take();
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
index fdd3f07..226c81b 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -625,7 +625,7 @@
}
}
- if (LogManager.IS_DEBUG_MODE) {
+ if (IS_DEBUG_MODE) {
System.out.println(logManager.getLogRecordHelper().getLogRecordForDisplay(currentLogLocator));
}
@@ -640,7 +640,7 @@
TxnId txnId = new TxnId(logRecordHelper.getJobId(currentLogLocator),
logRecordHelper.getDatasetId(currentLogLocator),
logRecordHelper.getPKHashValue(currentLogLocator));
- undoLSNSet = new ArrayList<Long>();
+ undoLSNSet = new LinkedList<Long>();
loserTxnTable.put(txnId, undoLSNSet);
}
undoLSNSet.add(currentLogLocator.getLsn());
@@ -679,13 +679,13 @@
byte resourceMgrId;
int undoCount = 0;
while (iter.hasNext()) {
-
+ //TODO
+ //Sort the lsns in order to undo in one pass.
+
Map.Entry<TxnId, List<Long>> loserTxn = (Map.Entry<TxnId, List<Long>>) iter.next();
txnId = loserTxn.getKey();
undoLSNSet = loserTxn.getValue();
- Comparator<Long> comparator = Collections.reverseOrder();
- Collections.sort(undoLSNSet, comparator);
for (long undoLSN : undoLSNSet) {
// here, all the log records are UPDATE type. So, we don't need to check the type again.
@@ -693,7 +693,7 @@
//read the corresponding log record to be undone.
logManager.readLog(undoLSN, currentLogLocator);
- if (LogManager.IS_DEBUG_MODE) {
+ if (IS_DEBUG_MODE) {
System.out.println(logManager.getLogRecordHelper().getLogRecordForDisplay(currentLogLocator));
}
diff --git a/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/test/LogRecordReader.java b/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/test/LogRecordReader.java
index eb22dd7..260e5ff 100644
--- a/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/test/LogRecordReader.java
+++ b/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/test/LogRecordReader.java
@@ -51,12 +51,12 @@
return true;
}
});
- LogicalLogLocator memLSN = LogUtil.getDummyLogicalLogLocator(logManager);
+ LogicalLogLocator currentLogLocator = LogUtil.getDummyLogicalLogLocator(logManager);
int logCount = 0;
while (true) {
- boolean logValidity = logCursor.next(memLSN);
+ boolean logValidity = logCursor.next(currentLogLocator);
if (logValidity) {
- System.out.println(++logCount + parser.getLogRecordForDisplay(memLSN));
+ System.out.println(++logCount + parser.getLogRecordForDisplay(currentLogLocator));
} else {
break;
}
@@ -73,16 +73,13 @@
* @param args
*/
public static void main(String[] args) throws ACIDException, Exception {
- long lsnValue = 10747454;
- String id = "nc1";
- String logDir = "/home/raman/research/work/hyracks-branches/svn/trunk/hyracks/asterix_logs/";
Properties props = new Properties();
- props.setProperty(LogManagerProperties.LOG_DIR_KEY, logDir + "/" + id);
+ props.setProperty(LogManagerProperties.LOG_DIR_KEY,
+ "/home/kisskys/workspace/lsm_merge/asterix_lsm_stabilization/debug/asterix_logs/nc1");
LogManagerProperties logProps = new LogManagerProperties(props);
LogManager logManager = new LogManager(null, logProps);
LogRecordReader logReader = new LogRecordReader(logManager);
logReader.readLogs(0);
- // logReader.readLogRecord(1703620);
}
}