fixed bugs related to rollback and primary key length
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java
index 6dd11bd..32bfa58 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java
@@ -72,7 +72,7 @@
     @Override
     public void open() throws HyracksDataException {
         try {
-            transactionContext = transactionManager.getTransactionContext(jobId);
+            transactionContext = transactionManager.getTransactionContext(jobId, false);
             transactionContext.setWriteTxn(isWriteTransaction);
         } catch (ACIDException e) {
             throw new HyracksDataException(e);
@@ -89,7 +89,11 @@
             pkHash = computePrimaryKeyHashValue(frameTupleReference, primaryKeyFields);
             logRecord.formEntityCommitLogRecord(transactionContext, datasetId, pkHash, frameTupleReference,
                     primaryKeyFields);
-            logMgr.log(logRecord);
+            try {
+                logMgr.log(logRecord);
+            } catch (ACIDException e) {
+                throw new HyracksDataException(e);
+            }
         }
     }
 
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java
index 2ed4b0ec..da18693 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -18,6 +18,7 @@
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import edu.uci.ics.asterix.common.transactions.AbstractOperationCallback;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
 import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
@@ -43,7 +44,7 @@
     public void beforeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
             IModificationOperationCallback modificationCallback) throws HyracksDataException {
         if (opType == LSMOperationType.MODIFICATION || opType == LSMOperationType.FORCE_MODIFICATION) {
-            numActiveOperations.incrementAndGet();
+            incrementNumActiveOperations(modificationCallback);
         } else if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE) {
             datasetLifecycleManager.declareActiveIOOperation(datasetID);
         }
@@ -62,14 +63,11 @@
     public void completeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
             IModificationOperationCallback modificationCallback) throws HyracksDataException {
         if (opType == LSMOperationType.MODIFICATION || opType == LSMOperationType.FORCE_MODIFICATION) {
-            numActiveOperations.decrementAndGet();
+            decrementNumActiveOperations(modificationCallback);
+            flushIfFull();
         } else if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE) {
             datasetLifecycleManager.undeclareActiveIOOperation(datasetID);
         }
-
-        if (opType == LSMOperationType.MODIFICATION || opType == LSMOperationType.FORCE_MODIFICATION) {
-            flushIfFull();
-        }
     }
 
     private void flushIfFull() throws HyracksDataException {
@@ -103,4 +101,25 @@
     public int getNumActiveOperations() {
         return numActiveOperations.get();
     }
+
+    private void incrementNumActiveOperations(IModificationOperationCallback modificationCallback) {
+        if (modificationCallback != null && modificationCallback != NoOpOperationCallback.INSTANCE) {
+            numActiveOperations.incrementAndGet();
+            ((AbstractOperationCallback) modificationCallback).incrementLocalNumActiveOperations();
+        }
+    }
+
+    private void decrementNumActiveOperations(IModificationOperationCallback modificationCallback) {
+        if (modificationCallback != null && modificationCallback != NoOpOperationCallback.INSTANCE) {
+            numActiveOperations.decrementAndGet();
+            ((AbstractOperationCallback) modificationCallback).decrementLocalNumActiveOperations();
+        }
+    }
+    
+    public void cleanupNumActiveOperationsForAbortedJob(AbstractOperationCallback callback) {
+        int delta = callback.getLocalNumActiveOperations() * -1;
+        numActiveOperations.getAndAdd(delta);
+        callback.resetLocalNumActiveOperations();
+    }
+
 }
\ No newline at end of file
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
index b6025cb..e046db0 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
@@ -32,7 +32,7 @@
     @Override
     public void afterOperation(List<ILSMComponent> oldComponents, ILSMComponent newComponent)
             throws HyracksDataException {
-        if (oldComponents != null && newComponent != null) {
+        if (newComponent != null) {
             LSMBTreeDiskComponent btreeComponent = (LSMBTreeDiskComponent) newComponent;
             putLSNIntoMetadata(btreeComponent.getBTree(), oldComponents);
         }
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/AbstractOperationCallback.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/AbstractOperationCallback.java
index d4b26f7..c549e7d 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/AbstractOperationCallback.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/AbstractOperationCallback.java
@@ -15,6 +15,8 @@
 
 package edu.uci.ics.asterix.common.transactions;
 
+import java.util.concurrent.atomic.AtomicInteger;
+
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 import edu.uci.ics.hyracks.storage.am.bloomfilter.impls.MurmurHash128Bit;
 
@@ -27,6 +29,7 @@
     protected final ITransactionContext txnCtx;
     protected final ILockManager lockManager;
     protected final long[] longHashes;
+    protected final AtomicInteger transactorLocalNumActiveOperations;
 
     public AbstractOperationCallback(int datasetId, int[] primaryKeyFields, ITransactionContext txnCtx,
             ILockManager lockManager) {
@@ -34,6 +37,7 @@
         this.primaryKeyFields = primaryKeyFields;
         this.txnCtx = txnCtx;
         this.lockManager = lockManager;
+        this.transactorLocalNumActiveOperations = new AtomicInteger(0);
         this.longHashes = new long[2];
     }
 
@@ -41,4 +45,21 @@
         MurmurHash128Bit.hash3_x64_128(tuple, primaryKeyFields, SEED, longHashes);
         return Math.abs((int) longHashes[0]);
     }
+    
+    public void resetLocalNumActiveOperations() {
+        transactorLocalNumActiveOperations.set(0);
+    }
+
+    public int getLocalNumActiveOperations() {
+        return transactorLocalNumActiveOperations.get();
+    }
+
+    public void incrementLocalNumActiveOperations() {
+        transactorLocalNumActiveOperations.incrementAndGet();
+    }
+
+    public void decrementLocalNumActiveOperations() {
+        transactorLocalNumActiveOperations.decrementAndGet();
+    }
+
 }
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogManager.java
index 8913f8a..27a91a4 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogManager.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogManager.java
@@ -14,9 +14,11 @@
  */
 package edu.uci.ics.asterix.common.transactions;
 
