removed the information of the numOfPartitions and refined synchronization blocks in transaction related classes.
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
index bba6272..18a81e5 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContext.java
@@ -121,7 +121,7 @@
IAsterixAppRuntimeContextProvider asterixAppRuntimeContextProvider = new AsterixAppRuntimeContextProviderForRecovery(
this);
txnSubsystem = new TransactionSubsystem(ncApplicationContext.getNodeId(), asterixAppRuntimeContextProvider,
- txnProperties, ioManager.getIODevices().size());
+ txnProperties);
isShuttingdown = false;
// The order of registration is important. The buffer cache must registered before recovery and transaction managers.
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
index 418e0a6..3fa9e56 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/aql/translator/AqlTranslator.java
@@ -128,6 +128,7 @@
ADDED_PENDINGOP_RECORD_TO_METADATA
}
+ public static final boolean IS_DEBUG_MODE = true;//true
private final List<Statement> aqlStatements;
private final PrintWriter out;
private final SessionConfig sessionConfig;
@@ -1506,8 +1507,9 @@
private void abort(Exception rootE, Exception parentE, MetadataTransactionContext mdTxnCtx) {
try {
- //TODO: remove stacktrace
- rootE.printStackTrace();
+ if (IS_DEBUG_MODE) {
+ rootE.printStackTrace();
+ }
MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
} catch (Exception e2) {
parentE.addSuppressed(e2);
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionContext.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionContext.java
index 8984403..531b10f 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionContext.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionContext.java
@@ -48,9 +48,4 @@
public boolean isMetadataTransaction();
public void notifyOptracker(boolean isJobLevelCommit);
-
- public void decrementNumOfActiveJobs();
-
- public int getNumOfActiveJobs();
-
}
diff --git a/asterix-common/src/test/java/edu/uci/ics/asterix/test/aql/TestsUtils.java b/asterix-common/src/test/java/edu/uci/ics/asterix/test/aql/TestsUtils.java
index 861cce5..560e0bd 100644
--- a/asterix-common/src/test/java/edu/uci/ics/asterix/test/aql/TestsUtils.java
+++ b/asterix-common/src/test/java/edu/uci/ics/asterix/test/aql/TestsUtils.java
@@ -332,6 +332,9 @@
for (TestFileContext ctx : testFileCtxs) {
testFile = ctx.getFile();
+ if (!testFile.getName().contains("insert-into-loaded-dataset_02")) {
+ continue;
+ }
statement = TestsUtils.readTestFile(testFile);
try {
switch (ctx.getType()) {
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
index 3602974..be95c99 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -92,6 +92,7 @@
*/
public class MetadataBootstrap {
private static final Logger LOGGER = Logger.getLogger(MetadataBootstrap.class.getName());
+ public static final boolean IS_DEBUG_MODE = true;//true
private static IAsterixAppRuntimeContext runtimeContext;
@@ -190,8 +191,9 @@
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
} catch (Exception e) {
try {
- //TODO: remove stacktrace
- e.printStackTrace();
+ if (IS_DEBUG_MODE) {
+ e.printStackTrace();
+ }
MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
} catch (Exception e2) {
e.addSuppressed(e2);
@@ -434,8 +436,9 @@
}
} catch (Exception e) {
try {
- //TODO: remove stacktrace
- e.printStackTrace();
+ if (IS_DEBUG_MODE) {
+ e.printStackTrace();
+ }
MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
} catch (Exception e2) {
e.addSuppressed(e2);
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java
index 43164cf..caf04e0 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerDeterministicUnitTest.java
@@ -58,7 +58,7 @@
public LockRequestController(String requestFileName) throws ACIDException, AsterixException {
this.txnProvider = new TransactionSubsystem("LockManagerPredefinedUnitTest", null,
- new AsterixTransactionProperties(new AsterixPropertiesAccessor()), 1);
+ new AsterixTransactionProperties(new AsterixPropertiesAccessor()));
this.workerReadyQueue = new WorkerReadyQueue();
this.requestList = new ArrayList<LockRequest>();
this.expectedResultList = new ArrayList<ArrayList<Integer>>();
@@ -277,7 +277,7 @@
lockMode = scanner.next();
txnContext = jobMap.get(jobId);
if (txnContext == null) {
- txnContext = new TransactionContext(new JobId(jobId), txnProvider, 1);
+ txnContext = new TransactionContext(new JobId(jobId), txnProvider);
jobMap.put(jobId, txnContext);
}
log("LockRequest[" + i++ + "]:T" + threadId + "," + requestType + ",J" + jobId + ",D" + datasetId
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerRandomUnitTest.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerRandomUnitTest.java
index aba41a9..e6f2798 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerRandomUnitTest.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManagerRandomUnitTest.java
@@ -48,7 +48,7 @@
public static void main(String args[]) throws ACIDException, AsterixException {
int i;
TransactionSubsystem txnProvider = new TransactionSubsystem("LockManagerRandomUnitTest", null,
- new AsterixTransactionProperties(new AsterixPropertiesAccessor()), 1);
+ new AsterixTransactionProperties(new AsterixPropertiesAccessor()));
rand = new Random(System.currentTimeMillis());
for (i = 0; i < MAX_NUM_OF_ENTITY_LOCK_JOB; i++) {
System.out.println("Creating " + i + "th EntityLockJob..");
@@ -119,7 +119,7 @@
private static TransactionContext generateTxnContext(TransactionSubsystem txnProvider) {
try {
- return new TransactionContext(new JobId(jobId++), txnProvider, 1);
+ return new TransactionContext(new JobId(jobId++), txnProvider);
} catch (ACIDException e) {
e.printStackTrace();
return null;
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
index f5c1264..d51a95b 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
@@ -29,7 +29,7 @@
public class LogPage implements ILogPage {
- public static final boolean IS_DEBUG_MODE = true;//true
+ public static final boolean IS_DEBUG_MODE = false;//true
private static final Logger LOGGER = Logger.getLogger(LogPage.class.getName());
private final LockManager lockMgr;
private final LogPageReader logPageReader;
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
index 504c5e2..940ba23 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
@@ -45,7 +45,7 @@
private static final long serialVersionUID = -6105616785783310111L;
private TransactionSubsystem transactionSubsystem;
-
+
//jobId is set once and read concurrently.
private final JobId jobId;
@@ -59,7 +59,7 @@
//txnState is read and written concurrently.
private AtomicInteger txnState;
- //isTimeout are read and written under the lockMgr's tableLatch
+ //isTimeout is read and written under the lockMgr's tableLatch
//Thus, no other synchronization is required separately.
private boolean isTimeout;
@@ -86,16 +86,11 @@
private MutableLong tempResourceIdForSetLSN;
private LogRecord logRecord;
- //numOfActiveJobs is accessed under a synchronized block on TransactionContext in callers.
- private int numOfActiveJobs;
-
//TODO: implement transactionContext pool in order to avoid object creations.
// also, the pool can throttle the number of concurrent active jobs at every moment.
- public TransactionContext(JobId jobId, TransactionSubsystem transactionSubsystem, int numOfPartitions)
- throws ACIDException {
+ public TransactionContext(JobId jobId, TransactionSubsystem transactionSubsystem) throws ACIDException {
this.jobId = jobId;
this.transactionSubsystem = transactionSubsystem;
- this.numOfActiveJobs = numOfPartitions;
firstLSN = new AtomicLong(-1);
lastLSN = new AtomicLong(-1);
txnState = new AtomicInteger(ITransactionManager.ACTIVE);
@@ -215,26 +210,15 @@
public String prettyPrint() {
StringBuilder sb = new StringBuilder();
sb.append("\n" + jobId + "\n");
- sb.append("isWriterTxn: " + isWriteTxn + "\n");
+ sb.append("isWriteTxn: " + isWriteTxn + "\n");
sb.append("firstLSN: " + firstLSN.get() + "\n");
sb.append("lastLSN: " + lastLSN.get() + "\n");
sb.append("TransactionState: " + txnState + "\n");
- sb.append("status: " + isTimeout + "\n");
+ sb.append("isTimeout: " + isTimeout + "\n");
return sb.toString();
}
public LogRecord getLogRecord() {
return logRecord;
}
-
- @Override
- public void decrementNumOfActiveJobs() {
- assert numOfActiveJobs > 0;
- numOfActiveJobs--;
- }
-
- @Override
- public int getNumOfActiveJobs() {
- return numOfActiveJobs;
- }
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
index b73b2db..82d980c 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -41,40 +41,30 @@
public static final boolean IS_DEBUG_MODE = false;//true
private static final Logger LOGGER = Logger.getLogger(TransactionManager.class.getName());
private final TransactionSubsystem txnSubsystem;
- private final int numOfPartitions;
private Map<JobId, ITransactionContext> transactionContextRepository = new ConcurrentHashMap<JobId, ITransactionContext>();
private AtomicInteger maxJobId = new AtomicInteger(0);
- public TransactionManager(TransactionSubsystem provider, int numOfPartitions) {
+ public TransactionManager(TransactionSubsystem provider) {
this.txnSubsystem = provider;
- this.numOfPartitions = numOfPartitions;
}
@Override
public void abortTransaction(ITransactionContext txnCtx, DatasetId datasetId, int PKHashVal) throws ACIDException {
- synchronized (txnCtx) {
- if (txnCtx.getTxnState() != ITransactionManager.ABORTED) {
- txnCtx.setTxnState(ITransactionManager.ABORTED);
+ if (txnCtx.getTxnState() != ITransactionManager.ABORTED) {
+ txnCtx.setTxnState(ITransactionManager.ABORTED);
+ }
+ try {
+ txnSubsystem.getRecoveryManager().rollbackTransaction(txnCtx);
+ } catch (Exception ae) {
+ String msg = "Could not complete rollback! System is in an inconsistent state";
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.severe(msg);
}
- if (!txnCtx.isMetadataTransaction()) {
- txnCtx.decrementNumOfActiveJobs();
- if (txnCtx.getNumOfActiveJobs() != 0) {
- return;
- }
- }
- try {
- txnSubsystem.getRecoveryManager().rollbackTransaction(txnCtx);
- } catch (Exception ae) {
- String msg = "Could not complete rollback! System is in an inconsistent state";
- if (LOGGER.isLoggable(Level.SEVERE)) {
- LOGGER.severe(msg);
- }
- ae.printStackTrace();
- throw new Error(msg);
- } finally {
- txnSubsystem.getLockManager().releaseLocks(txnCtx);
- transactionContextRepository.remove(txnCtx.getJobId());
- }
+ ae.printStackTrace();
+ throw new Error(msg);
+ } finally {
+ txnSubsystem.getLockManager().releaseLocks(txnCtx);
+ transactionContextRepository.remove(txnCtx.getJobId());
}
}
@@ -84,51 +74,48 @@
}
@Override
- public synchronized ITransactionContext getTransactionContext(JobId jobId) throws ACIDException {
+ public ITransactionContext getTransactionContext(JobId jobId) throws ACIDException {
setMaxJobId(jobId.getId());
ITransactionContext txnCtx = transactionContextRepository.get(jobId);
if (txnCtx == null) {
- txnCtx = new TransactionContext(jobId, txnSubsystem, numOfPartitions);
- transactionContextRepository.put(jobId, txnCtx);
+ synchronized (this) {
+ txnCtx = transactionContextRepository.get(jobId);
+ if (txnCtx == null) {
+ txnCtx = new TransactionContext(jobId, txnSubsystem);
+ transactionContextRepository.put(jobId, txnCtx);
+ }
+ }
}
return txnCtx;
}
@Override
public void commitTransaction(ITransactionContext txnCtx, DatasetId datasetId, int PKHashVal) throws ACIDException {
- synchronized (txnCtx) {
- //There is either job-level commit or entity-level commit.
- //The job-level commit will have -1 value both for datasetId and PKHashVal.
+ //There is either job-level commit or entity-level commit.
+ //The job-level commit will have -1 value both for datasetId and PKHashVal.
- //for entity-level commit
- if (PKHashVal != -1) {
- txnSubsystem.getLockManager().unlock(datasetId, PKHashVal, txnCtx, true);
- return;
- }
+ //for entity-level commit
+ if (PKHashVal != -1) {
+ txnSubsystem.getLockManager().unlock(datasetId, PKHashVal, txnCtx, true);
+ return;
+ }
- //for job-level commit
- if (!txnCtx.isMetadataTransaction()) {
- txnCtx.decrementNumOfActiveJobs();
- if (txnCtx.getNumOfActiveJobs() != 0) {
- return;
- }
+ //for job-level commit
+ try {
+ if (txnCtx.isWriteTxn()) {
+ LogRecord logRecord = ((TransactionContext) txnCtx).getLogRecord();
+ logRecord.formCommitLogRecord(txnCtx, LogType.JOB_COMMIT, txnCtx.getJobId().getId(), -1, -1);
+ txnSubsystem.getLogManager().log(logRecord);
}
- try {
- if (txnCtx.isWriteTxn()) {
- LogRecord logRecord = ((TransactionContext) txnCtx).getLogRecord();
- logRecord.formCommitLogRecord(txnCtx, LogType.JOB_COMMIT, txnCtx.getJobId().getId(), -1, -1);
- txnSubsystem.getLogManager().log(logRecord);
- }
- } catch (Exception ae) {
- if (LOGGER.isLoggable(Level.SEVERE)) {
- LOGGER.severe(" caused exception in commit !" + txnCtx.getJobId());
- }
- throw ae;
- } finally {
- txnSubsystem.getLockManager().releaseLocks(txnCtx); // release
- transactionContextRepository.remove(txnCtx.getJobId());
- txnCtx.setTxnState(ITransactionManager.COMMITTED);
+ } catch (Exception ae) {
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ LOGGER.severe(" caused exception in commit !" + txnCtx.getJobId());
}
+ throw ae;
+ } finally {
+ txnSubsystem.getLockManager().releaseLocks(txnCtx); // release
+ transactionContextRepository.remove(txnCtx.getJobId());
+ txnCtx.setTxnState(ITransactionManager.COMMITTED);
}
}
@@ -148,7 +135,10 @@
}
public void setMaxJobId(int jobId) {
- maxJobId.set(Math.max(maxJobId.get(), jobId));
+ int maxId = maxJobId.get();
+ if (jobId > maxId) {
+ maxJobId.compareAndSet(maxId, jobId);
+ }
}
public int getMaxJobId() {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionSubsystem.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionSubsystem.java
index dc06463..aceeb82 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionSubsystem.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionSubsystem.java
@@ -42,10 +42,10 @@
private final AsterixTransactionProperties txnProperties;
public TransactionSubsystem(String id, IAsterixAppRuntimeContextProvider asterixAppRuntimeContextProvider,
- AsterixTransactionProperties txnProperties, int numOfPartitions) throws ACIDException {
+ AsterixTransactionProperties txnProperties) throws ACIDException {
this.id = id;
this.txnProperties = txnProperties;
- this.transactionManager = new TransactionManager(this, numOfPartitions);
+ this.transactionManager = new TransactionManager(this);
this.lockManager = new LockManager(this);
this.logManager = new LogManager(this);
this.recoveryManager = new RecoveryManager(this);