changes to fix issues, 621 and 622
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java
index a7c2fdb..6dd11bd 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java
@@ -19,10 +19,11 @@
 
 import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
 import edu.uci.ics.asterix.common.exceptions.ACIDException;
-import edu.uci.ics.asterix.common.transactions.DatasetId;
+import edu.uci.ics.asterix.common.transactions.ILogManager;
 import edu.uci.ics.asterix.common.transactions.ITransactionContext;
 import edu.uci.ics.asterix.common.transactions.ITransactionManager;
 import edu.uci.ics.asterix.common.transactions.JobId;
+import edu.uci.ics.asterix.transaction.management.service.logging.LogRecord;
 import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
@@ -39,11 +40,13 @@
 
     private final IHyracksTaskContext hyracksTaskCtx;
     private final ITransactionManager transactionManager;
+    private final ILogManager logMgr;
     private final JobId jobId;
-    private final DatasetId datasetId;
+    private final int datasetId;
     private final int[] primaryKeyFields;
     private final boolean isWriteTransaction;
     private final long[] longHashes;
+    private final LogRecord logRecord;
 
     private ITransactionContext transactionContext;
     private RecordDescriptor inputRecordDesc;
@@ -56,12 +59,14 @@
         IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
                 .getApplicationContext().getApplicationObject();
         this.transactionManager = runtimeCtx.getTransactionSubsystem().getTransactionManager();
+        this.logMgr = runtimeCtx.getTransactionSubsystem().getLogManager();
         this.jobId = jobId;
-        this.datasetId = new DatasetId(datasetId);
+        this.datasetId = datasetId;
         this.primaryKeyFields = primaryKeyFields;
         this.frameTupleReference = new FrameTupleReference();
         this.isWriteTransaction = isWriteTransaction;
         this.longHashes = new long[2];
+        this.logRecord = new LogRecord();
     }
 
     @Override
