checkpoint toward making rollback work
git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_lsm_stabilization@934 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java b/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java
index ef6748e..980aedb 100644
--- a/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java
+++ b/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java
@@ -93,11 +93,13 @@
List<CompilationUnit> cUnits = tcCtx.getTestCase().getCompilationUnit();
for (CompilationUnit cUnit : cUnits) {
File testFile = tcCtx.getTestFile(cUnit);
+
/*************** to avoid run failure cases ****************/
if (testFile.getAbsolutePath().contains("runtimets/queries/failure/")) {
continue;
}
/***********************************************************/
+ System.out.println(""+testFile.getAbsolutePath());
File expectedResultFile = tcCtx.getExpectedResultFile(cUnit);
File actualFile = new File(PATH_ACTUAL + File.separator
+ tcCtx.getTestCase().getFilePath().replace(File.separator, "_") + "_" + cUnit.getName() + ".adm");
diff --git a/asterix-app/src/test/resources/metadata-transactions/check-state-queries.txt b/asterix-app/src/test/resources/metadata-transactions/check-state-queries.txt
index 5cee728..6618ddd 100644
--- a/asterix-app/src/test/resources/metadata-transactions/check-state-queries.txt
+++ b/asterix-app/src/test/resources/metadata-transactions/check-state-queries.txt
@@ -1,6 +1,6 @@
-//check_dataset.aql
-//check_datatype.aql
-//check_dataverse.aql
-//check_index.aql
-//check_node.aql
-//check_nodegroup.aql
+check_dataset.aql
+check_datatype.aql
+check_dataverse.aql
+check_index.aql
+check_node.aql
+check_nodegroup.aql
diff --git a/asterix-app/src/test/resources/metadata-transactions/init-state-queries.txt b/asterix-app/src/test/resources/metadata-transactions/init-state-queries.txt
index 5fb1c42..c9ef6ee 100644
--- a/asterix-app/src/test/resources/metadata-transactions/init-state-queries.txt
+++ b/asterix-app/src/test/resources/metadata-transactions/init-state-queries.txt
@@ -1 +1 @@
-//customers_orders.aql
+customers_orders.aql
diff --git a/asterix-app/src/test/resources/metadata-transactions/queries.txt b/asterix-app/src/test/resources/metadata-transactions/queries.txt
index 5f1589e..0762364 100644
--- a/asterix-app/src/test/resources/metadata-transactions/queries.txt
+++ b/asterix-app/src/test/resources/metadata-transactions/queries.txt
@@ -1,20 +1,20 @@
-//create_duplicate_dataset.aql
-//create_duplicate_dataverse.aql
-//create_duplicate_index.aql
-//create_duplicate_nodegroup.aql
-//create_duplicate_type.aql
-//drop_nonexistent_dataset.aql
-//drop_nonexistent_datatype.aql
-//drop_nonexistent_dataverse.aql
-//drop_nonexistent_index.aql
-//drop_nonexistent_nodegroup.aql
-//rollback_drop_dataset.aql
-//rollback_drop_datatype.aql
-//rollback_drop_dataverse.aql
-//rollback_drop_index.aql
-//rollback_drop_nodegroup.aql
-//rollback_new_dataset.aql
-//rollback_new_datatype.aql
-//rollback_new_dataverse.aql
-//rollback_new_index.aql
-//rollback_new_nodegroup.aql
+create_duplicate_dataset.aql
+create_duplicate_dataverse.aql
+create_duplicate_index.aql
+create_duplicate_nodegroup.aql
+create_duplicate_type.aql
+drop_nonexistent_dataset.aql
+drop_nonexistent_datatype.aql
+drop_nonexistent_dataverse.aql
+drop_nonexistent_index.aql
+drop_nonexistent_nodegroup.aql
+rollback_drop_dataset.aql
+rollback_drop_datatype.aql
+rollback_drop_dataverse.aql
+rollback_drop_index.aql
+rollback_drop_nodegroup.aql
+rollback_new_dataset.aql
+rollback_new_datatype.aql
+rollback_new_dataverse.aql
+rollback_new_index.aql
+rollback_new_nodegroup.aql
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
index d3c7eb0..6e9e936 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
@@ -67,6 +67,9 @@
int pkHash = computePrimaryKeyHashValue(after, primaryKeyFields, primaryKeyHashFunctions);
LSMBTreeTupleReference lsmBTreeTuple = (LSMBTreeTupleReference) before;
IndexOperation oldOp = IndexOperation.INSERT;
+ if (before == null) {
+ oldOp = IndexOperation.NOOP;
+ }
if (lsmBTreeTuple != null && lsmBTreeTuple.isAntimatter()) {
oldOp = IndexOperation.DELETE;
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogRecordHelper.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogRecordHelper.java
index 4f28c7d..80f74cb 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogRecordHelper.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogRecordHelper.java
@@ -41,7 +41,7 @@
byte getResourceMgrId(LogicalLogLocator logicalLogLocater);
- int getLogRecordSize(LogicalLogLocator logicalLogLocater);
+ int getLogContentSize(LogicalLogLocator logicalLogLocater);
long getLogChecksum(LogicalLogLocator logicalLogLocator);
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexResourceManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexResourceManager.java
index 5e6ef34..cb6a67a 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexResourceManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexResourceManager.java
@@ -43,16 +43,6 @@
long resourceId = logRecordHelper.getResourceId(logLocator);
int offset = logRecordHelper.getLogContentBeginPos(logLocator);
- /*
- byte[] logBufferContent = logLocator.getBuffer().getArray();
- // read the length of resource id byte array
- int resourceIdLength = DataUtil.byteArrayToInt(logBufferContent, logContentBeginPos);
- byte[] resourceIdBytes = new byte[resourceIdLength];
-
- // copy the resource if bytes
- System.arraycopy(logBufferContent, logContentBeginPos + 4, resourceIdBytes, 0, resourceIdLength);
- */
-
// look up the repository to obtain the resource object
IIndex index = (IIndex) provider.getTransactionalResourceRepository().getTransactionalResource(resourceId);
@@ -106,10 +96,10 @@
}
} else {
//For LSMRtree and LSMInvertedIndex
- //delete --> physical delete
- //insert --> logical delete
+ //delete --> insert
+ //insert --> delete
if (newOperation == (byte) IndexOperation.DELETE.ordinal()) {
- indexAccessor.physicalDelete(newTuple);
+ indexAccessor.insert(newTuple);
} else {
indexAccessor.delete(newTuple);
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java
index 8e89301..f29c847 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java
@@ -21,33 +21,42 @@
public class LogCursor implements ILogCursor {
- private final ILogManager logManager;
+ private final LogManager logManager;
private final ILogFilter logFilter;
private IFileBasedBuffer readOnlyBuffer;
private LogicalLogLocator logicalLogLocator = null;
private int bufferIndex = 0;
+ private boolean firstNext = true;
+ private long readLSN = 0;
/**
* @param logFilter
*/
- public LogCursor(final ILogManager logManager, ILogFilter logFilter) throws ACIDException {
+ public LogCursor(final LogManager logManager, ILogFilter logFilter) throws ACIDException {
this.logFilter = logFilter;
this.logManager = logManager;
}
- public LogCursor(final ILogManager logManager, PhysicalLogLocator startingPhysicalLogLocator, ILogFilter logFilter)
- throws IOException {
+ public LogCursor(final LogManager logManager, PhysicalLogLocator startingPhysicalLogLocator, ILogFilter logFilter)
+ throws IOException, ACIDException {
this.logFilter = logFilter;
this.logManager = logManager;
initialize(startingPhysicalLogLocator);
}
- private void initialize(final PhysicalLogLocator startingPhysicalLogLocator) throws IOException {
- readOnlyBuffer = getReadOnlyBuffer(startingPhysicalLogLocator.getLsn(), logManager.getLogManagerProperties()
- .getLogBufferSize());
- logicalLogLocator = new LogicalLogLocator(startingPhysicalLogLocator.getLsn(), readOnlyBuffer, 0, logManager);
-
+ private void initialize(final PhysicalLogLocator startingPhysicalLogLocator) throws IOException, ACIDException {
+ if (startingPhysicalLogLocator.getLsn() > logManager.getLastFlushedLsn().get()) {
+ readLSN = startingPhysicalLogLocator.getLsn();
+ } else {
+ //read from disk
+ readOnlyBuffer = getReadOnlyBuffer(startingPhysicalLogLocator.getLsn(), logManager
+ .getLogManagerProperties().getLogBufferSize());
+ logicalLogLocator = new LogicalLogLocator(startingPhysicalLogLocator.getLsn(), readOnlyBuffer, 0,
+ logManager);
+ readLSN = logicalLogLocator.getLsn();
+ }
+ return;
}
private IFileBasedBuffer getReadOnlyBuffer(long lsn, int size) throws IOException {
@@ -77,7 +86,12 @@
int integerRead = -1;
boolean logRecordBeginPosFound = false;
long bytesSkipped = 0;
-
+
+ if (readLSN > logManager.getLastFlushedLsn().get()) {
+ readFromMemory(readLSN, nextLogicalLogLocator);
+ return true;
+ }
+
//check whether the currentOffset has enough space to have new log record by comparing
//the smallest log record type(which is commit)'s log header.
while (logicalLogLocator.getMemoryOffset() <= readOnlyBuffer.getSize()
@@ -112,7 +126,9 @@
}
}
- int logLength = logManager.getLogRecordHelper().getLogRecordSize(logicalLogLocator);
+ int logLength = logManager.getLogRecordHelper().getLogRecordSize(
+ logManager.getLogRecordHelper().getLogType(logicalLogLocator),
+ logManager.getLogRecordHelper().getLogContentSize(logicalLogLocator));
if (logManager.getLogRecordHelper().validateLogRecord(logicalLogLocator)) {
if (nextLogicalLogLocator == null) {
nextLogicalLogLocator = new LogicalLogLocator(0, readOnlyBuffer, -1, logManager);
@@ -138,4 +154,57 @@
return logFilter;
}
+ private void readFromMemory(long lsn, LogicalLogLocator currentLogLocator) throws ACIDException {
+ byte[] logRecord = null;
+ if (lsn > logManager.getCurrentLsn().get()) {
+ throw new ACIDException(" invalid lsn " + lsn);
+ }
+
+ /* check if the log record in the log buffer or has reached the disk. */
+ int pageIndex = logManager.getLogPageIndex(lsn);
+ int pageOffset = logManager.getLogPageOffset(lsn);
+
+ byte[] pageContent = new byte[logManager.getLogManagerProperties().getLogPageSize()];
+ // take a lock on the log page so that the page is not flushed to
+ // disk interim
+ IFileBasedBuffer logPage = logManager.getLogPage(pageIndex);
+ int logRecordSize = 0;
+ synchronized (logPage) {
+ // need to check again
+ if (lsn > logManager.getLastFlushedLsn().get()) {
+ // get the log record length
+ logPage.getBytes(pageContent, 0, pageContent.length);
+ byte logType = pageContent[pageOffset + 4];
+ int logHeaderSize = logManager.getLogRecordHelper().getLogHeaderSize(logType);
+ int logBodySize = DataUtil.byteArrayToInt(pageContent, pageOffset + logHeaderSize - 4);
+ logRecordSize = logHeaderSize + logBodySize + logManager.getLogRecordHelper().getLogChecksumSize();
+ logRecord = new byte[logRecordSize];
+
+ //copy the log content
+ System.arraycopy(pageContent, pageOffset, logRecord, 0, logRecordSize);
+ MemBasedBuffer memBuffer = new MemBasedBuffer(logRecord);
+ if (logicalLogLocator == null) {
+ logicalLogLocator = new LogicalLogLocator(lsn, memBuffer, 0, logManager);
+ } else {
+ logicalLogLocator.setLsn(lsn);
+ logicalLogLocator.setBuffer(memBuffer);
+ logicalLogLocator.setMemoryOffset(0);
+ }
+
+ currentLogLocator.setLsn(lsn);
+ currentLogLocator.setBuffer(memBuffer);
+ currentLogLocator.setMemoryOffset(0);
+
+ try {
+ // validate the log record by comparing checksums
+ if (!logManager.getLogRecordHelper().validateLogRecord(logicalLogLocator)) {
+ throw new ACIDException(" invalid log record at lsn " + lsn);
+ }
+ } catch (Exception e) {
+ throw new ACIDException("exception encoutered in validating log record at lsn " + lsn, e);
+ }
+ }
+ }
+ readLSN = readLSN + logRecordSize;
+ }
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
index f295347..149498b 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
@@ -36,14 +36,8 @@
public class LogManager implements ILogManager {
- /*
- * Log Record Structure HEADER
- * <(log_magic_number,4)(log_length,8)(log_type,1
- * )(log_action_type,1)(log_timestamp
- * ,8)(log_transaction_id,8)(resource_manager_id
- * ,1)(page_id,8)(previous_lsn,8) <CONTENT> TAIL <(checksum,8)>
- */
-
+
+ public static final boolean IS_DEBUG_MODE = false;//true
private static final Logger LOGGER = Logger.getLogger(LogManager.class.getName());
private TransactionSubsystem provider;
private LogManagerProperties logManagerProperties;
@@ -522,8 +516,8 @@
}
// all constraints checked and we are good to go and acquire a lsn.
- long previousLogLocator = -1;
- long myLogLocator; // the will be set to the location (a long value)
+ long previousLSN = -1;
+ long currentLSN; // the will be set to the location (a long value)
// where the log record needs to be placed.
/*
@@ -534,10 +528,10 @@
* the last log record written by (any thread of) the transaction.
*/
synchronized (context) {
- previousLogLocator = context.getLastLogLocator().getLsn();
- myLogLocator = getLsn(totalLogSize, logType);
- context.getLastLogLocator().setLsn(myLogLocator);
- logicalLogLocator.setLsn(myLogLocator);
+ previousLSN = context.getLastLogLocator().getLsn();
+ currentLSN = getLsn(totalLogSize, logType);
+ context.setLastLSN(currentLSN);
+ logicalLogLocator.setLsn(currentLSN);
}
/*
@@ -555,7 +549,7 @@
// thread has submitted a flush
// request.
- int pageIndex = (int) getLogPageIndex(myLogLocator);
+ int pageIndex = (int) getLogPageIndex(currentLSN);
/*
* the lsn has been obtained for the log record. need to set the
@@ -565,13 +559,13 @@
try {
logicalLogLocator.setBuffer(logPages[pageIndex]);
- int pageOffset = getLogPageOffset(myLogLocator);
+ int pageOffset = getLogPageOffset(currentLSN);
logicalLogLocator.setMemoryOffset(pageOffset);
/*
* write the log header.
*/
- logRecordHelper.writeLogHeader(logicalLogLocator, logType, context, datasetId, PKHashValue, previousLogLocator,
+ logRecordHelper.writeLogHeader(logicalLogLocator, logType, context, datasetId, PKHashValue, previousLSN,
resourceId, resourceMgrId, logContentSize);
// increment the offset so that the transaction can fill up the
@@ -590,6 +584,11 @@
// record content at the allocated space.
logger.log(context, logicalLogLocator, logContentSize, reusableLogContentObject);
logger.postLog(context, reusableLogContentObject);
+ if (IS_DEBUG_MODE) {
+ logicalLogLocator.setMemoryOffset(logicalLogLocator.getMemoryOffset() - logRecordHelper.getLogHeaderSize(logType));
+ System.out.println(logRecordHelper.getLogRecordForDisplay(logicalLogLocator));
+ logicalLogLocator.increaseMemoryOffset(logRecordHelper.getLogHeaderSize(logType));
+ }
}
/*
@@ -628,7 +627,7 @@
* been flushed to disk because the containing log page filled up.
*/
if (logType == LogType.COMMIT) {
- if (getLastFlushedLsn().get() < myLogLocator) {
+ if (getLastFlushedLsn().get() < currentLSN) {
if (!addedFlushRequest) {
addFlushRequest(pageIndex);
}
@@ -640,7 +639,7 @@
* waiting threads of the flush event.
*/
synchronized (logPages[pageIndex]) {
- while (getLastFlushedLsn().get() < myLogLocator) {
+ while (getLastFlushedLsn().get() < currentLSN) {
logPages[pageIndex].wait();
}
}
@@ -701,7 +700,11 @@
FileChannel fileChannel = raf.getChannel();
fileChannel.read(buffer);
buffer.position(0);
- buffer.limit(buffer.getInt(4));
+ byte logType = buffer.get(4);
+ int logHeaderSize = logRecordHelper.getLogHeaderSize(logType);
+ int logBodySize = buffer.getInt(logHeaderSize-4);
+ int logRecordSize = logHeaderSize + logBodySize + logRecordHelper.getLogChecksumSize();
+ buffer.limit(logRecordSize);
MemBasedBuffer memBuffer = new MemBasedBuffer(buffer.slice());
logicalLogLocator = new LogicalLogLocator(physicalLogLocator.getLsn(), memBuffer, 0, this);
if (!logRecordHelper.validateLogRecord(logicalLogLocator)) {
@@ -737,6 +740,10 @@
if (lsnValue > getLastFlushedLsn().get()) {
int pageIndex = getLogPageIndex(lsnValue);
int pageOffset = getLogPageOffset(lsnValue);
+
+ //TODO
+ //minimize memory allocation overhead. current code allocates 10MBytes per reading a log record.
+
byte[] pageContent = new byte[logManagerProperties.getLogPageSize()];
// take a lock on the log page so that the page is not flushed to
// disk interim
@@ -750,13 +757,16 @@
// get the log record length
logPages[pageIndex].getBytes(pageContent, 0, pageContent.length);
- int logRecordLength = DataUtil.byteArrayToInt(pageContent, pageOffset + 4);
- logRecord = new byte[logRecordLength];
+ byte logType = pageContent[pageOffset+4];
+ int logHeaderSize = logRecordHelper.getLogHeaderSize(logType);
+ int logBodySize = DataUtil.byteArrayToInt(pageContent, pageOffset + logHeaderSize - 4);
+ int logRecordSize = logHeaderSize + logBodySize + logRecordHelper.getLogChecksumSize();
+ logRecord = new byte[logRecordSize];
/*
* copy the log record content
*/
- System.arraycopy(pageContent, pageOffset, logRecord, 0, logRecordLength);
+ System.arraycopy(pageContent, pageOffset, logRecord, 0, logRecordSize);
MemBasedBuffer memBuffer = new MemBasedBuffer(logRecord);
logLocator = new LogicalLogLocator(lsnValue, memBuffer, 0, this);
try {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java
index ef8fc54..d53d842 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java
@@ -36,6 +36,7 @@
* LogRecordSize(4)
* --------------------------- COMMIT doesn't have Body fields.
* [Body] The Body size is given through the parameter reusableLogContentObjectLength
+ * TupleFieldCount(4)
* NewOp(1)
* NewValueLength(4)
* NewValue(NewValueLength)
@@ -116,14 +117,15 @@
}
@Override
- public int getLogRecordSize(LogicalLogLocator logicalLogLocater) {
+ public int getLogContentSize(LogicalLogLocator logicalLogLocater) {
return logicalLogLocater.getBuffer().readInt(logicalLogLocater.getMemoryOffset() + LOG_RECORD_SIZE_POS);
}
@Override
public long getLogChecksum(LogicalLogLocator logicalLogLocator) {
return (logicalLogLocator.getBuffer()).readLong(logicalLogLocator.getMemoryOffset()
- + getLogRecordSize(logicalLogLocator) - LOG_CHECKSUM_SIZE);
+ + getLogRecordSize(getLogType(logicalLogLocator), getLogContentSize(logicalLogLocator))
+ - LOG_CHECKSUM_SIZE);
}
@Override
@@ -133,7 +135,9 @@
@Override
public int getLogContentEndPos(LogicalLogLocator logicalLogLocator) {
- return logicalLogLocator.getMemoryOffset() + getLogRecordSize(logicalLogLocator) - LOG_CHECKSUM_SIZE;
+ return logicalLogLocator.getMemoryOffset()
+ + getLogRecordSize(getLogType(logicalLogLocator), getLogContentSize(logicalLogLocator))
+ - LOG_CHECKSUM_SIZE;
}
@Override
@@ -153,10 +157,13 @@
builder.append(" Job Id : ").append(getJobId(logicalLogLocator));
builder.append(" Dataset Id : ").append(getDatasetId(logicalLogLocator));
builder.append(" PK Hash Value : ").append(getPKHashValue(logicalLogLocator));
- builder.append(" PrevLSN : ").append(getPrevLSN(logicalLogLocator));
- builder.append(" Resource Id : ").append(getResourceId(logicalLogLocator));
- builder.append(" ResourceMgr Id : ").append(getResourceMgrId(logicalLogLocator));
- builder.append(" Log Record Size : ").append(getLogRecordSize(logicalLogLocator));
+ if (logType == LogType.UPDATE) {
+ builder.append(" PrevLSN : ").append(getPrevLSN(logicalLogLocator).getLsn());
+ builder.append(" Resource Id : ").append(getResourceId(logicalLogLocator));
+ builder.append(" ResourceMgr Id : ").append(getResourceMgrId(logicalLogLocator));
+ builder.append(" Log Record Size : ").append(
+ getLogRecordSize(logType, getLogContentSize(logicalLogLocator)));
+ }
return builder.toString();
}
@@ -198,12 +205,13 @@
/* log record size */
(logicalLogLocator.getBuffer()).writeInt(logicalLogLocator.getMemoryOffset() + LOG_RECORD_SIZE_POS,
logRecordSize);
+
}
}
@Override
public boolean validateLogRecord(LogicalLogLocator logicalLogLocator) {
- int logLength = this.getLogRecordSize(logicalLogLocator);
+ int logLength = this.getLogRecordSize(getLogType(logicalLogLocator), getLogContentSize(logicalLogLocator));
long expectedChecksum = DataUtil.getChecksum(logicalLogLocator.getBuffer(),
logicalLogLocator.getMemoryOffset(), logLength - LOG_CHECKSUM_SIZE);
long actualChecksum = getLogChecksum(logicalLogLocator);
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
index 9a097d6..780debc 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -20,9 +20,14 @@
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -40,6 +45,7 @@
import edu.uci.ics.asterix.transaction.management.service.logging.LogicalLogLocator;
import edu.uci.ics.asterix.transaction.management.service.logging.PhysicalLogLocator;
import edu.uci.ics.asterix.transaction.management.service.transaction.IResourceManager;
+import edu.uci.ics.asterix.transaction.management.service.transaction.JobId;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
@@ -158,23 +164,27 @@
}
/**
- * Rollback a transaction (non-Javadoc)
+ * Rollback a transaction
*
* @see edu.uci.ics.transaction.management.service.recovery.IRecoveryManager# rollbackTransaction (edu.uci.ics.TransactionContext.management.service.transaction .TransactionContext)
*/
@Override
public void rollbackTransaction(TransactionContext txnContext) throws ACIDException {
ILogManager logManager = txnSubsystem.getLogManager();
- ILogRecordHelper parser = logManager.getLogRecordHelper();
+ ILogRecordHelper logRecordHelper = logManager.getLogRecordHelper();
+ Map<TxnId, List<Long>> loserTxnTable = new HashMap<TxnId, List<Long>>();
+ TxnId tempKeyTxnId = new TxnId(-1, -1, -1);
- // Obtain the last log record written by the transaction
- PhysicalLogLocator lsn = txnContext.getLastLogLocator();
+ // Obtain the first log record written by the Job
+ PhysicalLogLocator firstLSNLogLocator = txnContext.getFirstLogLocator();
+ PhysicalLogLocator lastLSNLogLocator = txnContext.getLastLogLocator();
if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info(" rollbacking transaction log records at lsn " + lsn.getLsn());
+ LOGGER.info(" rollbacking transaction log records from " + firstLSNLogLocator.getLsn() + "to"
+ + lastLSNLogLocator.getLsn());
}
// check if the transaction actually wrote some logs.
- if (lsn.getLsn() == TransactionManagementConstants.LogManagerConstants.TERMINAL_LSN) {
+ if (firstLSNLogLocator.getLsn() == TransactionManagementConstants.LogManagerConstants.TERMINAL_LSN) {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info(" no need to roll back as there were no operations by the transaction "
+ txnContext.getJobId());
@@ -182,68 +192,142 @@
return;
}
- // a dummy logLocator instance that is re-used during rollback
- LogicalLogLocator logLocator = LogUtil.getDummyLogicalLogLocator(logManager);
+ // While reading log records from firstLSN to lastLSN, collect uncommitted txn's LSNs
+ ILogCursor logCursor;
+ try {
+ logCursor = logManager.readLog(firstLSNLogLocator, new ILogFilter() {
+ @Override
+ public boolean accept(IBuffer buffer, long startOffset, int length) {
+ return true;
+ }
+ });
+ } catch (IOException e) {
+ throw new ACIDException("Failed to create LogCursor with LSN:" + firstLSNLogLocator.getLsn(), e);
+ }
- while (true) {
+ LogicalLogLocator currentLogLocator = LogUtil.getDummyLogicalLogLocator(logManager);
+ boolean valid;
+ byte logType;
+ List<Long> undoLSNSet = null;
+
+ while (currentLogLocator.getLsn() != lastLSNLogLocator.getLsn()) {
try {
- // read the log record at the given position
- logLocator = logManager.readLog(lsn);
- } catch (Exception e) {
- e.printStackTrace();
- state = SystemState.CORRUPTED;
- throw new ACIDException(" could not read log at lsn :" + lsn, e);
+ valid = logCursor.next(currentLogLocator);
+ } catch (IOException e) {
+ throw new ACIDException("Failed to read log at LSN:" + currentLogLocator.getLsn(), e);
+ }
+ if (!valid) {
+ if (currentLogLocator.getLsn() != lastLSNLogLocator.getLsn()) {
+ throw new ACIDException("Log File Corruption: lastLSN mismatch");
+ } else {
+ break;//End of Log File
+ }
}
- byte logType = parser.getLogType(logLocator);
- if (LOGGER.isLoggable(Level.FINE)) {
- LOGGER.fine(" reading LSN value inside rollback transaction method " + txnContext.getLastLogLocator()
- + " jodId " + parser.getJobId(logLocator) + " log type " + logType);
- }
+ tempKeyTxnId.setTxnId(logRecordHelper.getJobId(currentLogLocator), logRecordHelper.getDatasetId(currentLogLocator),
+ logRecordHelper.getPKHashValue(currentLogLocator));
+ logType = logRecordHelper.getLogType(currentLogLocator);
switch (logType) {
case LogType.UPDATE:
-
- // extract the resource manager id from the log record.
- byte resourceMgrId = parser.getResourceMgrId(logLocator);
- if (LOGGER.isLoggable(Level.FINE)) {
- LOGGER.fine(parser.getLogRecordForDisplay(logLocator));
+ undoLSNSet = loserTxnTable.get(tempKeyTxnId);
+ if (undoLSNSet == null) {
+ TxnId txnId = new TxnId(logRecordHelper.getJobId(currentLogLocator),
+ logRecordHelper.getDatasetId(currentLogLocator), logRecordHelper.getPKHashValue(currentLogLocator));
+ undoLSNSet = new ArrayList<Long>();
+ loserTxnTable.put(txnId, undoLSNSet);
}
-
- // look up the repository to get the resource manager
- IResourceManager resourceMgr = txnSubsystem.getTransactionalResourceRepository()
- .getTransactionalResourceMgr(resourceMgrId);
-
- // register resourceMgr if it is not registered.
- if (resourceMgr == null) {
- resourceMgr = new IndexResourceManager(resourceMgrId, txnSubsystem);
- txnSubsystem.getTransactionalResourceRepository().registerTransactionalResourceManager(
- resourceMgrId, resourceMgr);
- }
- resourceMgr.undo(parser, logLocator);
+ undoLSNSet.add(currentLogLocator.getLsn());
break;
case LogType.COMMIT:
- throw new ACIDException(txnContext, " cannot rollback commmitted transaction");
+ undoLSNSet = loserTxnTable.get(tempKeyTxnId);
+ if (undoLSNSet != null) {
+ loserTxnTable.remove(tempKeyTxnId);
+ }
+ break;
default:
throw new ACIDException("Unsupported LogType: " + logType);
-
- }
-
- // follow the previous LSN pointer to get the previous log record
- // written by the transaction
- // If the return value is true, the logLocator, it indicates that
- // the logLocator object has been
- // appropriately set to the location of the next log record to be
- // processed as part of the roll back
- boolean moreLogs = parser.getPrevLSN(lsn, logLocator);
- if (!moreLogs) {
- // no more logs to process
- break;
}
}
+ //undo loserTxn's effect
+ TxnId txnId;
+ Iterator<Entry<TxnId, List<Long>>> iter = loserTxnTable.entrySet().iterator();
+ byte resourceMgrId;
+ while (iter.hasNext()) {
+ Map.Entry<TxnId, List<Long>> loserTxn = (Map.Entry<TxnId, List<Long>>) iter.next();
+ txnId = loserTxn.getKey();
+
+ undoLSNSet = loserTxn.getValue();
+ Comparator<Long> comparator = Collections.reverseOrder();
+ Collections.sort(undoLSNSet, comparator);
+
+ for (long undoLSN : undoLSNSet) {
+ currentLogLocator.setLsn(undoLSN);
+ // here, all the log records are UPDATE type. So, we don't need to check the type again.
+
+ // extract the resource manager id from the log record.
+ resourceMgrId = logRecordHelper.getResourceMgrId(currentLogLocator);
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine(logRecordHelper.getLogRecordForDisplay(currentLogLocator));
+ }
+
+ // look up the repository to get the resource manager
+ IResourceManager resourceMgr = txnSubsystem.getTransactionalResourceRepository()
+ .getTransactionalResourceMgr(resourceMgrId);
+
+ // register resourceMgr if it is not registered.
+ if (resourceMgr == null) {
+ resourceMgr = new IndexResourceManager(resourceMgrId, txnSubsystem);
+ txnSubsystem.getTransactionalResourceRepository().registerTransactionalResourceManager(
+ resourceMgrId, resourceMgr);
+ }
+ resourceMgr.undo(logRecordHelper, currentLogLocator);
+ }
+ }
+ }
+}
+
+class TxnId {
+ public int jobId;
+ public int datasetId;
+ public int pkHashVal;
+
+ public TxnId(int jobId, int datasetId, int pkHashVal) {
+ this.jobId = jobId;
+ this.datasetId = datasetId;
+ this.pkHashVal = pkHashVal;
}
+ public void setTxnId(int jobId, int datasetId, int pkHashVal) {
+ this.jobId = jobId;
+ this.datasetId = datasetId;
+ this.pkHashVal = pkHashVal;
+ }
+
+ public void setTxnId(TxnId txnId) {
+ this.jobId = txnId.jobId;
+ this.datasetId = txnId.datasetId;
+ this.pkHashVal = txnId.pkHashVal;
+ }
+
+ @Override
+ public int hashCode() {
+ return pkHashVal;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == this) {
+ return true;
+ }
+ if (!(o instanceof JobId)) {
+ return false;
+ }
+ TxnId txnId = (TxnId) o;
+
+ return (txnId.pkHashVal == pkHashVal && txnId.datasetId == datasetId && txnId.jobId == jobId);
+ }
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
index bbade35..a88ed5f 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
@@ -52,8 +52,9 @@
private static final long serialVersionUID = -6105616785783310111L;
private TransactionSubsystem transactionSubsystem;
- private LogicalLogLocator lastLogLocator;
- private TransactionState txnState;
+ private LogicalLogLocator firstLogLocator;//firstLSN of the Job
+ private LogicalLogLocator lastLogLocator;//lastLSN of the Job
+ private TransactionState txnState;
private long startWaitTime;
private int status;
private Set<ICloseable> resources = new HashSet<ICloseable>();
@@ -74,6 +75,7 @@
}
private void init() throws ACIDException {
+ firstLogLocator = LogUtil.getDummyLogicalLogLocator(transactionSubsystem.getLogManager());
lastLogLocator = LogUtil.getDummyLogicalLogLocator(transactionSubsystem.getLogManager());
txnState = TransactionState.ACTIVE;
startWaitTime = INVALID_TIME;
@@ -117,12 +119,19 @@
resources.add(resource);
}
+ public LogicalLogLocator getFirstLogLocator() {
+ return firstLogLocator;
+ }
+
public LogicalLogLocator getLastLogLocator() {
return lastLogLocator;
}
- public void setLastLSN(LogicalLogLocator lastLogLocator) {
- this.lastLogLocator = lastLogLocator;
+ public void setLastLSN(long lsn) {
+ if (firstLogLocator.getLsn() == -1) {
+ firstLogLocator.setLsn(lsn);
+ }
+ lastLogLocator.setLsn(lsn);
}
public JobId getJobId() {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
index a22a7a2..3f71540 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -51,6 +51,7 @@
if (LOGGER.isLoggable(Level.SEVERE)) {
LOGGER.severe(msg);
}
+ ae.printStackTrace();
throw new Error(msg);
} finally {
txnContext.releaseResources();