Fix for issue 587.
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
index b980337..f742a64 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
@@ -115,6 +115,7 @@
@Override
public void commitTransaction(JobId jobId) throws RemoteException, ACIDException {
ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId);
+ txnCtx.setExclusiveJobLevelCommit();
transactionSubsystem.getTransactionManager().commitTransaction(txnCtx, new DatasetId(-1), -1);
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
index 5b10144..a612eff 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
@@ -53,7 +53,7 @@
public class LogManager implements ILogManager, ILifeCycleComponent {
- public static final boolean IS_DEBUG_MODE = false;//true
+ public static final boolean IS_DEBUG_MODE = false;// true
private static final Logger LOGGER = Logger.getLogger(LogManager.class.getName());
private final TransactionSubsystem provider;
private LogManagerProperties logManagerProperties;
@@ -397,7 +397,6 @@
if (logType != LogType.ENTITY_COMMIT) {
if (logType == LogType.COMMIT) {
- txnCtx.setExclusiveJobLevelCommit();
map = activeTxnCountMaps.get(pageIndex);
map.put(txnCtx, 1);
}
@@ -418,11 +417,13 @@
} else {
map.put(txnCtx, 1);
}
- //------------------------------------------------------------------------------
+ // ------------------------------------------------------------------------------
// [Notice]
- // reference count should be decremented
- // after activeTxnCount is incremented, but before addFlushRequest() is called.
- //------------------------------------------------------------------------------
+ // reference count should be decremented
+ // after activeTxnCount is incremented, but before
+ // addFlushRequest() is called.
+ // ------------------------------------------------------------------------------
+
// release the ownership as the log record has been placed in
// created space.
logPages[pageIndex].decRefCnt();
@@ -443,7 +444,7 @@
System.out.println("--------------> LSN(" + currentLSN + ") is written");
}
- //collect statistics
+ // collect statistics
statLogSize += totalLogSize;
statLogCount++;
@@ -739,22 +740,22 @@
@Override
public void start() {
- //no op
+ // no op
}
@Override
public void stop(boolean dumpState, OutputStream os) {
if (dumpState) {
- //#. dump Configurable Variables
+ // #. dump Configurable Variables
dumpConfVars(os);
- //#. dump LSNInfo
+ // #. dump LSNInfo
dumpLSNInfo(os);
try {
os.flush();
} catch (IOException e) {
- //ignore
+ // ignore
}
}
}
@@ -767,7 +768,7 @@
sb.append("\n>>dump_end\t>>----- [ConfVars] -----\n");
os.write(sb.toString().getBytes());
} catch (Exception e) {
- //ignore exception and continue dumping as much as possible.
+ // ignore exception and continue dumping as much as possible.
if (IS_DEBUG_MODE) {
e.printStackTrace();
}
@@ -784,7 +785,7 @@
sb.append("\n>>dump_end\t>>----- [LSNInfo] -----\n");
os.write(sb.toString().getBytes());
} catch (Exception e) {
- //ignore exception and continue dumping as much as possible.
+ // ignore exception and continue dumping as much as possible.
if (IS_DEBUG_MODE) {
e.printStackTrace();
}
@@ -893,8 +894,7 @@
}
continue;
}
-
- //if the log page is already full, don't wait.
+ // if the log page is already full, don't wait.
if (logManager.getLogPage(flushPageIndex).getBufferNextWriteOffset() < logPageSize
- logManager.getLogRecordHelper().getCommitLogSize()) {
// #. sleep for the groupCommitWaitTime
@@ -912,7 +912,8 @@
beforeFlushOffset = logManager.getLogPage(flushPageIndex).getBufferLastFlushOffset();
- // put the content to disk (the thread still has a lock on the log page)
+ // put the content to disk (the thread still has a lock
+ // on the log page)
logManager.getLogPage(flushPageIndex).flush();
afterFlushOffset = logManager.getLogPage(flushPageIndex).getBufferLastFlushOffset();
@@ -920,12 +921,14 @@
// increment the last flushed lsn
logManager.incrementLastFlushedLsn(afterFlushOffset - beforeFlushOffset);
- // increment currentLSN if currentLSN is less than flushLSN.
+ // increment currentLSN if currentLSN is less than
+ // flushLSN.
if (logManager.getLastFlushedLsn().get() + 1 > logManager.getCurrentLsn().get()) {
logManager.getCurrentLsn().set(logManager.getLastFlushedLsn().get() + 1);
}
- // Map the log page to a new region in the log file if the flushOffset reached the logPageSize
+ // Map the log page to a new region in the log file if
+ // the flushOffset reached the logPageSize
if (afterFlushOffset == logPageSize) {
long diskNextWriteOffset = logManager.getLogPages()[flushPageIndex]
.getDiskNextWriteOffset() + logBufferSize;
@@ -936,7 +939,6 @@
// decrement activeTxnCountOnIndexes
logManager.decrementActiveTxnCountOnIndexes(flushPageIndex);
-
} finally {
logManager.getLogPage(flushPageIndex).releaseWriteLatch();
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
index 33522e3..1ef949f 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
@@ -105,6 +105,7 @@
IModificationOperationCallback modificationCallback = (IModificationOperationCallback) cbIt.next();
BaseOperationTracker opTracker = (BaseOperationTracker) trackerIt.next();
if (exlusiveJobLevelCommit) {
+ // For metadata transactions only
opTracker.exclusiveJobCommitted();
} else {
opTracker.completeOperation(null, LSMOperationType.MODIFICATION, null, modificationCallback);