changes to fix issues, 621 and 622
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java
index a7c2fdb..6dd11bd 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java
@@ -19,10 +19,11 @@
import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
-import edu.uci.ics.asterix.common.transactions.DatasetId;
+import edu.uci.ics.asterix.common.transactions.ILogManager;
import edu.uci.ics.asterix.common.transactions.ITransactionContext;
import edu.uci.ics.asterix.common.transactions.ITransactionManager;
import edu.uci.ics.asterix.common.transactions.JobId;
+import edu.uci.ics.asterix.transaction.management.service.logging.LogRecord;
import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -39,11 +40,13 @@
private final IHyracksTaskContext hyracksTaskCtx;
private final ITransactionManager transactionManager;
+ private final ILogManager logMgr;
private final JobId jobId;
- private final DatasetId datasetId;
+ private final int datasetId;
private final int[] primaryKeyFields;
private final boolean isWriteTransaction;
private final long[] longHashes;
+ private final LogRecord logRecord;
private ITransactionContext transactionContext;
private RecordDescriptor inputRecordDesc;
@@ -56,12 +59,14 @@
IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
.getApplicationContext().getApplicationObject();
this.transactionManager = runtimeCtx.getTransactionSubsystem().getTransactionManager();
+ this.logMgr = runtimeCtx.getTransactionSubsystem().getLogManager();
this.jobId = jobId;
- this.datasetId = new DatasetId(datasetId);
+ this.datasetId = datasetId;
this.primaryKeyFields = primaryKeyFields;
this.frameTupleReference = new FrameTupleReference();
this.isWriteTransaction = isWriteTransaction;
this.longHashes = new long[2];
+ this.logRecord = new LogRecord();
}
@Override
@@ -82,11 +87,9 @@
for (int t = 0; t < nTuple; t++) {
frameTupleReference.reset(frameTupleAccessor, t);
pkHash = computePrimaryKeyHashValue(frameTupleReference, primaryKeyFields);
- try {
- transactionManager.commitTransaction(transactionContext, datasetId, pkHash);
- } catch (ACIDException e) {
- throw new HyracksDataException(e);
- }
+ logRecord.formEntityCommitLogRecord(transactionContext, datasetId, pkHash, frameTupleReference,
+ primaryKeyFields);
+ logMgr.log(logRecord);
}
}
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILockManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILockManager.java
index 54c86af..a752afa 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILockManager.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILockManager.java
@@ -70,17 +70,6 @@
throws ACIDException;
/**
- * @param datasetId
- * @param entityHashValue
- * @param txnContext
- * @throws ACIDException
- * TODO
- * @return
- */
- public void unlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext, boolean commitFlag)
- throws ACIDException;
-
- /**
* Call to lock and unlock a specific resource in a specific lock mode
*
* @param datasetId
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogRecord.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogRecord.java
index d810ebd..d13ef6c 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogRecord.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogRecord.java
@@ -20,16 +20,18 @@
public interface ILogRecord {
- public static final int COMMIT_LOG_SIZE = 21;
- public static final int UPDATE_LOG_BASE_SIZE = 56;
+ public static final int JOB_COMMIT_LOG_SIZE = 13;
+ public static final int ENTITY_COMMIT_LOG_BASE_SIZE = 29;
+ public static final int UPDATE_LOG_BASE_SIZE = 64;
public boolean readLogRecord(ByteBuffer buffer);
public void writeLogRecord(ByteBuffer buffer);
-
- public void formCommitLogRecord(ITransactionContext txnCtx, byte logType, int jobId, int datasetId, int PKHashValue);
- public void setUpdateLogSize();
+ public void formJobCommitLogRecord(ITransactionContext txnCtx);
+
+ public void formEntityCommitLogRecord(ITransactionContext txnCtx, int datasetId, int PKHashValue,
+ ITupleReference tupleReference, int[] primaryKeyFields);
public ITransactionContext getTxnCtx();
@@ -98,11 +100,23 @@
public long getChecksum();
public void setChecksum(long checksum);
-
+
public long getLSN();
public void setLSN(long LSN);
public String getLogRecordForDisplay();
+ public void computeAndSetLogSize();
+
+ public int getPKValueSize();
+
+ public ITupleReference getPKValue();
+
+ public void setPKFields(int[] primaryKeyFields);
+
+ public void computeAndSetPKValueSize();
+
+ public void setPKValue(ITupleReference PKValue);
+
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
index dac3e95..031a26e 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
@@ -58,6 +58,9 @@
protected void log(int PKHash, ITupleReference newValue, IndexOperation oldOp, ITupleReference oldValue)
throws ACIDException {
logRecord.setPKHashValue(PKHash);
+ logRecord.setPKFields(primaryKeyFields);
+ logRecord.setPKValue(newValue);
+ logRecord.computeAndSetPKValueSize();
if (newValue != null) {
logRecord.setNewValueSize(tupleWriter.bytesRequired(newValue));
logRecord.setNewValue(newValue);
@@ -73,7 +76,7 @@
logRecord.setOldValueSize(0);
}
}
- logRecord.setUpdateLogSize();
+ logRecord.computeAndSetLogSize();
txnSubsystem.getLogManager().log(logRecord);
}
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
index e0baa3f..6d86f70 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
@@ -29,7 +29,6 @@
import edu.uci.ics.asterix.common.exceptions.ACIDException;
import edu.uci.ics.asterix.common.transactions.DatasetId;
import edu.uci.ics.asterix.common.transactions.ILockManager;
-import edu.uci.ics.asterix.common.transactions.ILogRecord;
import edu.uci.ics.asterix.common.transactions.ITransactionContext;
import edu.uci.ics.asterix.common.transactions.ITransactionManager;
import edu.uci.ics.asterix.common.transactions.JobId;
@@ -86,7 +85,6 @@
private LockRequestTracker lockRequestTracker; //for debugging
private ConsecutiveWakeupContext consecutiveWakeupContext;
- private final ILogRecord logRecord;
public LockManager(TransactionSubsystem txnSubsystem) throws ACIDException {
this.txnSubsystem = txnSubsystem;
@@ -104,7 +102,6 @@
this.tempDatasetIdObj = new DatasetId(0);
this.tempJobIdObj = new JobId(0);
this.consecutiveWakeupContext = new ConsecutiveWakeupContext();
- this.logRecord = new LogRecord();
if (IS_DEBUG_MODE) {
this.lockRequestTracker = new LockRequestTracker();
}
@@ -642,22 +639,16 @@
@Override
public void unlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext) throws ACIDException {
- internalUnlock(datasetId, entityHashValue, txnContext, false, false);
- }
-
- @Override
- public void unlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext, boolean commitFlag)
- throws ACIDException {
- internalUnlock(datasetId, entityHashValue, txnContext, false, commitFlag);
+ internalUnlock(datasetId, entityHashValue, txnContext, false);
}
private void instantUnlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext)
throws ACIDException {
- internalUnlock(datasetId, entityHashValue, txnContext, true, false);
+ internalUnlock(datasetId, entityHashValue, txnContext, true);
}
private void internalUnlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext,
- boolean isInstant, boolean commitFlag) throws ACIDException {
+ boolean isInstant) throws ACIDException {
JobId jobId = txnContext.getJobId();
int eLockInfo = -1;
DatasetLockInfo dLockInfo = null;
@@ -701,22 +692,6 @@
+ "," + entityHashValue + "]: Corresponding lock info doesn't exist.");
}
- //////////////////////////////////////////////////////////
- //[Notice]
- //If both EntityLockCount and DatasetLockCount are 1,
- //then write entity-commit log and return without releasing the lock.
- //The lock will be released when the entity-commit log is flushed.
- if (commitFlag && entityInfoManager.getEntityLockCount(entityInfo) == 1
- && entityInfoManager.getDatasetLockCount(entityInfo) == 1) {
- if (txnContext.isWriteTxn()) {
- logRecord.formCommitLogRecord(txnContext, LogType.ENTITY_COMMIT, jobId.getId(), datasetId.getId(),
- entityHashValue);
- txnSubsystem.getLogManager().log(logRecord);
- }
- return;
- }
- //////////////////////////////////////////////////////////
-
datasetLockMode = entityInfoManager.getDatasetLockMode(entityInfo) == LockMode.S ? LockMode.IS
: LockMode.IX;
@@ -758,11 +733,6 @@
waiterObjId = waiterObj.getNextWaiterObjId();
}
if (threadCount == 0) {
- if (entityInfoManager.getEntityLockMode(entityInfo) == LockMode.X) {
- //TODO
- //write a commit log for the unlocked resource
- //need to figure out that instantLock() also needs to write a commit log.
- }
entityInfoManager.deallocate(entityInfo);
}
}
@@ -2243,17 +2213,11 @@
tempDatasetIdObj.setId(logRecord.getDatasetId());
tempJobIdObj.setId(logRecord.getJobId());
txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(tempJobIdObj);
- if (txnCtx == null) {
- throw new IllegalStateException("TransactionContext[" + tempJobIdObj + "] doesn't exist.");
- }
unlock(tempDatasetIdObj, logRecord.getPKHashValue(), txnCtx);
txnCtx.notifyOptracker(false);
} else if (logRecord.getLogType() == LogType.JOB_COMMIT) {
tempJobIdObj.setId(logRecord.getJobId());
txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(tempJobIdObj);
- if (txnCtx == null) {
- throw new IllegalStateException("TransactionContext[" + tempJobIdObj + "] doesn't exist.");
- }
txnCtx.notifyOptracker(true);
((LogPage) logPage).notifyJobCommitter();
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
index 7fce48d..ad9ff2a 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
@@ -365,7 +365,7 @@
}
class LogFlusher extends Thread {
- private final static LogPage POISON_PILL = new LogPage(null, ILogRecord.COMMIT_LOG_SIZE, null);
+ private final static LogPage POISON_PILL = new LogPage(null, ILogRecord.JOB_COMMIT_LOG_SIZE, null);
private final LogManager logMgr;//for debugging
private final LinkedBlockingQueue<LogPage> emptyQ;
private final LinkedBlockingQueue<LogPage> flushQ;
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
index 45c3e65..b954c6e 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
@@ -58,7 +58,7 @@
appendOffset = 0;
flushOffset = 0;
isLastPage = false;
- syncCommitQ = new LinkedBlockingQueue<ILogRecord>(logPageSize / ILogRecord.COMMIT_LOG_SIZE);
+ syncCommitQ = new LinkedBlockingQueue<ILogRecord>(logPageSize / ILogRecord.JOB_COMMIT_LOG_SIZE);
}
////////////////////////////////////
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogReader.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogReader.java
index 173088c..9dc966c 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogReader.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogReader.java
@@ -121,7 +121,7 @@
readBuffer.limit(logPageSize);
try {
fileChannel.position(readLSN % logFileSize);
- size = fileChannel.read(readBuffer, logPageSize);
+ size = fileChannel.read(readBuffer);
} catch (IOException e) {
throw new ACIDException(e);
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecord.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecord.java
index 380e524..4b0e1f2 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecord.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecord.java
@@ -30,13 +30,18 @@
/*
* == LogRecordFormat ==
* ---------------------------
- * [Header1] (13 bytes) : for all log types
+ * [Header1] (5 bytes) : for all log types
* LogType(1)
* JobId(4)
+ * ---------------------------
+ * [Header2] (16 bytes + PKValueSize) : for entity_commit and update log types
* DatasetId(4) //stored in dataset_dataset in Metadata Node
* PKHashValue(4)
+ * PKFieldCnt(4)
+ * PKValueSize(4)
+ * PKValue(PKValueSize)
* ---------------------------
- * [Header2] (21 bytes) : only for update log type
+ * [Header3] (21 bytes) : only for update log type
* PrevLSN(8)
* ResourceId(8) //stored in .metadata of the corresponding index in NC node
* ResourceType(1)
@@ -45,18 +50,21 @@
* [Body] (Variable size) : only for update log type
* FieldCnt(4)
* NewOp(1)
- * NewValueLength(4)
- * NewValue(NewValueLength)
+ * NewValueSize(4)
+ * NewValue(NewValueSize)
* OldOp(1)
- * OldValueLength(4)
- * OldValue(OldValueLength)
+ * OldValueSize(4)
+ * OldValue(OldValueSize)
* ---------------------------
* [Tail] (8 bytes) : for all log types
* Checksum(8)
* ---------------------------
* = LogSize =
- * 1) JOB_COMMIT and ENTITY_COMMIT: 21 bytes
- * 2) UPDATE: 56 + old and new value size (13 + 21 + 14 + old and newValueSize + 8)
+ * 1) JOB_COMMIT_LOG_SIZE: 13 bytes (5 + 8)
+ * 2) ENTITY_COMMIT: 29 + PKSize (5 + 16 + PKSize + 8)
+ * --> ENTITY_COMMIT_LOG_BASE_SIZE = 29
+ * 3) UPDATE: 64 + PKSize + New/OldValueSize (5 + 16 + PKSize + 21 + 14 + New/OldValueSize + 8)
+ * --> UPDATE_LOG_BASE_SIZE = 64
*/
public class LogRecord implements ILogRecord {
@@ -65,6 +73,9 @@
private int jobId;
private int datasetId;
private int PKHashValue;
+ private int PKFieldCnt;
+ private int PKValueSize;
+ private ITupleReference PKValue;
private long prevLSN;
private long resourceId;
private byte resourceType;
@@ -84,13 +95,18 @@
private long LSN;
private final AtomicBoolean isFlushed;
private final SimpleTupleWriter tupleWriter;
- private final SimpleTupleReference newTuple;
+ private final SimpleTupleReference readPKValue;
+ private final SimpleTupleReference readNewValue;
+ private final SimpleTupleReference readOldValue;
private final CRC32 checksumGen;
+ private int[] PKFields;
public LogRecord() {
isFlushed = new AtomicBoolean(false);
tupleWriter = new SimpleTupleWriter();
- newTuple = (SimpleTupleReference) tupleWriter.createTupleReference();
+ readPKValue = (SimpleTupleReference) tupleWriter.createTupleReference();
+ readNewValue = (SimpleTupleReference) tupleWriter.createTupleReference();
+ readOldValue = (SimpleTupleReference) tupleWriter.createTupleReference();
checksumGen = new CRC32();
}
@@ -99,8 +115,16 @@
int beginOffset = buffer.position();
buffer.put(logType);
buffer.putInt(jobId);
- buffer.putInt(datasetId);
- buffer.putInt(PKHashValue);
+ if (logType != LogType.JOB_COMMIT) {
+ buffer.putInt(datasetId);
+ buffer.putInt(PKHashValue);
+ buffer.putInt(PKFieldCnt);
+ if (PKValueSize <= 0) {
+ throw new IllegalStateException("Primary Key Size is less than or equal to 0");
+ }
+ buffer.putInt(PKValueSize);
+ writePKValue(buffer);
+ }
if (logType == LogType.UPDATE) {
buffer.putLong(prevLSN);
buffer.putLong(resourceId);
@@ -124,8 +148,16 @@
buffer.putLong(checksum);
}
+ private void writePKValue(ByteBuffer buffer) {
+ int i;
+ for (i = 0; i < PKFieldCnt; i++) {
+ buffer.put(PKValue.getFieldData(0), PKValue.getFieldStart(PKFields[i]), PKValue.getFieldLength(PKFields[i]));
+ }
+ }
+
private void writeTuple(ByteBuffer buffer, ITupleReference tuple, int size) {
tupleWriter.writeTuple(tuple, buffer.array(), buffer.position());
+ //writeTuple() doesn't change the position of the buffer.
buffer.position(buffer.position() + size);
}
@@ -141,8 +173,19 @@
try {
logType = buffer.get();
jobId = buffer.getInt();
- datasetId = buffer.getInt();
- PKHashValue = buffer.getInt();
+ if (logType == LogType.JOB_COMMIT) {
+ datasetId = -1;
+ PKHashValue = -1;
+ } else {
+ datasetId = buffer.getInt();
+ PKHashValue = buffer.getInt();
+ PKFieldCnt = buffer.getInt();
+ PKValueSize = buffer.getInt();
+ if (PKValueSize <= 0) {
+ throw new IllegalStateException("Primary Key Size is less than or equal to 0");
+ }
+ PKValue = readPKValue(buffer);
+ }
if (logType == LogType.UPDATE) {
prevLSN = buffer.getLong();
resourceId = buffer.getLong();
@@ -151,18 +194,18 @@
fieldCnt = buffer.getInt();
newOp = buffer.get();
newValueSize = buffer.getInt();
- newValue = readTuple(buffer, newValueSize);
+ newValue = readTuple(buffer, readNewValue, fieldCnt, newValueSize);
if (resourceType == ResourceType.LSM_BTREE) {
oldOp = buffer.get();
if (oldOp != (byte) (IndexOperation.NOOP.ordinal())) {
oldValueSize = buffer.getInt();
if (oldValueSize > 0) {
- oldValue = readTuple(buffer, oldValueSize);
+ oldValue = readTuple(buffer, readOldValue, fieldCnt, oldValueSize);
}
}
}
} else {
- logSize = COMMIT_LOG_SIZE;
+ computeAndSetLogSize();
}
checksum = buffer.getLong();
if (checksum != generateChecksum(buffer, beginOffset, logSize - CHECKSUM_SIZE)) {
@@ -174,27 +217,54 @@
}
return true;
}
+
+ private ITupleReference readPKValue(ByteBuffer buffer) {
+ return readTuple(buffer, readPKValue, PKFieldCnt, PKValueSize);
+ }
- private ITupleReference readTuple(ByteBuffer buffer, int size) {
- newTuple.setFieldCount(fieldCnt);
- newTuple.resetByTupleOffset(buffer, buffer.position());
- buffer.position(buffer.position() + size);
- return newTuple;
+ private ITupleReference readTuple(ByteBuffer srcBuffer, SimpleTupleReference destTuple, int fieldCnt, int size) {
+ destTuple.setFieldCount(fieldCnt);
+ destTuple.resetByTupleOffset(srcBuffer, srcBuffer.position());
+ srcBuffer.position(srcBuffer.position() + size);
+ return destTuple;
}
@Override
- public void formCommitLogRecord(ITransactionContext txnCtx, byte logType, int jobId, int datasetId, int PKHashValue) {
+ public void formJobCommitLogRecord(ITransactionContext txnCtx) {
this.txnCtx = txnCtx;
- this.logType = logType;
- this.jobId = jobId;
+ this.logType = LogType.JOB_COMMIT;
+ this.jobId = txnCtx.getJobId().getId();
+ this.datasetId = -1;
+ this.PKHashValue = -1;
+ computeAndSetLogSize();
+ }
+
+ @Override
+ public void formEntityCommitLogRecord(ITransactionContext txnCtx, int datasetId, int PKHashValue,
+ ITupleReference PKValue, int[] PKFields) {
+ this.txnCtx = txnCtx;
+ this.logType = LogType.ENTITY_COMMIT;
+ this.jobId = txnCtx.getJobId().getId();
this.datasetId = datasetId;
this.PKHashValue = PKHashValue;
- this.logSize = COMMIT_LOG_SIZE;
+ this.PKFieldCnt = PKFields.length;
+ this.PKValue = PKValue;
+ this.PKFields = PKFields;
+ computeAndSetPKValueSize();
+ computeAndSetLogSize();
}
@Override
- public void setUpdateLogSize() {
- logSize = UPDATE_LOG_BASE_SIZE + newValueSize + oldValueSize;
+ public void computeAndSetPKValueSize() {
+ int i;
+ PKValueSize = 0;
+ for (i = 0; i < PKFieldCnt; i++) {
+ PKValueSize += PKValue.getFieldLength(PKFields[i]);
+ }
+ }
+
+ private void setUpdateLogSize() {
+ logSize = UPDATE_LOG_BASE_SIZE + PKValueSize + newValueSize + oldValueSize;
if (resourceType != ResourceType.LSM_BTREE) {
logSize -= 5; //oldOp(byte: 1) + oldValueLength(int: 4)
} else {
@@ -205,18 +275,39 @@
}
@Override
+ public void computeAndSetLogSize() {
+ switch (logType) {
+ case LogType.UPDATE:
+ setUpdateLogSize();
+ break;
+ case LogType.JOB_COMMIT:
+ logSize = JOB_COMMIT_LOG_SIZE;
+ break;
+ case LogType.ENTITY_COMMIT:
+ logSize = ENTITY_COMMIT_LOG_BASE_SIZE + PKValueSize;
+ break;
+ default:
+ throw new IllegalStateException("Unsupported Log Type");
+ }
+ }
+
+ @Override
public String getLogRecordForDisplay() {
StringBuilder builder = new StringBuilder();
builder.append(" LSN : ").append(LSN);
builder.append(" LogType : ").append(LogType.toString(logType));
+ builder.append(" LogSize : ").append(logSize);
builder.append(" JobId : ").append(jobId);
- builder.append(" DatasetId : ").append(datasetId);
- builder.append(" PKHashValue : ").append(PKHashValue);
+ if (logType != LogType.JOB_COMMIT) {
+ builder.append(" DatasetId : ").append(datasetId);
+ builder.append(" PKHashValue : ").append(PKHashValue);
+ builder.append(" PKFieldCnt : ").append(PKFieldCnt);
+ builder.append(" PKSize: ").append(PKValueSize);
+ }
if (logType == LogType.UPDATE) {
builder.append(" PrevLSN : ").append(prevLSN);
builder.append(" ResourceId : ").append(resourceId);
builder.append(" ResourceType : ").append(resourceType);
- builder.append(" LogSize : ").append(logSize);
}
return builder.toString();
}
@@ -405,4 +496,25 @@
public void setLSN(long LSN) {
this.LSN = LSN;
}
+
+ @Override
+ public int getPKValueSize() {
+ return PKValueSize;
+ }
+
+ @Override
+ public ITupleReference getPKValue() {
+ return PKValue;
+ }
+
+ @Override
+ public void setPKFields(int[] primaryKeyFields) {
+ PKFields = primaryKeyFields;
+ PKFieldCnt = PKFields.length;
+ }
+
+ @Override
+ public void setPKValue(ITupleReference PKValue) {
+ this.PKValue = PKValue;
+ }
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
index 81a73d5..2ad3055 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -27,6 +27,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@@ -52,6 +53,7 @@
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
@@ -133,8 +135,10 @@
public void startRecovery(boolean synchronous) throws IOException, ACIDException {
int updateLogCount = 0;
- int commitLogCount = 0;
+ int entityCommitLogCount = 0;
+ int jobCommitLogCount = 0;
int redoCount = 0;
+ int jobId = -1;
state = SystemState.RECOVERING;
@@ -142,9 +146,12 @@
LOGGER.info("[RecoveryMgr] starting recovery ...");
}
- //winnerTxnTable is used to add pairs, <committed TxnId, the most recent commit Lsn of the TxnId>
- Map<TxnId, Long> winnerTxnTable = new HashMap<TxnId, Long>();
- TxnId tempKeyTxnId = new TxnId(-1, -1, -1);
+ Set<Integer> winnerJobSet = new HashSet<Integer>();
+ Map<Integer, Set<TxnId>> jobId2WinnerEntitiesMap = new HashMap<Integer, Set<TxnId>>();
+ //winnerEntity is used to add pairs, <committed TxnId, the most recent commit Lsn of the TxnId>
+ Set<TxnId> winnerEntitySet = null;
+ TxnId tempKeyTxnId = new TxnId(-1, -1, -1, null, -1, false);
+ TxnId winnerEntity = null;
//#. read checkpoint file and set lowWaterMark where anaylsis and redo start
CheckpointObject checkpointObject = readCheckpoint();
@@ -157,8 +164,6 @@
//-------------------------------------------------------------------------
// [ analysis phase ]
// - collect all committed Lsn
- // - if there are duplicate commits for the same TxnId,
- // keep only the mostRecentCommitLsn among the duplicates.
//-------------------------------------------------------------------------
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("[RecoveryMgr] in analysis phase");
@@ -176,23 +181,34 @@
if (logRecord.getJobId() > maxJobId) {
maxJobId = logRecord.getJobId();
}
- TxnId commitTxnId = null;
switch (logRecord.getLogType()) {
case LogType.UPDATE:
if (IS_DEBUG_MODE) {
updateLogCount++;
}
break;
-
case LogType.JOB_COMMIT:
- case LogType.ENTITY_COMMIT:
- commitTxnId = new TxnId(logRecord.getJobId(), logRecord.getDatasetId(), logRecord.getPKHashValue());
- winnerTxnTable.put(commitTxnId, logRecord.getLSN());
+ winnerJobSet.add(Integer.valueOf(logRecord.getJobId()));
+ jobId2WinnerEntitiesMap.remove(Integer.valueOf(logRecord.getJobId()));
if (IS_DEBUG_MODE) {
- commitLogCount++;
+ jobCommitLogCount++;
}
break;
-
+ case LogType.ENTITY_COMMIT:
+ jobId = logRecord.getJobId();
+ winnerEntity = new TxnId(jobId, logRecord.getDatasetId(), logRecord.getPKHashValue(),
+ logRecord.getPKValue(), logRecord.getPKValueSize(), true);
+ if (!jobId2WinnerEntitiesMap.containsKey(Integer.valueOf(jobId))) {
+ winnerEntitySet = new HashSet<TxnId>();
+ jobId2WinnerEntitiesMap.put(Integer.valueOf(jobId), winnerEntitySet);
+ } else {
+ winnerEntitySet = jobId2WinnerEntitiesMap.get(Integer.valueOf(jobId));
+ }
+ winnerEntitySet.add(winnerEntity);
+ if (IS_DEBUG_MODE) {
+ entityCommitLogCount++;
+ }
+ break;
default:
throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
}
@@ -203,55 +219,48 @@
// [ redo phase ]
// - redo if
// 1) The TxnId is committed && --> guarantee durability
- // 2) lsn < commitLog's Lsn && --> deal with a case of pkHashValue collision
- // 3) lsn > maxDiskLastLsn of the index --> guarantee idempotance
+ // 2) lsn > maxDiskLastLsn of the index --> guarantee idempotence
//-------------------------------------------------------------------------
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("[RecoveryMgr] in redo phase");
}
- //#. set log reader to the lowWaterMarkLsn again.
- logReader.initializeScan(lowWaterMarkLsn);
-
long resourceId;
long maxDiskLastLsn;
- long lsn = -1;
- long commitLsn = -1;
+ long LSN = -1;
ILSMIndex index = null;
LocalResource localResource = null;
ILocalResourceMetadata localResourceMetadata = null;
- Map<Long, Long> resourceId2MaxLsnMap = new HashMap<Long, Long>();
- TxnId jobLevelTxnId = new TxnId(-1, -1, -1);
- boolean foundWinnerTxn = false;
+ Map<Long, Long> resourceId2MaxLSNMap = new HashMap<Long, Long>();
+ boolean foundWinner = false;
//#. get indexLifeCycleManager
IAsterixAppRuntimeContextProvider appRuntimeContext = txnSubsystem.getAsterixAppRuntimeContextProvider();
IIndexLifecycleManager indexLifecycleManager = appRuntimeContext.getIndexLifecycleManager();
ILocalResourceRepository localResourceRepository = appRuntimeContext.getLocalResourceRepository();
+ //#. set log reader to the lowWaterMarkLsn again.
+ logReader.initializeScan(lowWaterMarkLsn);
logRecord = logReader.next();
while (logRecord != null) {
- lsn = logRecord.getLSN();
- foundWinnerTxn = false;
if (LogManager.IS_DEBUG_MODE) {
System.out.println(logRecord.getLogRecordForDisplay());
}
+ LSN = logRecord.getLSN();
+ jobId = logRecord.getJobId();
+ foundWinner = false;
switch (logRecord.getLogType()) {
case LogType.UPDATE:
- tempKeyTxnId.setTxnId(logRecord.getJobId(), logRecord.getDatasetId(), logRecord.getPKHashValue());
- jobLevelTxnId.setTxnId(logRecord.getJobId(), -1, -1);
- if (winnerTxnTable.containsKey(tempKeyTxnId)) {
- commitLsn = winnerTxnTable.get(tempKeyTxnId);
- if (lsn < commitLsn) {
- foundWinnerTxn = true;
- }
- } else if (winnerTxnTable.containsKey(jobLevelTxnId)) {
- commitLsn = winnerTxnTable.get(jobLevelTxnId);
- if (lsn < commitLsn) {
- foundWinnerTxn = true;
+ if (winnerJobSet.contains(Integer.valueOf(jobId))) {
+ foundWinner = true;
+ } else if (jobId2WinnerEntitiesMap.containsKey(Integer.valueOf(jobId))) {
+ winnerEntitySet = jobId2WinnerEntitiesMap.get(Integer.valueOf(jobId));
+ tempKeyTxnId.setTxnId(jobId, logRecord.getDatasetId(), logRecord.getPKHashValue(),
+ logRecord.getPKValue(), logRecord.getPKValueSize());
+ if (winnerEntitySet.contains(tempKeyTxnId)) {
+ foundWinner = true;
}
}
-
- if (foundWinnerTxn) {
+ if (foundWinner) {
resourceId = logRecord.getResourceId();
localResource = localResourceRepository.getResourceById(resourceId);
@@ -294,12 +303,12 @@
maxDiskLastLsn = abstractLSMIOCallback.getComponentLSN(index.getImmutableComponents());
//#. set resourceId and maxDiskLastLSN to the map
- resourceId2MaxLsnMap.put(resourceId, maxDiskLastLsn);
+ resourceId2MaxLSNMap.put(Long.valueOf(resourceId), Long.valueOf(maxDiskLastLsn));
} else {
- maxDiskLastLsn = resourceId2MaxLsnMap.get(resourceId);
+ maxDiskLastLsn = resourceId2MaxLSNMap.get(Long.valueOf(resourceId));
}
- if (lsn > maxDiskLastLsn) {
+ if (LSN > maxDiskLastLsn) {
redo(logRecord);
if (IS_DEBUG_MODE) {
redoCount++;
@@ -316,12 +325,11 @@
default:
throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
}
-
logRecord = logReader.next();
}
//close all indexes
- Set<Long> resourceIdList = resourceId2MaxLsnMap.keySet();
+ Set<Long> resourceIdList = resourceId2MaxLSNMap.keySet();
for (long r : resourceIdList) {
indexLifecycleManager.close(r);
}
@@ -332,8 +340,8 @@
LOGGER.info("[RecoveryMgr] recovery is completed.");
}
if (IS_DEBUG_MODE) {
- System.out.println("[RecoveryMgr] Count: Update/Commit/Redo = " + updateLogCount + "/" + commitLogCount
- + "/" + redoCount);
+ System.out.println("[RecoveryMgr] Count: Update/EntityCommit/JobCommit/Redo = " + updateLogCount + "/"
+ + entityCommitLogCount + "/" + jobCommitLogCount + "/" + redoCount);
}
}
@@ -533,15 +541,18 @@
@Override
public void rollbackTransaction(ITransactionContext txnContext) throws ACIDException {
Map<TxnId, List<Long>> loserTxnTable = new HashMap<TxnId, List<Long>>();
- TxnId tempKeyTxnId = new TxnId(-1, -1, -1);
+ TxnId tempKeyTxnId = new TxnId(-1, -1, -1, null, -1, false);
int updateLogCount = 0;
- int commitLogCount = 0;
+ int entityCommitLogCount = 0;
+ int jobId = -1;
+ int abortedJobId = txnContext.getJobId().getId();
+ long currentLSN = -1;
+ TxnId loserEntity = null;
- // Obtain the first log record written by the Job
+ // Obtain the first/last log record LSNs written by the Job
long firstLSN = txnContext.getFirstLSN();
long lastLSN = txnContext.getLastLSN();
- //TODO: make sure that the lastLsn is not updated anymore by another thread belonging to the same job.
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info(" rollbacking transaction log records from " + firstLSN + " to " + lastLSN);
}
@@ -559,62 +570,62 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info(" collecting loser transaction's LSNs from " + firstLSN + " to " + lastLSN);
}
- boolean reachedLastLog = false;
List<Long> undoLSNSet = null;
ILogReader logReader = logMgr.getLogReader(false);
logReader.initializeScan(firstLSN);
- ILogRecord logRecord = logReader.next();
- while (logRecord != null) {
- if (IS_DEBUG_MODE) {
- System.out.println(logRecord.getLogRecordForDisplay());
+ ILogRecord logRecord = null;
+ while (currentLSN < lastLSN) {
+ logRecord = logReader.next();
+ if (logRecord == null) {
+ break;
+ } else {
+ if (IS_DEBUG_MODE) {
+ System.out.println(logRecord.getLogRecordForDisplay());
+ }
+ currentLSN = logRecord.getLSN();
}
-
- tempKeyTxnId.setTxnId(logRecord.getJobId(), logRecord.getDatasetId(), logRecord.getPKHashValue());
+ jobId = logRecord.getJobId();
+ if (jobId != abortedJobId) {
+ continue;
+ }
+ tempKeyTxnId.setTxnId(jobId, logRecord.getDatasetId(), logRecord.getPKHashValue(), logRecord.getPKValue(),
+ logRecord.getPKValueSize());
switch (logRecord.getLogType()) {
case LogType.UPDATE:
undoLSNSet = loserTxnTable.get(tempKeyTxnId);
if (undoLSNSet == null) {
- TxnId txnId = new TxnId(logRecord.getJobId(), logRecord.getDatasetId(),
- logRecord.getPKHashValue());
+ loserEntity = new TxnId(jobId, logRecord.getDatasetId(), logRecord.getPKHashValue(),
+ logRecord.getPKValue(), logRecord.getPKValueSize(), true);
undoLSNSet = new LinkedList<Long>();
- loserTxnTable.put(txnId, undoLSNSet);
+ loserTxnTable.put(loserEntity, undoLSNSet);
}
- undoLSNSet.add(logRecord.getLSN());
+ undoLSNSet.add(Long.valueOf(currentLSN));
if (IS_DEBUG_MODE) {
updateLogCount++;
- System.out.println("" + Thread.currentThread().getId() + "======> update[" + logRecord.getLSN()
- + "]:" + tempKeyTxnId);
+ System.out.println("" + Thread.currentThread().getId() + "======> update[" + currentLSN + "]:"
+ + tempKeyTxnId);
}
break;
case LogType.JOB_COMMIT:
+ throw new ACIDException("Unexpected LogType(" + logRecord.getLogType() + ") during abort.");
+
case LogType.ENTITY_COMMIT:
- undoLSNSet = loserTxnTable.get(tempKeyTxnId);
- if (undoLSNSet != null) {
- loserTxnTable.remove(tempKeyTxnId);
- }
+ loserTxnTable.remove(tempKeyTxnId);
if (IS_DEBUG_MODE) {
- commitLogCount++;
- System.out.println("" + Thread.currentThread().getId() + "======> commit[" + logRecord.getLSN()
- + "]" + tempKeyTxnId);
+ entityCommitLogCount++;
+ System.out.println("" + Thread.currentThread().getId() + "======> entity_commit[" + currentLSN + "]"
+ + tempKeyTxnId);
}
break;
default:
throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
}
- if (logRecord.getLSN() == lastLSN) {
- reachedLastLog = true;
- break;
- } else if (logRecord.getLSN() > lastLSN) {
- throw new IllegalStateException("LastLSN mismatch");
- }
- logRecord = logReader.next();
}
-
- if (!reachedLastLog) {
- throw new ACIDException("LastLSN mismatch: " + lastLSN + " vs " + logRecord.getLSN()
- + " during Rollback a transaction( " + txnContext.getJobId() + ")");
+ if (currentLSN != lastLSN) {
+ throw new ACIDException("LastLSN mismatch: lastLSN(" + lastLSN + ") vs currentLSN(" + currentLSN
+ + ") during abort( " + txnContext.getJobId() + ")");
}
//undo loserTxn's effect
@@ -622,7 +633,6 @@
LOGGER.info(" undoing loser transaction's effect");
}
- TxnId txnId = null;
Iterator<Entry<TxnId, List<Long>>> iter = loserTxnTable.entrySet().iterator();
int undoCount = 0;
while (iter.hasNext()) {
@@ -630,16 +640,15 @@
//Sort the lsns in order to undo in one pass.
Map.Entry<TxnId, List<Long>> loserTxn = (Map.Entry<TxnId, List<Long>>) iter.next();
- txnId = loserTxn.getKey();
-
undoLSNSet = loserTxn.getValue();
for (long undoLSN : undoLSNSet) {
- // here, all the log records are UPDATE type. So, we don't need to check the type again.
-
+ //here, all the log records are UPDATE type. So, we don't need to check the type again.
//read the corresponding log record to be undone.
logRecord = logReader.read(undoLSN);
- assert logRecord != null;
+ if (logRecord == null) {
+ throw new ACIDException("IllegalState exception during abort( " + txnContext.getJobId() + ")");
+ }
if (IS_DEBUG_MODE) {
System.out.println(logRecord.getLogRecordForDisplay());
}
@@ -656,8 +665,8 @@
LOGGER.info(" undone loser transaction's effect");
}
if (IS_DEBUG_MODE) {
- System.out.println("UpdateLogCount/CommitLogCount/UndoCount:" + updateLogCount + "/" + commitLogCount + "/"
- + undoCount);
+ System.out.println("UpdateLogCount/CommitLogCount/UndoCount:" + updateLogCount + "/" + entityCommitLogCount
+ + "/" + undoCount);
}
}
@@ -720,36 +729,53 @@
}
class TxnId {
+ public boolean isByteArrayPKValue;
public int jobId;
public int datasetId;
- public int pkHashVal;
+ public int pkHashValue;
+ public int pkSize;
+ public byte[] byteArrayPKValue;
+ public ITupleReference tupleReferencePKValue;
- public TxnId(int jobId, int datasetId, int pkHashVal) {
+ public TxnId(int jobId, int datasetId, int pkHashValue, ITupleReference pkValue, int pkSize,
+ boolean isByteArrayPKValue) {
this.jobId = jobId;
this.datasetId = datasetId;
- this.pkHashVal = pkHashVal;
+ this.pkHashValue = pkHashValue;
+ this.pkSize = pkSize;
+ this.isByteArrayPKValue = isByteArrayPKValue;
+ if (isByteArrayPKValue) {
+ this.byteArrayPKValue = new byte[pkSize];
+ readPKValueIntoByteArray(pkValue, pkSize, byteArrayPKValue);
+ } else {
+ this.tupleReferencePKValue = pkValue;
+ }
}
- public void setTxnId(int jobId, int datasetId, int pkHashVal) {
+ private void readPKValueIntoByteArray(ITupleReference pkValue, int pkSize, byte[] byteArrayPKValue) {
+ int readOffset = pkValue.getFieldStart(0);
+ byte[] readBuffer = pkValue.getFieldData(0);
+ for (int i = 0; i < pkSize; i++) {
+ byteArrayPKValue[i] = readBuffer[readOffset + i];
+ }
+ }
+
+ public void setTxnId(int jobId, int datasetId, int pkHashValue, ITupleReference pkValue, int pkSize) {
this.jobId = jobId;
this.datasetId = datasetId;
- this.pkHashVal = pkHashVal;
- }
-
- public void setTxnId(TxnId txnId) {
- this.jobId = txnId.jobId;
- this.datasetId = txnId.datasetId;
- this.pkHashVal = txnId.pkHashVal;
+ this.pkHashValue = pkHashValue;
+ this.tupleReferencePKValue = pkValue;
+ isByteArrayPKValue = false;
}
@Override
public String toString() {
- return "[" + jobId + "," + datasetId + "," + pkHashVal + "]";
+ return "[" + jobId + "," + datasetId + "," + pkHashValue + "," + pkSize + "]";
}
@Override
public int hashCode() {
- return pkHashVal;
+ return pkHashValue;
}
@Override
@@ -761,7 +787,52 @@
return false;
}
TxnId txnId = (TxnId) o;
+ return (txnId.pkHashValue == pkHashValue && txnId.datasetId == datasetId && txnId.jobId == jobId
+ && pkSize == txnId.pkSize && isEqualTo(txnId));
+ }
- return (txnId.pkHashVal == pkHashVal && txnId.datasetId == datasetId && txnId.jobId == jobId);
+ private boolean isEqualTo(TxnId txnId) {
+ if (isByteArrayPKValue && txnId.isByteArrayPKValue) {
+ return isEqual(byteArrayPKValue, txnId.byteArrayPKValue, pkSize);
+ } else if (isByteArrayPKValue && (!txnId.isByteArrayPKValue)) {
+ return isEqual(byteArrayPKValue, txnId.tupleReferencePKValue, pkSize);
+ } else if ((!isByteArrayPKValue) && txnId.isByteArrayPKValue) {
+ return isEqual(txnId.byteArrayPKValue, tupleReferencePKValue, pkSize);
+ } else {
+ return isEqual(tupleReferencePKValue, txnId.tupleReferencePKValue, pkSize);
+ }
+ }
+
+ private boolean isEqual(byte[] a, byte[] b, int size) {
+ for (int i = 0; i < size; i++) {
+ if (a[i] != b[i]) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean isEqual(byte[] a, ITupleReference b, int size) {
+ int readOffset = b.getFieldStart(0);
+ byte[] readBuffer = b.getFieldData(0);
+ for (int i = 0; i < size; i++) {
+ if (a[i] != readBuffer[readOffset + i]) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean isEqual(ITupleReference a, ITupleReference b, int size) {
+ int aOffset = a.getFieldStart(0);
+ byte[] aBuffer = a.getFieldData(0);
+ int bOffset = b.getFieldStart(0);
+ byte[] bBuffer = b.getFieldData(0);
+ for (int i = 0; i < size; i++) {
+ if (aBuffer[aOffset + i] != bBuffer[bOffset + i]) {
+ return false;
+ }
+ }
+ return true;
}
}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
index 01bce83..a83604c 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -29,7 +29,6 @@
import edu.uci.ics.asterix.common.transactions.ITransactionManager;
import edu.uci.ics.asterix.common.transactions.JobId;
import edu.uci.ics.asterix.transaction.management.service.logging.LogRecord;
-import edu.uci.ics.asterix.transaction.management.service.logging.LogType;
import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
/**
@@ -60,7 +59,7 @@
if (LOGGER.isLoggable(Level.SEVERE)) {
LOGGER.severe(msg);
}
- throw new Error(msg, ae);
+ throw new ACIDException(msg, ae);
} finally {
txnSubsystem.getLockManager().releaseLocks(txnCtx);
transactionContextRepository.remove(txnCtx.getJobId());
@@ -90,20 +89,11 @@
@Override
public void commitTransaction(ITransactionContext txnCtx, DatasetId datasetId, int PKHashVal) throws ACIDException {
- //There is either job-level commit or entity-level commit.
- //The job-level commit will have -1 value both for datasetId and PKHashVal.
-
- //for entity-level commit
- if (PKHashVal != -1) {
- txnSubsystem.getLockManager().unlock(datasetId, PKHashVal, txnCtx, true);
- return;
- }
-
- //for job-level commit
+ //Only job-level commits call this method.
try {
if (txnCtx.isWriteTxn()) {
LogRecord logRecord = ((TransactionContext) txnCtx).getLogRecord();
- logRecord.formCommitLogRecord(txnCtx, LogType.JOB_COMMIT, txnCtx.getJobId().getId(), -1, -1);
+ logRecord.formJobCommitLogRecord(txnCtx);
txnSubsystem.getLogManager().log(logRecord);
}
} catch (Exception ae) {