added transaction support for exclusive job-level commits (e.g. Metadata)
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/BaseOperationTracker.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/BaseOperationTracker.java
index 2e50bf6..6313a09 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/BaseOperationTracker.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/BaseOperationTracker.java
@@ -61,4 +61,6 @@
IModificationOperationCallback modificationCallback) throws HyracksDataException {
}
+ public void exclusiveJobCommitted() throws HyracksDataException {
+ }
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java
index 0d7dd93..8b489a6 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -72,14 +72,19 @@
public void completeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
IModificationOperationCallback modificationCallback) throws HyracksDataException {
int nActiveOps = numActiveOperations.decrementAndGet();
-
// Decrement transactor-local active operations count.
AbstractOperationCallback opCallback = getOperationCallback(searchCallback, modificationCallback);
if (opCallback != null) {
opCallback.decrementLocalNumActiveOperations();
}
+ if (opType != LSMOperationType.FLUSH) {
+ flushIfFull(nActiveOps);
+ }
+ }
+
+ private void flushIfFull(int nActiveOps) throws HyracksDataException {
// If we need a flush, and this is the last completing operation, then schedule the flush.
- if (datasetBufferCache.isFull() && nActiveOps == 0 && opType != LSMOperationType.FLUSH) {
+ if (datasetBufferCache.isFull() && nActiveOps == 0) {
Set<ILSMIndex> indexes = datasetLifecycleManager.getDatasetIndexes(datasetID);
for (ILSMIndex lsmIndex : indexes) {
ILSMIndexAccessor accessor = (ILSMIndexAccessor) lsmIndex.createAccessor(
@@ -90,6 +95,11 @@
}
}
+ public void exclusiveJobCommitted() throws HyracksDataException {
+ numActiveOperations.set(0);
+ flushIfFull(0);
+ }
+
private AbstractOperationCallback getOperationCallback(ISearchOperationCallback searchCallback,
IModificationOperationCallback modificationCallback) {
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionContext.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionContext.java
index 1ffd5f1..6f4c9f5 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionContext.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionContext.java
@@ -53,4 +53,6 @@
READ_WRITE
}
+ public void setExclusiveJobLevelCommit();
+
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
index bfdda15..dc6f111 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
@@ -110,9 +110,6 @@
@Override
public void lock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
throws ACIDException {
- if (entityHashValue == 379839425) {
- System.out.println("break");
- }
internalLock(datasetId, entityHashValue, lockMode, txnContext, false);
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
index fc07457..b51a767 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
@@ -426,12 +426,9 @@
if (logType != LogType.ENTITY_COMMIT) {
if (logType == LogType.COMMIT) {
- int count = txnCtx.getActiveOperationCountOnIndexes();
+ txnCtx.setExclusiveJobLevelCommit();
map = activeTxnCountMaps.get(pageIndex);
- if (map.containsKey(txnCtx)) {
- count += (Integer) map.get(txnCtx);
- }
- map.put(txnCtx, count);
+ map.put(txnCtx, 1);
}
// release the ownership as the log record has been placed in
// created space.
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
index 4bdad6b..d53286a 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
@@ -52,6 +52,7 @@
private Set<ICloseable> resources = new HashSet<ICloseable>();
private TransactionType transactionType = TransactionType.READ;
private JobId jobId;
+ private boolean exlusiveJobLevelCommit;
// List of indexes on which operations were performed on behalf of this transaction.
private final Set<ILSMIndex> indexes = new HashSet<ILSMIndex>();
@@ -102,8 +103,12 @@
Iterator<BaseOperationTracker> trackerIt = opTrackers.iterator();
while (trackerIt.hasNext()) {
IModificationOperationCallback modificationCallback = (IModificationOperationCallback) cbIt.next();
- ((BaseOperationTracker) trackerIt.next()).completeOperation(null, LSMOperationType.MODIFICATION, null,
- modificationCallback);
+ BaseOperationTracker opTracker = (BaseOperationTracker) trackerIt.next();
+ if (exlusiveJobLevelCommit) {
+ opTracker.exclusiveJobCommitted();
+ } else {
+ opTracker.completeOperation(null, LSMOperationType.MODIFICATION, null, modificationCallback);
+ }
}
}
}
@@ -192,4 +197,9 @@
return (o == this);
}
+ @Override
+ public void setExclusiveJobLevelCommit() {
+ exlusiveJobLevelCommit = true;
+ }
+
}