1) corrected the way that the logFlusher thread is stopped, 2) fixed the issue of multiple job-level commits, and 3) cleaned up the TransactionContext class
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java
index 269526b..76a2bee 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java
@@ -21,7 +21,6 @@
import edu.uci.ics.asterix.common.exceptions.ACIDException;
import edu.uci.ics.asterix.common.transactions.DatasetId;
import edu.uci.ics.asterix.common.transactions.ITransactionContext;
-import edu.uci.ics.asterix.common.transactions.ITransactionContext.TransactionType;
import edu.uci.ics.asterix.common.transactions.ITransactionManager;
import edu.uci.ics.asterix.common.transactions.JobId;
import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
@@ -35,7 +34,7 @@
import edu.uci.ics.hyracks.storage.am.bloomfilter.impls.MurmurHash128Bit;
public class CommitRuntime implements IPushRuntime {
-
+
private final static long SEED = 0L;
private final IHyracksTaskContext hyracksTaskCtx;
@@ -44,7 +43,7 @@
private final DatasetId datasetId;
private final int[] primaryKeyFields;
private final boolean isWriteTransaction;
- private final long[] longHashes;
+ private final long[] longHashes;
private ITransactionContext transactionContext;
private RecordDescriptor inputRecordDesc;
@@ -54,23 +53,22 @@
public CommitRuntime(IHyracksTaskContext ctx, JobId jobId, int datasetId, int[] primaryKeyFields,
boolean isWriteTransaction) {
this.hyracksTaskCtx = ctx;
- IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
- .getApplicationObject();
+ IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
+ .getApplicationContext().getApplicationObject();
this.transactionManager = runtimeCtx.getTransactionSubsystem().getTransactionManager();
this.jobId = jobId;
this.datasetId = new DatasetId(datasetId);
this.primaryKeyFields = primaryKeyFields;
this.frameTupleReference = new FrameTupleReference();
this.isWriteTransaction = isWriteTransaction;
- this.longHashes= new long[2];
+ this.longHashes = new long[2];
}
@Override
public void open() throws HyracksDataException {
try {
transactionContext = transactionManager.getTransactionContext(jobId);
- transactionContext.setTransactionType(isWriteTransaction ? TransactionType.READ_WRITE
- : TransactionType.READ);
+ transactionContext.isWriteTxn(isWriteTransaction);
} catch (ACIDException e) {
throw new HyracksDataException(e);
}
@@ -91,10 +89,10 @@
}
}
}
-
+
private int computePrimaryKeyHashValue(ITupleReference tuple, int[] primaryKeyFields) {
MurmurHash128Bit.hash3_x64_128(tuple, primaryKeyFields, SEED, longHashes);
- return Math.abs((int) longHashes[0]);
+ return Math.abs((int) longHashes[0]);
}
@Override
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
index 18a81e5..bba6272 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
@@ -121,7 +121,7 @@
IAsterixAppRuntimeContextProvider asterixAppRuntimeContextProvider = new AsterixAppRuntimeContextProviderForRecovery(
this);
txnSubsystem = new TransactionSubsystem(ncApplicationContext.getNodeId(), asterixAppRuntimeContextProvider,
- txnProperties);
+ txnProperties, ioManager.getIODevices().size());
isShuttingdown = false;
// The order of registration is important. The buffer cache must registered before recovery and transaction managers.
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogPage.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogPage.java
index 7d9bba7..be205a9 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogPage.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogPage.java
@@ -18,6 +18,6 @@
public void append(ILogRecord logRecord, long appendLsn);
- public void flush() throws InterruptedException;
+ public void flush();
}
\ No newline at end of file
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionContext.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionContext.java
index b9ad1bd..8984403 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionContext.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionContext.java
@@ -14,7 +14,6 @@
*/
package edu.uci.ics.asterix.common.transactions;
-import edu.uci.ics.asterix.common.transactions.ITransactionManager.TransactionState;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
public interface ITransactionContext {
@@ -24,17 +23,13 @@
public JobId getJobId();
- public void setStartWaitTime(long time);
+ public void isTimeout(boolean isTimeout);
- public long getStartWaitTime();
+ public boolean isTimeout();
- public void setStatus(int status);
+ public void setTxnState(int txnState);
- public int getStatus();
-
- public void setTxnState(TransactionState txnState);
-
- public TransactionState getTxnState();
+ public int getTxnState();
public long getFirstLSN();
@@ -42,25 +37,20 @@
public void setLastLSN(long resourceId, long LSN);
- public TransactionType getTransactionType();
+ public boolean isWriteTxn();
- public void setTransactionType(TransactionType transactionType);
+ public void isWriteTxn(boolean isWriterTxn);
public String prettyPrint();
- // used for showing a transaction is not waiting.
- public static final long INVALID_TIME = -1l;
+ public void isMetadataTransaction(boolean isMetadataTxn);
- public static final int ACTIVE_STATUS = 0;
- public static final int TIMED_OUT_STATUS = 1;
+ public boolean isMetadataTransaction();
- public enum TransactionType {
- READ,
- READ_WRITE
- }
+ public void notifyOptracker(boolean isJobLevelCommit);
- public void setExclusiveJobLevelCommit();
+ public void decrementNumOfActiveJobs();
- void notifyOptracker(boolean isJobLevelCommit);
+ public int getNumOfActiveJobs();
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionManager.java
index 9a99dc9..5404ceb 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionManager.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionManager.java
@@ -29,12 +29,17 @@
* transaction has committed. ABORTED: The transaction has aborted.
* TIMED_OUT: The transaction has timed out waiting to acquire a lock.
*/
- public enum TransactionState {
- ACTIVE,
- COMMITTED,
- ABORTED,
- TIMED_OUT,
- };
+// public enum TransactionState {
+// ACTIVE,
+// COMMITTED,
+// ABORTED,
+// TIMED_OUT,
+// };
+
+ public static final int ACTIVE = 0;
+ public static final int COMMITTED = 1;
+ public static final int ABORTED = 2;
+ public static final int TIMED_OUT = 3;
/**
* Begins a transaction identified by a transaction id and returns the
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
index 13f1ec0..f48f267 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
@@ -29,7 +29,6 @@
import edu.uci.ics.asterix.common.transactions.DatasetId;
import edu.uci.ics.asterix.common.transactions.IRecoveryManager.ResourceType;
import edu.uci.ics.asterix.common.transactions.ITransactionContext;
-import edu.uci.ics.asterix.common.transactions.ITransactionContext.TransactionType;
import edu.uci.ics.asterix.common.transactions.ITransactionSubsystem;
import edu.uci.ics.asterix.common.transactions.JobId;
import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
@@ -111,13 +110,12 @@
@Override
public void beginTransaction(JobId transactionId) throws ACIDException, RemoteException {
ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().beginTransaction(transactionId);
- txnCtx.setExclusiveJobLevelCommit();
+ txnCtx.isMetadataTransaction(true);
}
@Override
public void commitTransaction(JobId jobId) throws RemoteException, ACIDException {
ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId);
- txnCtx.setExclusiveJobLevelCommit();
transactionSubsystem.getTransactionManager().commitTransaction(txnCtx, new DatasetId(-1), -1);
}
@@ -276,7 +274,7 @@
ILSMIndexAccessor indexAccessor = lsmIndex.createAccessor(modCallback, NoOpOperationCallback.INSTANCE);
ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId);
- txnCtx.setTransactionType(TransactionType.READ_WRITE);
+ txnCtx.isWriteTxn(true);
txnCtx.registerIndexAndCallback(resourceID, lsmIndex, (AbstractOperationCallback) modCallback,
metadataIndex.isPrimaryIndex());
@@ -584,7 +582,7 @@
ILSMIndexAccessor indexAccessor = lsmIndex.createAccessor(modCallback, NoOpOperationCallback.INSTANCE);
ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId);
- txnCtx.setTransactionType(TransactionType.READ_WRITE);
+ txnCtx.isWriteTxn(true);
txnCtx.registerIndexAndCallback(resourceID, lsmIndex, (AbstractOperationCallback) modCallback,
metadataIndex.isPrimaryIndex());
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/job/listener/JobEventListenerFactory.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/job/listener/JobEventListenerFactory.java
index 8092ecd..2782e63 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/job/listener/JobEventListenerFactory.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/job/listener/JobEventListenerFactory.java
@@ -18,7 +18,6 @@
import edu.uci.ics.asterix.common.exceptions.ACIDException;
import edu.uci.ics.asterix.common.transactions.DatasetId;
import edu.uci.ics.asterix.common.transactions.ITransactionContext;
-import edu.uci.ics.asterix.common.transactions.ITransactionContext.TransactionType;
import edu.uci.ics.asterix.common.transactions.ITransactionManager;
import edu.uci.ics.asterix.common.transactions.JobId;
import edu.uci.ics.hyracks.api.context.IHyracksJobletContext;
@@ -51,8 +50,7 @@
ITransactionManager txnManager = ((IAsterixAppRuntimeContext) jobletContext.getApplicationContext()
.getApplicationObject()).getTransactionSubsystem().getTransactionManager();
ITransactionContext txnContext = txnManager.getTransactionContext(jobId);
- txnContext.setTransactionType(transactionalWrite ? TransactionType.READ_WRITE
- : TransactionType.READ);
+ txnContext.isWriteTxn(transactionalWrite);
txnManager.completedTransaction(txnContext, new DatasetId(-1), -1,
!(jobStatus == JobStatus.FAILURE));
} catch (ACIDException e) {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
index fdfc822..759b45b 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
@@ -31,7 +31,7 @@
import edu.uci.ics.asterix.common.transactions.ILockManager;
import edu.uci.ics.asterix.common.transactions.ILogRecord;
import edu.uci.ics.asterix.common.transactions.ITransactionContext;
-import edu.uci.ics.asterix.common.transactions.ITransactionManager.TransactionState;
+import edu.uci.ics.asterix.common.transactions.ITransactionManager;
import edu.uci.ics.asterix.common.transactions.JobId;
import edu.uci.ics.asterix.transaction.management.service.logging.LogPage;
import edu.uci.ics.asterix.transaction.management.service.logging.LogPageReader;
@@ -305,9 +305,9 @@
}
private void validateJob(ITransactionContext txnContext) throws ACIDException {
- if (txnContext.getTxnState() == TransactionState.ABORTED) {
+ if (txnContext.getTxnState() == ITransactionManager.ABORTED) {
throw new ACIDException("" + txnContext.getJobId() + " is in ABORTED state.");
- } else if (txnContext.getStatus() == ITransactionContext.TIMED_OUT_STATUS) {
+ } else if (txnContext.isTimeout()) {
requestAbort(txnContext);
}
}
@@ -708,7 +708,7 @@
//The lock will be released when the entity-commit log is flushed.
if (commitFlag && entityInfoManager.getEntityLockCount(entityInfo) == 1
&& entityInfoManager.getDatasetLockCount(entityInfo) == 1) {
- if (txnContext.getTransactionType().equals(ITransactionContext.TransactionType.READ_WRITE)) {
+ if (txnContext.isWriteTxn()) {
logRecord.formCommitLogRecord(txnContext, LogType.ENTITY_COMMIT, jobId.getId(), datasetId.getId(),
entityHashValue);
txnSubsystem.getLogManager().log(logRecord);
@@ -972,8 +972,8 @@
jobHT.remove(jobId);
if (existWaiter) {
- txnContext.setStatus(ITransactionContext.TIMED_OUT_STATUS);
- txnContext.setTxnState(TransactionState.ABORTED);
+ txnContext.isTimeout(true);
+ txnContext.setTxnState(ITransactionManager.ABORTED);
}
if (IS_DEBUG_MODE) {
@@ -1823,7 +1823,7 @@
//waiter woke up -> remove/deallocate waiter object and abort if timeout
latchLockTable();
- if (txnContext.getStatus() == ITransactionContext.TIMED_OUT_STATUS || waiter.isVictim()) {
+ if (txnContext.isTimeout() || waiter.isVictim()) {
requestAbort(txnContext);
}
@@ -1880,8 +1880,7 @@
}
private void requestAbort(ITransactionContext txnContext) throws ACIDException {
- txnContext.setStatus(ITransactionContext.TIMED_OUT_STATUS);
- txnContext.setStartWaitTime(ITransactionContext.INVALID_TIME);
+ txnContext.isTimeout(true);
throw new ACIDException("Transaction " + txnContext.getJobId()
+ " should abort (requested by the Lock Manager)");
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java
index e97c177..43164cf 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java
@@ -21,17 +21,13 @@
import java.util.NoSuchElementException;
import java.util.Scanner;
-import edu.uci.ics.asterix.common.config.AsterixCompilerProperties;
-import edu.uci.ics.asterix.common.config.AsterixExternalProperties;
-import edu.uci.ics.asterix.common.config.AsterixMetadataProperties;
import edu.uci.ics.asterix.common.config.AsterixPropertiesAccessor;
-import edu.uci.ics.asterix.common.config.AsterixStorageProperties;
import edu.uci.ics.asterix.common.config.AsterixTransactionProperties;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.common.transactions.DatasetId;
import edu.uci.ics.asterix.common.transactions.ILockManager;
-import edu.uci.ics.asterix.common.transactions.ITransactionManager.TransactionState;
+import edu.uci.ics.asterix.common.transactions.ITransactionManager;
import edu.uci.ics.asterix.common.transactions.JobId;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
@@ -62,7 +58,7 @@
public LockRequestController(String requestFileName) throws ACIDException, AsterixException {
this.txnProvider = new TransactionSubsystem("LockManagerPredefinedUnitTest", null,
- new AsterixTransactionProperties(new AsterixPropertiesAccessor()));
+ new AsterixTransactionProperties(new AsterixPropertiesAccessor()), 1);
this.workerReadyQueue = new WorkerReadyQueue();
this.requestList = new ArrayList<LockRequest>();
this.expectedResultList = new ArrayList<ArrayList<Integer>>();
@@ -281,7 +277,7 @@
lockMode = scanner.next();
txnContext = jobMap.get(jobId);
if (txnContext == null) {
- txnContext = new TransactionContext(new JobId(jobId), txnProvider);
+ txnContext = new TransactionContext(new JobId(jobId), txnProvider, 1);
jobMap.put(jobId, txnContext);
}
log("LockRequest[" + i++ + "]:T" + threadId + "," + requestType + ",J" + jobId + ",D" + datasetId
@@ -428,9 +424,9 @@
try {
sendRequest(lockRequest);
} catch (ACIDException e) {
- if (lockRequest.txnContext.getStatus() == TransactionContext.TIMED_OUT_STATUS) {
- if (lockRequest.txnContext.getTxnState() != TransactionState.ABORTED) {
- lockRequest.txnContext.setTxnState(TransactionState.ABORTED);
+ if (lockRequest.txnContext.isTimeout()) {
+ if (lockRequest.txnContext.getTxnState() != ITransactionManager.ABORTED) {
+ lockRequest.txnContext.setTxnState(ITransactionManager.ABORTED);
log("*** " + lockRequest.txnContext.getJobId() + " lock request causing deadlock ***");
log("Abort --> Releasing all locks acquired by " + lockRequest.txnContext.getJobId());
try {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerRandomUnitTest.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerRandomUnitTest.java
index f73460d..aba41a9 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerRandomUnitTest.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerRandomUnitTest.java
@@ -24,7 +24,7 @@
import edu.uci.ics.asterix.common.transactions.DatasetId;
import edu.uci.ics.asterix.common.transactions.ILockManager;
import edu.uci.ics.asterix.common.transactions.ITransactionContext;
-import edu.uci.ics.asterix.common.transactions.ITransactionManager.TransactionState;
+import edu.uci.ics.asterix.common.transactions.ITransactionManager;
import edu.uci.ics.asterix.common.transactions.JobId;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
@@ -48,7 +48,7 @@
public static void main(String args[]) throws ACIDException, AsterixException {
int i;
TransactionSubsystem txnProvider = new TransactionSubsystem("LockManagerRandomUnitTest", null,
- new AsterixTransactionProperties(new AsterixPropertiesAccessor()));
+ new AsterixTransactionProperties(new AsterixPropertiesAccessor()), 1);
rand = new Random(System.currentTimeMillis());
for (i = 0; i < MAX_NUM_OF_ENTITY_LOCK_JOB; i++) {
System.out.println("Creating " + i + "th EntityLockJob..");
@@ -119,7 +119,7 @@
private static TransactionContext generateTxnContext(TransactionSubsystem txnProvider) {
try {
- return new TransactionContext(new JobId(jobId++), txnProvider);
+ return new TransactionContext(new JobId(jobId++), txnProvider, 1);
} catch (ACIDException e) {
e.printStackTrace();
return null;
@@ -287,8 +287,8 @@
} else {
try {
synchronized (txnContext) {
- if (txnContext.getTxnState() != TransactionState.ABORTED) {
- txnContext.setTxnState(TransactionState.ABORTED);
+ if (txnContext.getTxnState() != ITransactionManager.ABORTED) {
+ txnContext.setTxnState(ITransactionManager.ABORTED);
mayRelease = true;
}
}
@@ -446,9 +446,9 @@
try {
lockMgr.lock(request.datasetIdObj, request.entityHashValue, request.lockMode, request.txnContext);
} catch (ACIDException e) {
- if (request.txnContext.getStatus() == TransactionContext.TIMED_OUT_STATUS) {
- if (request.txnContext.getTxnState() != TransactionState.ABORTED) {
- request.txnContext.setTxnState(TransactionState.ABORTED);
+ if (request.txnContext.isTimeout()) {
+ if (request.txnContext.getTxnState() != ITransactionManager.ABORTED) {
+ request.txnContext.setTxnState(ITransactionManager.ABORTED);
log("*** " + request.txnContext.getJobId() + " lock request causing deadlock ***");
log("Abort --> Releasing all locks acquired by " + request.txnContext.getJobId());
try {
@@ -469,9 +469,9 @@
lockMgr.instantLock(request.datasetIdObj, request.entityHashValue, request.lockMode,
request.txnContext);
} catch (ACIDException e) {
- if (request.txnContext.getStatus() == TransactionContext.TIMED_OUT_STATUS) {
- if (request.txnContext.getTxnState() != TransactionState.ABORTED) {
- request.txnContext.setTxnState(TransactionState.ABORTED);
+ if (request.txnContext.isTimeout()) {
+ if (request.txnContext.getTxnState() != ITransactionManager.ABORTED) {
+ request.txnContext.setTxnState(ITransactionManager.ABORTED);
log("*** " + request.txnContext.getJobId() + " lock request causing deadlock ***");
log("Abort --> Releasing all locks acquired by " + request.txnContext.getJobId());
try {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
index 49c07c1..e378789 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
@@ -387,7 +387,10 @@
public void terminate() {
if (flushPage != null) {
- flushPage.isStop(true);
+ synchronized (flushPage) {
+ flushPage.isStop(true);
+ flushPage.notify();
+ }
}
stop = true;
this.interrupt();
@@ -400,6 +403,9 @@
try {
flushPage = flushQ.take();
flushPage.flush();
+ if (stop) {
+ break;
+ }
} catch (InterruptedException e) {
if (stop) {
break;
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
index 5c051a3..04200a7 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
@@ -120,7 +120,7 @@
////////////////////////////////////
@Override
- public void flush() throws InterruptedException {
+ public void flush() {
try {
int endOffset;
while (!full.get()) {
@@ -128,12 +128,12 @@
if (appendOffset - flushOffset == 0 && !full.get()) {
try {
this.wait();
- } catch (InterruptedException e) {
if (stop) {
- throw e;
- } else {
- throw new IllegalStateException(e);
+ fileChannel.close();
+ break;
}
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(e);
}
}
endOffset = appendOffset;
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
index eee6bad..504c5e2 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
@@ -17,6 +17,8 @@
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import edu.uci.ics.asterix.common.context.BaseOperationTracker;
@@ -24,7 +26,7 @@
import edu.uci.ics.asterix.common.exceptions.ACIDException;
import edu.uci.ics.asterix.common.transactions.AbstractOperationCallback;
import edu.uci.ics.asterix.common.transactions.ITransactionContext;
-import edu.uci.ics.asterix.common.transactions.ITransactionManager.TransactionState;
+import edu.uci.ics.asterix.common.transactions.ITransactionManager;
import edu.uci.ics.asterix.common.transactions.JobId;
import edu.uci.ics.asterix.common.transactions.MutableLong;
import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallback;
@@ -33,41 +35,73 @@
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMOperationType;
-/**
- * Represents a holder object that contains all information related to a
- * transaction. A TransactionContext instance can be used as a token and
- * provided to Transaction sub-systems (Log/Lock/Recovery/Transaction)Manager to
- * initiate an operation on the behalf of the transaction associated with the
- * context.
+/*
+ * An object of TransactionContext is created and accessed(read/written) by multiple threads which work for
+ * a single job identified by a jobId. Thus, the member variables in the object can be read/written
+ * concurrently. Please see each variable declaration to know which one is accessed concurrently and
+ * which one is not.
*/
public class TransactionContext implements ITransactionContext, Serializable {
private static final long serialVersionUID = -6105616785783310111L;
private TransactionSubsystem transactionSubsystem;
- private final AtomicLong firstLSN;
- private final AtomicLong lastLSN;
- private TransactionState txnState;
- private long startWaitTime;
- private int status;
- private TransactionType transactionType = TransactionType.READ;
- private JobId jobId;
- private boolean exlusiveJobLevelCommit;
- private final Map<MutableLong, BaseOperationTracker> indexMap;
+
+ //jobId is set once and read concurrently.
+ private final JobId jobId;
+
+ //There are no concurrent writers on both firstLSN and lastLSN
+ //since both values are updated by serialized log appenders.
+ //But readers and writers can be different threads,
+ //so both LSNs are atomic variables in order to be read and written atomically.
+ private AtomicLong firstLSN;
+ private AtomicLong lastLSN;
+
+ //txnState is read and written concurrently.
+ private AtomicInteger txnState;
+
+ //isTimeout are read and written under the lockMgr's tableLatch
+ //Thus, no other synchronization is required separately.
+ private boolean isTimeout;
+
+ //isWriteTxn can be set concurrently by multiple threads.
+ private AtomicBoolean isWriteTxn;
+
+ //isMetadataTxn is accessed by a single thread since the metadata is not partitioned
+ private boolean isMetadataTxn;
+
+ //indexMap is concurrently accessed by multiple threads,
+ //so those threads are synchronized on indexMap object itself
+ private Map<MutableLong, BaseOperationTracker> indexMap;
+
+ //TODO: fix ComponentLSNs' issues.
+ //primaryIndex, primaryIndexCallback, and primaryIndexOptracker will be modified accordingly
+ //when the issues of componentLSNs are fixed.
private ILSMIndex primaryIndex;
private PrimaryIndexModificationOperationCallback primaryIndexCallback;
private PrimaryIndexOperationTracker primaryIndexOpTracker;
- private final MutableLong tempResourceIdForRegister;
- private final MutableLong tempResourceIdForSetLSN;
- private final LogRecord logRecord;
- public TransactionContext(JobId jobId, TransactionSubsystem transactionSubsystem) throws ACIDException {
+ //The following three variables are used as temporary variables in order to avoid object creations.
+ //Those are used in synchronized methods.
+ private MutableLong tempResourceIdForRegister;
+ private MutableLong tempResourceIdForSetLSN;
+ private LogRecord logRecord;
+
+ //numOfActiveJobs is accessed under a synchronized block on TransactionContext in callers.
+ private int numOfActiveJobs;
+
+ //TODO: implement transactionContext pool in order to avoid object creations.
+ // also, the pool can throttle the number of concurrent active jobs at every moment.
+ public TransactionContext(JobId jobId, TransactionSubsystem transactionSubsystem, int numOfPartitions)
+ throws ACIDException {
this.jobId = jobId;
this.transactionSubsystem = transactionSubsystem;
+ this.numOfActiveJobs = numOfPartitions;
firstLSN = new AtomicLong(-1);
lastLSN = new AtomicLong(-1);
- txnState = TransactionState.ACTIVE;
- startWaitTime = INVALID_TIME;
- status = ACTIVE_STATUS;
+ txnState = new AtomicInteger(ITransactionManager.ACTIVE);
+ isTimeout = false;
+ isWriteTxn = new AtomicBoolean(false);
+ isMetadataTxn = false;
indexMap = new HashMap<MutableLong, BaseOperationTracker>();
primaryIndex = null;
tempResourceIdForRegister = new MutableLong();
@@ -109,9 +143,9 @@
@Override
public void notifyOptracker(boolean isJobLevelCommit) {
try {
- if (isJobLevelCommit && exlusiveJobLevelCommit) {
+ if (isJobLevelCommit && isMetadataTxn) {
primaryIndexOpTracker.exclusiveJobCommitted();
- } else if (!isJobLevelCommit){
+ } else if (!isJobLevelCommit) {
primaryIndexOpTracker
.completeOperation(null, LSMOperationType.MODIFICATION, null, primaryIndexCallback);
}
@@ -120,12 +154,12 @@
}
}
- public void setTransactionType(TransactionType transactionType) {
- this.transactionType = transactionType;
+ public void isWriteTxn(boolean isWriteTxn) {
+ this.isWriteTxn.set(isWriteTxn);
}
- public TransactionType getTransactionType() {
- return transactionType;
+ public boolean isWriteTxn() {
+ return isWriteTxn.get();
}
@Override
@@ -138,39 +172,24 @@
return lastLSN.get();
}
- public void setLastLSN(long LSN) {
- if (firstLSN.get() == -1) {
- firstLSN.set(LSN);
- }
- lastLSN.set(LSN);
- }
-
public JobId getJobId() {
return jobId;
}
- public void setStartWaitTime(long time) {
- this.startWaitTime = time;
+ public void isTimeout(boolean isTimeout) {
+ this.isTimeout = isTimeout;
}
- public long getStartWaitTime() {
- return startWaitTime;
+ public boolean isTimeout() {
+ return isTimeout;
}
- public void setStatus(int status) {
- this.status = status;
+ public void setTxnState(int txnState) {
+ this.txnState.set(txnState);
}
- public int getStatus() {
- return status;
- }
-
- public void setTxnState(TransactionState txnState) {
- this.txnState = txnState;
- }
-
- public TransactionState getTxnState() {
- return txnState;
+ public int getTxnState() {
+ return txnState.get();
}
@Override
@@ -184,23 +203,38 @@
}
@Override
- public void setExclusiveJobLevelCommit() {
- exlusiveJobLevelCommit = true;
+ public void isMetadataTransaction(boolean isMetadataTxn) {
+ this.isMetadataTxn = isMetadataTxn;
+ }
+
+ @Override
+ public boolean isMetadataTransaction() {
+ return isMetadataTxn;
}
public String prettyPrint() {
StringBuilder sb = new StringBuilder();
sb.append("\n" + jobId + "\n");
- sb.append("transactionType: " + transactionType);
+ sb.append("isWriterTxn: " + isWriteTxn + "\n");
sb.append("firstLSN: " + firstLSN.get() + "\n");
sb.append("lastLSN: " + lastLSN.get() + "\n");
sb.append("TransactionState: " + txnState + "\n");
- sb.append("startWaitTime: " + startWaitTime + "\n");
- sb.append("status: " + status + "\n");
+ sb.append("status: " + isTimeout + "\n");
return sb.toString();
}
public LogRecord getLogRecord() {
return logRecord;
}
+
+ @Override
+ public void decrementNumOfActiveJobs() {
+ assert numOfActiveJobs > 0;
+ numOfActiveJobs--;
+ }
+
+ @Override
+ public int getNumOfActiveJobs() {
+ return numOfActiveJobs;
+ }
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
index 9e18ccf..b73b2db 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -41,22 +41,29 @@
public static final boolean IS_DEBUG_MODE = false;//true
private static final Logger LOGGER = Logger.getLogger(TransactionManager.class.getName());
private final TransactionSubsystem txnSubsystem;
+ private final int numOfPartitions;
private Map<JobId, ITransactionContext> transactionContextRepository = new ConcurrentHashMap<JobId, ITransactionContext>();
private AtomicInteger maxJobId = new AtomicInteger(0);
- public TransactionManager(TransactionSubsystem provider) {
+ public TransactionManager(TransactionSubsystem provider, int numOfPartitions) {
this.txnSubsystem = provider;
+ this.numOfPartitions = numOfPartitions;
}
@Override
- public void abortTransaction(ITransactionContext txnContext, DatasetId datasetId, int PKHashVal)
- throws ACIDException {
- synchronized (txnContext) {
- if (txnContext.getTxnState().equals(TransactionState.ABORTED)) {
- return;
+ public void abortTransaction(ITransactionContext txnCtx, DatasetId datasetId, int PKHashVal) throws ACIDException {
+ synchronized (txnCtx) {
+ if (txnCtx.getTxnState() != ITransactionManager.ABORTED) {
+ txnCtx.setTxnState(ITransactionManager.ABORTED);
+ }
+ if (!txnCtx.isMetadataTransaction()) {
+ txnCtx.decrementNumOfActiveJobs();
+ if (txnCtx.getNumOfActiveJobs() != 0) {
+ return;
+ }
}
try {
- txnSubsystem.getRecoveryManager().rollbackTransaction(txnContext);
+ txnSubsystem.getRecoveryManager().rollbackTransaction(txnCtx);
} catch (Exception ae) {
String msg = "Could not complete rollback! System is in an inconsistent state";
if (LOGGER.isLoggable(Level.SEVERE)) {
@@ -65,9 +72,8 @@
ae.printStackTrace();
throw new Error(msg);
} finally {
- txnSubsystem.getLockManager().releaseLocks(txnContext);
- transactionContextRepository.remove(txnContext.getJobId());
- txnContext.setTxnState(TransactionState.ABORTED);
+ txnSubsystem.getLockManager().releaseLocks(txnCtx);
+ transactionContextRepository.remove(txnCtx.getJobId());
}
}
}
@@ -82,7 +88,7 @@
setMaxJobId(jobId.getId());
ITransactionContext txnCtx = transactionContextRepository.get(jobId);
if (txnCtx == null) {
- txnCtx = new TransactionContext(jobId, txnSubsystem);
+ txnCtx = new TransactionContext(jobId, txnSubsystem, numOfPartitions);
transactionContextRepository.put(jobId, txnCtx);
}
return txnCtx;
@@ -91,10 +97,6 @@
@Override
public void commitTransaction(ITransactionContext txnCtx, DatasetId datasetId, int PKHashVal) throws ACIDException {
synchronized (txnCtx) {
- if ((txnCtx.getTxnState().equals(TransactionState.COMMITTED))) {
- return;
- }
-
//There is either job-level commit or entity-level commit.
//The job-level commit will have -1 value both for datasetId and PKHashVal.
@@ -105,8 +107,14 @@
}
//for job-level commit
+ if (!txnCtx.isMetadataTransaction()) {
+ txnCtx.decrementNumOfActiveJobs();
+ if (txnCtx.getNumOfActiveJobs() != 0) {
+ return;
+ }
+ }
try {
- if (txnCtx.getTransactionType().equals(ITransactionContext.TransactionType.READ_WRITE)) {
+ if (txnCtx.isWriteTxn()) {
LogRecord logRecord = ((TransactionContext) txnCtx).getLogRecord();
logRecord.formCommitLogRecord(txnCtx, LogType.JOB_COMMIT, txnCtx.getJobId().getId(), -1, -1);
txnSubsystem.getLogManager().log(logRecord);
@@ -119,7 +127,7 @@
} finally {
txnSubsystem.getLockManager().releaseLocks(txnCtx); // release
transactionContextRepository.remove(txnCtx.getJobId());
- txnCtx.setTxnState(TransactionState.COMMITTED);
+ txnCtx.setTxnState(ITransactionManager.COMMITTED);
}
}
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionSubsystem.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionSubsystem.java
index 2dd7e64..dc06463 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionSubsystem.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionSubsystem.java
@@ -42,12 +42,12 @@
private final AsterixTransactionProperties txnProperties;
public TransactionSubsystem(String id, IAsterixAppRuntimeContextProvider asterixAppRuntimeContextProvider,
- AsterixTransactionProperties txnProperties) throws ACIDException {
+ AsterixTransactionProperties txnProperties, int numOfPartitions) throws ACIDException {
this.id = id;
this.txnProperties = txnProperties;
- this.transactionManager = new TransactionManager(this);
- this.logManager = new LogManager(this);
+ this.transactionManager = new TransactionManager(this, numOfPartitions);
this.lockManager = new LockManager(this);
+ this.logManager = new LogManager(this);
this.recoveryManager = new RecoveryManager(this);
this.asterixAppRuntimeContextProvider = asterixAppRuntimeContextProvider;
if (asterixAppRuntimeContextProvider != null) {