fixed bugs related to rollback and primary key length
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java
index 6dd11bd..32bfa58 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java
@@ -72,7 +72,7 @@
@Override
public void open() throws HyracksDataException {
try {
- transactionContext = transactionManager.getTransactionContext(jobId);
+ transactionContext = transactionManager.getTransactionContext(jobId, false);
transactionContext.setWriteTxn(isWriteTransaction);
} catch (ACIDException e) {
throw new HyracksDataException(e);
@@ -89,7 +89,11 @@
pkHash = computePrimaryKeyHashValue(frameTupleReference, primaryKeyFields);
logRecord.formEntityCommitLogRecord(transactionContext, datasetId, pkHash, frameTupleReference,
primaryKeyFields);
- logMgr.log(logRecord);
+ try {
+ logMgr.log(logRecord);
+ } catch (ACIDException e) {
+ throw new HyracksDataException(e);
+ }
}
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java
index 2ed4b0ec..da18693 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -18,6 +18,7 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
+import edu.uci.ics.asterix.common.transactions.AbstractOperationCallback;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
@@ -43,7 +44,7 @@
public void beforeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
IModificationOperationCallback modificationCallback) throws HyracksDataException {
if (opType == LSMOperationType.MODIFICATION || opType == LSMOperationType.FORCE_MODIFICATION) {
- numActiveOperations.incrementAndGet();
+ incrementNumActiveOperations(modificationCallback);
} else if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE) {
datasetLifecycleManager.declareActiveIOOperation(datasetID);
}
@@ -62,14 +63,11 @@
public void completeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
IModificationOperationCallback modificationCallback) throws HyracksDataException {
if (opType == LSMOperationType.MODIFICATION || opType == LSMOperationType.FORCE_MODIFICATION) {
- numActiveOperations.decrementAndGet();
+ decrementNumActiveOperations(modificationCallback);
+ flushIfFull();
} else if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE) {
datasetLifecycleManager.undeclareActiveIOOperation(datasetID);
}
-
- if (opType == LSMOperationType.MODIFICATION || opType == LSMOperationType.FORCE_MODIFICATION) {
- flushIfFull();
- }
}
private void flushIfFull() throws HyracksDataException {
@@ -103,4 +101,25 @@
public int getNumActiveOperations() {
return numActiveOperations.get();
}
+
+ private void incrementNumActiveOperations(IModificationOperationCallback modificationCallback) {
+ if (modificationCallback != null && modificationCallback != NoOpOperationCallback.INSTANCE) {
+ numActiveOperations.incrementAndGet();
+ ((AbstractOperationCallback) modificationCallback).incrementLocalNumActiveOperations();
+ }
+ }
+
+ private void decrementNumActiveOperations(IModificationOperationCallback modificationCallback) {
+ if (modificationCallback != null && modificationCallback != NoOpOperationCallback.INSTANCE) {
+ numActiveOperations.decrementAndGet();
+ ((AbstractOperationCallback) modificationCallback).decrementLocalNumActiveOperations();
+ }
+ }
+
+ public void cleanupNumActiveOperationsForAbortedJob(AbstractOperationCallback callback) {
+ int delta = callback.getLocalNumActiveOperations() * -1;
+ numActiveOperations.getAndAdd(delta);
+ callback.resetLocalNumActiveOperations();
+ }
+
}
\ No newline at end of file
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
index b6025cb..e046db0 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
@@ -32,7 +32,7 @@
@Override
public void afterOperation(List<ILSMComponent> oldComponents, ILSMComponent newComponent)
throws HyracksDataException {
- if (oldComponents != null && newComponent != null) {
+ if (newComponent != null) {
LSMBTreeDiskComponent btreeComponent = (LSMBTreeDiskComponent) newComponent;
putLSNIntoMetadata(btreeComponent.getBTree(), oldComponents);
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/AbstractOperationCallback.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/AbstractOperationCallback.java
index d4b26f7..c549e7d 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/AbstractOperationCallback.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/AbstractOperationCallback.java
@@ -15,6 +15,8 @@
package edu.uci.ics.asterix.common.transactions;
+import java.util.concurrent.atomic.AtomicInteger;
+
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.storage.am.bloomfilter.impls.MurmurHash128Bit;
@@ -27,6 +29,7 @@
protected final ITransactionContext txnCtx;
protected final ILockManager lockManager;
protected final long[] longHashes;
+ protected final AtomicInteger transactorLocalNumActiveOperations;
public AbstractOperationCallback(int datasetId, int[] primaryKeyFields, ITransactionContext txnCtx,
ILockManager lockManager) {
@@ -34,6 +37,7 @@
this.primaryKeyFields = primaryKeyFields;
this.txnCtx = txnCtx;
this.lockManager = lockManager;
+ this.transactorLocalNumActiveOperations = new AtomicInteger(0);
this.longHashes = new long[2];
}
@@ -41,4 +45,21 @@
MurmurHash128Bit.hash3_x64_128(tuple, primaryKeyFields, SEED, longHashes);
return Math.abs((int) longHashes[0]);
}
+
+ public void resetLocalNumActiveOperations() {
+ transactorLocalNumActiveOperations.set(0);
+ }
+
+ public int getLocalNumActiveOperations() {
+ return transactorLocalNumActiveOperations.get();
+ }
+
+ public void incrementLocalNumActiveOperations() {
+ transactorLocalNumActiveOperations.incrementAndGet();
+ }
+
+ public void decrementLocalNumActiveOperations() {
+ transactorLocalNumActiveOperations.decrementAndGet();
+ }
+
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogManager.java
index 8913f8a..27a91a4 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogManager.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogManager.java
@@ -14,9 +14,11 @@
*/
package edu.uci.ics.asterix.common.transactions;
+import edu.uci.ics.asterix.common.exceptions.ACIDException;
+
public interface ILogManager {
- public void log(ILogRecord logRecord);
+ public void log(ILogRecord logRecord) throws ACIDException;
public ILogReader getLogReader(boolean isRecoveryMode);
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogRecord.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogRecord.java
index d13ef6c..3068867 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogRecord.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogRecord.java
@@ -20,15 +20,15 @@
public interface ILogRecord {
- public static final int JOB_COMMIT_LOG_SIZE = 13;
- public static final int ENTITY_COMMIT_LOG_BASE_SIZE = 29;
- public static final int UPDATE_LOG_BASE_SIZE = 64;
+ public static final int JOB_TERMINATE_LOG_SIZE = 13; //JOB_COMMIT or ABORT log type
+ public static final int ENTITY_COMMIT_LOG_BASE_SIZE = 25;
+ public static final int UPDATE_LOG_BASE_SIZE = 60;
public boolean readLogRecord(ByteBuffer buffer);
public void writeLogRecord(ByteBuffer buffer);
- public void formJobCommitLogRecord(ITransactionContext txnCtx);
+ public void formJobTerminateLogRecord(ITransactionContext txnCtx, boolean isCommit);
public void formEntityCommitLogRecord(ITransactionContext txnCtx, int datasetId, int PKHashValue,
ITupleReference tupleReference, int[] primaryKeyFields);
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionManager.java
index 77e960b..ffd4cc2 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionManager.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionManager.java
@@ -52,10 +52,11 @@
*
* @param jobId
* a unique value for the transaction id.
+ * @param createIfNotExist TODO
* @return
* @throws ACIDException
*/
- public ITransactionContext getTransactionContext(JobId jobId) throws ACIDException;
+ public ITransactionContext getTransactionContext(JobId jobId, boolean createIfNotExist) throws ACIDException;
/**
* Commits a transaction.
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
index 8765aae..ee0791b 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
@@ -115,14 +115,15 @@
@Override
public void commitTransaction(JobId jobId) throws RemoteException, ACIDException {
- ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId);
+ ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId, false);
transactionSubsystem.getTransactionManager().commitTransaction(txnCtx, new DatasetId(-1), -1);
}
@Override
public void abortTransaction(JobId jobId) throws RemoteException, ACIDException {
try {
- ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId);
+ ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId,
+ false);
transactionSubsystem.getTransactionManager().abortTransaction(txnCtx, new DatasetId(-1), -1);
} catch (ACIDException e) {
e.printStackTrace();
@@ -132,13 +133,13 @@
@Override
public void lock(JobId jobId, byte lockMode) throws ACIDException, RemoteException {
- ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId);
+ ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId, false);
transactionSubsystem.getLockManager().lock(METADATA_DATASET_ID, -1, lockMode, txnCtx);
}
@Override
public void unlock(JobId jobId) throws ACIDException, RemoteException {
- ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId);
+ ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId, false);
transactionSubsystem.getLockManager().unlock(METADATA_DATASET_ID, -1, txnCtx);
}
@@ -273,7 +274,7 @@
ILSMIndexAccessor indexAccessor = lsmIndex.createAccessor(modCallback, NoOpOperationCallback.INSTANCE);
- ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId);
+ ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId, false);
txnCtx.setWriteTxn(true);
txnCtx.registerIndexAndCallback(resourceID, lsmIndex, (AbstractOperationCallback) modCallback,
metadataIndex.isPrimaryIndex());
@@ -286,7 +287,7 @@
private IModificationOperationCallback createIndexModificationCallback(JobId jobId, long resourceId,
IMetadataIndex metadataIndex, ILSMIndex lsmIndex, IndexOperation indexOp) throws Exception {
- ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId);
+ ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId, false);
if (metadataIndex.isPrimaryIndex()) {
return new PrimaryIndexModificationOperationCallback(metadataIndex.getDatasetId().getId(),
@@ -581,7 +582,7 @@
lsmIndex, IndexOperation.DELETE);
ILSMIndexAccessor indexAccessor = lsmIndex.createAccessor(modCallback, NoOpOperationCallback.INSTANCE);
- ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId);
+ ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId, false);
txnCtx.setWriteTxn(true);
txnCtx.registerIndexAndCallback(resourceID, lsmIndex, (AbstractOperationCallback) modCallback,
metadataIndex.isPrimaryIndex());
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/job/listener/JobEventListenerFactory.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/job/listener/JobEventListenerFactory.java
index b44755c..4f4eba2 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/job/listener/JobEventListenerFactory.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/job/listener/JobEventListenerFactory.java
@@ -49,7 +49,7 @@
try {
ITransactionManager txnManager = ((IAsterixAppRuntimeContext) jobletContext.getApplicationContext()
.getApplicationObject()).getTransactionSubsystem().getTransactionManager();
- ITransactionContext txnContext = txnManager.getTransactionContext(jobId);
+ ITransactionContext txnContext = txnManager.getTransactionContext(jobId, false);
txnContext.setWriteTxn(transactionalWrite);
txnManager.completedTransaction(txnContext, new DatasetId(-1), -1,
!(jobStatus == JobStatus.FAILURE));
@@ -62,7 +62,7 @@
public void jobletStart() {
try {
((IAsterixAppRuntimeContext) jobletContext.getApplicationContext().getApplicationObject())
- .getTransactionSubsystem().getTransactionManager().getTransactionContext(jobId);
+ .getTransactionSubsystem().getTransactionManager().getTransactionContext(jobId, true);
} catch (ACIDException e) {
throw new Error(e);
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java
index a6537cd..00f0c4a 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java
@@ -41,7 +41,7 @@
throws HyracksDataException {
ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
try {
- ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId);
+ ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId, false);
return new PrimaryIndexInstantSearchOperationCallback(datasetId, primaryKeyFields,
txnSubsystem.getLockManager(), txnCtx);
} catch (ACIDException e) {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
index abeec62..07daee4 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
@@ -58,7 +58,7 @@
}
try {
- ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId);
+ ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId, false);
IModificationOperationCallback modCallback = new PrimaryIndexModificationOperationCallback(datasetId,
primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resourceId, resourceType,
indexOp);
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
index b59eb75..01cb725 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
@@ -41,7 +41,7 @@
throws HyracksDataException {
ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
try {
- ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId);
+ ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId, false);
return new PrimaryIndexSearchOperationCallback(datasetId, primaryKeyFields, txnSubsystem.getLockManager(),
txnCtx);
} catch (ACIDException e) {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
index d5bc877..563e9b7 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
@@ -55,7 +55,7 @@
}
try {
- ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId);
+ ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId, false);
IModificationOperationCallback modCallback = new SecondaryIndexModificationOperationCallback(datasetId, primaryKeyFields, txnCtx,
txnSubsystem.getLockManager(), txnSubsystem, resourceId, resourceType, indexOp);
txnCtx.registerIndexAndCallback(resourceId, index, (AbstractOperationCallback) modCallback, false);
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
index 6d86f70..c7df2f2 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
@@ -665,8 +665,6 @@
latchLockTable();
try {
- validateJob(txnContext);
-
if (IS_DEBUG_MODE) {
trackLockRequest("Requested", RequestType.UNLOCK, datasetId, entityHashValue, (byte) 0, txnContext,
dLockInfo, eLockInfo);
@@ -2212,14 +2210,14 @@
if (logRecord.getLogType() == LogType.ENTITY_COMMIT) {
tempDatasetIdObj.setId(logRecord.getDatasetId());
tempJobIdObj.setId(logRecord.getJobId());
- txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(tempJobIdObj);
+ txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(tempJobIdObj, false);
unlock(tempDatasetIdObj, logRecord.getPKHashValue(), txnCtx);
txnCtx.notifyOptracker(false);
- } else if (logRecord.getLogType() == LogType.JOB_COMMIT) {
+ } else if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT) {
tempJobIdObj.setId(logRecord.getJobId());
- txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(tempJobIdObj);
+ txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(tempJobIdObj, false);
txnCtx.notifyOptracker(true);
- ((LogPage) logPage).notifyJobCommitter();
+ ((LogPage) logPage).notifyJobTerminator();
}
logRecord = logPageReader.next();
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
index 4f0bb59..933afcd 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
@@ -38,6 +38,7 @@
import edu.uci.ics.asterix.common.transactions.ILogReader;
import edu.uci.ics.asterix.common.transactions.ILogRecord;
import edu.uci.ics.asterix.common.transactions.ITransactionContext;
+import edu.uci.ics.asterix.common.transactions.ITransactionManager;
import edu.uci.ics.asterix.common.transactions.LogManagerProperties;
import edu.uci.ics.asterix.common.transactions.MutableLong;
import edu.uci.ics.asterix.transaction.management.service.locking.LockManager;
@@ -74,16 +75,16 @@
logDir = logManagerProperties.getLogDir();
logFilePrefix = logManagerProperties.getLogFilePrefix();
flushLSN = new MutableLong();
- initializeLogManager();
+ initializeLogManager(0);
}
- private void initializeLogManager() {
+ private void initializeLogManager(long nextLogFileId) {
emptyQ = new LinkedBlockingQueue<LogPage>(numLogPages);
flushQ = new LinkedBlockingQueue<LogPage>(numLogPages);
for (int i = 0; i < numLogPages; i++) {
emptyQ.offer(new LogPage((LockManager) txnSubsystem.getLockManager(), logPageSize, flushLSN));
}
- appendLSN = initializeLogAnchor();
+ appendLSN = initializeLogAnchor(nextLogFileId);
flushLSN.set(appendLSN);
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("LogManager starts logging in LSN: " + appendLSN);
@@ -95,12 +96,13 @@
}
@Override
- public void log(ILogRecord logRecord) {
+ public void log(ILogRecord logRecord) throws ACIDException {
if (logRecord.getLogSize() > logPageSize) {
throw new IllegalStateException();
}
syncLog(logRecord);
- if (logRecord.getLogType() == LogType.JOB_COMMIT && !logRecord.isFlushed()) {
+ if ((logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT)
+ && !logRecord.isFlushed()) {
synchronized (logRecord) {
while (!logRecord.isFlushed()) {
try {
@@ -113,13 +115,16 @@
}
}
- private synchronized void syncLog(ILogRecord logRecord) {
+ private synchronized void syncLog(ILogRecord logRecord) throws ACIDException {
ITransactionContext txnCtx = logRecord.getTxnCtx();
+ if (txnCtx.getTxnState() == ITransactionManager.ABORTED && logRecord.getLogType() != LogType.ABORT) {
+ throw new ACIDException("Aborted job(" + txnCtx.getJobId() + ") tried to write non-abort type log record.");
+ }
if (getLogFileOffset(appendLSN) + logRecord.getLogSize() > logFileSize) {
prepareNextLogFile();
appendPage.isFull(true);
getAndInitNewPage();
- } else if (!appendPage.hasSpace(logRecord.getLogSize(), getLogFileOffset(appendLSN))) {
+ } else if (!appendPage.hasSpace(logRecord.getLogSize())) {
appendPage.isFull(true);
getAndInitNewPage();
}
@@ -141,7 +146,6 @@
}
appendPage.reset();
appendPage.setFileChannel(appendChannel);
- appendPage.setInitialFlushOffset(getLogFileOffset(appendLSN));
flushQ.offer(appendPage);
}
@@ -229,7 +233,7 @@
return flushLSN;
}
- private long initializeLogAnchor() {
+ private long initializeLogAnchor(long nextLogFileId) {
long fileId = 0;
long offset = 0;
File fileLogDir = new File(logDir);
@@ -237,9 +241,10 @@
if (fileLogDir.exists()) {
List<Long> logFileIds = getLogFileIds();
if (logFileIds == null) {
- createFileIfNotExists(getLogFilePath(0));
+ fileId = nextLogFileId;
+ createFileIfNotExists(getLogFilePath(fileId));
if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("created a log file: " + getLogFilePath(0));
+ LOGGER.info("created a log file: " + getLogFilePath(fileId));
}
} else {
fileId = logFileIds.get(logFileIds.size() - 1);
@@ -247,13 +252,14 @@
offset = logFile.length();
}
} else {
+ fileId = nextLogFileId;
createNewDirectory(logDir);
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("created the log directory: " + logManagerProperties.getLogDir());
}
- createFileIfNotExists(getLogFilePath(0));
+ createFileIfNotExists(getLogFilePath(fileId));
if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("created a log file: " + getLogFilePath(0));
+ LOGGER.info("created a log file: " + getLogFilePath(fileId));
}
}
} catch (IOException ioe) {
@@ -267,8 +273,8 @@
public void renewLogFiles() {
terminateLogFlusher();
- deleteAllLogFiles();
- initializeLogManager();
+ long lastMaxLogFileId = deleteAllLogFiles();
+ initializeLogManager(lastMaxLogFileId + 1);
}
private void terminateLogFlusher() {
@@ -290,7 +296,7 @@
}
}
- private void deleteAllLogFiles() {
+ private long deleteAllLogFiles() {
if (appendChannel != null) {
try {
appendChannel.close();
@@ -305,6 +311,7 @@
throw new IllegalStateException("Failed to delete a file: " + file.getAbsolutePath());
}
}
+ return logFileIds.get(logFileIds.size() - 1);
}
private List<Long> getLogFileIds() {
@@ -384,10 +391,16 @@
}
return newFileChannel;
}
+
+ public long getReadableSmallestLSN() {
+ List<Long> logFileIds = getLogFileIds();
+ return logFileIds.get(0) * logFileSize;
+ }
}
class LogFlusher implements Callable<Boolean> {
- private final static LogPage POISON_PILL = new LogPage(null, ILogRecord.JOB_COMMIT_LOG_SIZE, null);
+ private static final Logger LOGGER = Logger.getLogger(LogFlusher.class.getName());
+ private final static LogPage POISON_PILL = new LogPage(null, ILogRecord.JOB_TERMINATE_LOG_SIZE, null);
private final LogManager logMgr;//for debugging
private final LinkedBlockingQueue<LogPage> emptyQ;
private final LinkedBlockingQueue<LogPage> flushQ;
@@ -402,13 +415,13 @@
flushPage = null;
isStarted = new AtomicBoolean(false);
terminateFlag = new AtomicBoolean(false);
-
+
}
public void terminate() {
//make sure the LogFlusher thread started before terminating it.
synchronized (isStarted) {
- while(!isStarted.get()) {
+ while (!isStarted.get()) {
try {
isStarted.wait();
} catch (InterruptedException e) {
@@ -416,7 +429,7 @@
}
}
}
-
+
terminateFlag.set(true);
if (flushPage != null) {
synchronized (flushPage) {
@@ -432,24 +445,34 @@
@Override
public Boolean call() {
- synchronized(isStarted) {
+ synchronized (isStarted) {
isStarted.set(true);
isStarted.notify();
}
- while (true) {
- flushPage = null;
- try {
- flushPage = flushQ.take();
- if (flushPage == POISON_PILL || terminateFlag.get()) {
- return true;
+ try {
+ while (true) {
+ flushPage = null;
+ try {
+ flushPage = flushQ.take();
+ if (flushPage == POISON_PILL || terminateFlag.get()) {
+ return true;
+ }
+ } catch (InterruptedException e) {
+ if (flushPage == null) {
+ continue;
+ }
}
- } catch (InterruptedException e) {
- if (flushPage == null) {
- continue;
- }
+ flushPage.flush();
+ emptyQ.offer(flushPage);
}
- flushPage.flush();
- emptyQ.offer(flushPage);
+ } catch (Exception e) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("-------------------------------------------------------------------------");
+ LOGGER.info("LogFlusher is terminating abnormally. System is in unusalbe state.");
+ LOGGER.info("-------------------------------------------------------------------------");
+ }
+ e.printStackTrace();
+ throw e;
}
}
}
\ No newline at end of file
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
index edfec69..8921f35 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
@@ -58,7 +58,7 @@
appendOffset = 0;
flushOffset = 0;
isLastPage = false;
- syncCommitQ = new LinkedBlockingQueue<ILogRecord>(logPageSize / ILogRecord.JOB_COMMIT_LOG_SIZE);
+ syncCommitQ = new LinkedBlockingQueue<ILogRecord>(logPageSize / ILogRecord.JOB_TERMINATE_LOG_SIZE);
}
////////////////////////////////////
@@ -76,7 +76,7 @@
if (IS_DEBUG_MODE) {
LOGGER.info("append()| appendOffset: " + appendOffset);
}
- if (logRecord.getLogType() == LogType.JOB_COMMIT) {
+ if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT) {
logRecord.isFlushed(false);
syncCommitQ.offer(logRecord);
}
@@ -105,7 +105,7 @@
this.isLastPage = isLastPage;
}
- public boolean hasSpace(int logSize, long logFileOffset) {
+ public boolean hasSpace(int logSize) {
return appendOffset + logSize <= logPageSize;
}
@@ -192,7 +192,7 @@
}
}
- public void notifyJobCommitter() {
+ public void notifyJobTerminator() {
ILogRecord logRecord = null;
while (logRecord == null) {
try {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecord.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecord.java
index 4b0e1f2..dd81df7 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecord.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecord.java
@@ -34,10 +34,9 @@
* LogType(1)
* JobId(4)
* ---------------------------
- * [Header2] (16 bytes + PKValueSize) : for entity_commit and update log types
+ * [Header2] (12 bytes + PKValueSize) : for entity_commit and update log types
* DatasetId(4) //stored in dataset_dataset in Metadata Node
* PKHashValue(4)
- * PKFieldCnt(4)
* PKValueSize(4)
* PKValue(PKValueSize)
* ---------------------------
@@ -61,10 +60,10 @@
* ---------------------------
* = LogSize =
* 1) JOB_COMMIT_LOG_SIZE: 13 bytes (5 + 8)
- * 2) ENTITY_COMMIT: 29 + PKSize (5 + 16 + PKSize + 8)
- * --> ENTITY_COMMIT_LOG_BASE_SIZE = 29
- * 3) UPDATE: 64 + PKSize + New/OldValueSize (5 + 16 + PKSize + 21 + 14 + New/OldValueSize + 8)
- * --> UPDATE_LOG_BASE_SIZE = 64
+ * 2) ENTITY_COMMIT: 25 + PKSize (5 + 12 + PKSize + 8)
+ * --> ENTITY_COMMIT_LOG_BASE_SIZE = 25
+ * 3) UPDATE: 64 + PKSize + New/OldValueSize (5 + 12 + PKSize + 21 + 14 + New/OldValueSize + 8)
+ * --> UPDATE_LOG_BASE_SIZE = 60
*/
public class LogRecord implements ILogRecord {
@@ -73,7 +72,6 @@
private int jobId;
private int datasetId;
private int PKHashValue;
- private int PKFieldCnt;
private int PKValueSize;
private ITupleReference PKValue;
private long prevLSN;
@@ -90,12 +88,13 @@
private long checksum;
//------------- fields in a log record (end) --------------//
+ private int PKFieldCnt;
private static final int CHECKSUM_SIZE = 8;
private ITransactionContext txnCtx;
private long LSN;
private final AtomicBoolean isFlushed;
private final SimpleTupleWriter tupleWriter;
- private final SimpleTupleReference readPKValue;
+ private final PrimaryKeyTupleReference readPKValue;
private final SimpleTupleReference readNewValue;
private final SimpleTupleReference readOldValue;
private final CRC32 checksumGen;
@@ -104,7 +103,7 @@
public LogRecord() {
isFlushed = new AtomicBoolean(false);
tupleWriter = new SimpleTupleWriter();
- readPKValue = (SimpleTupleReference) tupleWriter.createTupleReference();
+ readPKValue = new PrimaryKeyTupleReference();
readNewValue = (SimpleTupleReference) tupleWriter.createTupleReference();
readOldValue = (SimpleTupleReference) tupleWriter.createTupleReference();
checksumGen = new CRC32();
@@ -115,10 +114,9 @@
int beginOffset = buffer.position();
buffer.put(logType);
buffer.putInt(jobId);
- if (logType != LogType.JOB_COMMIT) {
+ if (logType == LogType.UPDATE || logType == LogType.ENTITY_COMMIT) {
buffer.putInt(datasetId);
buffer.putInt(PKHashValue);
- buffer.putInt(PKFieldCnt);
if (PKValueSize <= 0) {
throw new IllegalStateException("Primary Key Size is less than or equal to 0");
}
@@ -173,13 +171,12 @@
try {
logType = buffer.get();
jobId = buffer.getInt();
- if (logType == LogType.JOB_COMMIT) {
+ if (logType == LogType.JOB_COMMIT || logType == LogType.ABORT) {
datasetId = -1;
PKHashValue = -1;
} else {
- datasetId = buffer.getInt();
+ datasetId = buffer.getInt();
PKHashValue = buffer.getInt();
- PKFieldCnt = buffer.getInt();
PKValueSize = buffer.getInt();
if (PKValueSize <= 0) {
throw new IllegalStateException("Primary Key Size is less than or equal to 0");
@@ -217,12 +214,20 @@
}
return true;
}
-
+
private ITupleReference readPKValue(ByteBuffer buffer) {
- return readTuple(buffer, readPKValue, PKFieldCnt, PKValueSize);
+ if (buffer.position() + PKValueSize > buffer.limit()) {
+ throw new BufferUnderflowException();
+ }
+ readPKValue.reset(buffer.array(), buffer.position(), PKValueSize);
+ buffer.position(buffer.position() + PKValueSize);
+ return readPKValue;
}
private ITupleReference readTuple(ByteBuffer srcBuffer, SimpleTupleReference destTuple, int fieldCnt, int size) {
+ if (srcBuffer.position() + size > srcBuffer.limit()) {
+ throw new BufferUnderflowException();
+ }
destTuple.setFieldCount(fieldCnt);
destTuple.resetByTupleOffset(srcBuffer, srcBuffer.position());
srcBuffer.position(srcBuffer.position() + size);
@@ -230,9 +235,9 @@
}
@Override
- public void formJobCommitLogRecord(ITransactionContext txnCtx) {
+ public void formJobTerminateLogRecord(ITransactionContext txnCtx, boolean isCommit) {
this.txnCtx = txnCtx;
- this.logType = LogType.JOB_COMMIT;
+ this.logType = isCommit ? LogType.JOB_COMMIT : LogType.ABORT;
this.jobId = txnCtx.getJobId().getId();
this.datasetId = -1;
this.PKHashValue = -1;
@@ -281,7 +286,8 @@
setUpdateLogSize();
break;
case LogType.JOB_COMMIT:
- logSize = JOB_COMMIT_LOG_SIZE;
+ case LogType.ABORT:
+ logSize = JOB_TERMINATE_LOG_SIZE;
break;
case LogType.ENTITY_COMMIT:
logSize = ENTITY_COMMIT_LOG_BASE_SIZE + PKValueSize;
@@ -298,7 +304,7 @@
builder.append(" LogType : ").append(LogType.toString(logType));
builder.append(" LogSize : ").append(logSize);
builder.append(" JobId : ").append(jobId);
- if (logType != LogType.JOB_COMMIT) {
+ if (logType == LogType.ENTITY_COMMIT || logType == LogType.UPDATE) {
builder.append(" DatasetId : ").append(datasetId);
builder.append(" PKHashValue : ").append(PKHashValue);
builder.append(" PKFieldCnt : ").append(PKFieldCnt);
@@ -496,17 +502,17 @@
public void setLSN(long LSN) {
this.LSN = LSN;
}
-
+
@Override
public int getPKValueSize() {
return PKValueSize;
}
-
+
@Override
public ITupleReference getPKValue() {
return PKValue;
}
-
+
@Override
public void setPKFields(int[] primaryKeyFields) {
PKFields = primaryKeyFields;
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogType.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogType.java
index 823c8d3..f9e9304 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogType.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogType.java
@@ -19,11 +19,14 @@
public static final byte UPDATE = 0;
public static final byte JOB_COMMIT = 1;
public static final byte ENTITY_COMMIT = 2;
+ public static final byte ABORT = 3;
private static final String STRING_UPDATE = "UPDATE";
private static final String STRING_JOB_COMMIT = "JOB_COMMIT";
private static final String STRING_ENTITY_COMMIT = "ENTITY_COMMIT";
+ private static final String STRING_ABORT = "ABORT";
private static final String STRING_INVALID_LOG_TYPE = "INVALID_LOG_TYPE";
+
public static String toString(byte logType) {
switch (logType) {
case LogType.UPDATE:
@@ -32,6 +35,8 @@
return STRING_JOB_COMMIT;
case LogType.ENTITY_COMMIT:
return STRING_ENTITY_COMMIT;
+ case LogType.ABORT:
+ return STRING_ABORT;
default:
return STRING_INVALID_LOG_TYPE;
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/PrimaryKeyTupleReference.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/PrimaryKeyTupleReference.java
new file mode 100644
index 0000000..d45b209
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/PrimaryKeyTupleReference.java
@@ -0,0 +1,36 @@
+package edu.uci.ics.asterix.transaction.management.service.logging;
+
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+public class PrimaryKeyTupleReference implements ITupleReference {
+ private byte[] fieldData;
+ private int start;
+ private int length;
+
+ public void reset(byte[] fieldData, int start, int length) {
+ this.fieldData = fieldData;
+ this.start = start;
+ this.length = length;
+ }
+
+ @Override
+ public int getFieldCount() {
+ return 1;
+ }
+
+ @Override
+ public byte[] getFieldData(int fIdx) {
+ return fieldData;
+ }
+
+ @Override
+ public int getFieldStart(int fIdx) {
+ return start;
+ }
+
+ @Override
+ public int getFieldLength(int fIdx) {
+ return length;
+ }
+
+}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
index 2ad3055..f31c164 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -77,6 +77,7 @@
private final TransactionSubsystem txnSubsystem;
private final LogManager logMgr;
private final int checkpointHistory;
+ private final long SHARP_CHECKPOINT_LSN = -1;
/**
* A file at a known location that contains the LSN of the last log record
@@ -115,18 +116,23 @@
return state;
}
- //#. if minMCTFirstLSN is equal to -1 &&
- // checkpointLSN in the checkpoint file is equal to the lastLSN in the log file,
- // then return healthy state. Otherwise, return corrupted.
- if ((checkpointObject.getMinMCTFirstLsn() == -2 && logMgr.getAppendLSN() == 0)
- || (checkpointObject.getMinMCTFirstLsn() == -1 && checkpointObject.getCheckpointLsn() == logMgr
- .getAppendLSN())) {
+ long readableSmallestLSN = logMgr.getReadableSmallestLSN();
+ if (logMgr.getAppendLSN() == readableSmallestLSN) {
+ if (checkpointObject.getMinMCTFirstLsn() != SHARP_CHECKPOINT_LSN) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("[Warning] ---------------------------------------------------");
+ LOGGER.info("[Warning] Some(or all) of transaction log files are lost.");
+ LOGGER.info("[Warning] ---------------------------------------------------");
+ //No choice but continuing when the log files are lost.
+ }
+ }
+ state = SystemState.HEALTHY;
+ return state;
+ } else if (checkpointObject.getCheckpointLsn() == logMgr.getAppendLSN()
+ && checkpointObject.getMinMCTFirstLsn() == SHARP_CHECKPOINT_LSN) {
state = SystemState.HEALTHY;
return state;
} else {
- if (logMgr.getAppendLSN() == 0) {
- throw new IllegalStateException("Transaction log files are lost.");
- }
state = SystemState.CORRUPTED;
return state;
}
@@ -154,10 +160,11 @@
TxnId winnerEntity = null;
//#. read checkpoint file and set lowWaterMark where anaylsis and redo start
+ long readableSmallestLSN = logMgr.getReadableSmallestLSN();
CheckpointObject checkpointObject = readCheckpoint();
- long lowWaterMarkLsn = checkpointObject.getMinMCTFirstLsn();
- if (lowWaterMarkLsn == -1 || lowWaterMarkLsn == -2) {
- lowWaterMarkLsn = 0;
+ long lowWaterMarkLSN = checkpointObject.getMinMCTFirstLsn();
+ if (lowWaterMarkLSN < readableSmallestLSN) {
+ lowWaterMarkLSN = readableSmallestLSN;
}
int maxJobId = checkpointObject.getMaxJobId();
@@ -171,11 +178,11 @@
//#. set log reader to the lowWaterMarkLsn
ILogReader logReader = logMgr.getLogReader(true);
- logReader.initializeScan(lowWaterMarkLsn);
+ logReader.initializeScan(lowWaterMarkLSN);
ILogRecord logRecord = logReader.next();
while (logRecord != null) {
if (IS_DEBUG_MODE) {
- System.out.println(logRecord.getLogRecordForDisplay());
+ LOGGER.info(logRecord.getLogRecordForDisplay());
}
//update max jobId
if (logRecord.getJobId() > maxJobId) {
@@ -183,16 +190,12 @@
}
switch (logRecord.getLogType()) {
case LogType.UPDATE:
- if (IS_DEBUG_MODE) {
- updateLogCount++;
- }
+ updateLogCount++;
break;
case LogType.JOB_COMMIT:
winnerJobSet.add(Integer.valueOf(logRecord.getJobId()));
jobId2WinnerEntitiesMap.remove(Integer.valueOf(logRecord.getJobId()));
- if (IS_DEBUG_MODE) {
- jobCommitLogCount++;
- }
+ jobCommitLogCount++;
break;
case LogType.ENTITY_COMMIT:
jobId = logRecord.getJobId();
@@ -205,9 +208,10 @@
winnerEntitySet = jobId2WinnerEntitiesMap.get(Integer.valueOf(jobId));
}
winnerEntitySet.add(winnerEntity);
- if (IS_DEBUG_MODE) {
- entityCommitLogCount++;
- }
+ entityCommitLogCount++;
+ break;
+ case LogType.ABORT:
+ //ignore
break;
default:
throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
@@ -239,11 +243,11 @@
ILocalResourceRepository localResourceRepository = appRuntimeContext.getLocalResourceRepository();
//#. set log reader to the lowWaterMarkLsn again.
- logReader.initializeScan(lowWaterMarkLsn);
+ logReader.initializeScan(lowWaterMarkLSN);
logRecord = logReader.next();
while (logRecord != null) {
- if (LogManager.IS_DEBUG_MODE) {
- System.out.println(logRecord.getLogRecordForDisplay());
+ if (IS_DEBUG_MODE) {
+ LOGGER.info(logRecord.getLogRecordForDisplay());
}
LSN = logRecord.getLSN();
jobId = logRecord.getJobId();
@@ -310,15 +314,14 @@
if (LSN > maxDiskLastLsn) {
redo(logRecord);
- if (IS_DEBUG_MODE) {
- redoCount++;
- }
+ redoCount++;
}
}
break;
case LogType.JOB_COMMIT:
case LogType.ENTITY_COMMIT:
+ case LogType.ABORT:
//do nothing
break;
@@ -338,9 +341,7 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("[RecoveryMgr] recovery is completed.");
- }
- if (IS_DEBUG_MODE) {
- System.out.println("[RecoveryMgr] Count: Update/EntityCommit/JobCommit/Redo = " + updateLogCount + "/"
+ LOGGER.info("[RecoveryMgr] Count: Update/EntityCommit/JobCommit/Redo = " + updateLogCount + "/"
+ entityCommitLogCount + "/" + jobCommitLogCount + "/" + redoCount);
}
}
@@ -393,7 +394,7 @@
throw new ACIDException(e);
}
}
- minMCTFirstLSN = -2;
+ minMCTFirstLSN = SHARP_CHECKPOINT_LSN;
} else {
long firstLSN;
minMCTFirstLSN = Long.MAX_VALUE;
@@ -403,7 +404,7 @@
minMCTFirstLSN = Math.min(minMCTFirstLSN, firstLSN);
}
} else {
- minMCTFirstLSN = -1;
+ minMCTFirstLSN = SHARP_CHECKPOINT_LSN;
}
}
CheckpointObject checkpointObject = new CheckpointObject(logMgr.getAppendLSN(), minMCTFirstLSN,
@@ -580,7 +581,7 @@
break;
} else {
if (IS_DEBUG_MODE) {
- System.out.println(logRecord.getLogRecordForDisplay());
+ LOGGER.info(logRecord.getLogRecordForDisplay());
}
currentLSN = logRecord.getLSN();
}
@@ -600,9 +601,9 @@
loserTxnTable.put(loserEntity, undoLSNSet);
}
undoLSNSet.add(Long.valueOf(currentLSN));
+ updateLogCount++;
if (IS_DEBUG_MODE) {
- updateLogCount++;
- System.out.println("" + Thread.currentThread().getId() + "======> update[" + currentLSN + "]:"
+ LOGGER.info("" + Thread.currentThread().getId() + "======> update[" + currentLSN + "]:"
+ tempKeyTxnId);
}
break;
@@ -611,14 +612,21 @@
throw new ACIDException("Unexpected LogType(" + logRecord.getLogType() + ") during abort.");
case LogType.ENTITY_COMMIT:
- loserTxnTable.remove(tempKeyTxnId);
+ undoLSNSet = loserTxnTable.remove(tempKeyTxnId);
+ if (undoLSNSet == null) {
+ undoLSNSet = loserTxnTable.remove(tempKeyTxnId);
+ }
+ entityCommitLogCount++;
if (IS_DEBUG_MODE) {
- entityCommitLogCount++;
- System.out.println("" + Thread.currentThread().getId() + "======> entity_commit[" + currentLSN + "]"
+ LOGGER.info("" + Thread.currentThread().getId() + "======> entity_commit[" + currentLSN + "]"
+ tempKeyTxnId);
}
break;
+ case LogType.ABORT:
+ //ignore
+ break;
+
default:
throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
}
@@ -650,23 +658,17 @@
throw new ACIDException("IllegalState exception during abort( " + txnContext.getJobId() + ")");
}
if (IS_DEBUG_MODE) {
- System.out.println(logRecord.getLogRecordForDisplay());
+ LOGGER.info(logRecord.getLogRecordForDisplay());
}
undo(logRecord);
- if (IS_DEBUG_MODE) {
- undoCount++;
- }
+ undoCount++;
}
}
-
logReader.close();
-
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info(" undone loser transaction's effect");
- }
- if (IS_DEBUG_MODE) {
- System.out.println("UpdateLogCount/CommitLogCount/UndoCount:" + updateLogCount + "/" + entityCommitLogCount
- + "/" + undoCount);
+ LOGGER.info("UpdateLogCount/CommitLogCount/UndoCount:" + updateLogCount + "/" + entityCommitLogCount + "/"
+ + undoCount);
}
}
@@ -765,6 +767,7 @@
this.datasetId = datasetId;
this.pkHashValue = pkHashValue;
this.tupleReferencePKValue = pkValue;
+ this.pkSize = pkSize;
isByteArrayPKValue = false;
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
index 678956b..0b0a6f5 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
@@ -221,4 +221,10 @@
public LogRecord getLogRecord() {
return logRecord;
}
+
+ public void cleanupForAbort() {
+ if (primaryIndexOpTracker != null) {
+ primaryIndexOpTracker.cleanupNumActiveOperationsForAbortedJob(primaryIndexCallback);
+ }
+ }
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
index 01b38c2..07fc152 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -53,7 +53,12 @@
txnCtx.setTxnState(ITransactionManager.ABORTED);
}
try {
- txnSubsystem.getRecoveryManager().rollbackTransaction(txnCtx);
+ if (txnCtx.isWriteTxn()) {
+ LogRecord logRecord = ((TransactionContext) txnCtx).getLogRecord();
+ logRecord.formJobTerminateLogRecord(txnCtx, false);
+ txnSubsystem.getLogManager().log(logRecord);
+ txnSubsystem.getRecoveryManager().rollbackTransaction(txnCtx);
+ }
} catch (Exception ae) {
String msg = "Could not complete rollback! System is in an inconsistent state";
if (LOGGER.isLoggable(Level.SEVERE)) {
@@ -62,6 +67,7 @@
ae.printStackTrace();
throw new ACIDException(msg, ae);
} finally {
+ ((TransactionContext) txnCtx).cleanupForAbort();
txnSubsystem.getLockManager().releaseLocks(txnCtx);
transactionContextRepository.remove(txnCtx.getJobId());
}
@@ -69,20 +75,24 @@
@Override
public ITransactionContext beginTransaction(JobId jobId) throws ACIDException {
- return getTransactionContext(jobId);
+ return getTransactionContext(jobId, true);
}
@Override
- public ITransactionContext getTransactionContext(JobId jobId) throws ACIDException {
+ public ITransactionContext getTransactionContext(JobId jobId, boolean createIfNotExist) throws ACIDException {
setMaxJobId(jobId.getId());
ITransactionContext txnCtx = transactionContextRepository.get(jobId);
if (txnCtx == null) {
- synchronized (this) {
- txnCtx = transactionContextRepository.get(jobId);
- if (txnCtx == null) {
- txnCtx = new TransactionContext(jobId, txnSubsystem);
- transactionContextRepository.put(jobId, txnCtx);
+ if (createIfNotExist) {
+ synchronized (this) {
+ txnCtx = transactionContextRepository.get(jobId);
+ if (txnCtx == null) {
+ txnCtx = new TransactionContext(jobId, txnSubsystem);
+ transactionContextRepository.put(jobId, txnCtx);
+ }
}
+ } else {
+ throw new ACIDException("TransactionContext of " + jobId + " doesn't exist.");
}
}
return txnCtx;
@@ -94,7 +104,7 @@
try {
if (txnCtx.isWriteTxn()) {
LogRecord logRecord = ((TransactionContext) txnCtx).getLogRecord();
- logRecord.formJobCommitLogRecord(txnCtx);
+ logRecord.formJobTerminateLogRecord(txnCtx, true);
txnSubsystem.getLogManager().log(logRecord);
}
} catch (Exception ae) {