corrected the way that the transaction contexts are create/retrieved concurrently and the job-level commit log records are written. Still the transaction context class needs changes in order to deal with multiple job-level commits caused by the partitioned indexes in a NC
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
index 59ffe5b..eee6bad 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
@@ -28,6 +28,7 @@
import edu.uci.ics.asterix.common.transactions.JobId;
import edu.uci.ics.asterix.common.transactions.MutableLong;
import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallback;
+import edu.uci.ics.asterix.transaction.management.service.logging.LogRecord;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMOperationType;
@@ -57,6 +58,7 @@
private PrimaryIndexOperationTracker primaryIndexOpTracker;
private final MutableLong tempResourceIdForRegister;
private final MutableLong tempResourceIdForSetLSN;
+ private final LogRecord logRecord;
public TransactionContext(JobId jobId, TransactionSubsystem transactionSubsystem) throws ACIDException {
this.jobId = jobId;
@@ -70,6 +72,7 @@
primaryIndex = null;
tempResourceIdForRegister = new MutableLong();
tempResourceIdForSetLSN = new MutableLong();
+ logRecord = new LogRecord();
}
public void registerIndexAndCallback(long resourceId, ILSMIndex index, AbstractOperationCallback callback,
@@ -196,4 +199,8 @@
sb.append("status: " + status + "\n");
return sb.toString();
}
+
+ public LogRecord getLogRecord() {
+ return logRecord;
+ }
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
index cf614a5..9e18ccf 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -16,22 +16,20 @@
import java.io.IOException;
import java.io.OutputStream;
-import java.util.HashMap;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
import edu.uci.ics.asterix.common.transactions.DatasetId;
-import edu.uci.ics.asterix.common.transactions.ILogRecord;
import edu.uci.ics.asterix.common.transactions.ITransactionContext;
import edu.uci.ics.asterix.common.transactions.ITransactionManager;
import edu.uci.ics.asterix.common.transactions.JobId;
import edu.uci.ics.asterix.transaction.management.service.logging.LogRecord;
import edu.uci.ics.asterix.transaction.management.service.logging.LogType;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
/**
@@ -43,13 +41,11 @@
public static final boolean IS_DEBUG_MODE = false;//true
private static final Logger LOGGER = Logger.getLogger(TransactionManager.class.getName());
private final TransactionSubsystem txnSubsystem;
- private Map<JobId, ITransactionContext> transactionContextRepository = new HashMap<JobId, ITransactionContext>();
+ private Map<JobId, ITransactionContext> transactionContextRepository = new ConcurrentHashMap<JobId, ITransactionContext>();
private AtomicInteger maxJobId = new AtomicInteger(0);
- private final ILogRecord logRecord;
public TransactionManager(TransactionSubsystem provider) {
this.txnSubsystem = provider;
- logRecord = new LogRecord();
}
@Override
@@ -59,7 +55,6 @@
if (txnContext.getTxnState().equals(TransactionState.ABORTED)) {
return;
}
-
try {
txnSubsystem.getRecoveryManager().rollbackTransaction(txnContext);
} catch (Exception ae) {
@@ -79,27 +74,18 @@
@Override
public ITransactionContext beginTransaction(JobId jobId) throws ACIDException {
- setMaxJobId(jobId.getId());
- ITransactionContext txnContext = new TransactionContext(jobId, txnSubsystem);
- synchronized (this) {
- transactionContextRepository.put(jobId, txnContext);
- }
- return txnContext;
+ return getTransactionContext(jobId);
}
@Override
- public ITransactionContext getTransactionContext(JobId jobId) throws ACIDException {
+ public synchronized ITransactionContext getTransactionContext(JobId jobId) throws ACIDException {
setMaxJobId(jobId.getId());
- synchronized (transactionContextRepository) {
-
- ITransactionContext context = transactionContextRepository.get(jobId);
- if (context == null) {
- context = transactionContextRepository.get(jobId);
- context = new TransactionContext(jobId, txnSubsystem);
- transactionContextRepository.put(jobId, context);
- }
- return context;
+ ITransactionContext txnCtx = transactionContextRepository.get(jobId);
+ if (txnCtx == null) {
+ txnCtx = new TransactionContext(jobId, txnSubsystem);
+ transactionContextRepository.put(jobId, txnCtx);
}
+ return txnCtx;
}
@Override
@@ -121,6 +107,7 @@
//for job-level commit
try {
if (txnCtx.getTransactionType().equals(ITransactionContext.TransactionType.READ_WRITE)) {
+ LogRecord logRecord = ((TransactionContext) txnCtx).getLogRecord();
logRecord.formCommitLogRecord(txnCtx, LogType.JOB_COMMIT, txnCtx.getJobId().getId(), -1, -1);
txnSubsystem.getLogManager().log(logRecord);
}