+import edu.uci.ics.asterix.common.exceptions.ACIDException;
+
 public interface ILogManager {
 
-    public void log(ILogRecord logRecord);
+    public void log(ILogRecord logRecord) throws ACIDException;
 
     public ILogReader getLogReader(boolean isRecoveryMode);
 
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogRecord.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogRecord.java
index d13ef6c..3068867 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogRecord.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogRecord.java
@@ -20,15 +20,15 @@
 
 public interface ILogRecord {
 
-    public static final int JOB_COMMIT_LOG_SIZE = 13;
-    public static final int ENTITY_COMMIT_LOG_BASE_SIZE = 29;
-    public static final int UPDATE_LOG_BASE_SIZE = 64;
+    public static final int JOB_TERMINATE_LOG_SIZE = 13; //JOB_COMMIT or ABORT log type
+    public static final int ENTITY_COMMIT_LOG_BASE_SIZE = 25;
+    public static final int UPDATE_LOG_BASE_SIZE = 60;
 
     public boolean readLogRecord(ByteBuffer buffer);
 
     public void writeLogRecord(ByteBuffer buffer);
 
-    public void formJobCommitLogRecord(ITransactionContext txnCtx);
+    public void formJobTerminateLogRecord(ITransactionContext txnCtx, boolean isCommit);
 
     public void formEntityCommitLogRecord(ITransactionContext txnCtx, int datasetId, int PKHashValue,
             ITupleReference tupleReference, int[] primaryKeyFields);
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionManager.java
index 77e960b..ffd4cc2 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionManager.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionManager.java
@@ -52,10 +52,11 @@
      * 
      * @param jobId
      *            a unique value for the transaction id.
+     * @param createIfNotExist TODO
      * @return
      * @throws ACIDException
      */
-    public ITransactionContext getTransactionContext(JobId jobId) throws ACIDException;
+    public ITransactionContext getTransactionContext(JobId jobId, boolean createIfNotExist) throws ACIDException;
 
     /**
      * Commits a transaction.
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
index 8765aae..ee0791b 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
@@ -115,14 +115,15 @@
 
     @Override
     public void commitTransaction(JobId jobId) throws RemoteException, ACIDException {
-        ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId);
+        ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId, false);
         transactionSubsystem.getTransactionManager().commitTransaction(txnCtx, new DatasetId(-1), -1);
     }
 
     @Override
     public void abortTransaction(JobId jobId) throws RemoteException, ACIDException {
         try {
-            ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId);
+            ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId,
+                    false);
             transactionSubsystem.getTransactionManager().abortTransaction(txnCtx, new DatasetId(-1), -1);
         } catch (ACIDException e) {
             e.printStackTrace();
@@ -132,13 +133,13 @@
 
     @Override
     public void lock(JobId jobId, byte lockMode) throws ACIDException, RemoteException {
-        ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId);
+        ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId, false);
         transactionSubsystem.getLockManager().lock(METADATA_DATASET_ID, -1, lockMode, txnCtx);
     }
 
     @Override
     public void unlock(JobId jobId) throws ACIDException, RemoteException {
-        ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId);
+        ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId, false);
         transactionSubsystem.getLockManager().unlock(METADATA_DATASET_ID, -1, txnCtx);
     }
 
@@ -273,7 +274,7 @@
 
         ILSMIndexAccessor indexAccessor = lsmIndex.createAccessor(modCallback, NoOpOperationCallback.INSTANCE);
 
-        ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId);
+        ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId, false);
         txnCtx.setWriteTxn(true);
         txnCtx.registerIndexAndCallback(resourceID, lsmIndex, (AbstractOperationCallback) modCallback,
                 metadataIndex.isPrimaryIndex());
@@ -286,7 +287,7 @@
 
     private IModificationOperationCallback createIndexModificationCallback(JobId jobId, long resourceId,
             IMetadataIndex metadataIndex, ILSMIndex lsmIndex, IndexOperation indexOp) throws Exception {
-        ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId);
+        ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId, false);
 
         if (metadataIndex.isPrimaryIndex()) {
             return new PrimaryIndexModificationOperationCallback(metadataIndex.getDatasetId().getId(),
@@ -581,7 +582,7 @@
                 lsmIndex, IndexOperation.DELETE);
         ILSMIndexAccessor indexAccessor = lsmIndex.createAccessor(modCallback, NoOpOperationCallback.INSTANCE);
 
-        ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId);
+        ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId, false);
         txnCtx.setWriteTxn(true);
         txnCtx.registerIndexAndCallback(resourceID, lsmIndex, (AbstractOperationCallback) modCallback,
                 metadataIndex.isPrimaryIndex());
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/job/listener/JobEventListenerFactory.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/job/listener/JobEventListenerFactory.java
index b44755c..4f4eba2 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/job/listener/JobEventListenerFactory.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/job/listener/JobEventListenerFactory.java
@@ -49,7 +49,7 @@
                 try {
                     ITransactionManager txnManager = ((IAsterixAppRuntimeContext) jobletContext.getApplicationContext()
                             .getApplicationObject()).getTransactionSubsystem().getTransactionManager();
-                    ITransactionContext txnContext = txnManager.getTransactionContext(jobId);
+                    ITransactionContext txnContext = txnManager.getTransactionContext(jobId, false);
                     txnContext.setWriteTxn(transactionalWrite);
                     txnManager.completedTransaction(txnContext, new DatasetId(-1), -1,
                             !(jobStatus == JobStatus.FAILURE));
@@ -62,7 +62,7 @@
             public void jobletStart() {
                 try {
                     ((IAsterixAppRuntimeContext) jobletContext.getApplicationContext().getApplicationObject())
-                            .getTransactionSubsystem().getTransactionManager().getTransactionContext(jobId);
+                            .getTransactionSubsystem().getTransactionManager().getTransactionContext(jobId, true);
                 } catch (ACIDException e) {
                     throw new Error(e);
                 }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java
index a6537cd..00f0c4a 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java
@@ -41,7 +41,7 @@
             throws HyracksDataException {
         ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
         try {
-            ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId);
+            ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId, false);
             return new PrimaryIndexInstantSearchOperationCallback(datasetId, primaryKeyFields,
                     txnSubsystem.getLockManager(), txnCtx);
         } catch (ACIDException e) {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
index abeec62..07daee4 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
@@ -58,7 +58,7 @@
         }
 
         try {
-            ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId);
+            ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId, false);
             IModificationOperationCallback modCallback = new PrimaryIndexModificationOperationCallback(datasetId,
                     primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resourceId, resourceType,
                     indexOp);
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
index b59eb75..01cb725 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
@@ -41,7 +41,7 @@
             throws HyracksDataException {
         ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
         try {
-            ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId);
+            ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId, false);
             return new PrimaryIndexSearchOperationCallback(datasetId, primaryKeyFields, txnSubsystem.getLockManager(),
                     txnCtx);
         } catch (ACIDException e) {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
index d5bc877..563e9b7 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
@@ -55,7 +55,7 @@
         }
 
         try {
-            ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId);
+            ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId, false);
             IModificationOperationCallback modCallback = new SecondaryIndexModificationOperationCallback(datasetId, primaryKeyFields, txnCtx,
                     txnSubsystem.getLockManager(), txnSubsystem, resourceId, resourceType, indexOp);
             txnCtx.registerIndexAndCallback(resourceId, index, (AbstractOperationCallback) modCallback, false);
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
index 6d86f70..c7df2f2 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
@@ -665,8 +665,6 @@
 
         latchLockTable();
         try {
-            validateJob(txnContext);
-
             if (IS_DEBUG_MODE) {
                 trackLockRequest("Requested", RequestType.UNLOCK, datasetId, entityHashValue, (byte) 0, txnContext,
                         dLockInfo, eLockInfo);
@@ -2212,14 +2210,14 @@
                 if (logRecord.getLogType() == LogType.ENTITY_COMMIT) {
                     tempDatasetIdObj.setId(logRecord.getDatasetId());
                     tempJobIdObj.setId(logRecord.getJobId());
-                    txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(tempJobIdObj);
+                    txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(tempJobIdObj, false);
                     unlock(tempDatasetIdObj, logRecord.getPKHashValue(), txnCtx);
                     txnCtx.notifyOptracker(false);
-                } else if (logRecord.getLogType() == LogType.JOB_COMMIT) {
+                } else if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT) {
                     tempJobIdObj.setId(logRecord.getJobId());
-                    txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(tempJobIdObj);
+                    txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(tempJobIdObj, false);
                     txnCtx.notifyOptracker(true);
-                    ((LogPage) logPage).notifyJobCommitter();
+                    ((LogPage) logPage).notifyJobTerminator();
                 }
                 logRecord = logPageReader.next();
             }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
index 4f0bb59..933afcd 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
@@ -38,6 +38,7 @@
 import edu.uci.ics.asterix.common.transactions.ILogReader;
 import edu.uci.ics.asterix.common.transactions.ILogRecord;
 import edu.uci.ics.asterix.common.transactions.ITransactionContext;
+import edu.uci.ics.asterix.common.transactions.ITransactionManager;
 import edu.uci.ics.asterix.common.transactions.LogManagerProperties;
 import edu.uci.ics.asterix.common.transactions.MutableLong;
 import edu.uci.ics.asterix.transaction.management.service.locking.LockManager;
@@ -74,16 +75,16 @@
         logDir = logManagerProperties.getLogDir();
         logFilePrefix = logManagerProperties.getLogFilePrefix();
         flushLSN = new MutableLong();
-        initializeLogManager();
+        initializeLogManager(0);
     }
 
-    private void initializeLogManager() {
+    private void initializeLogManager(long nextLogFileId) {
         emptyQ = new LinkedBlockingQueue<LogPage>(numLogPages);
         flushQ = new LinkedBlockingQueue<LogPage>(numLogPages);
         for (int i = 0; i < numLogPages; i++) {
             emptyQ.offer(new LogPage((LockManager) txnSubsystem.getLockManager(), logPageSize, flushLSN));
         }
-        appendLSN = initializeLogAnchor();
+        appendLSN = initializeLogAnchor(nextLogFileId);
         flushLSN.set(appendLSN);
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("LogManager starts logging in LSN: " + appendLSN);
@@ -95,12 +96,13 @@
     }
 
     @Override
-    public void log(ILogRecord logRecord) {
+    public void log(ILogRecord logRecord) throws ACIDException {
         if (logRecord.getLogSize() > logPageSize) {
             throw new IllegalStateException();
         }
         syncLog(logRecord);
-        if (logRecord.getLogType() == LogType.JOB_COMMIT && !logRecord.isFlushed()) {
+        if ((logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT)
+                && !logRecord.isFlushed()) {
             synchronized (logRecord) {
                 while (!logRecord.isFlushed()) {
                     try {
@@ -113,13 +115,16 @@
         }
     }
 
-    private synchronized void syncLog(ILogRecord logRecord) {
+    private synchronized void syncLog(ILogRecord logRecord) throws ACIDException {
         ITransactionContext txnCtx = logRecord.getTxnCtx();
+        if (txnCtx.getTxnState() == ITransactionManager.ABORTED && logRecord.getLogType() != LogType.ABORT) {
+            throw new ACIDException("Aborted job(" + txnCtx.getJobId() + ") tried to write non-abort type log record.");
+        }
         if (getLogFileOffset(appendLSN) + logRecord.getLogSize() > logFileSize) {
             prepareNextLogFile();
             appendPage.isFull(true);
             getAndInitNewPage();
-        } else if (!appendPage.hasSpace(logRecord.getLogSize(), getLogFileOffset(appendLSN))) {
+        } else if (!appendPage.hasSpace(logRecord.getLogSize())) {
             appendPage.isFull(true);
             getAndInitNewPage();
         }
@@ -141,7 +146,6 @@
         }
         appendPage.reset();
         appendPage.setFileChannel(appendChannel);
-        appendPage.setInitialFlushOffset(getLogFileOffset(appendLSN));
         flushQ.offer(appendPage);
     }
 
@@ -229,7 +233,7 @@
         return flushLSN;
     }
 
-    private long initializeLogAnchor() {
+    private long initializeLogAnchor(long nextLogFileId) {
         long fileId = 0;
         long offset = 0;
         File fileLogDir = new File(logDir);
@@ -237,9 +241,10 @@
             if (fileLogDir.exists()) {
                 List<Long> logFileIds = getLogFileIds();
                 if (logFileIds == null) {
-                    createFileIfNotExists(getLogFilePath(0));
+                    fileId = nextLogFileId;
+                    createFileIfNotExists(getLogFilePath(fileId));
                     if (LOGGER.isLoggable(Level.INFO)) {
-                        LOGGER.info("created a log file: " + getLogFilePath(0));
+                        LOGGER.info("created a log file: " + getLogFilePath(fileId));
                     }
                 } else {
                     fileId = logFileIds.get(logFileIds.size() - 1);
@@ -247,13 +252,14 @@
                     offset = logFile.length();
                 }
             } else {
+                fileId = nextLogFileId;
                 createNewDirectory(logDir);
                 if (LOGGER.isLoggable(Level.INFO)) {
                     LOGGER.info("created the log directory: " + logManagerProperties.getLogDir());
                 }
-                createFileIfNotExists(getLogFilePath(0));
+                createFileIfNotExists(getLogFilePath(fileId));
                 if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("created a log file: " + getLogFilePath(0));
+                    LOGGER.info("created a log file: " + getLogFilePath(fileId));
                 }
             }
         } catch (IOException ioe) {
@@ -267,8 +273,8 @@
 
     public void renewLogFiles() {
         terminateLogFlusher();
-        deleteAllLogFiles();
-        initializeLogManager();
+        long lastMaxLogFileId = deleteAllLogFiles();
+        initializeLogManager(lastMaxLogFileId + 1);
     }
 
     private void terminateLogFlusher() {
@@ -290,7 +296,7 @@
         }
     }
 
-    private void deleteAllLogFiles() {
+    private long deleteAllLogFiles() {
         if (appendChannel != null) {
             try {
                 appendChannel.close();
@@ -305,6 +311,7 @@
                 throw new IllegalStateException("Failed to delete a file: " + file.getAbsolutePath());
             }
         }
+        return logFileIds.get(logFileIds.size() - 1);
     }
 
     private List<Long> getLogFileIds() {
@@ -384,10 +391,16 @@
         }
         return newFileChannel;
     }
+
+    public long getReadableSmallestLSN() {
+        List<Long> logFileIds = getLogFileIds();
+        return logFileIds.get(0) * logFileSize;
+    }
 }
 
 class LogFlusher implements Callable<Boolean> {
-    private final static LogPage POISON_PILL = new LogPage(null, ILogRecord.JOB_COMMIT_LOG_SIZE, null);
+    private static final Logger LOGGER = Logger.getLogger(LogFlusher.class.getName());
+    private final static LogPage POISON_PILL = new LogPage(null, ILogRecord.JOB_TERMINATE_LOG_SIZE, null);
     private final LogManager logMgr;//for debugging
     private final LinkedBlockingQueue<LogPage> emptyQ;
     private final LinkedBlockingQueue<LogPage> flushQ;
@@ -402,13 +415,13 @@
         flushPage = null;
         isStarted = new AtomicBoolean(false);
         terminateFlag = new AtomicBoolean(false);
-        
+
     }
 
     public void terminate() {
         //make sure the LogFlusher thread started before terminating it.
         synchronized (isStarted) {
-            while(!isStarted.get()) {
+            while (!isStarted.get()) {
                 try {
                     isStarted.wait();
                 } catch (InterruptedException e) {
@@ -416,7 +429,7 @@
                 }
             }
         }
-        
+
         terminateFlag.set(true);
         if (flushPage != null) {
             synchronized (flushPage) {
@@ -432,24 +445,34 @@
 
     @Override
     public Boolean call() {
-        synchronized(isStarted) {
+        synchronized (isStarted) {
             isStarted.set(true);
             isStarted.notify();
         }
-        while (true) {
-            flushPage = null;
-            try {
-                flushPage = flushQ.take();
-                if (flushPage == POISON_PILL || terminateFlag.get()) {
-                    return true;
+        try {
+            while (true) {
+                flushPage = null;
+                try {
+                    flushPage = flushQ.take();
+                    if (flushPage == POISON_PILL || terminateFlag.get()) {
+                        return true;
+                    }
+                } catch (InterruptedException e) {
+                    if (flushPage == null) {
+                        continue;
+                    }
                 }
-            } catch (InterruptedException e) {
-                if (flushPage == null) {
-                    continue;
-                }
+                flushPage.flush();
+                emptyQ.offer(flushPage);
             }
-            flushPage.flush();
-            emptyQ.offer(flushPage);
+        } catch (Exception e) {
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("-------------------------------------------------------------------------");
+                LOGGER.info("LogFlusher is terminating abnormally. System is in unusalbe state.");
+                LOGGER.info("-------------------------------------------------------------------------");
+            }
+            e.printStackTrace();
+            throw e;
         }
     }
 }
\ No newline at end of file
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
index edfec69..8921f35 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
@@ -58,7 +58,7 @@
         appendOffset = 0;
         flushOffset = 0;
         isLastPage = false;
-        syncCommitQ = new LinkedBlockingQueue<ILogRecord>(logPageSize / ILogRecord.JOB_COMMIT_LOG_SIZE);
+        syncCommitQ = new LinkedBlockingQueue<ILogRecord>(logPageSize / ILogRecord.JOB_TERMINATE_LOG_SIZE);
     }
 
     ////////////////////////////////////
@@ -76,7 +76,7 @@
             if (IS_DEBUG_MODE) {
                 LOGGER.info("append()| appendOffset: " + appendOffset);
             }
-            if (logRecord.getLogType() == LogType.JOB_COMMIT) {
+            if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT) {
                 logRecord.isFlushed(false);
                 syncCommitQ.offer(logRecord);
             }
@@ -105,7 +105,7 @@
         this.isLastPage = isLastPage;
     }
 
-    public boolean hasSpace(int logSize, long logFileOffset) {
+    public boolean hasSpace(int logSize) {
         return appendOffset + logSize <= logPageSize;
     }
 
@@ -192,7 +192,7 @@
         }
     }
 
-    public void notifyJobCommitter() {
+    public void notifyJobTerminator() {
         ILogRecord logRecord = null;
         while (logRecord == null) {
             try {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecord.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecord.java
index 4b0e1f2..dd81df7 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecord.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecord.java
@@ -34,10 +34,9 @@
  * LogType(1)
  * JobId(4)
  * ---------------------------
- * [Header2] (16 bytes + PKValueSize) : for entity_commit and update log types 
+ * [Header2] (12 bytes + PKValueSize) : for entity_commit and update log types 
  * DatasetId(4) //stored in dataset_dataset in Metadata Node
  * PKHashValue(4)
- * PKFieldCnt(4)
  * PKValueSize(4)
  * PKValue(PKValueSize)
  * ---------------------------
@@ -61,10 +60,10 @@
  * ---------------------------
  * = LogSize =
  * 1) JOB_COMMIT_LOG_SIZE: 13 bytes (5 + 8)
- * 2) ENTITY_COMMIT: 29 + PKSize (5 + 16 + PKSize + 8)
- *    --> ENTITY_COMMIT_LOG_BASE_SIZE = 29
- * 3) UPDATE: 64 + PKSize + New/OldValueSize (5 + 16 + PKSize + 21 + 14 + New/OldValueSize + 8)
- *    --> UPDATE_LOG_BASE_SIZE = 64
+ * 2) ENTITY_COMMIT: 25 + PKSize (5 + 12 + PKSize + 8)
+ *    --> ENTITY_COMMIT_LOG_BASE_SIZE = 25
+ * 3) UPDATE: 64 + PKSize + New/OldValueSize (5 + 12 + PKSize + 21 + 14 + New/OldValueSize + 8)
+ *    --> UPDATE_LOG_BASE_SIZE = 60
  */
 public class LogRecord implements ILogRecord {
 
@@ -73,7 +72,6 @@
     private int jobId;
     private int datasetId;
     private int PKHashValue;
-    private int PKFieldCnt;
     private int PKValueSize;
     private ITupleReference PKValue;
     private long prevLSN;
@@ -90,12 +88,13 @@
     private long checksum;
     //------------- fields in a log record (end) --------------//
 
+    private int PKFieldCnt;
     private static final int CHECKSUM_SIZE = 8;
     private ITransactionContext txnCtx;
     private long LSN;
     private final AtomicBoolean isFlushed;
     private final SimpleTupleWriter tupleWriter;
-    private final SimpleTupleReference readPKValue;
+    private final PrimaryKeyTupleReference readPKValue;
     private final SimpleTupleReference readNewValue;
     private final SimpleTupleReference readOldValue;
     private final CRC32 checksumGen;
@@ -104,7 +103,7 @@
     public LogRecord() {
         isFlushed = new AtomicBoolean(false);
         tupleWriter = new SimpleTupleWriter();
-        readPKValue = (SimpleTupleReference) tupleWriter.createTupleReference();
+        readPKValue = new PrimaryKeyTupleReference();
         readNewValue = (SimpleTupleReference) tupleWriter.createTupleReference();
         readOldValue = (SimpleTupleReference) tupleWriter.createTupleReference();
         checksumGen = new CRC32();
@@ -115,10 +114,9 @@
         int beginOffset = buffer.position();
         buffer.put(logType);
         buffer.putInt(jobId);
-        if (logType != LogType.JOB_COMMIT) {
+        if (logType == LogType.UPDATE || logType == LogType.ENTITY_COMMIT) {
             buffer.putInt(datasetId);
             buffer.putInt(PKHashValue);
-            buffer.putInt(PKFieldCnt);
             if (PKValueSize <= 0) {
                 throw new IllegalStateException("Primary Key Size is less than or equal to 0");
             }
@@ -173,13 +171,12 @@
         try {
             logType = buffer.get();
             jobId = buffer.getInt();
-            if (logType == LogType.JOB_COMMIT) {
+            if (logType == LogType.JOB_COMMIT || logType == LogType.ABORT) {
                 datasetId = -1;
                 PKHashValue = -1;
             } else {
-                datasetId = buffer.getInt();    
+                datasetId = buffer.getInt();
                 PKHashValue = buffer.getInt();
-                PKFieldCnt = buffer.getInt();
                 PKValueSize = buffer.getInt();
                 if (PKValueSize <= 0) {
                     throw new IllegalStateException("Primary Key Size is less than or equal to 0");
@@ -217,12 +214,20 @@
         }
         return true;
     }
-    
+
     private ITupleReference readPKValue(ByteBuffer buffer) {
-        return readTuple(buffer, readPKValue, PKFieldCnt, PKValueSize);
+        if (buffer.position() + PKValueSize > buffer.limit()) {
+            throw new BufferUnderflowException();
+        }
+        readPKValue.reset(buffer.array(), buffer.position(), PKValueSize);
+        buffer.position(buffer.position() + PKValueSize);
+        return readPKValue;
     }
 
     private ITupleReference readTuple(ByteBuffer srcBuffer, SimpleTupleReference destTuple, int fieldCnt, int size) {
+        if (srcBuffer.position() + size > srcBuffer.limit()) {
+            throw new BufferUnderflowException();
+        }
         destTuple.setFieldCount(fieldCnt);
         destTuple.resetByTupleOffset(srcBuffer, srcBuffer.position());
         srcBuffer.position(srcBuffer.position() + size);
@@ -230,9 +235,9 @@
     }
 
     @Override
-    public void formJobCommitLogRecord(ITransactionContext txnCtx) {
+    public void formJobTerminateLogRecord(ITransactionContext txnCtx, boolean isCommit) {
         this.txnCtx = txnCtx;
-        this.logType = LogType.JOB_COMMIT;
+        this.logType = isCommit ? LogType.JOB_COMMIT : LogType.ABORT;
         this.jobId = txnCtx.getJobId().getId();
         this.datasetId = -1;
         this.PKHashValue = -1;
@@ -281,7 +286,8 @@
                 setUpdateLogSize();
                 break;
             case LogType.JOB_COMMIT:
-                logSize = JOB_COMMIT_LOG_SIZE;
+            case LogType.ABORT:
+                logSize = JOB_TERMINATE_LOG_SIZE;
                 break;
             case LogType.ENTITY_COMMIT:
                 logSize = ENTITY_COMMIT_LOG_BASE_SIZE + PKValueSize;
@@ -298,7 +304,7 @@
         builder.append(" LogType : ").append(LogType.toString(logType));
         builder.append(" LogSize : ").append(logSize);
         builder.append(" JobId : ").append(jobId);
-        if (logType != LogType.JOB_COMMIT) {
+        if (logType == LogType.ENTITY_COMMIT || logType == LogType.UPDATE) {
             builder.append(" DatasetId : ").append(datasetId);
             builder.append(" PKHashValue : ").append(PKHashValue);
             builder.append(" PKFieldCnt : ").append(PKFieldCnt);
@@ -496,17 +502,17 @@
     public void setLSN(long LSN) {
         this.LSN = LSN;
     }
-    
+
     @Override
     public int getPKValueSize() {
         return PKValueSize;
     }
-    
+
     @Override
     public ITupleReference getPKValue() {
         return PKValue;
     }
-    
+
     @Override
     public void setPKFields(int[] primaryKeyFields) {
         PKFields = primaryKeyFields;
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogType.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogType.java
index 823c8d3..f9e9304 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogType.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogType.java
@@ -19,11 +19,14 @@
     public static final byte UPDATE = 0;
     public static final byte JOB_COMMIT = 1;
     public static final byte ENTITY_COMMIT = 2;
+    public static final byte ABORT = 3;
     private static final String STRING_UPDATE = "UPDATE";
     private static final String STRING_JOB_COMMIT = "JOB_COMMIT";
     private static final String STRING_ENTITY_COMMIT = "ENTITY_COMMIT";
+    private static final String STRING_ABORT = "ABORT";
     private static final String STRING_INVALID_LOG_TYPE = "INVALID_LOG_TYPE";
 
+
     public static String toString(byte logType) {
         switch (logType) {
             case LogType.UPDATE:
@@ -32,6 +35,8 @@
                 return STRING_JOB_COMMIT;
             case LogType.ENTITY_COMMIT:
                 return STRING_ENTITY_COMMIT;
+            case LogType.ABORT:
+                return STRING_ABORT;
             default:
                 return STRING_INVALID_LOG_TYPE;
         }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/PrimaryKeyTupleReference.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/PrimaryKeyTupleReference.java
new file mode 100644
index 0000000..d45b209
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/PrimaryKeyTupleReference.java
@@ -0,0 +1,36 @@
+package edu.uci.ics.asterix.transaction.management.service.logging;
+
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+public class PrimaryKeyTupleReference implements ITupleReference {
+    private byte[] fieldData;
+    private int start;
+    private int length;
+
+    public void reset(byte[] fieldData, int start, int length) {
+        this.fieldData = fieldData;
+        this.start = start;
+        this.length = length;
+    }
+    
+    @Override
+    public int getFieldCount() {
+        return 1;
+    }
+
+    @Override
+    public byte[] getFieldData(int fIdx) {
+        return fieldData;
+    }
+
+    @Override
+    public int getFieldStart(int fIdx) {
+        return start;
+    }
+
+    @Override
+    public int getFieldLength(int fIdx) {
+        return length;
+    }
+
+}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
index 2ad3055..f31c164 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -77,6 +77,7 @@
     private final TransactionSubsystem txnSubsystem;
     private final LogManager logMgr;
     private final int checkpointHistory;
+    private final long SHARP_CHECKPOINT_LSN = -1;
 
     /**
      * A file at a known location that contains the LSN of the last log record
@@ -115,18 +116,23 @@
             return state;
         }
 
-        //#. if minMCTFirstLSN is equal to -1 && 
-        //   checkpointLSN in the checkpoint file is equal to the lastLSN in the log file,
-        //   then return healthy state. Otherwise, return corrupted.
-        if ((checkpointObject.getMinMCTFirstLsn() == -2 && logMgr.getAppendLSN() == 0)
-                || (checkpointObject.getMinMCTFirstLsn() == -1 && checkpointObject.getCheckpointLsn() == logMgr
-                        .getAppendLSN())) {
+        long readableSmallestLSN = logMgr.getReadableSmallestLSN();
+        if (logMgr.getAppendLSN() == readableSmallestLSN) {
+            if (checkpointObject.getMinMCTFirstLsn() != SHARP_CHECKPOINT_LSN) {
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("[Warning] ---------------------------------------------------");
+                    LOGGER.info("[Warning] Some(or all) of transaction log files are lost.");
+                    LOGGER.info("[Warning] ---------------------------------------------------");
+                    //No choice but continuing when the log files are lost. 
+                }
+            }
+            state = SystemState.HEALTHY;
+            return state;
+        } else if (checkpointObject.getCheckpointLsn() == logMgr.getAppendLSN()
+                && checkpointObject.getMinMCTFirstLsn() == SHARP_CHECKPOINT_LSN) {
             state = SystemState.HEALTHY;
             return state;
         } else {
-            if (logMgr.getAppendLSN() == 0) {
-                throw new IllegalStateException("Transaction log files are lost.");
-            }
             state = SystemState.CORRUPTED;
             return state;
         }
@@ -154,10 +160,11 @@
         TxnId winnerEntity = null;
 
         //#. read checkpoint file and set lowWaterMark where anaylsis and redo start
+        long readableSmallestLSN = logMgr.getReadableSmallestLSN();
         CheckpointObject checkpointObject = readCheckpoint();
-        long lowWaterMarkLsn = checkpointObject.getMinMCTFirstLsn();
-        if (lowWaterMarkLsn == -1 || lowWaterMarkLsn == -2) {
-            lowWaterMarkLsn = 0;
+        long lowWaterMarkLSN = checkpointObject.getMinMCTFirstLsn();
+        if (lowWaterMarkLSN < readableSmallestLSN) {
+            lowWaterMarkLSN = readableSmallestLSN;
         }
         int maxJobId = checkpointObject.getMaxJobId();
 
@@ -171,11 +178,11 @@
 
         //#. set log reader to the lowWaterMarkLsn
         ILogReader logReader = logMgr.getLogReader(true);
-        logReader.initializeScan(lowWaterMarkLsn);
+        logReader.initializeScan(lowWaterMarkLSN);
         ILogRecord logRecord = logReader.next();
         while (logRecord != null) {
             if (IS_DEBUG_MODE) {
-                System.out.println(logRecord.getLogRecordForDisplay());
+                LOGGER.info(logRecord.getLogRecordForDisplay());
             }
             //update max jobId
             if (logRecord.getJobId() > maxJobId) {
@@ -183,16 +190,12 @@
             }
             switch (logRecord.getLogType()) {
                 case LogType.UPDATE:
-                    if (IS_DEBUG_MODE) {
-                        updateLogCount++;
-                    }
+                    updateLogCount++;
                     break;
                 case LogType.JOB_COMMIT:
                     winnerJobSet.add(Integer.valueOf(logRecord.getJobId()));
                     jobId2WinnerEntitiesMap.remove(Integer.valueOf(logRecord.getJobId()));
-                    if (IS_DEBUG_MODE) {
-                        jobCommitLogCount++;
-                    }
+                    jobCommitLogCount++;
                     break;
                 case LogType.ENTITY_COMMIT:
                     jobId = logRecord.getJobId();
@@ -205,9 +208,10 @@
                         winnerEntitySet = jobId2WinnerEntitiesMap.get(Integer.valueOf(jobId));
                     }
                     winnerEntitySet.add(winnerEntity);
-                    if (IS_DEBUG_MODE) {
-                        entityCommitLogCount++;
-                    }
+                    entityCommitLogCount++;
+                    break;
+                case LogType.ABORT:
+                    //ignore
                     break;
                 default:
                     throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
@@ -239,11 +243,11 @@
         ILocalResourceRepository localResourceRepository = appRuntimeContext.getLocalResourceRepository();
 
         //#. set log reader to the lowWaterMarkLsn again.
-        logReader.initializeScan(lowWaterMarkLsn);
+        logReader.initializeScan(lowWaterMarkLSN);
         logRecord = logReader.next();
         while (logRecord != null) {
-            if (LogManager.IS_DEBUG_MODE) {
-                System.out.println(logRecord.getLogRecordForDisplay());
+            if (IS_DEBUG_MODE) {
+                LOGGER.info(logRecord.getLogRecordForDisplay());
             }
             LSN = logRecord.getLSN();
             jobId = logRecord.getJobId();
@@ -310,15 +314,14 @@
 
                         if (LSN > maxDiskLastLsn) {
                             redo(logRecord);
-                            if (IS_DEBUG_MODE) {
-                                redoCount++;
-                            }
+                            redoCount++;
                         }
                     }
                     break;
 
                 case LogType.JOB_COMMIT:
                 case LogType.ENTITY_COMMIT:
+                case LogType.ABORT:
                     //do nothing
                     break;
 
@@ -338,9 +341,7 @@
 
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("[RecoveryMgr] recovery is completed.");
-        }
-        if (IS_DEBUG_MODE) {
-            System.out.println("[RecoveryMgr] Count: Update/EntityCommit/JobCommit/Redo = " + updateLogCount + "/"
+            LOGGER.info("[RecoveryMgr] Count: Update/EntityCommit/JobCommit/Redo = " + updateLogCount + "/"
                     + entityCommitLogCount + "/" + jobCommitLogCount + "/" + redoCount);
         }
     }
@@ -393,7 +394,7 @@
                     throw new ACIDException(e);
                 }
             }
-            minMCTFirstLSN = -2;
+            minMCTFirstLSN = SHARP_CHECKPOINT_LSN;
         } else {
             long firstLSN;
             minMCTFirstLSN = Long.MAX_VALUE;
@@ -403,7 +404,7 @@
                     minMCTFirstLSN = Math.min(minMCTFirstLSN, firstLSN);
                 }
             } else {
-                minMCTFirstLSN = -1;
+                minMCTFirstLSN = SHARP_CHECKPOINT_LSN;
             }
         }
         CheckpointObject checkpointObject = new CheckpointObject(logMgr.getAppendLSN(), minMCTFirstLSN,
@@ -580,7 +581,7 @@
                 break;
             } else {
                 if (IS_DEBUG_MODE) {
-                    System.out.println(logRecord.getLogRecordForDisplay());
+                    LOGGER.info(logRecord.getLogRecordForDisplay());
                 }
                 currentLSN = logRecord.getLSN();
             }
@@ -600,9 +601,9 @@
                         loserTxnTable.put(loserEntity, undoLSNSet);
                     }
                     undoLSNSet.add(Long.valueOf(currentLSN));
+                    updateLogCount++;
                     if (IS_DEBUG_MODE) {
-                        updateLogCount++;
-                        System.out.println("" + Thread.currentThread().getId() + "======> update[" + currentLSN + "]:"
+                        LOGGER.info("" + Thread.currentThread().getId() + "======> update[" + currentLSN + "]:"
                                 + tempKeyTxnId);
                     }
                     break;
@@ -611,14 +612,21 @@
                     throw new ACIDException("Unexpected LogType(" + logRecord.getLogType() + ") during abort.");
 
                 case LogType.ENTITY_COMMIT:
-                    loserTxnTable.remove(tempKeyTxnId);
+                    undoLSNSet = loserTxnTable.remove(tempKeyTxnId);
+                    if (undoLSNSet == null) {
+                        undoLSNSet = loserTxnTable.remove(tempKeyTxnId);
+                    }
+                    entityCommitLogCount++;
                     if (IS_DEBUG_MODE) {
-                        entityCommitLogCount++;
-                        System.out.println("" + Thread.currentThread().getId() + "======> entity_commit[" + currentLSN + "]"
+                        LOGGER.info("" + Thread.currentThread().getId() + "======> entity_commit[" + currentLSN + "]"
                                 + tempKeyTxnId);
                     }
                     break;
 
+                case LogType.ABORT:
+                    //ignore
+                    break;
+
                 default:
                     throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
             }
@@ -650,23 +658,17 @@
                     throw new ACIDException("IllegalState exception during abort( " + txnContext.getJobId() + ")");
                 }
                 if (IS_DEBUG_MODE) {
-                    System.out.println(logRecord.getLogRecordForDisplay());
+                    LOGGER.info(logRecord.getLogRecordForDisplay());
                 }
                 undo(logRecord);
-                if (IS_DEBUG_MODE) {
-                    undoCount++;
-                }
+                undoCount++;
             }
         }
-
         logReader.close();
-
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info(" undone loser transaction's effect");
-        }
-        if (IS_DEBUG_MODE) {
-            System.out.println("UpdateLogCount/CommitLogCount/UndoCount:" + updateLogCount + "/" + entityCommitLogCount
-                    + "/" + undoCount);
+            LOGGER.info("UpdateLogCount/CommitLogCount/UndoCount:" + updateLogCount + "/" + entityCommitLogCount + "/"
+                    + undoCount);
         }
     }
 
@@ -765,6 +767,7 @@
         this.datasetId = datasetId;
         this.pkHashValue = pkHashValue;
         this.tupleReferencePKValue = pkValue;
+        this.pkSize = pkSize;
         isByteArrayPKValue = false;
     }
 
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
index 678956b..0b0a6f5 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
@@ -221,4 +221,10 @@
     public LogRecord getLogRecord() {
         return logRecord;
     }
+
+    public void cleanupForAbort() {
+        if (primaryIndexOpTracker != null) {
+            primaryIndexOpTracker.cleanupNumActiveOperationsForAbortedJob(primaryIndexCallback);
+        }
+    }
 }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
index 01b38c2..07fc152 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -53,7 +53,12 @@
             txnCtx.setTxnState(ITransactionManager.ABORTED);
         }
         try {
-            txnSubsystem.getRecoveryManager().rollbackTransaction(txnCtx);
+            if (txnCtx.isWriteTxn()) {
+                LogRecord logRecord = ((TransactionContext) txnCtx).getLogRecord();
+                logRecord.formJobTerminateLogRecord(txnCtx, false);
+                txnSubsystem.getLogManager().log(logRecord);
+                txnSubsystem.getRecoveryManager().rollbackTransaction(txnCtx);
+            }
         } catch (Exception ae) {
             String msg = "Could not complete rollback! System is in an inconsistent state";
             if (LOGGER.isLoggable(Level.SEVERE)) {
@@ -62,6 +67,7 @@
             ae.printStackTrace();
             throw new ACIDException(msg, ae);
         } finally {
+            ((TransactionContext) txnCtx).cleanupForAbort();
             txnSubsystem.getLockManager().releaseLocks(txnCtx);
             transactionContextRepository.remove(txnCtx.getJobId());
         }
@@ -69,20 +75,24 @@
 
     @Override
     public ITransactionContext beginTransaction(JobId jobId) throws ACIDException {
-        return getTransactionContext(jobId);
+        return getTransactionContext(jobId, true);
     }
 
     @Override
-    public ITransactionContext getTransactionContext(JobId jobId) throws ACIDException {
+    public ITransactionContext getTransactionContext(JobId jobId, boolean createIfNotExist) throws ACIDException {
         setMaxJobId(jobId.getId());
         ITransactionContext txnCtx = transactionContextRepository.get(jobId);
         if (txnCtx == null) {
-            synchronized (this) {
-                txnCtx = transactionContextRepository.get(jobId);
-                if (txnCtx == null) {
-                    txnCtx = new TransactionContext(jobId, txnSubsystem);
-                    transactionContextRepository.put(jobId, txnCtx);
+            if (createIfNotExist) {
+                synchronized (this) {
+                    txnCtx = transactionContextRepository.get(jobId);
+                    if (txnCtx == null) {
+                        txnCtx = new TransactionContext(jobId, txnSubsystem);
+                        transactionContextRepository.put(jobId, txnCtx);
+                    }
                 }
+            } else {
+                throw new ACIDException("TransactionContext of " + jobId + " doesn't exist.");
             }
         }
         return txnCtx;
@@ -94,7 +104,7 @@
         try {
             if (txnCtx.isWriteTxn()) {
                 LogRecord logRecord = ((TransactionContext) txnCtx).getLogRecord();
-                logRecord.formJobCommitLogRecord(txnCtx);
+                logRecord.formJobTerminateLogRecord(txnCtx, true);
                 txnSubsystem.getLogManager().log(logRecord);
             }
         } catch (Exception ae) {