@@ -82,11 +87,9 @@
         for (int t = 0; t < nTuple; t++) {
             frameTupleReference.reset(frameTupleAccessor, t);
             pkHash = computePrimaryKeyHashValue(frameTupleReference, primaryKeyFields);
-            try {
-                transactionManager.commitTransaction(transactionContext, datasetId, pkHash);
-            } catch (ACIDException e) {
-                throw new HyracksDataException(e);
-            }
+            logRecord.formEntityCommitLogRecord(transactionContext, datasetId, pkHash, frameTupleReference,
+                    primaryKeyFields);
+            logMgr.log(logRecord);
         }
     }
 
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILockManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILockManager.java
index 54c86af..a752afa 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILockManager.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILockManager.java
@@ -70,17 +70,6 @@
             throws ACIDException;
 
     /**
-     * @param datasetId
-     * @param entityHashValue
-     * @param txnContext
-     * @throws ACIDException
-     *             TODO
-     * @return 
-     */
-    public void unlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext, boolean commitFlag)
-            throws ACIDException;
-
-    /**
      * Call to lock and unlock a specific resource in a specific lock mode
      * 
      * @param datasetId
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogRecord.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogRecord.java
index d810ebd..d13ef6c 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogRecord.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogRecord.java
@@ -20,16 +20,18 @@
 
 public interface ILogRecord {
 
-    public static final int COMMIT_LOG_SIZE = 21;
-    public static final int UPDATE_LOG_BASE_SIZE = 56;
+    public static final int JOB_COMMIT_LOG_SIZE = 13;
+    public static final int ENTITY_COMMIT_LOG_BASE_SIZE = 29;
+    public static final int UPDATE_LOG_BASE_SIZE = 64;
 
     public boolean readLogRecord(ByteBuffer buffer);
 
     public void writeLogRecord(ByteBuffer buffer);
-    
-    public void formCommitLogRecord(ITransactionContext txnCtx, byte logType, int jobId, int datasetId, int PKHashValue);
 
-    public void setUpdateLogSize();
+    public void formJobCommitLogRecord(ITransactionContext txnCtx);
+
+    public void formEntityCommitLogRecord(ITransactionContext txnCtx, int datasetId, int PKHashValue,
+            ITupleReference tupleReference, int[] primaryKeyFields);
 
     public ITransactionContext getTxnCtx();
 
@@ -98,11 +100,23 @@
     public long getChecksum();
 
     public void setChecksum(long checksum);
-    
+
     public long getLSN();
 
     public void setLSN(long LSN);
 
     public String getLogRecordForDisplay();
 
+    public void computeAndSetLogSize();
+
+    public int getPKValueSize();
+
+    public ITupleReference getPKValue();
+
+    public void setPKFields(int[] primaryKeyFields);
+
+    public void computeAndSetPKValueSize();
+
+    public void setPKValue(ITupleReference PKValue);
+
 }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
index dac3e95..031a26e 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
@@ -58,6 +58,9 @@
     protected void log(int PKHash, ITupleReference newValue, IndexOperation oldOp, ITupleReference oldValue)
             throws ACIDException {
         logRecord.setPKHashValue(PKHash);
+        logRecord.setPKFields(primaryKeyFields);
+        logRecord.setPKValue(newValue);
+        logRecord.computeAndSetPKValueSize();
         if (newValue != null) {
             logRecord.setNewValueSize(tupleWriter.bytesRequired(newValue));
             logRecord.setNewValue(newValue);
@@ -73,7 +76,7 @@
                 logRecord.setOldValueSize(0);
             }
         }
-        logRecord.setUpdateLogSize();
+        logRecord.computeAndSetLogSize();
         txnSubsystem.getLogManager().log(logRecord);
     }
 }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
index e0baa3f..6d86f70 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
@@ -29,7 +29,6 @@
 import edu.uci.ics.asterix.common.exceptions.ACIDException;
 import edu.uci.ics.asterix.common.transactions.DatasetId;
 import edu.uci.ics.asterix.common.transactions.ILockManager;
-import edu.uci.ics.asterix.common.transactions.ILogRecord;
 import edu.uci.ics.asterix.common.transactions.ITransactionContext;
 import edu.uci.ics.asterix.common.transactions.ITransactionManager;
 import edu.uci.ics.asterix.common.transactions.JobId;
@@ -86,7 +85,6 @@
 
     private LockRequestTracker lockRequestTracker; //for debugging
     private ConsecutiveWakeupContext consecutiveWakeupContext;
-    private final ILogRecord logRecord;
 
     public LockManager(TransactionSubsystem txnSubsystem) throws ACIDException {
         this.txnSubsystem = txnSubsystem;
@@ -104,7 +102,6 @@
         this.tempDatasetIdObj = new DatasetId(0);
         this.tempJobIdObj = new JobId(0);
         this.consecutiveWakeupContext = new ConsecutiveWakeupContext();
-        this.logRecord = new LogRecord();
         if (IS_DEBUG_MODE) {
             this.lockRequestTracker = new LockRequestTracker();
         }
@@ -642,22 +639,16 @@
 
     @Override
     public void unlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext) throws ACIDException {
-        internalUnlock(datasetId, entityHashValue, txnContext, false, false);
-    }
-
-    @Override
-    public void unlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext, boolean commitFlag)
-            throws ACIDException {
-        internalUnlock(datasetId, entityHashValue, txnContext, false, commitFlag);
+        internalUnlock(datasetId, entityHashValue, txnContext, false);
     }
 
     private void instantUnlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext)
             throws ACIDException {
-        internalUnlock(datasetId, entityHashValue, txnContext, true, false);
+        internalUnlock(datasetId, entityHashValue, txnContext, true);
     }
 
     private void internalUnlock(DatasetId datasetId, int entityHashValue, ITransactionContext txnContext,
-            boolean isInstant, boolean commitFlag) throws ACIDException {
+            boolean isInstant) throws ACIDException {
         JobId jobId = txnContext.getJobId();
         int eLockInfo = -1;
         DatasetLockInfo dLockInfo = null;
@@ -701,22 +692,6 @@
                         + "," + entityHashValue + "]: Corresponding lock info doesn't exist.");
             }
 
-            //////////////////////////////////////////////////////////
-            //[Notice]
-            //If both EntityLockCount and DatasetLockCount are 1, 
-            //then write entity-commit log and return without releasing the lock.
-            //The lock will be released when the entity-commit log is flushed. 
-            if (commitFlag && entityInfoManager.getEntityLockCount(entityInfo) == 1
-                    && entityInfoManager.getDatasetLockCount(entityInfo) == 1) {
-                if (txnContext.isWriteTxn()) {
-                    logRecord.formCommitLogRecord(txnContext, LogType.ENTITY_COMMIT, jobId.getId(), datasetId.getId(),
-                            entityHashValue);
-                    txnSubsystem.getLogManager().log(logRecord);
-                }
-                return;
-            }
-            //////////////////////////////////////////////////////////
-
             datasetLockMode = entityInfoManager.getDatasetLockMode(entityInfo) == LockMode.S ? LockMode.IS
                     : LockMode.IX;
 
@@ -758,11 +733,6 @@
                     waiterObjId = waiterObj.getNextWaiterObjId();
                 }
                 if (threadCount == 0) {
-                    if (entityInfoManager.getEntityLockMode(entityInfo) == LockMode.X) {
-                        //TODO
-                        //write a commit log for the unlocked resource
-                        //need to figure out that instantLock() also needs to write a commit log. 
-                    }
                     entityInfoManager.deallocate(entityInfo);
                 }
             }
@@ -2243,17 +2213,11 @@
                     tempDatasetIdObj.setId(logRecord.getDatasetId());
                     tempJobIdObj.setId(logRecord.getJobId());
                     txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(tempJobIdObj);
-                    if (txnCtx == null) {
-                        throw new IllegalStateException("TransactionContext[" + tempJobIdObj + "] doesn't exist.");
-                    }
                     unlock(tempDatasetIdObj, logRecord.getPKHashValue(), txnCtx);
                     txnCtx.notifyOptracker(false);
                 } else if (logRecord.getLogType() == LogType.JOB_COMMIT) {
                     tempJobIdObj.setId(logRecord.getJobId());
                     txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(tempJobIdObj);
-                    if (txnCtx == null) {
-                        throw new IllegalStateException("TransactionContext[" + tempJobIdObj + "] doesn't exist.");
-                    }
                     txnCtx.notifyOptracker(true);
                     ((LogPage) logPage).notifyJobCommitter();
                 }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
index 7fce48d..ad9ff2a 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
@@ -365,7 +365,7 @@
 }
 
 class LogFlusher extends Thread {
-    private final static LogPage POISON_PILL = new LogPage(null, ILogRecord.COMMIT_LOG_SIZE, null);
+    private final static LogPage POISON_PILL = new LogPage(null, ILogRecord.JOB_COMMIT_LOG_SIZE, null);
     private final LogManager logMgr;//for debugging
     private final LinkedBlockingQueue<LogPage> emptyQ;
     private final LinkedBlockingQueue<LogPage> flushQ;
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
index 45c3e65..b954c6e 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
@@ -58,7 +58,7 @@
         appendOffset = 0;
         flushOffset = 0;
         isLastPage = false;
-        syncCommitQ = new LinkedBlockingQueue<ILogRecord>(logPageSize / ILogRecord.COMMIT_LOG_SIZE);
+        syncCommitQ = new LinkedBlockingQueue<ILogRecord>(logPageSize / ILogRecord.JOB_COMMIT_LOG_SIZE);
     }
 
     ////////////////////////////////////
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogReader.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogReader.java
index 173088c..9dc966c 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogReader.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogReader.java
@@ -121,7 +121,7 @@
         readBuffer.limit(logPageSize);
         try {
             fileChannel.position(readLSN % logFileSize);
-            size = fileChannel.read(readBuffer, logPageSize);
+            size = fileChannel.read(readBuffer);
         } catch (IOException e) {
             throw new ACIDException(e);
         }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecord.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecord.java
index 380e524..4b0e1f2 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecord.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecord.java
@@ -30,13 +30,18 @@
 /*
  * == LogRecordFormat ==
  * ---------------------------
- * [Header1] (13 bytes) : for all log types
+ * [Header1] (5 bytes) : for all log types
  * LogType(1)
  * JobId(4)
+ * ---------------------------
+ * [Header2] (16 bytes + PKValueSize) : for entity_commit and update log types 
  * DatasetId(4) //stored in dataset_dataset in Metadata Node
  * PKHashValue(4)
+ * PKFieldCnt(4)
+ * PKValueSize(4)
+ * PKValue(PKValueSize)
  * ---------------------------
- * [Header2] (21 bytes) : only for update log type
+ * [Header3] (21 bytes) : only for update log type
  * PrevLSN(8)
  * ResourceId(8) //stored in .metadata of the corresponding index in NC node
  * ResourceType(1)
@@ -45,18 +50,21 @@
  * [Body] (Variable size) : only for update log type
  * FieldCnt(4)
  * NewOp(1)
- * NewValueLength(4)
- * NewValue(NewValueLength)
+ * NewValueSize(4)
+ * NewValue(NewValueSize)
  * OldOp(1)
- * OldValueLength(4)
- * OldValue(OldValueLength)
+ * OldValueSize(4)
+ * OldValue(OldValueSize)
  * ---------------------------
  * [Tail] (8 bytes) : for all log types
  * Checksum(8)
  * ---------------------------
  * = LogSize =
- * 1) JOB_COMMIT and ENTITY_COMMIT: 21 bytes
- * 2) UPDATE: 56 + old and new value size (13 + 21 + 14 + old and newValueSize + 8)
+ * 1) JOB_COMMIT_LOG_SIZE: 13 bytes (5 + 8)
+ * 2) ENTITY_COMMIT: 29 + PKSize (5 + 16 + PKSize + 8)
+ *    --> ENTITY_COMMIT_LOG_BASE_SIZE = 29
+ * 3) UPDATE: 64 + PKSize + New/OldValueSize (5 + 16 + PKSize + 21 + 14 + New/OldValueSize + 8)
+ *    --> UPDATE_LOG_BASE_SIZE = 64
  */
 public class LogRecord implements ILogRecord {
 
@@ -65,6 +73,9 @@
     private int jobId;
     private int datasetId;
     private int PKHashValue;
+    private int PKFieldCnt;
+    private int PKValueSize;
+    private ITupleReference PKValue;
     private long prevLSN;
     private long resourceId;
     private byte resourceType;
@@ -84,13 +95,18 @@
     private long LSN;
     private final AtomicBoolean isFlushed;
     private final SimpleTupleWriter tupleWriter;
-    private final SimpleTupleReference newTuple;
+    private final SimpleTupleReference readPKValue;
+    private final SimpleTupleReference readNewValue;
+    private final SimpleTupleReference readOldValue;
     private final CRC32 checksumGen;
+    private int[] PKFields;
 
     public LogRecord() {
         isFlushed = new AtomicBoolean(false);
         tupleWriter = new SimpleTupleWriter();
-        newTuple = (SimpleTupleReference) tupleWriter.createTupleReference();
+        readPKValue = (SimpleTupleReference) tupleWriter.createTupleReference();
+        readNewValue = (SimpleTupleReference) tupleWriter.createTupleReference();
+        readOldValue = (SimpleTupleReference) tupleWriter.createTupleReference();
         checksumGen = new CRC32();
     }
 
@@ -99,8 +115,16 @@
         int beginOffset = buffer.position();
         buffer.put(logType);
         buffer.putInt(jobId);
-        buffer.putInt(datasetId);
-        buffer.putInt(PKHashValue);
+        if (logType != LogType.JOB_COMMIT) {
+            buffer.putInt(datasetId);
+            buffer.putInt(PKHashValue);
+            buffer.putInt(PKFieldCnt);
+            if (PKValueSize <= 0) {
+                throw new IllegalStateException("Primary Key Size is less than or equal to 0");
+            }
+            buffer.putInt(PKValueSize);
+            writePKValue(buffer);
+        }
         if (logType == LogType.UPDATE) {
             buffer.putLong(prevLSN);
             buffer.putLong(resourceId);
@@ -124,8 +148,16 @@
         buffer.putLong(checksum);
     }
 
+    private void writePKValue(ByteBuffer buffer) {
+        int i;
+        for (i = 0; i < PKFieldCnt; i++) {
+            buffer.put(PKValue.getFieldData(0), PKValue.getFieldStart(PKFields[i]), PKValue.getFieldLength(PKFields[i]));
+        }
+    }
+
     private void writeTuple(ByteBuffer buffer, ITupleReference tuple, int size) {
         tupleWriter.writeTuple(tuple, buffer.array(), buffer.position());
+        //writeTuple() doesn't change the position of the buffer. 
         buffer.position(buffer.position() + size);
     }
 
@@ -141,8 +173,19 @@
         try {
             logType = buffer.get();
             jobId = buffer.getInt();
-            datasetId = buffer.getInt();
-            PKHashValue = buffer.getInt();
+            if (logType == LogType.JOB_COMMIT) {
+                datasetId = -1;
+                PKHashValue = -1;
+            } else {
+                datasetId = buffer.getInt();    
+                PKHashValue = buffer.getInt();
+                PKFieldCnt = buffer.getInt();
+                PKValueSize = buffer.getInt();
+                if (PKValueSize <= 0) {
+                    throw new IllegalStateException("Primary Key Size is less than or equal to 0");
+                }
+                PKValue = readPKValue(buffer);
+            }
             if (logType == LogType.UPDATE) {
                 prevLSN = buffer.getLong();
                 resourceId = buffer.getLong();
@@ -151,18 +194,18 @@
                 fieldCnt = buffer.getInt();
                 newOp = buffer.get();
                 newValueSize = buffer.getInt();
-                newValue = readTuple(buffer, newValueSize);
+                newValue = readTuple(buffer, readNewValue, fieldCnt, newValueSize);
                 if (resourceType == ResourceType.LSM_BTREE) {
                     oldOp = buffer.get();
                     if (oldOp != (byte) (IndexOperation.NOOP.ordinal())) {
                         oldValueSize = buffer.getInt();
                         if (oldValueSize > 0) {
-                            oldValue = readTuple(buffer, oldValueSize);
+                            oldValue = readTuple(buffer, readOldValue, fieldCnt, oldValueSize);
                         }
                     }
                 }
             } else {
-                logSize = COMMIT_LOG_SIZE;
+                computeAndSetLogSize();
             }
             checksum = buffer.getLong();
             if (checksum != generateChecksum(buffer, beginOffset, logSize - CHECKSUM_SIZE)) {
@@ -174,27 +217,54 @@
         }
         return true;
     }
+    
+    private ITupleReference readPKValue(ByteBuffer buffer) {
+        return readTuple(buffer, readPKValue, PKFieldCnt, PKValueSize);
+    }
 
-    private ITupleReference readTuple(ByteBuffer buffer, int size) {
-        newTuple.setFieldCount(fieldCnt);
-        newTuple.resetByTupleOffset(buffer, buffer.position());
-        buffer.position(buffer.position() + size);
-        return newTuple;
+    private ITupleReference readTuple(ByteBuffer srcBuffer, SimpleTupleReference destTuple, int fieldCnt, int size) {
+        destTuple.setFieldCount(fieldCnt);
+        destTuple.resetByTupleOffset(srcBuffer, srcBuffer.position());
+        srcBuffer.position(srcBuffer.position() + size);
+        return destTuple;
     }
 
     @Override
-    public void formCommitLogRecord(ITransactionContext txnCtx, byte logType, int jobId, int datasetId, int PKHashValue) {
+    public void formJobCommitLogRecord(ITransactionContext txnCtx) {
         this.txnCtx = txnCtx;
-        this.logType = logType;
-        this.jobId = jobId;
+        this.logType = LogType.JOB_COMMIT;
+        this.jobId = txnCtx.getJobId().getId();
+        this.datasetId = -1;
+        this.PKHashValue = -1;
+        computeAndSetLogSize();
+    }
+
+    @Override
+    public void formEntityCommitLogRecord(ITransactionContext txnCtx, int datasetId, int PKHashValue,
+            ITupleReference PKValue, int[] PKFields) {
+        this.txnCtx = txnCtx;
+        this.logType = LogType.ENTITY_COMMIT;
+        this.jobId = txnCtx.getJobId().getId();
         this.datasetId = datasetId;
         this.PKHashValue = PKHashValue;
-        this.logSize = COMMIT_LOG_SIZE;
+        this.PKFieldCnt = PKFields.length;
+        this.PKValue = PKValue;
+        this.PKFields = PKFields;
+        computeAndSetPKValueSize();
+        computeAndSetLogSize();
     }
 
     @Override
-    public void setUpdateLogSize() {
-        logSize = UPDATE_LOG_BASE_SIZE + newValueSize + oldValueSize;
+    public void computeAndSetPKValueSize() {
+        int i;
+        PKValueSize = 0;
+        for (i = 0; i < PKFieldCnt; i++) {
+            PKValueSize += PKValue.getFieldLength(PKFields[i]);
+        }
+    }
+
+    private void setUpdateLogSize() {
+        logSize = UPDATE_LOG_BASE_SIZE + PKValueSize + newValueSize + oldValueSize;
         if (resourceType != ResourceType.LSM_BTREE) {
             logSize -= 5; //oldOp(byte: 1) + oldValueLength(int: 4)
         } else {
@@ -205,18 +275,39 @@
     }
 
     @Override
+    public void computeAndSetLogSize() {
+        switch (logType) {
+            case LogType.UPDATE:
+                setUpdateLogSize();
+                break;
+            case LogType.JOB_COMMIT:
+                logSize = JOB_COMMIT_LOG_SIZE;
+                break;
+            case LogType.ENTITY_COMMIT:
+                logSize = ENTITY_COMMIT_LOG_BASE_SIZE + PKValueSize;
+                break;
+            default:
+                throw new IllegalStateException("Unsupported Log Type");
+        }
+    }
+
+    @Override
     public String getLogRecordForDisplay() {
         StringBuilder builder = new StringBuilder();
         builder.append(" LSN : ").append(LSN);
         builder.append(" LogType : ").append(LogType.toString(logType));
+        builder.append(" LogSize : ").append(logSize);
         builder.append(" JobId : ").append(jobId);
-        builder.append(" DatasetId : ").append(datasetId);
-        builder.append(" PKHashValue : ").append(PKHashValue);
+        if (logType != LogType.JOB_COMMIT) {
+            builder.append(" DatasetId : ").append(datasetId);
+            builder.append(" PKHashValue : ").append(PKHashValue);
+            builder.append(" PKFieldCnt : ").append(PKFieldCnt);
+            builder.append(" PKSize: ").append(PKValueSize);
+        }
         if (logType == LogType.UPDATE) {
             builder.append(" PrevLSN : ").append(prevLSN);
             builder.append(" ResourceId : ").append(resourceId);
             builder.append(" ResourceType : ").append(resourceType);
-            builder.append(" LogSize : ").append(logSize);
         }
         return builder.toString();
     }
@@ -405,4 +496,25 @@
     public void setLSN(long LSN) {
         this.LSN = LSN;
     }
+    
+    @Override
+    public int getPKValueSize() {
+        return PKValueSize;
+    }
+    
+    @Override
+    public ITupleReference getPKValue() {
+        return PKValue;
+    }
+    
+    @Override
+    public void setPKFields(int[] primaryKeyFields) {
+        PKFields = primaryKeyFields;
+        PKFieldCnt = PKFields.length;
+    }
+
+    @Override
+    public void setPKValue(ITupleReference PKValue) {
+        this.PKValue = PKValue;
+    }
 }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
index 81a73d5..2ad3055 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -27,6 +27,7 @@
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
@@ -52,6 +53,7 @@
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
@@ -133,8 +135,10 @@
     public void startRecovery(boolean synchronous) throws IOException, ACIDException {
 
         int updateLogCount = 0;
-        int commitLogCount = 0;
+        int entityCommitLogCount = 0;
+        int jobCommitLogCount = 0;
         int redoCount = 0;
+        int jobId = -1;
 
         state = SystemState.RECOVERING;
 
@@ -142,9 +146,12 @@
             LOGGER.info("[RecoveryMgr] starting recovery ...");
         }
 
-        //winnerTxnTable is used to add pairs, <committed TxnId, the most recent commit Lsn of the TxnId>
-        Map<TxnId, Long> winnerTxnTable = new HashMap<TxnId, Long>();
-        TxnId tempKeyTxnId = new TxnId(-1, -1, -1);
+        Set<Integer> winnerJobSet = new HashSet<Integer>();
+        Map<Integer, Set<TxnId>> jobId2WinnerEntitiesMap = new HashMap<Integer, Set<TxnId>>();
+        //winnerEntity is used to add pairs, <committed TxnId, the most recent commit Lsn of the TxnId>
+        Set<TxnId> winnerEntitySet = null;
+        TxnId tempKeyTxnId = new TxnId(-1, -1, -1, null, -1, false);
+        TxnId winnerEntity = null;
 
         //#. read checkpoint file and set lowWaterMark where anaylsis and redo start
         CheckpointObject checkpointObject = readCheckpoint();
@@ -157,8 +164,6 @@
         //-------------------------------------------------------------------------
         //  [ analysis phase ]
         //  - collect all committed Lsn 
-        //  - if there are duplicate commits for the same TxnId, 
-        //    keep only the mostRecentCommitLsn among the duplicates.
         //-------------------------------------------------------------------------
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("[RecoveryMgr] in analysis phase");
@@ -176,23 +181,34 @@
             if (logRecord.getJobId() > maxJobId) {
                 maxJobId = logRecord.getJobId();
             }
-            TxnId commitTxnId = null;
             switch (logRecord.getLogType()) {
                 case LogType.UPDATE:
                     if (IS_DEBUG_MODE) {
                         updateLogCount++;
                     }
                     break;
-
                 case LogType.JOB_COMMIT:
-                case LogType.ENTITY_COMMIT:
-                    commitTxnId = new TxnId(logRecord.getJobId(), logRecord.getDatasetId(), logRecord.getPKHashValue());
-                    winnerTxnTable.put(commitTxnId, logRecord.getLSN());
+                    winnerJobSet.add(Integer.valueOf(logRecord.getJobId()));
+                    jobId2WinnerEntitiesMap.remove(Integer.valueOf(logRecord.getJobId()));
                     if (IS_DEBUG_MODE) {
-                        commitLogCount++;
+                        jobCommitLogCount++;
                     }
                     break;
-
+                case LogType.ENTITY_COMMIT:
+                    jobId = logRecord.getJobId();
+                    winnerEntity = new TxnId(jobId, logRecord.getDatasetId(), logRecord.getPKHashValue(),
+                            logRecord.getPKValue(), logRecord.getPKValueSize(), true);
+                    if (!jobId2WinnerEntitiesMap.containsKey(Integer.valueOf(jobId))) {
+                        winnerEntitySet = new HashSet<TxnId>();
+                        jobId2WinnerEntitiesMap.put(Integer.valueOf(jobId), winnerEntitySet);
+                    } else {
+                        winnerEntitySet = jobId2WinnerEntitiesMap.get(Integer.valueOf(jobId));
+                    }
+                    winnerEntitySet.add(winnerEntity);
+                    if (IS_DEBUG_MODE) {
+                        entityCommitLogCount++;
+                    }
+                    break;
                 default:
                     throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
             }
@@ -203,55 +219,48 @@
         //  [ redo phase ]
         //  - redo if
         //    1) The TxnId is committed && --> guarantee durability
-        //    2) lsn < commitLog's Lsn && --> deal with a case of pkHashValue collision
-        //    3) lsn > maxDiskLastLsn of the index --> guarantee idempotance
+        //    2) lsn > maxDiskLastLsn of the index --> guarantee idempotence
         //-------------------------------------------------------------------------
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("[RecoveryMgr] in redo phase");
         }
-        //#. set log reader to the lowWaterMarkLsn again.
-        logReader.initializeScan(lowWaterMarkLsn);
-
         long resourceId;
         long maxDiskLastLsn;
-        long lsn = -1;
-        long commitLsn = -1;
+        long LSN = -1;
         ILSMIndex index = null;
         LocalResource localResource = null;
         ILocalResourceMetadata localResourceMetadata = null;
-        Map<Long, Long> resourceId2MaxLsnMap = new HashMap<Long, Long>();
-        TxnId jobLevelTxnId = new TxnId(-1, -1, -1);
-        boolean foundWinnerTxn = false;
+        Map<Long, Long> resourceId2MaxLSNMap = new HashMap<Long, Long>();
+        boolean foundWinner = false;
 
         //#. get indexLifeCycleManager 
         IAsterixAppRuntimeContextProvider appRuntimeContext = txnSubsystem.getAsterixAppRuntimeContextProvider();
         IIndexLifecycleManager indexLifecycleManager = appRuntimeContext.getIndexLifecycleManager();
         ILocalResourceRepository localResourceRepository = appRuntimeContext.getLocalResourceRepository();
 
+        //#. set log reader to the lowWaterMarkLsn again.
+        logReader.initializeScan(lowWaterMarkLsn);
         logRecord = logReader.next();
         while (logRecord != null) {
-            lsn = logRecord.getLSN();
-            foundWinnerTxn = false;
             if (LogManager.IS_DEBUG_MODE) {
                 System.out.println(logRecord.getLogRecordForDisplay());
             }
+            LSN = logRecord.getLSN();
+            jobId = logRecord.getJobId();
+            foundWinner = false;
             switch (logRecord.getLogType()) {
                 case LogType.UPDATE:
-                    tempKeyTxnId.setTxnId(logRecord.getJobId(), logRecord.getDatasetId(), logRecord.getPKHashValue());
-                    jobLevelTxnId.setTxnId(logRecord.getJobId(), -1, -1);
-                    if (winnerTxnTable.containsKey(tempKeyTxnId)) {
-                        commitLsn = winnerTxnTable.get(tempKeyTxnId);
-                        if (lsn < commitLsn) {
-                            foundWinnerTxn = true;
-                        }
-                    } else if (winnerTxnTable.containsKey(jobLevelTxnId)) {
-                        commitLsn = winnerTxnTable.get(jobLevelTxnId);
-                        if (lsn < commitLsn) {
-                            foundWinnerTxn = true;
+                    if (winnerJobSet.contains(Integer.valueOf(jobId))) {
+                        foundWinner = true;
+                    } else if (jobId2WinnerEntitiesMap.containsKey(Integer.valueOf(jobId))) {
+                        winnerEntitySet = jobId2WinnerEntitiesMap.get(Integer.valueOf(jobId));
+                        tempKeyTxnId.setTxnId(jobId, logRecord.getDatasetId(), logRecord.getPKHashValue(),
+                                logRecord.getPKValue(), logRecord.getPKValueSize());
+                        if (winnerEntitySet.contains(tempKeyTxnId)) {
+                            foundWinner = true;
                         }
                     }
-
-                    if (foundWinnerTxn) {
+                    if (foundWinner) {
                         resourceId = logRecord.getResourceId();
                         localResource = localResourceRepository.getResourceById(resourceId);
 
@@ -294,12 +303,12 @@
                             maxDiskLastLsn = abstractLSMIOCallback.getComponentLSN(index.getImmutableComponents());
 
                             //#. set resourceId and maxDiskLastLSN to the map
-                            resourceId2MaxLsnMap.put(resourceId, maxDiskLastLsn);
+                            resourceId2MaxLSNMap.put(Long.valueOf(resourceId), Long.valueOf(maxDiskLastLsn));
                         } else {
-                            maxDiskLastLsn = resourceId2MaxLsnMap.get(resourceId);
+                            maxDiskLastLsn = resourceId2MaxLSNMap.get(Long.valueOf(resourceId));
                         }
 
-                        if (lsn > maxDiskLastLsn) {
+                        if (LSN > maxDiskLastLsn) {
                             redo(logRecord);
                             if (IS_DEBUG_MODE) {
                                 redoCount++;
@@ -316,12 +325,11 @@
                 default:
                     throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
             }
-
             logRecord = logReader.next();
         }
 
         //close all indexes
-        Set<Long> resourceIdList = resourceId2MaxLsnMap.keySet();
+        Set<Long> resourceIdList = resourceId2MaxLSNMap.keySet();
         for (long r : resourceIdList) {
             indexLifecycleManager.close(r);
         }
@@ -332,8 +340,8 @@
             LOGGER.info("[RecoveryMgr] recovery is completed.");
         }
         if (IS_DEBUG_MODE) {
-            System.out.println("[RecoveryMgr] Count: Update/Commit/Redo = " + updateLogCount + "/" + commitLogCount
-                    + "/" + redoCount);
+            System.out.println("[RecoveryMgr] Count: Update/EntityCommit/JobCommit/Redo = " + updateLogCount + "/"
+                    + entityCommitLogCount + "/" + jobCommitLogCount + "/" + redoCount);
         }
     }
 
@@ -533,15 +541,18 @@
     @Override
     public void rollbackTransaction(ITransactionContext txnContext) throws ACIDException {
         Map<TxnId, List<Long>> loserTxnTable = new HashMap<TxnId, List<Long>>();
-        TxnId tempKeyTxnId = new TxnId(-1, -1, -1);
+        TxnId tempKeyTxnId = new TxnId(-1, -1, -1, null, -1, false);
 
         int updateLogCount = 0;
-        int commitLogCount = 0;
+        int entityCommitLogCount = 0;
+        int jobId = -1;
+        int abortedJobId = txnContext.getJobId().getId();
+        long currentLSN = -1;
+        TxnId loserEntity = null;
 
-        // Obtain the first log record written by the Job
+        // Obtain the first/last log record LSNs written by the Job
         long firstLSN = txnContext.getFirstLSN();
         long lastLSN = txnContext.getLastLSN();
-        //TODO: make sure that the lastLsn is not updated anymore by another thread belonging to the same job.
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info(" rollbacking transaction log records from " + firstLSN + " to " + lastLSN);
         }
@@ -559,62 +570,62 @@
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info(" collecting loser transaction's LSNs from " + firstLSN + " to " + lastLSN);
         }
-        boolean reachedLastLog = false;
         List<Long> undoLSNSet = null;
         ILogReader logReader = logMgr.getLogReader(false);
         logReader.initializeScan(firstLSN);
-        ILogRecord logRecord = logReader.next();
-        while (logRecord != null) {
-            if (IS_DEBUG_MODE) {
-                System.out.println(logRecord.getLogRecordForDisplay());
+        ILogRecord logRecord = null;
+        while (currentLSN < lastLSN) {
+            logRecord = logReader.next();
+            if (logRecord == null) {
+                break;
+            } else {
+                if (IS_DEBUG_MODE) {
+                    System.out.println(logRecord.getLogRecordForDisplay());
+                }
+                currentLSN = logRecord.getLSN();
             }
-
-            tempKeyTxnId.setTxnId(logRecord.getJobId(), logRecord.getDatasetId(), logRecord.getPKHashValue());
+            jobId = logRecord.getJobId();
+            if (jobId != abortedJobId) {
+                continue;
+            }
+            tempKeyTxnId.setTxnId(jobId, logRecord.getDatasetId(), logRecord.getPKHashValue(), logRecord.getPKValue(),
+                    logRecord.getPKValueSize());
             switch (logRecord.getLogType()) {
                 case LogType.UPDATE:
                     undoLSNSet = loserTxnTable.get(tempKeyTxnId);
                     if (undoLSNSet == null) {
-                        TxnId txnId = new TxnId(logRecord.getJobId(), logRecord.getDatasetId(),
-                                logRecord.getPKHashValue());
+                        loserEntity = new TxnId(jobId, logRecord.getDatasetId(), logRecord.getPKHashValue(),
+                                logRecord.getPKValue(), logRecord.getPKValueSize(), true);
                         undoLSNSet = new LinkedList<Long>();
-                        loserTxnTable.put(txnId, undoLSNSet);
+                        loserTxnTable.put(loserEntity, undoLSNSet);
                     }
-                    undoLSNSet.add(logRecord.getLSN());
+                    undoLSNSet.add(Long.valueOf(currentLSN));
                     if (IS_DEBUG_MODE) {
                         updateLogCount++;
-                        System.out.println("" + Thread.currentThread().getId() + "======> update[" + logRecord.getLSN()
-                                + "]:" + tempKeyTxnId);
+                        System.out.println("" + Thread.currentThread().getId() + "======> update[" + currentLSN + "]:"
+                                + tempKeyTxnId);
                     }
                     break;
 
                 case LogType.JOB_COMMIT:
+                    throw new ACIDException("Unexpected LogType(" + logRecord.getLogType() + ") during abort.");
+
                 case LogType.ENTITY_COMMIT:
-                    undoLSNSet = loserTxnTable.get(tempKeyTxnId);
-                    if (undoLSNSet != null) {
-                        loserTxnTable.remove(tempKeyTxnId);
-                    }
+                    loserTxnTable.remove(tempKeyTxnId);
                     if (IS_DEBUG_MODE) {
-                        commitLogCount++;
-                        System.out.println("" + Thread.currentThread().getId() + "======> commit[" + logRecord.getLSN()
-                                + "]" + tempKeyTxnId);
+                        entityCommitLogCount++;
+                        System.out.println("" + Thread.currentThread().getId() + "======> entity_commit[" + currentLSN + "]"
+                                + tempKeyTxnId);
                     }
                     break;
 
                 default:
                     throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
             }
-            if (logRecord.getLSN() == lastLSN) {
-                reachedLastLog = true;
-                break;
-            } else if (logRecord.getLSN() > lastLSN) {
-                throw new IllegalStateException("LastLSN mismatch");
-            }
-            logRecord = logReader.next();
         }
-
-        if (!reachedLastLog) {
-            throw new ACIDException("LastLSN mismatch: " + lastLSN + " vs " + logRecord.getLSN()
-                    + " during Rollback a transaction( " + txnContext.getJobId() + ")");
+        if (currentLSN != lastLSN) {
+            throw new ACIDException("LastLSN mismatch: lastLSN(" + lastLSN + ") vs currentLSN(" + currentLSN
+                    + ") during abort( " + txnContext.getJobId() + ")");
         }
 
         //undo loserTxn's effect
@@ -622,7 +633,6 @@
             LOGGER.info(" undoing loser transaction's effect");
         }
 
-        TxnId txnId = null;
         Iterator<Entry<TxnId, List<Long>>> iter = loserTxnTable.entrySet().iterator();
         int undoCount = 0;
         while (iter.hasNext()) {
@@ -630,16 +640,15 @@
             //Sort the lsns in order to undo in one pass. 
 
             Map.Entry<TxnId, List<Long>> loserTxn = (Map.Entry<TxnId, List<Long>>) iter.next();
-            txnId = loserTxn.getKey();
-
             undoLSNSet = loserTxn.getValue();
 
             for (long undoLSN : undoLSNSet) {
-                // here, all the log records are UPDATE type. So, we don't need to check the type again.
-
+                //here, all the log records are UPDATE type. So, we don't need to check the type again.
                 //read the corresponding log record to be undone.
                 logRecord = logReader.read(undoLSN);
-                assert logRecord != null;
+                if (logRecord == null) {
+                    throw new ACIDException("IllegalState exception during abort( " + txnContext.getJobId() + ")");
+                }
                 if (IS_DEBUG_MODE) {
                     System.out.println(logRecord.getLogRecordForDisplay());
                 }
@@ -656,8 +665,8 @@
             LOGGER.info(" undone loser transaction's effect");
         }
         if (IS_DEBUG_MODE) {
-            System.out.println("UpdateLogCount/CommitLogCount/UndoCount:" + updateLogCount + "/" + commitLogCount + "/"
-                    + undoCount);
+            System.out.println("UpdateLogCount/CommitLogCount/UndoCount:" + updateLogCount + "/" + entityCommitLogCount
+                    + "/" + undoCount);
         }
     }
 
@@ -720,36 +729,53 @@
 }
 
 class TxnId {
+    public boolean isByteArrayPKValue;
     public int jobId;
     public int datasetId;
-    public int pkHashVal;
+    public int pkHashValue;
+    public int pkSize;
+    public byte[] byteArrayPKValue;
+    public ITupleReference tupleReferencePKValue;
 
-    public TxnId(int jobId, int datasetId, int pkHashVal) {
+    public TxnId(int jobId, int datasetId, int pkHashValue, ITupleReference pkValue, int pkSize,
+            boolean isByteArrayPKValue) {
         this.jobId = jobId;
         this.datasetId = datasetId;
-        this.pkHashVal = pkHashVal;
+        this.pkHashValue = pkHashValue;
+        this.pkSize = pkSize;
+        this.isByteArrayPKValue = isByteArrayPKValue;
+        if (isByteArrayPKValue) {
+            this.byteArrayPKValue = new byte[pkSize];
+            readPKValueIntoByteArray(pkValue, pkSize, byteArrayPKValue);
+        } else {
+            this.tupleReferencePKValue = pkValue;
+        }
     }
 
-    public void setTxnId(int jobId, int datasetId, int pkHashVal) {
+    private void readPKValueIntoByteArray(ITupleReference pkValue, int pkSize, byte[] byteArrayPKValue) {
+        int readOffset = pkValue.getFieldStart(0);
+        byte[] readBuffer = pkValue.getFieldData(0);
+        for (int i = 0; i < pkSize; i++) {
+            byteArrayPKValue[i] = readBuffer[readOffset + i];
+        }
+    }
+
+    public void setTxnId(int jobId, int datasetId, int pkHashValue, ITupleReference pkValue, int pkSize) {
         this.jobId = jobId;
         this.datasetId = datasetId;
-        this.pkHashVal = pkHashVal;
-    }
-
-    public void setTxnId(TxnId txnId) {
-        this.jobId = txnId.jobId;
-        this.datasetId = txnId.datasetId;
-        this.pkHashVal = txnId.pkHashVal;
+        this.pkHashValue = pkHashValue;
+        this.tupleReferencePKValue = pkValue;
+        isByteArrayPKValue = false;
     }
 
     @Override
     public String toString() {
-        return "[" + jobId + "," + datasetId + "," + pkHashVal + "]";
+        return "[" + jobId + "," + datasetId + "," + pkHashValue + "," + pkSize + "]";
     }
 
     @Override
     public int hashCode() {
-        return pkHashVal;
+        return pkHashValue;
     }
 
     @Override
@@ -761,7 +787,52 @@
             return false;
         }
         TxnId txnId = (TxnId) o;
+        return (txnId.pkHashValue == pkHashValue && txnId.datasetId == datasetId && txnId.jobId == jobId
+                && pkSize == txnId.pkSize && isEqualTo(txnId));
+    }
 
-        return (txnId.pkHashVal == pkHashVal && txnId.datasetId == datasetId && txnId.jobId == jobId);
+    private boolean isEqualTo(TxnId txnId) {
+        if (isByteArrayPKValue && txnId.isByteArrayPKValue) {
+            return isEqual(byteArrayPKValue, txnId.byteArrayPKValue, pkSize);
+        } else if (isByteArrayPKValue && (!txnId.isByteArrayPKValue)) {
+            return isEqual(byteArrayPKValue, txnId.tupleReferencePKValue, pkSize);
+        } else if ((!isByteArrayPKValue) && txnId.isByteArrayPKValue) {
+            return isEqual(txnId.byteArrayPKValue, tupleReferencePKValue, pkSize);
+        } else {
+            return isEqual(tupleReferencePKValue, txnId.tupleReferencePKValue, pkSize);
+        }
+    }
+
+    private boolean isEqual(byte[] a, byte[] b, int size) {
+        for (int i = 0; i < size; i++) {
+            if (a[i] != b[i]) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private boolean isEqual(byte[] a, ITupleReference b, int size) {
+        int readOffset = b.getFieldStart(0);
+        byte[] readBuffer = b.getFieldData(0);
+        for (int i = 0; i < size; i++) {
+            if (a[i] != readBuffer[readOffset + i]) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private boolean isEqual(ITupleReference a, ITupleReference b, int size) {
+        int aOffset = a.getFieldStart(0);
+        byte[] aBuffer = a.getFieldData(0);
+        int bOffset = b.getFieldStart(0);
+        byte[] bBuffer = b.getFieldData(0);
+        for (int i = 0; i < size; i++) {
+            if (aBuffer[aOffset + i] != bBuffer[bOffset + i]) {
+                return false;
+            }
+        }
+        return true;
     }
 }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
index 01bce83..a83604c 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -29,7 +29,6 @@
 import edu.uci.ics.asterix.common.transactions.ITransactionManager;
 import edu.uci.ics.asterix.common.transactions.JobId;
 import edu.uci.ics.asterix.transaction.management.service.logging.LogRecord;
-import edu.uci.ics.asterix.transaction.management.service.logging.LogType;
 import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
 
 /**
@@ -60,7 +59,7 @@
             if (LOGGER.isLoggable(Level.SEVERE)) {
                 LOGGER.severe(msg);
             }
-            throw new Error(msg, ae);
+            throw new ACIDException(msg, ae);
         } finally {
             txnSubsystem.getLockManager().releaseLocks(txnCtx);
             transactionContextRepository.remove(txnCtx.getJobId());
@@ -90,20 +89,11 @@
 
     @Override
     public void commitTransaction(ITransactionContext txnCtx, DatasetId datasetId, int PKHashVal) throws ACIDException {
-        //There is either job-level commit or entity-level commit.
-        //The job-level commit will have -1 value both for datasetId and PKHashVal.
-
-        //for entity-level commit
-        if (PKHashVal != -1) {
-            txnSubsystem.getLockManager().unlock(datasetId, PKHashVal, txnCtx, true);
-            return;
-        }
-
-        //for job-level commit
+        //Only job-level commits call this method. 
         try {
             if (txnCtx.isWriteTxn()) {
                 LogRecord logRecord = ((TransactionContext) txnCtx).getLogRecord();
-                logRecord.formCommitLogRecord(txnCtx, LogType.JOB_COMMIT, txnCtx.getJobId().getId(), -1, -1);
+                logRecord.formJobCommitLogRecord(txnCtx);
                 txnSubsystem.getLogManager().log(logRecord);
             }
         } catch (Exception ae) {