ASTERIXDB-1045: Log analysis fixes
-Avoid using exceptions for control flow in LogRecord
-Rename LogPage and ilk to LogBuffer
-Busywait on read() to fill entire buffer for fillLogBuffer rather than failing
-Distinguish between log truncation and checksum corruption
TODOs:
- Log IO and parsing still happen in lock-step.
- Busywaiting for read to return something other than 0 is unfortunate
Change-Id: I1658e938eb0f199f748407361ffee4833aac661c
Reviewed-on: https://asterix-gerrit.ics.uci.edu/289
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Young-Seok Kim <kisskys@gmail.com>
Reviewed-by: Murtadha Hubail <hubailmor@gmail.com>
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogPage.java b/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBuffer.java
similarity index 96%
rename from asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogPage.java
rename to asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBuffer.java
index bd174b5..9e28cda 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogPage.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBuffer.java
@@ -18,7 +18,7 @@
*/
package org.apache.asterix.common.transactions;
-public interface ILogPage {
+public interface ILogBuffer {
public void append(ILogRecord logRecord, long appendLsn);
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java b/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
index 90595e3..16c51fe 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
@@ -28,8 +28,15 @@
public static final int ENTITY_COMMIT_LOG_BASE_SIZE = 25;
public static final int UPDATE_LOG_BASE_SIZE = 54;
public static final int FLUSH_LOG_SIZE = 17;
-
- public boolean readLogRecord(ByteBuffer buffer);
+
+
+ public enum RECORD_STATUS{
+ TRUNCATED,
+ BAD_CHKSUM,
+ OK
+ }
+
+ public LogRecord.RECORD_STATUS readLogRecord(ByteBuffer buffer);
public void writeLogRecord(ByteBuffer buffer);
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java b/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
index c0b71e6..60e3097 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
@@ -83,7 +83,6 @@
//------------- fields in a log record (end) --------------//
private int PKFieldCnt;
- private static final int CHECKSUM_SIZE = 8;
private ITransactionContext txnCtx;
private long LSN;
private final AtomicBoolean isFlushed;
@@ -102,6 +101,24 @@
checksumGen = new CRC32();
}
+ private final static int TYPE_LEN = Byte.SIZE/Byte.SIZE;
+ private final static int JID_LEN = Integer.SIZE / Byte.SIZE;
+ private final static int DSID_LEN = Integer.SIZE / Byte.SIZE;
+ private final static int PKHASH_LEN = Integer.SIZE / Byte.SIZE;
+ private final static int PKSZ_LEN = Integer.SIZE / Byte.SIZE;
+ private final static int PRVLSN_LEN = Long.SIZE / Byte.SIZE;
+ private final static int RSID_LEN = Long.SIZE / Byte.SIZE;
+ private final static int LOGRCD_SZ_LEN = Integer.SIZE / Byte.SIZE;
+ private final static int FLDCNT_LEN = Integer.SIZE / Byte.SIZE;
+ private final static int NEWOP_LEN = Byte.SIZE/Byte.SIZE;
+ private final static int NEWVALSZ_LEN = Integer.SIZE / Byte.SIZE;
+ private final static int CHKSUM_LEN = Long.SIZE / Byte.SIZE;
+
+ private final static int ALL_RECORD_HEADER_LEN = TYPE_LEN + JID_LEN;
+ private final static int ENTITYCOMMIT_UPDATE_HEADER_LEN = DSID_LEN + PKHASH_LEN + PKSZ_LEN;
+ private final static int UPDATE_LSN_HEADER = PRVLSN_LEN + RSID_LEN + LOGRCD_SZ_LEN;
+ private final static int UPDATE_BODY_HEADER = FLDCNT_LEN + NEWOP_LEN + NEWVALSZ_LEN;
+
@Override
public void writeLogRecord(ByteBuffer buffer) {
int beginOffset = buffer.position();
@@ -130,7 +147,7 @@
buffer.putInt(datasetId);
}
- checksum = generateChecksum(buffer, beginOffset, logSize - CHECKSUM_SIZE);
+ checksum = generateChecksum(buffer, beginOffset, logSize - CHKSUM_LEN);
buffer.putLong(checksum);
}
@@ -154,52 +171,79 @@
}
@Override
- public boolean readLogRecord(ByteBuffer buffer) {
+ public RECORD_STATUS readLogRecord(ByteBuffer buffer) {
int beginOffset = buffer.position();
- try {
- logType = buffer.get();
- jobId = buffer.getInt();
- if(logType != LogType.FLUSH)
- {
- if (logType == LogType.JOB_COMMIT || logType == LogType.ABORT) {
- datasetId = -1;
- PKHashValue = -1;
- } else {
- datasetId = buffer.getInt();
- PKHashValue = buffer.getInt();
- PKValueSize = buffer.getInt();
- if (PKValueSize <= 0) {
- throw new IllegalStateException("Primary Key Size is less than or equal to 0");
- }
- PKValue = readPKValue(buffer);
- }
- if (logType == LogType.UPDATE) {
- prevLSN = buffer.getLong();
- resourceId = buffer.getLong();
- logSize = buffer.getInt();
- fieldCnt = buffer.getInt();
- newOp = buffer.get();
- newValueSize = buffer.getInt();
- newValue = readTuple(buffer, readNewValue, fieldCnt, newValueSize);
- } else {
- computeAndSetLogSize();
- }
- }
- else{
- computeAndSetLogSize();
- datasetId = buffer.getInt();
- resourceId = 0l;
- }
-
- checksum = buffer.getLong();
- if (checksum != generateChecksum(buffer, beginOffset, logSize - CHECKSUM_SIZE)) {
- throw new IllegalStateException();
- }
- } catch (BufferUnderflowException e) {
+ //first we need the logtype and Job ID, if the buffer isn't that big, then no dice.
+ if(buffer.remaining() < ALL_RECORD_HEADER_LEN) {
buffer.position(beginOffset);
- return false;
+ return RECORD_STATUS.TRUNCATED;
}
- return true;
+ logType = buffer.get();
+ jobId = buffer.getInt();
+ if(logType != LogType.FLUSH)
+ {
+ if (logType == LogType.JOB_COMMIT || logType == LogType.ABORT) {
+ datasetId = -1;
+ PKHashValue = -1;
+ } else {
+ //attempt to read in the dsid, PK hash and PK length
+ if(buffer.remaining() < ENTITYCOMMIT_UPDATE_HEADER_LEN){
+ buffer.position(beginOffset);
+ return RECORD_STATUS.TRUNCATED;
+ }
+ datasetId = buffer.getInt();
+ PKHashValue = buffer.getInt();
+ PKValueSize = buffer.getInt();
+ //attempt to read in the PK
+ if(buffer.remaining() < PKValueSize){
+ buffer.position(beginOffset);
+ return RECORD_STATUS.TRUNCATED;
+ }
+ if (PKValueSize <= 0) {
+ throw new IllegalStateException("Primary Key Size is less than or equal to 0");
+ }
+ PKValue = readPKValue(buffer);
+ }
+ if (logType == LogType.UPDATE) {
+ //attempt to read in the previous LSN, log size, new value size, and new record type
+ if(buffer.remaining() <UPDATE_LSN_HEADER + UPDATE_BODY_HEADER){
+ buffer.position(beginOffset);
+ return RECORD_STATUS.TRUNCATED;
+ }
+ prevLSN = buffer.getLong();
+ resourceId = buffer.getLong();
+ logSize = buffer.getInt();
+ fieldCnt = buffer.getInt();
+ newOp = buffer.get();
+ newValueSize = buffer.getInt();
+ if(buffer.remaining() < newValueSize){
+ buffer.position(beginOffset);
+ return RECORD_STATUS.TRUNCATED;
+ }
+ newValue = readTuple(buffer, readNewValue, fieldCnt, newValueSize);
+ } else {
+ computeAndSetLogSize();
+ }
+ }
+ else{
+ computeAndSetLogSize();
+ if(buffer.remaining() < DSID_LEN){
+ buffer.position(beginOffset);
+ return RECORD_STATUS.TRUNCATED;
+ }
+ datasetId = buffer.getInt();
+ resourceId = 0l;
+ }
+ //atempt to read checksum
+ if(buffer.remaining() < CHKSUM_LEN){
+ buffer.position(beginOffset);
+ return RECORD_STATUS.TRUNCATED;
+ }
+ checksum = buffer.getLong();
+ if (checksum != generateChecksum(buffer, beginOffset, logSize - CHKSUM_LEN)) {
+ return RECORD_STATUS.BAD_CHKSUM;
+ }
+ return RECORD_STATUS.OK;
}
private ITupleReference readPKValue(ByteBuffer buffer) {
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/LockManager.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/LockManager.java
index 6f36a6a..ee9fd84 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/LockManager.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/LockManager.java
@@ -37,8 +37,8 @@
import org.apache.asterix.common.transactions.JobId;
import org.apache.asterix.common.transactions.LogRecord;
import org.apache.asterix.common.transactions.LogType;
-import org.apache.asterix.transaction.management.service.logging.LogPage;
-import org.apache.asterix.transaction.management.service.logging.LogPageReader;
+import org.apache.asterix.transaction.management.service.logging.LogBuffer;
+import org.apache.asterix.transaction.management.service.logging.LogBufferTailReader;
import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
import org.apache.asterix.transaction.management.service.transaction.TransactionSubsystem;
import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
@@ -2204,11 +2204,11 @@
}
}
- public void batchUnlock(LogPage logPage, LogPageReader logPageReader) throws ACIDException {
+ public void batchUnlock(LogBuffer logPage, LogBufferTailReader logBufferTailReader) throws ACIDException {
latchLockTable();
try {
ITransactionContext txnCtx = null;
- LogRecord logRecord = logPageReader.next();
+ LogRecord logRecord = logBufferTailReader.next();
while (logRecord != null) {
if (logRecord.getLogType() == LogType.ENTITY_COMMIT) {
tempDatasetIdObj.setId(logRecord.getDatasetId());
@@ -2222,7 +2222,7 @@
txnCtx.notifyOptracker(true);
logPage.notifyJobTerminator();
}
- logRecord = logPageReader.next();
+ logRecord = logBufferTailReader.next();
}
} finally {
unlatchLockTable();
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogPage.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
similarity index 93%
rename from asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogPage.java
rename to asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
index 53e17d4..4d50294 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogPage.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
@@ -28,7 +28,7 @@
import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.transactions.DatasetId;
-import org.apache.asterix.common.transactions.ILogPage;
+import org.apache.asterix.common.transactions.ILogBuffer;
import org.apache.asterix.common.transactions.ILogRecord;
import org.apache.asterix.common.transactions.ITransactionContext;
import org.apache.asterix.common.transactions.JobId;
@@ -39,12 +39,12 @@
import org.apache.asterix.transaction.management.service.transaction.TransactionSubsystem;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-public class LogPage implements ILogPage {
+public class LogBuffer implements ILogBuffer {
public static final boolean IS_DEBUG_MODE = false;//true
- private static final Logger LOGGER = Logger.getLogger(LogPage.class.getName());
+ private static final Logger LOGGER = Logger.getLogger(LogBuffer.class.getName());
private final TransactionSubsystem txnSubsystem;
- private final LogPageReader logPageReader;
+ private final LogBufferTailReader logBufferTailReader;
private final int logPageSize;
private final MutableLong flushLSN;
private final AtomicBoolean full;
@@ -62,14 +62,14 @@
private final DatasetId reusableDsId;
private final JobId reusableJobId;
- public LogPage(TransactionSubsystem txnSubsystem, int logPageSize, MutableLong flushLSN) {
+ public LogBuffer(TransactionSubsystem txnSubsystem, int logPageSize, MutableLong flushLSN) {
this.txnSubsystem = txnSubsystem;
this.logPageSize = logPageSize;
this.flushLSN = flushLSN;
appendBuffer = ByteBuffer.allocate(logPageSize);
flushBuffer = appendBuffer.duplicate();
unlockBuffer = appendBuffer.duplicate();
- logPageReader = getLogPageReader();
+ logBufferTailReader = getLogBufferTailReader();
full = new AtomicBoolean(false);
appendOffset = 0;
flushOffset = 0;
@@ -206,17 +206,17 @@
}
}
- private LogPageReader getLogPageReader() {
- return new LogPageReader(unlockBuffer);
+ private LogBufferTailReader getLogBufferTailReader() {
+ return new LogBufferTailReader(unlockBuffer);
}
private void batchUnlock(int beginOffset, int endOffset) throws ACIDException {
if (endOffset > beginOffset) {
- logPageReader.initializeScan(beginOffset, endOffset);
+ logBufferTailReader.initializeScan(beginOffset, endOffset);
ITransactionContext txnCtx = null;
- LogRecord logRecord = logPageReader.next();
+ LogRecord logRecord = logBufferTailReader.next();
while (logRecord != null) {
if (logRecord.getLogType() == LogType.ENTITY_COMMIT) {
reusableJobId.setId(logRecord.getJobId());
@@ -234,7 +234,7 @@
notifyFlushTerminator();
}
- logRecord = logPageReader.next();
+ logRecord = logBufferTailReader.next();
}
}
}
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogPageReader.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBufferTailReader.java
similarity index 78%
rename from asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogPageReader.java
rename to asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBufferTailReader.java
index 76648ae..f8e0253 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogPageReader.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBufferTailReader.java
@@ -20,15 +20,17 @@
import java.nio.ByteBuffer;
+import org.apache.asterix.common.transactions.ILogRecord;
+import org.apache.asterix.common.transactions.ILogRecord.RECORD_STATUS;
import org.apache.asterix.common.transactions.LogRecord;
-public class LogPageReader {
+public class LogBufferTailReader {
private final ByteBuffer buffer;
private final LogRecord logRecord;
private int endOffset;
- public LogPageReader(ByteBuffer buffer) {
+ public LogBufferTailReader(ByteBuffer buffer) {
this.buffer = buffer;
logRecord = new LogRecord();
}
@@ -42,7 +44,9 @@
if (buffer.position() == endOffset) {
return null;
}
- if (!logRecord.readLogRecord(buffer)) {
+ RECORD_STATUS status = logRecord.readLogRecord(buffer);
+ //underflow is not expected because we are at the very tail of the current log buffer
+ if (status != RECORD_STATUS.OK) {
throw new IllegalStateException();
}
return logRecord;
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
index b531961..f14c146 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
@@ -62,11 +62,11 @@
private final String logDir;
private final String logFilePrefix;
private final MutableLong flushLSN;
- private LinkedBlockingQueue<LogPage> emptyQ;
- private LinkedBlockingQueue<LogPage> flushQ;
+ private LinkedBlockingQueue<LogBuffer> emptyQ;
+ private LinkedBlockingQueue<LogBuffer> flushQ;
private final AtomicLong appendLSN;
private FileChannel appendChannel;
- private LogPage appendPage;
+ private LogBuffer appendPage;
private LogFlusher logFlusher;
private Future<Object> futureLogFlusher;
private static final long SMALLEST_LOG_FILE_ID = 0;
@@ -86,10 +86,10 @@
}
private void initializeLogManager(long nextLogFileId) {
- emptyQ = new LinkedBlockingQueue<LogPage>(numLogPages);
- flushQ = new LinkedBlockingQueue<LogPage>(numLogPages);
+ emptyQ = new LinkedBlockingQueue<LogBuffer>(numLogPages);
+ flushQ = new LinkedBlockingQueue<LogBuffer>(numLogPages);
for (int i = 0; i < numLogPages; i++) {
- emptyQ.offer(new LogPage(txnSubsystem, logPageSize, flushLSN));
+ emptyQ.offer(new LogBuffer(txnSubsystem, logPageSize, flushLSN));
}
appendLSN.set(initializeLogAnchor(nextLogFileId));
flushLSN.set(appendLSN.get());
@@ -174,7 +174,7 @@
appendPage.isLastPage(true);
//[Notice]
//the current log file channel is closed if
- //LogPage.flush() completely flush the last page of the file.
+ //LogBuffer.flush() completely flush the last page of the file.
}
@Override
@@ -443,15 +443,15 @@
class LogFlusher implements Callable<Boolean> {
private static final Logger LOGGER = Logger.getLogger(LogFlusher.class.getName());
- private final static LogPage POISON_PILL = new LogPage(null, ILogRecord.JOB_TERMINATE_LOG_SIZE, null);
+ private final static LogBuffer POISON_PILL = new LogBuffer(null, ILogRecord.JOB_TERMINATE_LOG_SIZE, null);
private final LogManager logMgr;//for debugging
- private final LinkedBlockingQueue<LogPage> emptyQ;
- private final LinkedBlockingQueue<LogPage> flushQ;
- private LogPage flushPage;
+ private final LinkedBlockingQueue<LogBuffer> emptyQ;
+ private final LinkedBlockingQueue<LogBuffer> flushQ;
+ private LogBuffer flushPage;
private final AtomicBoolean isStarted;
private final AtomicBoolean terminateFlag;
- public LogFlusher(LogManager logMgr, LinkedBlockingQueue<LogPage> emptyQ, LinkedBlockingQueue<LogPage> flushQ) {
+ public LogFlusher(LogManager logMgr, LinkedBlockingQueue<LogBuffer> emptyQ, LinkedBlockingQueue<LogBuffer> flushQ) {
this.logMgr = logMgr;
this.emptyQ = emptyQ;
this.flushQ = flushQ;
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java
index 6fa0ebb..9900468 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java
@@ -29,6 +29,12 @@
import org.apache.asterix.common.transactions.LogRecord;
import org.apache.asterix.common.transactions.MutableLong;
+import static org.apache.asterix.common.transactions.LogRecord.*;
+
+/**
+ * NOTE: Many method calls of this class are not thread safe.
+ * Be very cautious using it in a multithreaded context.
+ */
public class LogReader implements ILogReader {
public static final boolean IS_DEBUG_MODE = false;//true
@@ -67,21 +73,57 @@
return;
}
getFileChannel();
- readPage();
+ fillLogReadBuffer();
}
- //for scanning
+ /**
+ * Get the next log record from the log file.
+ * @return A deserialized log record, or null if we have reached the end of the file.
+ * @throws ACIDException
+ */
@Override
public ILogRecord next() throws ACIDException {
if (waitForFlushOrReturnIfEOF() == ReturnState.EOF) {
return null;
}
- if (readBuffer.position() == readBuffer.limit() || !logRecord.readLogRecord(readBuffer)) {
- readNextPage();
- if (!logRecord.readLogRecord(readBuffer)) {
- throw new IllegalStateException();
+ if (readBuffer.position() == readBuffer.limit()) {
+ boolean eof = refillLogReadBuffer();
+ if (eof && isRecoveryMode && readLSN < flushLSN.get()) {
+ LOGGER.severe("Transaction log ends before expected. Log files may be missing.");
+ return null;
}
}
+
+ RECORD_STATUS status = logRecord.readLogRecord(readBuffer);
+ switch(status) {
+ case TRUNCATED: {
+ //we may have just read off the end of the buffer, so try refiling it
+ if(!refillLogReadBuffer()) {
+ return null;
+ }
+ //now see what we have in the refilled buffer
+ status = logRecord.readLogRecord(readBuffer);
+ switch(status){
+ case TRUNCATED: {
+ LOGGER.info("Log file has truncated log records.");
+ return null;
+ }
+ case BAD_CHKSUM:{
+ LOGGER.severe("Transaction log contains corrupt log records (perhaps due to medium error). Stopping recovery early.");
+ return null;
+ }
+ case OK: break;
+ }
+ //if we have exited the inner switch,
+ // this means status is really "OK" after buffer refill
+ break;
+ }
+ case BAD_CHKSUM:{
+ LOGGER.severe("Transaction log contains corrupt log records (perhaps due to medium error). Stopping recovery early.");
+ return null;
+ }
+ case OK: break;
+ }
logRecord.setLSN(readLSN);
readLSN += logRecord.getLogSize();
return logRecord;
@@ -107,32 +149,55 @@
}
}
- private void readNextPage() throws ACIDException {
+ /**
+ * Continues log analysis between log file splits.
+ * @return true if log continues, false if EOF
+ * @throws ACIDException
+ */
+ private boolean refillLogReadBuffer() throws ACIDException {
try {
if (readLSN % logFileSize == fileChannel.size()) {
fileChannel.close();
readLSN += logFileSize - (readLSN % logFileSize);
getFileChannel();
}
- readPage();
+ return fillLogReadBuffer();
} catch (IOException e) {
throw new ACIDException(e);
}
}
- private void readPage() throws ACIDException {
- int size;
+ /**
+ * Fills the log buffer with data from the log file at the current position
+ * @return false if EOF, true otherwise
+ * @throws ACIDException
+ */
+
+ private boolean fillLogReadBuffer() throws ACIDException {
+ int size=0;
+ int read=0;
readBuffer.position(0);
readBuffer.limit(logPageSize);
try {
fileChannel.position(readLSN % logFileSize);
- size = fileChannel.read(readBuffer);
+ //We loop here because read() may return 0, but this simply means we are waiting on IO.
+ //Therefore we want to break out only when either the buffer is full, or we reach EOF.
+ while( size < logPageSize && read != -1) {
+ read = fileChannel.read(readBuffer);
+ if(read>0) {
+ size += read;
+ }
+ }
} catch (IOException e) {
throw new ACIDException(e);
}
readBuffer.position(0);
readBuffer.limit(size);
+ if(size == 0 && read == -1){
+ return false; //EOF
+ }
bufferBeginLSN = readLSN;
+ return true;
}
//for random reading
@@ -151,25 +216,37 @@
try {
if (fileChannel == null) {
getFileChannel();
- readPage();
+ fillLogReadBuffer();
} else if (readLSN < fileBeginLSN || readLSN >= fileBeginLSN + fileChannel.size()) {
fileChannel.close();
getFileChannel();
- readPage();
+ fillLogReadBuffer();
} else if (readLSN < bufferBeginLSN || readLSN >= bufferBeginLSN + readBuffer.limit()) {
- readPage();
+ fillLogReadBuffer();
} else {
readBuffer.position((int) (readLSN - bufferBeginLSN));
}
} catch (IOException e) {
throw new ACIDException(e);
}
- if (!logRecord.readLogRecord(readBuffer)) {
- readNextPage();
- if (!logRecord.readLogRecord(readBuffer)) {
- throw new IllegalStateException();
+ boolean hasRemaining;
+ if(readBuffer.position() == readBuffer.limit()){
+ hasRemaining = refillLogReadBuffer();
+ if(!hasRemaining){
+ throw new ACIDException("LSN is out of bounds");
}
}
+ RECORD_STATUS status = logRecord.readLogRecord(readBuffer);
+ switch(status){
+ case TRUNCATED:{
+ throw new ACIDException("LSN is out of bounds");
+ }
+ case BAD_CHKSUM:{
+ throw new ACIDException("Log record has incorrect checksum");
+ }
+ case OK: break;
+
+ }
logRecord.setLSN(readLSN);
readLSN += logRecord.getLogSize();
return logRecord;