merged from master to master_abort_recovery_fix2
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java
index 6dd11bd..32bfa58 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java
@@ -72,7 +72,7 @@
     @Override
     public void open() throws HyracksDataException {
         try {
-            transactionContext = transactionManager.getTransactionContext(jobId);
+            transactionContext = transactionManager.getTransactionContext(jobId, false);
             transactionContext.setWriteTxn(isWriteTransaction);
         } catch (ACIDException e) {
             throw new HyracksDataException(e);
@@ -89,7 +89,11 @@
             pkHash = computePrimaryKeyHashValue(frameTupleReference, primaryKeyFields);
             logRecord.formEntityCommitLogRecord(transactionContext, datasetId, pkHash, frameTupleReference,
                     primaryKeyFields);
-            logMgr.log(logRecord);
+            try {
+                logMgr.log(logRecord);
+            } catch (ACIDException e) {
+                throw new HyracksDataException(e);
+            }
         }
     }
 
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/InvertedIndexPOperator.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/InvertedIndexPOperator.java
index 56fc0e7..af503a6 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/InvertedIndexPOperator.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/InvertedIndexPOperator.java
@@ -220,17 +220,15 @@
                 dataflowHelperFactory = new LSMInvertedIndexDataflowHelperFactory(
                         new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
                         AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, new SecondaryIndexOperationTrackerProvider(
-                                LSMInvertedIndexIOOperationCallbackFactory.INSTANCE, dataset.getDatasetId()),
-                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                                dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                        LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
                         storageProperties.getBloomFilterFalsePositiveRate());
             } else {
                 dataflowHelperFactory = new PartitionedLSMInvertedIndexDataflowHelperFactory(
                         new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
                         AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, new SecondaryIndexOperationTrackerProvider(
-                                LSMInvertedIndexIOOperationCallbackFactory.INSTANCE, dataset.getDatasetId()),
-                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                                dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                        LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
                         storageProperties.getBloomFilterFalsePositiveRate());
             }
             LSMInvertedIndexSearchOperatorDescriptor invIndexSearchOp = new LSMInvertedIndexSearchOperatorDescriptor(
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContextProviderForRecovery.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContextProviderForRecovery.java
index ee9dfae..b528381 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContextProviderForRecovery.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/AsterixAppRuntimeContextProviderForRecovery.java
@@ -18,10 +18,8 @@
 
 import edu.uci.ics.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
 import edu.uci.ics.asterix.common.transactions.ITransactionSubsystem;
-import edu.uci.ics.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
 import edu.uci.ics.hyracks.api.io.IIOManager;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
@@ -95,26 +93,6 @@
     }
 
     @Override
-    public ILSMIOOperationCallbackProvider getNoOpIOOperationCallbackProvider() {
-        return AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER;
-    }
-
-    @Override
-    public ILSMIOOperationCallbackProvider getLSMBTreeIOOperationCallbackProvider(boolean isPrimary) {
-        return AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER;
-    }
-
-    @Override
-    public ILSMIOOperationCallbackProvider getLSMRTreeIOOperationCallbackProvider() {
-        return AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER;
-    }
-
-    @Override
-    public ILSMIOOperationCallbackProvider getLSMInvertedIndexIOOperationCallbackProvider() {
-        return AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER;
-    }
-
-    @Override
     public ILSMOperationTracker getLSMBTreeOperationTracker(int datasetID) {
         return asterixAppRuntimeContext.getLSMBTreeOperationTracker(datasetID);
     }
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
index 3e41a77..ccc5e85 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
@@ -30,6 +30,7 @@
 import edu.uci.ics.asterix.common.context.AsterixVirtualBufferCacheProvider;
 import edu.uci.ics.asterix.common.exceptions.ACIDException;
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
 import edu.uci.ics.asterix.formats.base.IDataFormat;
 import edu.uci.ics.asterix.metadata.MetadataManager;
 import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
@@ -127,8 +128,7 @@
                 splitsAndConstraint.first, new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
                         dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
                         new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
-                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
                         storageProperties.getBloomFilterFalsePositiveRate()));
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(specPrimary, primaryBtreeDrop,
                 splitsAndConstraint.second);
@@ -180,7 +180,7 @@
                 new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
                         AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, new PrimaryIndexOperationTrackerProvider(
                                 dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, storageProperties
+                        LSMBTreeIOOperationCallbackFactory.INSTANCE, storageProperties
                                 .getBloomFilterFalsePositiveRate()), localResourceFactoryProvider,
                 NoOpOperationCallbackFactory.INSTANCE);
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, indexCreateOp,
@@ -267,7 +267,7 @@
                             AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
                             new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
                             AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                            LSMBTreeIOOperationCallbackFactory.INSTANCE,
                             storageProperties.getBloomFilterFalsePositiveRate()), NoOpOperationCallbackFactory.INSTANCE);
             AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, btreeBulkLoad,
                     splitsAndConstraint.second);
@@ -293,7 +293,7 @@
                             AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
                             new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
                             AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                            LSMBTreeIOOperationCallbackFactory.INSTANCE,
                             storageProperties.getBloomFilterFalsePositiveRate()), NoOpOperationCallbackFactory.INSTANCE);
             AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, btreeBulkLoad,
                     splitsAndConstraint.second);
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
index af56894..bf31db7 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
@@ -71,9 +71,8 @@
                 AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
                 splitsAndConstraint.first, new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
                         dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                        new SecondaryIndexOperationTrackerProvider(LSMBTreeIOOperationCallbackFactory.INSTANCE, dataset
-                                .getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                        new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
                         storageProperties.getBloomFilterFalsePositiveRate()));
         AlgebricksPartitionConstraintHelper
                 .setPartitionConstraintInJobSpec(spec, btreeDrop, splitsAndConstraint.second);
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java
index e3832d4..3ae5f6d 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryBTreeCreator.java
@@ -66,9 +66,8 @@
                 secondaryFileSplitProvider, secondaryRecDesc.getTypeTraits(), secondaryComparatorFactories,
                 secondaryBloomFilterKeyFields, new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
                         dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                        new SecondaryIndexOperationTrackerProvider(LSMBTreeIOOperationCallbackFactory.INSTANCE, dataset
-                                .getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                        new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
                         storageProperties.getBloomFilterFalsePositiveRate()), localResourceFactoryProvider,
                 NoOpOperationCallbackFactory.INSTANCE);
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, secondaryIndexCreateOp,
@@ -107,9 +106,8 @@
                 numSecondaryKeys,
                 new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
                         AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, new SecondaryIndexOperationTrackerProvider(
-                                LSMBTreeIOOperationCallbackFactory.INSTANCE, dataset.getDatasetId()),
-                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, storageProperties
+                                dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                        LSMBTreeIOOperationCallbackFactory.INSTANCE, storageProperties
                                 .getBloomFilterFalsePositiveRate()), BTree.DEFAULT_FILL_FACTOR);
 
         // Connect the operators.
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
index a9b3881..9f80568 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
@@ -26,6 +26,7 @@
 import edu.uci.ics.asterix.common.context.ITransactionSubsystemProvider;
 import edu.uci.ics.asterix.common.context.TransactionSubsystemProvider;
 import edu.uci.ics.asterix.common.exceptions.AsterixException;
+import edu.uci.ics.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
 import edu.uci.ics.asterix.common.transactions.IRecoveryManager.ResourceType;
 import edu.uci.ics.asterix.common.transactions.JobId;
 import edu.uci.ics.asterix.formats.nontagged.AqlBinaryBooleanInspectorImpl;
@@ -296,7 +297,7 @@
                 new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
                         AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, new PrimaryIndexOperationTrackerProvider(
                                 dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                        LSMBTreeIOOperationCallbackFactory.INSTANCE,
                         storageProperties.getBloomFilterFalsePositiveRate()), false, searchCallbackFactory);
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, primarySearchOp,
                 primaryPartitionConstraint);
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java
index 40e0aa9..72d0d70 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryInvertedIndexCreator.java
@@ -283,16 +283,16 @@
         if (!isPartitioned) {
             return new LSMInvertedIndexDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
                     dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                    new SecondaryIndexOperationTrackerProvider(LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
-                            dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                    new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
                     AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                    LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
                     storageProperties.getBloomFilterFalsePositiveRate());
         } else {
             return new PartitionedLSMInvertedIndexDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(
                     dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                    new SecondaryIndexOperationTrackerProvider(LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
-                            dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                    new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
                     AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                    LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
                     storageProperties.getBloomFilterFalsePositiveRate());
         }
     }
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
index ec62068..4d8118d 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
@@ -91,10 +91,9 @@
                 new LSMRTreeDataflowHelperFactory(valueProviderFactories, RTreePolicyType.RTREE,
                         primaryComparatorFactories, new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
                         AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, new SecondaryIndexOperationTrackerProvider(
-                                LSMRTreeIOOperationCallbackFactory.INSTANCE, dataset.getDatasetId()),
-                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AqlMetadataProvider.proposeLinearizer(
-                                keyType, secondaryComparatorFactories.length), storageProperties
+                                dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                        LSMRTreeIOOperationCallbackFactory.INSTANCE, AqlMetadataProvider.proposeLinearizer(keyType,
+                                secondaryComparatorFactories.length), storageProperties
                                 .getBloomFilterFalsePositiveRate()), localResourceFactoryProvider,
                 NoOpOperationCallbackFactory.INSTANCE);
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, secondaryIndexCreateOp,
@@ -171,17 +170,14 @@
 
         AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties();
         // Create secondary RTree bulk load op.
-        TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = createTreeIndexBulkLoadOp(
-                spec,
-                numNestedSecondaryKeyFields,
-                new LSMRTreeDataflowHelperFactory(valueProviderFactories, RTreePolicyType.RTREE,
-                        primaryComparatorFactories, new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
-                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, new SecondaryIndexOperationTrackerProvider(
-                                LSMRTreeIOOperationCallbackFactory.INSTANCE, dataset.getDatasetId()),
-                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AqlMetadataProvider.proposeLinearizer(
-                                keyType, secondaryComparatorFactories.length), storageProperties
-                                .getBloomFilterFalsePositiveRate()), BTree.DEFAULT_FILL_FACTOR);
+        TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = createTreeIndexBulkLoadOp(spec,
+                numNestedSecondaryKeyFields, new LSMRTreeDataflowHelperFactory(valueProviderFactories,
+                        RTreePolicyType.RTREE, primaryComparatorFactories, new AsterixVirtualBufferCacheProvider(
+                                dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                        new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMRTreeIOOperationCallbackFactory.INSTANCE,
+                        AqlMetadataProvider.proposeLinearizer(keyType, secondaryComparatorFactories.length),
+                        storageProperties.getBloomFilterFalsePositiveRate()), BTree.DEFAULT_FILL_FACTOR);
 
         // Connect the operators.
         spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0);
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/BaseOperationTracker.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/BaseOperationTracker.java
index 3610478..7e7ffd9 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/BaseOperationTracker.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/BaseOperationTracker.java
@@ -17,52 +17,18 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
 import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMOperationType;
-import edu.uci.ics.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallback;
 
 public class BaseOperationTracker implements ILSMOperationTracker {
 
     protected final DatasetLifecycleManager datasetLifecycleManager;
-    protected final ILSMIOOperationCallback ioOpCallback;
-    protected long lastLSN;
-    protected long firstLSN;
     protected final int datasetID;
 
-    public BaseOperationTracker(DatasetLifecycleManager datasetLifecycleManager,
-            ILSMIOOperationCallbackFactory ioOpCallbackFactory, int datasetID) {
+    public BaseOperationTracker(DatasetLifecycleManager datasetLifecycleManager, int datasetID) {
         this.datasetLifecycleManager = datasetLifecycleManager;
-        this.ioOpCallback = ioOpCallbackFactory == null ? NoOpIOOperationCallback.INSTANCE : ioOpCallbackFactory
-                .createIOOperationCallback(this);
         this.datasetID = datasetID;
-        resetLSNs();
-    }
-
-    public ILSMIOOperationCallback getIOOperationCallback() {
-        return ioOpCallback;
-    }
-
-    public long getLastLSN() {
-        return lastLSN;
-    }
-
-    public long getFirstLSN() {
-        return firstLSN;
-    }
-
-    public void updateLastLSN(long lastLSN) {
-        if (firstLSN == -1) {
-            firstLSN = lastLSN;
-        }
-        this.lastLSN = Math.max(this.lastLSN, lastLSN);
-    }
-
-    public void resetLSNs() {
-        lastLSN = -1;
-        firstLSN = -1;
     }
 
     @Override
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/ConstantMergePolicy.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/ConstantMergePolicy.java
index cf69bfe..dee652e 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/ConstantMergePolicy.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/ConstantMergePolicy.java
@@ -43,7 +43,7 @@
         if (!ctx.isShuttingdown() && immutableComponents.size() >= threshold) {
             ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
                     NoOpOperationCallback.INSTANCE);
-            accessor.scheduleMerge(NoOpIOOperationCallback.INSTANCE);
+            accessor.scheduleMerge(index.getIOOperationCallback());
         }
     }
 }
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java
index 617b6ff..6335fb1 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/DatasetLifecycleManager.java
@@ -26,7 +26,6 @@
 
 import edu.uci.ics.asterix.common.api.ILocalResourceMetadata;
 import edu.uci.ics.asterix.common.config.AsterixStorageProperties;
-import edu.uci.ics.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.lifecycle.ILifeCycleComponent;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
@@ -253,7 +252,7 @@
         if (iInfo.isOpen) {
             ILSMIndexAccessor accessor = (ILSMIndexAccessor) iInfo.index.createAccessor(NoOpOperationCallback.INSTANCE,
                     NoOpOperationCallback.INSTANCE);
-            accessor.scheduleFlush(((BaseOperationTracker) iInfo.index.getOperationTracker()).getIOOperationCallback());
+            accessor.scheduleFlush(iInfo.index.getIOOperationCallback());
         }
         // Wait for the above flush op.
         while (dsInfo.numActiveIOOps > 0) {
@@ -316,8 +315,7 @@
         synchronized (datasetOpTrackers) {
             ILSMOperationTracker opTracker = datasetOpTrackers.get(datasetID);
             if (opTracker == null) {
-                opTracker = new PrimaryIndexOperationTracker(this, datasetID,
-                        LSMBTreeIOOperationCallbackFactory.INSTANCE);
+                opTracker = new PrimaryIndexOperationTracker(this, datasetID);
                 datasetOpTrackers.put(datasetID, opTracker);
             }
 
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java
index 2ed4b0ec..ef58937 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -18,11 +18,11 @@
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import edu.uci.ics.asterix.common.transactions.AbstractOperationCallback;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
 import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallback;
 import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexInternal;
@@ -33,9 +33,8 @@
     // Number of active operations on an ILSMIndex instance.
     private final AtomicInteger numActiveOperations;
 
-    public PrimaryIndexOperationTracker(DatasetLifecycleManager datasetLifecycleManager, int datasetID,
-            ILSMIOOperationCallbackFactory ioOpCallbackFactory) {
-        super(datasetLifecycleManager, ioOpCallbackFactory, datasetID);
+    public PrimaryIndexOperationTracker(DatasetLifecycleManager datasetLifecycleManager, int datasetID) {
+        super(datasetLifecycleManager, datasetID);
         this.numActiveOperations = new AtomicInteger();
     }
 
@@ -43,7 +42,7 @@
     public void beforeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
             IModificationOperationCallback modificationCallback) throws HyracksDataException {
         if (opType == LSMOperationType.MODIFICATION || opType == LSMOperationType.FORCE_MODIFICATION) {
-            numActiveOperations.incrementAndGet();
+            incrementNumActiveOperations(modificationCallback);
         } else if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE) {
             datasetLifecycleManager.declareActiveIOOperation(datasetID);
         }
@@ -62,14 +61,11 @@
     public void completeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
             IModificationOperationCallback modificationCallback) throws HyracksDataException {
         if (opType == LSMOperationType.MODIFICATION || opType == LSMOperationType.FORCE_MODIFICATION) {
-            numActiveOperations.decrementAndGet();
+            decrementNumActiveOperations(modificationCallback);
+            flushIfFull();
         } else if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE) {
             datasetLifecycleManager.undeclareActiveIOOperation(datasetID);
         }
-
-        if (opType == LSMOperationType.MODIFICATION || opType == LSMOperationType.FORCE_MODIFICATION) {
-            flushIfFull();
-        }
     }
 
     private void flushIfFull() throws HyracksDataException {
@@ -88,8 +84,7 @@
                 for (ILSMIndex lsmIndex : indexes) {
                     ILSMIndexAccessor accessor = (ILSMIndexAccessor) lsmIndex.createAccessor(
                             NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
-                    accessor.scheduleFlush(((BaseOperationTracker) lsmIndex.getOperationTracker())
-                            .getIOOperationCallback());
+                    accessor.scheduleFlush(lsmIndex.getIOOperationCallback());
                 }
             }
         }
@@ -103,4 +98,27 @@
     public int getNumActiveOperations() {
         return numActiveOperations.get();
     }
+
+    private void incrementNumActiveOperations(IModificationOperationCallback modificationCallback) {
+        //modificationCallback can be NoOpOperationCallback when redo/undo operations are executed. 
+        if (modificationCallback != NoOpOperationCallback.INSTANCE) {
+            numActiveOperations.incrementAndGet();
+            ((AbstractOperationCallback) modificationCallback).incrementLocalNumActiveOperations();
+        }
+    }
+
+    private void decrementNumActiveOperations(IModificationOperationCallback modificationCallback) {
+        //modificationCallback can be NoOpOperationCallback when redo/undo operations are executed.
+        if (modificationCallback != NoOpOperationCallback.INSTANCE) {
+            numActiveOperations.decrementAndGet();
+            ((AbstractOperationCallback) modificationCallback).decrementLocalNumActiveOperations();
+        }
+    }
+
+    public void cleanupNumActiveOperationsForAbortedJob(AbstractOperationCallback callback) {
+        int delta = callback.getLocalNumActiveOperations() * -1;
+        numActiveOperations.getAndAdd(delta);
+        callback.resetLocalNumActiveOperations();
+    }
+
 }
\ No newline at end of file
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
index da08cd8..684068b 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
@@ -17,34 +17,51 @@
 
 import java.util.List;
 
-import edu.uci.ics.asterix.common.context.BaseOperationTracker;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrame;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMOperationType;
 import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
 import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
 import edu.uci.ics.hyracks.storage.common.file.BufferedFileHandle;
 
 public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationCallback {
 
-    protected final BaseOperationTracker opTracker;
+    protected long firstLSN;
+    protected long lastLSN;
+    protected long[] immutableLastLSNs;
+    protected int readIndex;
+    protected int writeIndex;
 
-    public AbstractLSMIOOperationCallback(BaseOperationTracker opTracker) {
-        this.opTracker = opTracker;
+    public AbstractLSMIOOperationCallback() {
+        resetLSNs();
     }
 
     @Override
-    public void beforeOperation() {
+    public void setNumOfMutableComponents(int count) {
+        immutableLastLSNs = new long[count];
+        readIndex = 0;
+        writeIndex = 0;
+    }
+
+    @Override
+    public void beforeOperation(LSMOperationType opType) {
+        if (opType == LSMOperationType.FLUSH) {
+            synchronized (this) {
+                immutableLastLSNs[writeIndex] = lastLSN;
+                writeIndex = (writeIndex + 1) % immutableLastLSNs.length;
+                resetLSNs();
+            }
+        }
+    }
+
+    @Override
+    public void afterFinalize(LSMOperationType opType, ILSMComponent newComponent) {
         // Do nothing.
     }
 
-    @Override
-    public void afterFinalize(ILSMComponent newComponent) {
-        opTracker.resetLSNs();
-    }
-
     public abstract long getComponentLSN(List<ILSMComponent> oldComponents) throws HyracksDataException;
 
     protected void putLSNIntoMetadata(ITreeIndex treeIndex, List<ILSMComponent> oldComponents)
@@ -80,4 +97,25 @@
             bufferCache.unpin(metadataPage);
         }
     }
+
+    protected void resetLSNs() {
+        firstLSN = -1;
+        lastLSN = -1;
+    }
+
+    public void updateLastLSN(long lastLSN) {
+        if (firstLSN == -1) {
+            firstLSN = lastLSN;
+        }
+        this.lastLSN = Math.max(this.lastLSN, lastLSN);
+    }
+
+    public long getFirstLSN() {
+        return firstLSN;
+    }
+
+    public long getLastLSN() {
+        return lastLSN;
+    }
+
 }
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
index b6025cb..8e9b44e 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
@@ -17,22 +17,22 @@
 
 import java.util.List;
 
-import edu.uci.ics.asterix.common.context.BaseOperationTracker;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
 import edu.uci.ics.hyracks.storage.am.lsm.btree.impls.LSMBTreeDiskComponent;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMOperationType;
 
 public class LSMBTreeIOOperationCallback extends AbstractLSMIOOperationCallback {
 
-    public LSMBTreeIOOperationCallback(BaseOperationTracker opTracker) {
-        super(opTracker);
+    public LSMBTreeIOOperationCallback() {
+        super();
     }
 
     @Override
-    public void afterOperation(List<ILSMComponent> oldComponents, ILSMComponent newComponent)
+    public void afterOperation(LSMOperationType opType, List<ILSMComponent> oldComponents, ILSMComponent newComponent)
             throws HyracksDataException {
-        if (oldComponents != null && newComponent != null) {
+        if (newComponent != null) {
             LSMBTreeDiskComponent btreeComponent = (LSMBTreeDiskComponent) newComponent;
             putLSNIntoMetadata(btreeComponent.getBTree(), oldComponents);
         }
@@ -42,7 +42,11 @@
     public long getComponentLSN(List<ILSMComponent> diskComponents) throws HyracksDataException {
         if (diskComponents == null) {
             // Implies a flush IO operation.
-            return opTracker.getLastLSN();
+            synchronized (this) {
+                long lsn = immutableLastLSNs[readIndex];
+                readIndex = (readIndex + 1) % immutableLastLSNs.length;
+                return lsn;
+            }
         }
         // Get max LSN from the diskComponents. Implies a merge IO operation or Recovery operation.
         long maxLSN = -1;
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
index 92ba9ec..1028015 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
@@ -15,7 +15,6 @@
 
 package edu.uci.ics.asterix.common.ioopcallbacks;
 
-import edu.uci.ics.asterix.common.context.BaseOperationTracker;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
 
@@ -29,7 +28,7 @@
     }
 
     @Override
-    public ILSMIOOperationCallback createIOOperationCallback(Object syncObj) {
-        return new LSMBTreeIOOperationCallback((BaseOperationTracker) syncObj);
+    public ILSMIOOperationCallback createIOOperationCallback() {
+        return new LSMBTreeIOOperationCallback();
     }
 }
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
index 4f99ae6..5532f97 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
@@ -17,21 +17,21 @@
 
 import java.util.List;
 
-import edu.uci.ics.asterix.common.context.BaseOperationTracker;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMOperationType;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndexDiskComponent;
 
 public class LSMInvertedIndexIOOperationCallback extends AbstractLSMIOOperationCallback {
 
-    public LSMInvertedIndexIOOperationCallback(BaseOperationTracker opTracker) {
-        super(opTracker);
+    public LSMInvertedIndexIOOperationCallback() {
+        super();
     }
 
     @Override
-    public void afterOperation(List<ILSMComponent> oldComponents, ILSMComponent newComponent)
+    public void afterOperation(LSMOperationType opType, List<ILSMComponent> oldComponents, ILSMComponent newComponent)
             throws HyracksDataException {
-        if (oldComponents != null && newComponent != null) {
+        if (newComponent != null) {
             LSMInvertedIndexDiskComponent invIndexComponent = (LSMInvertedIndexDiskComponent) newComponent;
             putLSNIntoMetadata(invIndexComponent.getDeletedKeysBTree(), oldComponents);
         }
@@ -41,7 +41,11 @@
     public long getComponentLSN(List<ILSMComponent> diskComponents) throws HyracksDataException {
         if (diskComponents == null) {
             // Implies a flush IO operation.
-            return opTracker.getLastLSN();
+            synchronized (this) {
+                long lsn = immutableLastLSNs[readIndex];
+                readIndex = (readIndex + 1) % immutableLastLSNs.length;
+                return lsn;
+            }
         }
         // Get max LSN from the diskComponents. Implies a merge IO operation or Recovery operation.
         long maxLSN = -1;
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
index c20cdb3..5dc0c0b 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
@@ -15,7 +15,6 @@
 
 package edu.uci.ics.asterix.common.ioopcallbacks;
 
-import edu.uci.ics.asterix.common.context.BaseOperationTracker;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
 
@@ -29,7 +28,7 @@
     }
 
     @Override
-    public ILSMIOOperationCallback createIOOperationCallback(Object syncObj) {
-        return new LSMInvertedIndexIOOperationCallback((BaseOperationTracker) syncObj);
+    public ILSMIOOperationCallback createIOOperationCallback() {
+        return new LSMInvertedIndexIOOperationCallback();
     }
 }
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
index cd7b7a0..1497e17 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
@@ -17,21 +17,21 @@
 
 import java.util.List;
 
-import edu.uci.ics.asterix.common.context.BaseOperationTracker;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.LSMOperationType;
 import edu.uci.ics.hyracks.storage.am.lsm.rtree.impls.LSMRTreeDiskComponent;
 
 public class LSMRTreeIOOperationCallback extends AbstractLSMIOOperationCallback {
 
-    public LSMRTreeIOOperationCallback(BaseOperationTracker opTracker) {
-        super(opTracker);
+    public LSMRTreeIOOperationCallback() {
+        super();
     }
 
     @Override
-    public void afterOperation(List<ILSMComponent> oldComponents, ILSMComponent newComponent)
+    public void afterOperation(LSMOperationType opType, List<ILSMComponent> oldComponents, ILSMComponent newComponent)
             throws HyracksDataException {
-        if (oldComponents != null && newComponent != null) {
+        if (newComponent != null) {
             LSMRTreeDiskComponent rtreeComponent = (LSMRTreeDiskComponent) newComponent;
             putLSNIntoMetadata(rtreeComponent.getRTree(), oldComponents);
             putLSNIntoMetadata(rtreeComponent.getBTree(), oldComponents);
@@ -42,7 +42,11 @@
     public long getComponentLSN(List<ILSMComponent> diskComponents) throws HyracksDataException {
         if (diskComponents == null) {
             // Implies a flush IO operation.
-            return opTracker.getLastLSN();
+            synchronized (this) {
+                long lsn = immutableLastLSNs[readIndex];
+                readIndex = (readIndex + 1) % immutableLastLSNs.length;
+                return lsn;
+            }
         }
         // Get max LSN from the diskComponents. Implies a merge IO operation or Recovery operation.
         long maxLSN = -1;
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
index 0cd2539..841a1d5 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
@@ -15,7 +15,6 @@
 
 package edu.uci.ics.asterix.common.ioopcallbacks;
 
-import edu.uci.ics.asterix.common.context.BaseOperationTracker;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
 
@@ -29,7 +28,7 @@
     }
 
     @Override
-    public ILSMIOOperationCallback createIOOperationCallback(Object syncObj) {
-        return new LSMRTreeIOOperationCallback((BaseOperationTracker) syncObj);
+    public ILSMIOOperationCallback createIOOperationCallback() {
+        return new LSMRTreeIOOperationCallback();
     }
 }
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/AbstractOperationCallback.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/AbstractOperationCallback.java
index d4b26f7..c549e7d 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/AbstractOperationCallback.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/AbstractOperationCallback.java
@@ -15,6 +15,8 @@
 
 package edu.uci.ics.asterix.common.transactions;
 
+import java.util.concurrent.atomic.AtomicInteger;
+
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 import edu.uci.ics.hyracks.storage.am.bloomfilter.impls.MurmurHash128Bit;
 
@@ -27,6 +29,7 @@
     protected final ITransactionContext txnCtx;
     protected final ILockManager lockManager;
     protected final long[] longHashes;
+    protected final AtomicInteger transactorLocalNumActiveOperations;
 
     public AbstractOperationCallback(int datasetId, int[] primaryKeyFields, ITransactionContext txnCtx,
             ILockManager lockManager) {
@@ -34,6 +37,7 @@
         this.primaryKeyFields = primaryKeyFields;
         this.txnCtx = txnCtx;
         this.lockManager = lockManager;
+        this.transactorLocalNumActiveOperations = new AtomicInteger(0);
         this.longHashes = new long[2];
     }
 
@@ -41,4 +45,21 @@
         MurmurHash128Bit.hash3_x64_128(tuple, primaryKeyFields, SEED, longHashes);
         return Math.abs((int) longHashes[0]);
     }
+    
+    public void resetLocalNumActiveOperations() {
+        transactorLocalNumActiveOperations.set(0);
+    }
+
+    public int getLocalNumActiveOperations() {
+        return transactorLocalNumActiveOperations.get();
+    }
+
+    public void incrementLocalNumActiveOperations() {
+        transactorLocalNumActiveOperations.incrementAndGet();
+    }
+
+    public void decrementLocalNumActiveOperations() {
+        transactorLocalNumActiveOperations.decrementAndGet();
+    }
+
 }
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java
index 2f522b9..2638c9f 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/IAsterixAppRuntimeContextProvider.java
@@ -18,7 +18,6 @@
 
 import edu.uci.ics.hyracks.api.io.IIOManager;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
@@ -44,14 +43,6 @@
 
     public ILSMOperationTracker getLSMBTreeOperationTracker(int datasetID);
 
-    public ILSMIOOperationCallbackProvider getLSMBTreeIOOperationCallbackProvider(boolean isPrimary);
-
-    public ILSMIOOperationCallbackProvider getLSMRTreeIOOperationCallbackProvider();
-
-    public ILSMIOOperationCallbackProvider getLSMInvertedIndexIOOperationCallbackProvider();
-
-    public ILSMIOOperationCallbackProvider getNoOpIOOperationCallbackProvider();
-
     public ILSMIOOperationScheduler getLSMIOScheduler();
 
     public ILocalResourceRepository getLocalResourceRepository();
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogManager.java
index 8913f8a..27a91a4 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogManager.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogManager.java
@@ -14,9 +14,11 @@
  */
 package edu.uci.ics.asterix.common.transactions;
 
+import edu.uci.ics.asterix.common.exceptions.ACIDException;
+
 public interface ILogManager {
 
-    public void log(ILogRecord logRecord);
+    public void log(ILogRecord logRecord) throws ACIDException;
 
     public ILogReader getLogReader(boolean isRecoveryMode);
 
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogRecord.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogRecord.java
index d13ef6c..3068867 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogRecord.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ILogRecord.java
@@ -20,15 +20,15 @@
 
 public interface ILogRecord {
 
-    public static final int JOB_COMMIT_LOG_SIZE = 13;
-    public static final int ENTITY_COMMIT_LOG_BASE_SIZE = 29;
-    public static final int UPDATE_LOG_BASE_SIZE = 64;
+    public static final int JOB_TERMINATE_LOG_SIZE = 13; //JOB_COMMIT or ABORT log type
+    public static final int ENTITY_COMMIT_LOG_BASE_SIZE = 25;
+    public static final int UPDATE_LOG_BASE_SIZE = 60;
 
     public boolean readLogRecord(ByteBuffer buffer);
 
     public void writeLogRecord(ByteBuffer buffer);
 
-    public void formJobCommitLogRecord(ITransactionContext txnCtx);
+    public void formJobTerminateLogRecord(ITransactionContext txnCtx, boolean isCommit);
 
     public void formEntityCommitLogRecord(ITransactionContext txnCtx, int datasetId, int PKHashValue,
             ITupleReference tupleReference, int[] primaryKeyFields);
diff --git a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionManager.java b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionManager.java
index 77e960b..ffd4cc2 100644
--- a/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionManager.java
+++ b/asterix-common/src/main/java/edu/uci/ics/asterix/common/transactions/ITransactionManager.java
@@ -52,10 +52,11 @@
      * 
      * @param jobId
      *            a unique value for the transaction id.
+     * @param createIfNotExist TODO
      * @return
      * @throws ACIDException
      */
-    public ITransactionContext getTransactionContext(JobId jobId) throws ACIDException;
+    public ITransactionContext getTransactionContext(JobId jobId, boolean createIfNotExist) throws ACIDException;
 
     /**
      * Commits a transaction.
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
index 8765aae..ee0791b 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
@@ -115,14 +115,15 @@
 
     @Override
     public void commitTransaction(JobId jobId) throws RemoteException, ACIDException {
-        ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId);
+        ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId, false);
         transactionSubsystem.getTransactionManager().commitTransaction(txnCtx, new DatasetId(-1), -1);
     }
 
     @Override
     public void abortTransaction(JobId jobId) throws RemoteException, ACIDException {
         try {
-            ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId);
+            ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId,
+                    false);
             transactionSubsystem.getTransactionManager().abortTransaction(txnCtx, new DatasetId(-1), -1);
         } catch (ACIDException e) {
             e.printStackTrace();
@@ -132,13 +133,13 @@
 
     @Override
     public void lock(JobId jobId, byte lockMode) throws ACIDException, RemoteException {
-        ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId);
+        ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId, false);
         transactionSubsystem.getLockManager().lock(METADATA_DATASET_ID, -1, lockMode, txnCtx);
     }
 
     @Override
     public void unlock(JobId jobId) throws ACIDException, RemoteException {
-        ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId);
+        ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId, false);
         transactionSubsystem.getLockManager().unlock(METADATA_DATASET_ID, -1, txnCtx);
     }
 
@@ -273,7 +274,7 @@
 
         ILSMIndexAccessor indexAccessor = lsmIndex.createAccessor(modCallback, NoOpOperationCallback.INSTANCE);
 
-        ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId);
+        ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId, false);
         txnCtx.setWriteTxn(true);
         txnCtx.registerIndexAndCallback(resourceID, lsmIndex, (AbstractOperationCallback) modCallback,
                 metadataIndex.isPrimaryIndex());
@@ -286,7 +287,7 @@
 
     private IModificationOperationCallback createIndexModificationCallback(JobId jobId, long resourceId,
             IMetadataIndex metadataIndex, ILSMIndex lsmIndex, IndexOperation indexOp) throws Exception {
-        ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId);
+        ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId, false);
 
         if (metadataIndex.isPrimaryIndex()) {
             return new PrimaryIndexModificationOperationCallback(metadataIndex.getDatasetId().getId(),
@@ -581,7 +582,7 @@
                 lsmIndex, IndexOperation.DELETE);
         ILSMIndexAccessor indexAccessor = lsmIndex.createAccessor(modCallback, NoOpOperationCallback.INSTANCE);
 
-        ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId);
+        ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId, false);
         txnCtx.setWriteTxn(true);
         txnCtx.registerIndexAndCallback(resourceID, lsmIndex, (AbstractOperationCallback) modCallback,
                 metadataIndex.isPrimaryIndex());
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
index 4a02bc5..7be4e8e 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -60,7 +60,6 @@
 import edu.uci.ics.asterix.runtime.formats.NonTaggedDataFormat;
 import edu.uci.ics.asterix.transaction.management.resource.LSMBTreeLocalResourceMetadata;
 import edu.uci.ics.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
-import edu.uci.ics.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
 import edu.uci.ics.hyracks.api.application.INCApplicationContext;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -333,14 +332,14 @@
         int[] bloomFilterKeyFields = index.getBloomFilterKeyFields();
         LSMBTree lsmBtree = null;
         long resourceID = -1;
-        AsterixRuntimeComponentsProvider rtcProvider = AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER;
         ILSMOperationTracker opTracker = index.isPrimaryIndex() ? runtimeContext.getLSMBTreeOperationTracker(index
                 .getDatasetId().getId()) : new BaseOperationTracker((DatasetLifecycleManager) indexLifecycleManager,
-                LSMBTreeIOOperationCallbackFactory.INSTANCE, index.getDatasetId().getId());
+                index.getDatasetId().getId());
         if (create) {
             lsmBtree = LSMBTreeUtils.createLSMTree(virtualBufferCaches, file, bufferCache, fileMapProvider, typeTraits,
                     comparatorFactories, bloomFilterKeyFields, runtimeContext.getBloomFilterFalsePositiveRate(),
-                    runtimeContext.getLSMMergePolicy(), opTracker, runtimeContext.getLSMIOScheduler(), rtcProvider);
+                    runtimeContext.getLSMMergePolicy(), opTracker, runtimeContext.getLSMIOScheduler(),
+                    LSMBTreeIOOperationCallbackFactory.INSTANCE.createIOOperationCallback());
             lsmBtree.create();
             resourceID = runtimeContext.getResourceIdFactory().createId();
             ILocalResourceMetadata localResourceMetadata = new LSMBTreeLocalResourceMetadata(typeTraits,
@@ -359,7 +358,7 @@
                         typeTraits, comparatorFactories, bloomFilterKeyFields,
                         runtimeContext.getBloomFilterFalsePositiveRate(), runtimeContext.getLSMMergePolicy(),
                         opTracker, runtimeContext.getLSMIOScheduler(),
-                        AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER);
+                        LSMBTreeIOOperationCallbackFactory.INSTANCE.createIOOperationCallback());
                 indexLifecycleManager.register(resourceID, lsmBtree);
             }
         }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index 8b422c8..4909e21 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -579,11 +579,10 @@
                     typeTraits, comparatorFactories, bloomFilterKeyFields, lowKeyFields, highKeyFields,
                     lowKeyInclusive, highKeyInclusive, new LSMBTreeDataflowHelperFactory(
                             new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), rtcProvider,
-                            isSecondary ? new SecondaryIndexOperationTrackerProvider(
-                                    LSMBTreeIOOperationCallbackFactory.INSTANCE, dataset.getDatasetId())
+                            isSecondary ? new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId())
                                     : new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()), rtcProvider,
-                            rtcProvider, storageProperties.getBloomFilterFalsePositiveRate()), retainInput,
-                    searchCallbackFactory);
+                            LSMBTreeIOOperationCallbackFactory.INSTANCE,
+                            storageProperties.getBloomFilterFalsePositiveRate()), retainInput, searchCallbackFactory);
 
             return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeSearchOp, spPc.second);
 
@@ -649,11 +648,11 @@
                             valueProviderFactories, RTreePolicyType.RTREE, primaryComparatorFactories,
                             new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
                             AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                            new SecondaryIndexOperationTrackerProvider(LSMRTreeIOOperationCallbackFactory.INSTANCE,
-                                    dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, proposeLinearizer(
-                                    nestedKeyType.getTypeTag(), comparatorFactories.length),
-                            storageProperties.getBloomFilterFalsePositiveRate()), retainInput, searchCallbackFactory);
+                            new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                            LSMRTreeIOOperationCallbackFactory.INSTANCE, proposeLinearizer(nestedKeyType.getTypeTag(),
+                                    comparatorFactories.length), storageProperties.getBloomFilterFalsePositiveRate()),
+                    retainInput, searchCallbackFactory);
             return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(rtreeSearchOp, spPc.second);
 
         } catch (MetadataException me) {
@@ -809,7 +808,7 @@
                             AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
                             new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
                             AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                            LSMBTreeIOOperationCallbackFactory.INSTANCE,
                             storageProperties.getBloomFilterFalsePositiveRate()), NoOpOperationCallbackFactory.INSTANCE);
             return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad,
                     splitsAndConstraint.second);
@@ -877,7 +876,7 @@
                             AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
                             new PrimaryIndexOperationTrackerProvider(dataset.getDatasetId()),
                             AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, storageProperties
+                            LSMBTreeIOOperationCallbackFactory.INSTANCE, storageProperties
                                     .getBloomFilterFalsePositiveRate()), null, modificationCallbackFactory, true);
 
             return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(insertDeleteOp,
@@ -1071,9 +1070,9 @@
                     comparatorFactories, bloomFilterKeyFields, fieldPermutation, indexOp,
                     new LSMBTreeDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(datasetId),
                             AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                            new SecondaryIndexOperationTrackerProvider(LSMBTreeIOOperationCallbackFactory.INSTANCE,
-                                    dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, storageProperties
+                            new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                            LSMBTreeIOOperationCallbackFactory.INSTANCE, storageProperties
                                     .getBloomFilterFalsePositiveRate()), filterFactory, modificationCallbackFactory,
                     false);
             return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad,
@@ -1199,10 +1198,9 @@
                     invListsTypeTraits, invListComparatorFactories, tokenizerFactory, fieldPermutation, indexOp,
                     new LSMInvertedIndexDataflowHelperFactory(new AsterixVirtualBufferCacheProvider(datasetId),
                             AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                            new SecondaryIndexOperationTrackerProvider(
-                                    LSMInvertedIndexIOOperationCallbackFactory.INSTANCE, dataset.getDatasetId()),
+                            new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
                             AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, storageProperties
+                            LSMInvertedIndexIOOperationCallbackFactory.INSTANCE, storageProperties
                                     .getBloomFilterFalsePositiveRate()), filterFactory, modificationCallbackFactory);
             return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(insertDeleteOp,
                     splitsAndConstraint.second);
@@ -1294,12 +1292,11 @@
                             valueProviderFactories, RTreePolicyType.RTREE, primaryComparatorFactories,
                             new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
                             AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                            new SecondaryIndexOperationTrackerProvider(LSMRTreeIOOperationCallbackFactory.INSTANCE,
-                                    dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
-                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, proposeLinearizer(
-                                    nestedKeyType.getTypeTag(), comparatorFactories.length),
-                            storageProperties.getBloomFilterFalsePositiveRate()), filterFactory,
-                    modificationCallbackFactory, false);
+                            new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
+                            AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                            LSMRTreeIOOperationCallbackFactory.INSTANCE, proposeLinearizer(nestedKeyType.getTypeTag(),
+                                    comparatorFactories.length), storageProperties.getBloomFilterFalsePositiveRate()),
+                    filterFactory, modificationCallbackFactory, false);
             return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(rtreeUpdate, splitsAndConstraint.second);
         } catch (MetadataException | IOException e) {
             throw new AlgebricksException(e);
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/job/listener/JobEventListenerFactory.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/job/listener/JobEventListenerFactory.java
index b44755c..4f4eba2 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/job/listener/JobEventListenerFactory.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/job/listener/JobEventListenerFactory.java
@@ -49,7 +49,7 @@
                 try {
                     ITransactionManager txnManager = ((IAsterixAppRuntimeContext) jobletContext.getApplicationContext()
                             .getApplicationObject()).getTransactionSubsystem().getTransactionManager();
-                    ITransactionContext txnContext = txnManager.getTransactionContext(jobId);
+                    ITransactionContext txnContext = txnManager.getTransactionContext(jobId, false);
                     txnContext.setWriteTxn(transactionalWrite);
                     txnManager.completedTransaction(txnContext, new DatasetId(-1), -1,
                             !(jobStatus == JobStatus.FAILURE));
@@ -62,7 +62,7 @@
             public void jobletStart() {
                 try {
                     ((IAsterixAppRuntimeContext) jobletContext.getApplicationContext().getApplicationObject())
-                            .getTransactionSubsystem().getTransactionManager().getTransactionContext(jobId);
+                            .getTransactionSubsystem().getTransactionManager().getTransactionContext(jobId, true);
                 } catch (ACIDException e) {
                     throw new Error(e);
                 }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java
index a6537cd..00f0c4a 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java
@@ -41,7 +41,7 @@
             throws HyracksDataException {
         ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
         try {
-            ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId);
+            ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId, false);
             return new PrimaryIndexInstantSearchOperationCallback(datasetId, primaryKeyFields,
                     txnSubsystem.getLockManager(), txnCtx);
         } catch (ACIDException e) {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
index abeec62..07daee4 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
@@ -58,7 +58,7 @@
         }
 
         try {
-            ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId);
+            ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId, false);
             IModificationOperationCallback modCallback = new PrimaryIndexModificationOperationCallback(datasetId,
                     primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resourceId, resourceType,
                     indexOp);
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
index b59eb75..01cb725 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
@@ -41,7 +41,7 @@
             throws HyracksDataException {
         ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
         try {
-            ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId);
+            ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId, false);
             return new PrimaryIndexSearchOperationCallback(datasetId, primaryKeyFields, txnSubsystem.getLockManager(),
                     txnCtx);
         } catch (ACIDException e) {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
index d5bc877..563e9b7 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
@@ -55,7 +55,7 @@
         }
 
         try {
-            ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId);
+            ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId, false);
             IModificationOperationCallback modCallback = new SecondaryIndexModificationOperationCallback(datasetId, primaryKeyFields, txnCtx,
                     txnSubsystem.getLockManager(), txnSubsystem, resourceId, resourceType, indexOp);
             txnCtx.registerIndexAndCallback(resourceId, index, (AbstractOperationCallback) modCallback, false);
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerProvider.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerProvider.java
index aec378b..140a8dd 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerProvider.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerProvider.java
@@ -18,7 +18,6 @@
 import edu.uci.ics.asterix.common.context.BaseOperationTracker;
 import edu.uci.ics.asterix.common.context.DatasetLifecycleManager;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerProvider;
 
@@ -27,10 +26,8 @@
     private static final long serialVersionUID = 1L;
 
     private final int datasetID;
-    private final ILSMIOOperationCallbackFactory ioOpCallbackFactory;
 
-    public SecondaryIndexOperationTrackerProvider(ILSMIOOperationCallbackFactory ioOpCallbackFactory, int datasetID) {
-        this.ioOpCallbackFactory = ioOpCallbackFactory;
+    public SecondaryIndexOperationTrackerProvider(int datasetID) {
         this.datasetID = datasetID;
     }
 
@@ -38,7 +35,7 @@
     public ILSMOperationTracker getOperationTracker(IHyracksTaskContext ctx) {
         DatasetLifecycleManager dslcManager = (DatasetLifecycleManager) ((IAsterixAppRuntimeContext) ctx
                 .getJobletContext().getApplicationContext().getApplicationObject()).getIndexLifecycleManager();
-        return new BaseOperationTracker(dslcManager, ioOpCallbackFactory, datasetID);
+        return new BaseOperationTracker(dslcManager, datasetID);
     }
 
 }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
index d243dd2..cf60182 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
@@ -55,10 +55,11 @@
         LSMBTree lsmBTree = LSMBTreeUtils.createLSMTree(virtualBufferCaches, file, runtimeContextProvider
                 .getBufferCache(), runtimeContextProvider.getFileMapManager(), typeTraits, cmpFactories,
                 bloomFilterKeyFields, runtimeContextProvider.getBloomFilterFalsePositiveRate(), runtimeContextProvider
-                        .getLSMMergePolicy(), isPrimary ? runtimeContextProvider.getLSMBTreeOperationTracker(datasetID)
-                        : new BaseOperationTracker((DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager(),
-                                LSMBTreeIOOperationCallbackFactory.INSTANCE, datasetID), runtimeContextProvider
-                        .getLSMIOScheduler(), runtimeContextProvider.getLSMBTreeIOOperationCallbackProvider(isPrimary));
+                        .getLSMMergePolicy(),
+                isPrimary ? runtimeContextProvider.getLSMBTreeOperationTracker(datasetID) : new BaseOperationTracker(
+                        (DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager(), datasetID),
+                runtimeContextProvider.getLSMIOScheduler(), LSMBTreeIOOperationCallbackFactory.INSTANCE
+                        .createIOOperationCallback());
         return lsmBTree;
     }
 
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java
index 8482172..5ed7019 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java
@@ -59,25 +59,37 @@
         List<IVirtualBufferCache> virtualBufferCaches = runtimeContextProvider.getVirtualBufferCaches(datasetID);
         try {
             if (isPartitioned) {
-                return InvertedIndexUtils.createPartitionedLSMInvertedIndex(virtualBufferCaches, runtimeContextProvider
-                        .getFileMapManager(), invListTypeTraits, invListCmpFactories, tokenTypeTraits,
-                        tokenCmpFactories, tokenizerFactory, runtimeContextProvider.getBufferCache(), filePath,
-                        runtimeContextProvider.getBloomFilterFalsePositiveRate(), runtimeContextProvider
-                                .getLSMMergePolicy(), new BaseOperationTracker(
-                                (DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager(),
-                                LSMInvertedIndexIOOperationCallbackFactory.INSTANCE, datasetID), runtimeContextProvider
-                                .getLSMIOScheduler(), runtimeContextProvider
-                                .getLSMInvertedIndexIOOperationCallbackProvider());
+                return InvertedIndexUtils.createPartitionedLSMInvertedIndex(
+                        virtualBufferCaches,
+                        runtimeContextProvider.getFileMapManager(),
+                        invListTypeTraits,
+                        invListCmpFactories,
+                        tokenTypeTraits,
+                        tokenCmpFactories,
+                        tokenizerFactory,
+                        runtimeContextProvider.getBufferCache(),
+                        filePath,
+                        runtimeContextProvider.getBloomFilterFalsePositiveRate(),
+                        runtimeContextProvider.getLSMMergePolicy(),
+                        new BaseOperationTracker((DatasetLifecycleManager) runtimeContextProvider
+                                .getIndexLifecycleManager(), datasetID), runtimeContextProvider.getLSMIOScheduler(),
+                        LSMInvertedIndexIOOperationCallbackFactory.INSTANCE.createIOOperationCallback());
             } else {
-                return InvertedIndexUtils.createLSMInvertedIndex(virtualBufferCaches, runtimeContextProvider
-                        .getFileMapManager(), invListTypeTraits, invListCmpFactories, tokenTypeTraits,
-                        tokenCmpFactories, tokenizerFactory, runtimeContextProvider.getBufferCache(), filePath,
-                        runtimeContextProvider.getBloomFilterFalsePositiveRate(), runtimeContextProvider
-                                .getLSMMergePolicy(), new BaseOperationTracker(
-                                (DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager(),
-                                LSMInvertedIndexIOOperationCallbackFactory.INSTANCE, datasetID), runtimeContextProvider
-                                .getLSMIOScheduler(), runtimeContextProvider
-                                .getLSMInvertedIndexIOOperationCallbackProvider());
+                return InvertedIndexUtils.createLSMInvertedIndex(
+                        virtualBufferCaches,
+                        runtimeContextProvider.getFileMapManager(),
+                        invListTypeTraits,
+                        invListCmpFactories,
+                        tokenTypeTraits,
+                        tokenCmpFactories,
+                        tokenizerFactory,
+                        runtimeContextProvider.getBufferCache(),
+                        filePath,
+                        runtimeContextProvider.getBloomFilterFalsePositiveRate(),
+                        runtimeContextProvider.getLSMMergePolicy(),
+                        new BaseOperationTracker((DatasetLifecycleManager) runtimeContextProvider
+                                .getIndexLifecycleManager(), datasetID), runtimeContextProvider.getLSMIOScheduler(),
+                        LSMInvertedIndexIOOperationCallbackFactory.INSTANCE.createIOOperationCallback());
             }
         } catch (IndexException e) {
             throw new HyracksDataException(e);
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java
index bc1e889..dc6b30b 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java
@@ -66,10 +66,9 @@
                     runtimeContextProvider.getFileMapManager(), typeTraits, rtreeCmpFactories, btreeCmpFactories,
                     valueProviderFactories, rtreePolicyType, runtimeContextProvider.getBloomFilterFalsePositiveRate(),
                     runtimeContextProvider.getLSMMergePolicy(), new BaseOperationTracker(
-                            (DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager(),
-                            LSMRTreeIOOperationCallbackFactory.INSTANCE, datasetID), runtimeContextProvider
-                            .getLSMIOScheduler(), runtimeContextProvider.getLSMRTreeIOOperationCallbackProvider(),
-                    linearizeCmpFactory);
+                            (DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager(), datasetID),
+                    runtimeContextProvider.getLSMIOScheduler(), LSMRTreeIOOperationCallbackFactory.INSTANCE
+                            .createIOOperationCallback(), linearizeCmpFactory);
         } catch (TreeIndexException e) {
             throw new HyracksDataException(e);
         }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
index 6d86f70..c7df2f2 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
@@ -665,8 +665,6 @@
 
         latchLockTable();
         try {
-            validateJob(txnContext);
-
             if (IS_DEBUG_MODE) {
                 trackLockRequest("Requested", RequestType.UNLOCK, datasetId, entityHashValue, (byte) 0, txnContext,
                         dLockInfo, eLockInfo);
@@ -2212,14 +2210,14 @@
                 if (logRecord.getLogType() == LogType.ENTITY_COMMIT) {
                     tempDatasetIdObj.setId(logRecord.getDatasetId());
                     tempJobIdObj.setId(logRecord.getJobId());
-                    txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(tempJobIdObj);
+                    txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(tempJobIdObj, false);
                     unlock(tempDatasetIdObj, logRecord.getPKHashValue(), txnCtx);
                     txnCtx.notifyOptracker(false);
-                } else if (logRecord.getLogType() == LogType.JOB_COMMIT) {
+                } else if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT) {
                     tempJobIdObj.setId(logRecord.getJobId());
-                    txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(tempJobIdObj);
+                    txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(tempJobIdObj, false);
                     txnCtx.notifyOptracker(true);
-                    ((LogPage) logPage).notifyJobCommitter();
+                    ((LogPage) logPage).notifyJobTerminator();
                 }
                 logRecord = logPageReader.next();
             }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
index 4f0bb59..933afcd 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
@@ -38,6 +38,7 @@
 import edu.uci.ics.asterix.common.transactions.ILogReader;
 import edu.uci.ics.asterix.common.transactions.ILogRecord;
 import edu.uci.ics.asterix.common.transactions.ITransactionContext;
+import edu.uci.ics.asterix.common.transactions.ITransactionManager;
 import edu.uci.ics.asterix.common.transactions.LogManagerProperties;
 import edu.uci.ics.asterix.common.transactions.MutableLong;
 import edu.uci.ics.asterix.transaction.management.service.locking.LockManager;
@@ -74,16 +75,16 @@
         logDir = logManagerProperties.getLogDir();
         logFilePrefix = logManagerProperties.getLogFilePrefix();
         flushLSN = new MutableLong();
-        initializeLogManager();
+        initializeLogManager(0);
     }
 
-    private void initializeLogManager() {
+    private void initializeLogManager(long nextLogFileId) {
         emptyQ = new LinkedBlockingQueue<LogPage>(numLogPages);
         flushQ = new LinkedBlockingQueue<LogPage>(numLogPages);
         for (int i = 0; i < numLogPages; i++) {
             emptyQ.offer(new LogPage((LockManager) txnSubsystem.getLockManager(), logPageSize, flushLSN));
         }
-        appendLSN = initializeLogAnchor();
+        appendLSN = initializeLogAnchor(nextLogFileId);
         flushLSN.set(appendLSN);
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("LogManager starts logging in LSN: " + appendLSN);
@@ -95,12 +96,13 @@
     }
 
     @Override
-    public void log(ILogRecord logRecord) {
+    public void log(ILogRecord logRecord) throws ACIDException {
         if (logRecord.getLogSize() > logPageSize) {
             throw new IllegalStateException();
         }
         syncLog(logRecord);
-        if (logRecord.getLogType() == LogType.JOB_COMMIT && !logRecord.isFlushed()) {
+        if ((logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT)
+                && !logRecord.isFlushed()) {
             synchronized (logRecord) {
                 while (!logRecord.isFlushed()) {
                     try {
@@ -113,13 +115,16 @@
         }
     }
 
-    private synchronized void syncLog(ILogRecord logRecord) {
+    private synchronized void syncLog(ILogRecord logRecord) throws ACIDException {
         ITransactionContext txnCtx = logRecord.getTxnCtx();
+        if (txnCtx.getTxnState() == ITransactionManager.ABORTED && logRecord.getLogType() != LogType.ABORT) {
+            throw new ACIDException("Aborted job(" + txnCtx.getJobId() + ") tried to write non-abort type log record.");
+        }
         if (getLogFileOffset(appendLSN) + logRecord.getLogSize() > logFileSize) {
             prepareNextLogFile();
             appendPage.isFull(true);
             getAndInitNewPage();
-        } else if (!appendPage.hasSpace(logRecord.getLogSize(), getLogFileOffset(appendLSN))) {
+        } else if (!appendPage.hasSpace(logRecord.getLogSize())) {
             appendPage.isFull(true);
             getAndInitNewPage();
         }
@@ -141,7 +146,6 @@
         }
         appendPage.reset();
         appendPage.setFileChannel(appendChannel);
-        appendPage.setInitialFlushOffset(getLogFileOffset(appendLSN));
         flushQ.offer(appendPage);
     }
 
@@ -229,7 +233,7 @@
         return flushLSN;
     }
 
-    private long initializeLogAnchor() {
+    private long initializeLogAnchor(long nextLogFileId) {
         long fileId = 0;
         long offset = 0;
         File fileLogDir = new File(logDir);
@@ -237,9 +241,10 @@
             if (fileLogDir.exists()) {
                 List<Long> logFileIds = getLogFileIds();
                 if (logFileIds == null) {
-                    createFileIfNotExists(getLogFilePath(0));
+                    fileId = nextLogFileId;
+                    createFileIfNotExists(getLogFilePath(fileId));
                     if (LOGGER.isLoggable(Level.INFO)) {
-                        LOGGER.info("created a log file: " + getLogFilePath(0));
+                        LOGGER.info("created a log file: " + getLogFilePath(fileId));
                     }
                 } else {
                     fileId = logFileIds.get(logFileIds.size() - 1);
@@ -247,13 +252,14 @@
                     offset = logFile.length();
                 }
             } else {
+                fileId = nextLogFileId;
                 createNewDirectory(logDir);
                 if (LOGGER.isLoggable(Level.INFO)) {
                     LOGGER.info("created the log directory: " + logManagerProperties.getLogDir());
                 }
-                createFileIfNotExists(getLogFilePath(0));
+                createFileIfNotExists(getLogFilePath(fileId));
                 if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("created a log file: " + getLogFilePath(0));
+                    LOGGER.info("created a log file: " + getLogFilePath(fileId));
                 }
             }
         } catch (IOException ioe) {
@@ -267,8 +273,8 @@
 
     public void renewLogFiles() {
         terminateLogFlusher();
-        deleteAllLogFiles();
-        initializeLogManager();
+        long lastMaxLogFileId = deleteAllLogFiles();
+        initializeLogManager(lastMaxLogFileId + 1);
     }
 
     private void terminateLogFlusher() {
@@ -290,7 +296,7 @@
         }
     }
 
-    private void deleteAllLogFiles() {
+    private long deleteAllLogFiles() {
         if (appendChannel != null) {
             try {
                 appendChannel.close();
@@ -305,6 +311,7 @@
                 throw new IllegalStateException("Failed to delete a file: " + file.getAbsolutePath());
             }
         }
+        return logFileIds.get(logFileIds.size() - 1);
     }
 
     private List<Long> getLogFileIds() {
@@ -384,10 +391,16 @@
         }
         return newFileChannel;
     }
+
+    public long getReadableSmallestLSN() {
+        List<Long> logFileIds = getLogFileIds();
+        return logFileIds.get(0) * logFileSize;
+    }
 }
 
 class LogFlusher implements Callable<Boolean> {
-    private final static LogPage POISON_PILL = new LogPage(null, ILogRecord.JOB_COMMIT_LOG_SIZE, null);
+    private static final Logger LOGGER = Logger.getLogger(LogFlusher.class.getName());
+    private final static LogPage POISON_PILL = new LogPage(null, ILogRecord.JOB_TERMINATE_LOG_SIZE, null);
     private final LogManager logMgr;//for debugging
     private final LinkedBlockingQueue<LogPage> emptyQ;
     private final LinkedBlockingQueue<LogPage> flushQ;
@@ -402,13 +415,13 @@
         flushPage = null;
         isStarted = new AtomicBoolean(false);
         terminateFlag = new AtomicBoolean(false);
-        
+
     }
 
     public void terminate() {
         //make sure the LogFlusher thread started before terminating it.
         synchronized (isStarted) {
-            while(!isStarted.get()) {
+            while (!isStarted.get()) {
                 try {
                     isStarted.wait();
                 } catch (InterruptedException e) {
@@ -416,7 +429,7 @@
                 }
             }
         }
-        
+
         terminateFlag.set(true);
         if (flushPage != null) {
             synchronized (flushPage) {
@@ -432,24 +445,34 @@
 
     @Override
     public Boolean call() {
-        synchronized(isStarted) {
+        synchronized (isStarted) {
             isStarted.set(true);
             isStarted.notify();
         }
-        while (true) {
-            flushPage = null;
-            try {
-                flushPage = flushQ.take();
-                if (flushPage == POISON_PILL || terminateFlag.get()) {
-                    return true;
+        try {
+            while (true) {
+                flushPage = null;
+                try {
+                    flushPage = flushQ.take();
+                    if (flushPage == POISON_PILL || terminateFlag.get()) {
+                        return true;
+                    }
+                } catch (InterruptedException e) {
+                    if (flushPage == null) {
+                        continue;
+                    }
                 }
-            } catch (InterruptedException e) {
-                if (flushPage == null) {
-                    continue;
-                }
+                flushPage.flush();
+                emptyQ.offer(flushPage);
             }
-            flushPage.flush();
-            emptyQ.offer(flushPage);
+        } catch (Exception e) {
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("-------------------------------------------------------------------------");
+                LOGGER.info("LogFlusher is terminating abnormally. System is in unusalbe state.");
+                LOGGER.info("-------------------------------------------------------------------------");
+            }
+            e.printStackTrace();
+            throw e;
         }
     }
 }
\ No newline at end of file
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
index edfec69..a3f42a7 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogPage.java
@@ -58,7 +58,7 @@
         appendOffset = 0;
         flushOffset = 0;
         isLastPage = false;
-        syncCommitQ = new LinkedBlockingQueue<ILogRecord>(logPageSize / ILogRecord.JOB_COMMIT_LOG_SIZE);
+        syncCommitQ = new LinkedBlockingQueue<ILogRecord>(logPageSize / ILogRecord.JOB_TERMINATE_LOG_SIZE);
     }
 
     ////////////////////////////////////
@@ -68,15 +68,14 @@
     @Override
     public void append(ILogRecord logRecord, long appendLSN) {
         logRecord.writeLogRecord(appendBuffer);
-        if (logRecord.getLogType() == LogType.UPDATE) {
-            logRecord.getTxnCtx().setLastLSN(logRecord.getResourceId(), appendLSN);
-        }
+        logRecord.getTxnCtx().setLastLSN(logRecord.getLogType() == LogType.UPDATE ? logRecord.getResourceId() : -1,
+                appendLSN);
         synchronized (this) {
             appendOffset += logRecord.getLogSize();
             if (IS_DEBUG_MODE) {
                 LOGGER.info("append()| appendOffset: " + appendOffset);
             }
-            if (logRecord.getLogType() == LogType.JOB_COMMIT) {
+            if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT) {
                 logRecord.isFlushed(false);
                 syncCommitQ.offer(logRecord);
             }
@@ -105,7 +104,7 @@
         this.isLastPage = isLastPage;
     }
 
-    public boolean hasSpace(int logSize, long logFileOffset) {
+    public boolean hasSpace(int logSize) {
         return appendOffset + logSize <= logPageSize;
     }
 
@@ -192,7 +191,7 @@
         }
     }
 
-    public void notifyJobCommitter() {
+    public void notifyJobTerminator() {
         ILogRecord logRecord = null;
         while (logRecord == null) {
             try {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecord.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecord.java
index 4b0e1f2..dd81df7 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecord.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecord.java
@@ -34,10 +34,9 @@
  * LogType(1)
  * JobId(4)
  * ---------------------------
- * [Header2] (16 bytes + PKValueSize) : for entity_commit and update log types 
+ * [Header2] (12 bytes + PKValueSize) : for entity_commit and update log types 
  * DatasetId(4) //stored in dataset_dataset in Metadata Node
  * PKHashValue(4)
- * PKFieldCnt(4)
  * PKValueSize(4)
  * PKValue(PKValueSize)
  * ---------------------------
@@ -61,10 +60,10 @@
  * ---------------------------
  * = LogSize =
  * 1) JOB_COMMIT_LOG_SIZE: 13 bytes (5 + 8)
- * 2) ENTITY_COMMIT: 29 + PKSize (5 + 16 + PKSize + 8)
- *    --> ENTITY_COMMIT_LOG_BASE_SIZE = 29
- * 3) UPDATE: 64 + PKSize + New/OldValueSize (5 + 16 + PKSize + 21 + 14 + New/OldValueSize + 8)
- *    --> UPDATE_LOG_BASE_SIZE = 64
+ * 2) ENTITY_COMMIT: 25 + PKSize (5 + 12 + PKSize + 8)
+ *    --> ENTITY_COMMIT_LOG_BASE_SIZE = 25
+ * 3) UPDATE: 64 + PKSize + New/OldValueSize (5 + 12 + PKSize + 21 + 14 + New/OldValueSize + 8)
+ *    --> UPDATE_LOG_BASE_SIZE = 60
  */
 public class LogRecord implements ILogRecord {
 
@@ -73,7 +72,6 @@
     private int jobId;
     private int datasetId;
     private int PKHashValue;
-    private int PKFieldCnt;
     private int PKValueSize;
     private ITupleReference PKValue;
     private long prevLSN;
@@ -90,12 +88,13 @@
     private long checksum;
     //------------- fields in a log record (end) --------------//
 
+    private int PKFieldCnt;
     private static final int CHECKSUM_SIZE = 8;
     private ITransactionContext txnCtx;
     private long LSN;
     private final AtomicBoolean isFlushed;
     private final SimpleTupleWriter tupleWriter;
-    private final SimpleTupleReference readPKValue;
+    private final PrimaryKeyTupleReference readPKValue;
     private final SimpleTupleReference readNewValue;
     private final SimpleTupleReference readOldValue;
     private final CRC32 checksumGen;
@@ -104,7 +103,7 @@
     public LogRecord() {
         isFlushed = new AtomicBoolean(false);
         tupleWriter = new SimpleTupleWriter();
-        readPKValue = (SimpleTupleReference) tupleWriter.createTupleReference();
+        readPKValue = new PrimaryKeyTupleReference();
         readNewValue = (SimpleTupleReference) tupleWriter.createTupleReference();
         readOldValue = (SimpleTupleReference) tupleWriter.createTupleReference();
         checksumGen = new CRC32();
@@ -115,10 +114,9 @@
         int beginOffset = buffer.position();
         buffer.put(logType);
         buffer.putInt(jobId);
-        if (logType != LogType.JOB_COMMIT) {
+        if (logType == LogType.UPDATE || logType == LogType.ENTITY_COMMIT) {
             buffer.putInt(datasetId);
             buffer.putInt(PKHashValue);
-            buffer.putInt(PKFieldCnt);
             if (PKValueSize <= 0) {
                 throw new IllegalStateException("Primary Key Size is less than or equal to 0");
             }
@@ -173,13 +171,12 @@
         try {
             logType = buffer.get();
             jobId = buffer.getInt();
-            if (logType == LogType.JOB_COMMIT) {
+            if (logType == LogType.JOB_COMMIT || logType == LogType.ABORT) {
                 datasetId = -1;
                 PKHashValue = -1;
             } else {
-                datasetId = buffer.getInt();    
+                datasetId = buffer.getInt();
                 PKHashValue = buffer.getInt();
-                PKFieldCnt = buffer.getInt();
                 PKValueSize = buffer.getInt();
                 if (PKValueSize <= 0) {
                     throw new IllegalStateException("Primary Key Size is less than or equal to 0");
@@ -217,12 +214,20 @@
         }
         return true;
     }
-    
+
     private ITupleReference readPKValue(ByteBuffer buffer) {
-        return readTuple(buffer, readPKValue, PKFieldCnt, PKValueSize);
+        if (buffer.position() + PKValueSize > buffer.limit()) {
+            throw new BufferUnderflowException();
+        }
+        readPKValue.reset(buffer.array(), buffer.position(), PKValueSize);
+        buffer.position(buffer.position() + PKValueSize);
+        return readPKValue;
     }
 
     private ITupleReference readTuple(ByteBuffer srcBuffer, SimpleTupleReference destTuple, int fieldCnt, int size) {
+        if (srcBuffer.position() + size > srcBuffer.limit()) {
+            throw new BufferUnderflowException();
+        }
         destTuple.setFieldCount(fieldCnt);
         destTuple.resetByTupleOffset(srcBuffer, srcBuffer.position());
         srcBuffer.position(srcBuffer.position() + size);
@@ -230,9 +235,9 @@
     }
 
     @Override
-    public void formJobCommitLogRecord(ITransactionContext txnCtx) {
+    public void formJobTerminateLogRecord(ITransactionContext txnCtx, boolean isCommit) {
         this.txnCtx = txnCtx;
-        this.logType = LogType.JOB_COMMIT;
+        this.logType = isCommit ? LogType.JOB_COMMIT : LogType.ABORT;
         this.jobId = txnCtx.getJobId().getId();
         this.datasetId = -1;
         this.PKHashValue = -1;
@@ -281,7 +286,8 @@
                 setUpdateLogSize();
                 break;
             case LogType.JOB_COMMIT:
-                logSize = JOB_COMMIT_LOG_SIZE;
+            case LogType.ABORT:
+                logSize = JOB_TERMINATE_LOG_SIZE;
                 break;
             case LogType.ENTITY_COMMIT:
                 logSize = ENTITY_COMMIT_LOG_BASE_SIZE + PKValueSize;
@@ -298,7 +304,7 @@
         builder.append(" LogType : ").append(LogType.toString(logType));
         builder.append(" LogSize : ").append(logSize);
         builder.append(" JobId : ").append(jobId);
-        if (logType != LogType.JOB_COMMIT) {
+        if (logType == LogType.ENTITY_COMMIT || logType == LogType.UPDATE) {
             builder.append(" DatasetId : ").append(datasetId);
             builder.append(" PKHashValue : ").append(PKHashValue);
             builder.append(" PKFieldCnt : ").append(PKFieldCnt);
@@ -496,17 +502,17 @@
     public void setLSN(long LSN) {
         this.LSN = LSN;
     }
-    
+
     @Override
     public int getPKValueSize() {
         return PKValueSize;
     }
-    
+
     @Override
     public ITupleReference getPKValue() {
         return PKValue;
     }
-    
+
     @Override
     public void setPKFields(int[] primaryKeyFields) {
         PKFields = primaryKeyFields;
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogType.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogType.java
index 823c8d3..f9e9304 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogType.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogType.java
@@ -19,11 +19,14 @@
     public static final byte UPDATE = 0;
     public static final byte JOB_COMMIT = 1;
     public static final byte ENTITY_COMMIT = 2;
+    public static final byte ABORT = 3;
     private static final String STRING_UPDATE = "UPDATE";
     private static final String STRING_JOB_COMMIT = "JOB_COMMIT";
     private static final String STRING_ENTITY_COMMIT = "ENTITY_COMMIT";
+    private static final String STRING_ABORT = "ABORT";
     private static final String STRING_INVALID_LOG_TYPE = "INVALID_LOG_TYPE";
 
+
     public static String toString(byte logType) {
         switch (logType) {
             case LogType.UPDATE:
@@ -32,6 +35,8 @@
                 return STRING_JOB_COMMIT;
             case LogType.ENTITY_COMMIT:
                 return STRING_ENTITY_COMMIT;
+            case LogType.ABORT:
+                return STRING_ABORT;
             default:
                 return STRING_INVALID_LOG_TYPE;
         }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/PrimaryKeyTupleReference.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/PrimaryKeyTupleReference.java
new file mode 100644
index 0000000..d45b209
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/PrimaryKeyTupleReference.java
@@ -0,0 +1,36 @@
+package edu.uci.ics.asterix.transaction.management.service.logging;
+
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+public class PrimaryKeyTupleReference implements ITupleReference {
+    private byte[] fieldData;
+    private int start;
+    private int length;
+
+    public void reset(byte[] fieldData, int start, int length) {
+        this.fieldData = fieldData;
+        this.start = start;
+        this.length = length;
+    }
+    
+    @Override
+    public int getFieldCount() {
+        return 1;
+    }
+
+    @Override
+    public byte[] getFieldData(int fIdx) {
+        return fieldData;
+    }
+
+    @Override
+    public int getFieldStart(int fIdx) {
+        return start;
+    }
+
+    @Override
+    public int getFieldLength(int fIdx) {
+        return length;
+    }
+
+}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointThread.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointThread.java
index 6f6da4a..dca14d8 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointThread.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/CheckpointThread.java
@@ -16,8 +16,8 @@
 
 import java.util.List;
 
-import edu.uci.ics.asterix.common.context.BaseOperationTracker;
 import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
 import edu.uci.ics.asterix.common.transactions.IRecoveryManager;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
@@ -70,7 +70,8 @@
         long firstLSN;
         if (openIndexList.size() > 0) {
             for (IIndex index : openIndexList) {
-                firstLSN = ((BaseOperationTracker) ((ILSMIndex) index).getOperationTracker()).getFirstLSN();
+                firstLSN = ((AbstractLSMIOOperationCallback) ((ILSMIndex) index).getIOOperationCallback())
+                        .getFirstLSN();
                 minMCTFirstLSN = Math.min(minMCTFirstLSN, firstLSN);
             }
         } else {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
index 2ad3055..95ee767 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -38,7 +38,6 @@
 import java.util.logging.Logger;
 
 import edu.uci.ics.asterix.common.api.ILocalResourceMetadata;
-import edu.uci.ics.asterix.common.context.BaseOperationTracker;
 import edu.uci.ics.asterix.common.exceptions.ACIDException;
 import edu.uci.ics.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
 import edu.uci.ics.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
@@ -77,6 +76,7 @@
     private final TransactionSubsystem txnSubsystem;
     private final LogManager logMgr;
     private final int checkpointHistory;
+    private final long SHARP_CHECKPOINT_LSN = -1;
 
     /**
      * A file at a known location that contains the LSN of the last log record
@@ -115,18 +115,23 @@
             return state;
         }
 
-        //#. if minMCTFirstLSN is equal to -1 && 
-        //   checkpointLSN in the checkpoint file is equal to the lastLSN in the log file,
-        //   then return healthy state. Otherwise, return corrupted.
-        if ((checkpointObject.getMinMCTFirstLsn() == -2 && logMgr.getAppendLSN() == 0)
-                || (checkpointObject.getMinMCTFirstLsn() == -1 && checkpointObject.getCheckpointLsn() == logMgr
-                        .getAppendLSN())) {
+        long readableSmallestLSN = logMgr.getReadableSmallestLSN();
+        if (logMgr.getAppendLSN() == readableSmallestLSN) {
+            if (checkpointObject.getMinMCTFirstLsn() != SHARP_CHECKPOINT_LSN) {
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.info("[Warning] ---------------------------------------------------");
+                    LOGGER.info("[Warning] Some(or all) of transaction log files are lost.");
+                    LOGGER.info("[Warning] ---------------------------------------------------");
+                    //No choice but continuing when the log files are lost. 
+                }
+            }
+            state = SystemState.HEALTHY;
+            return state;
+        } else if (checkpointObject.getCheckpointLsn() == logMgr.getAppendLSN()
+                && checkpointObject.getMinMCTFirstLsn() == SHARP_CHECKPOINT_LSN) {
             state = SystemState.HEALTHY;
             return state;
         } else {
-            if (logMgr.getAppendLSN() == 0) {
-                throw new IllegalStateException("Transaction log files are lost.");
-            }
             state = SystemState.CORRUPTED;
             return state;
         }
@@ -138,6 +143,7 @@
         int entityCommitLogCount = 0;
         int jobCommitLogCount = 0;
         int redoCount = 0;
+        int abortLogCount = 0;
         int jobId = -1;
 
         state = SystemState.RECOVERING;
@@ -154,10 +160,11 @@
         TxnId winnerEntity = null;
 
         //#. read checkpoint file and set lowWaterMark where anaylsis and redo start
+        long readableSmallestLSN = logMgr.getReadableSmallestLSN();
         CheckpointObject checkpointObject = readCheckpoint();
-        long lowWaterMarkLsn = checkpointObject.getMinMCTFirstLsn();
-        if (lowWaterMarkLsn == -1 || lowWaterMarkLsn == -2) {
-            lowWaterMarkLsn = 0;
+        long lowWaterMarkLSN = checkpointObject.getMinMCTFirstLsn();
+        if (lowWaterMarkLSN < readableSmallestLSN) {
+            lowWaterMarkLSN = readableSmallestLSN;
         }
         int maxJobId = checkpointObject.getMaxJobId();
 
@@ -171,11 +178,11 @@
 
         //#. set log reader to the lowWaterMarkLsn
         ILogReader logReader = logMgr.getLogReader(true);
-        logReader.initializeScan(lowWaterMarkLsn);
+        logReader.initializeScan(lowWaterMarkLSN);
         ILogRecord logRecord = logReader.next();
         while (logRecord != null) {
             if (IS_DEBUG_MODE) {
-                System.out.println(logRecord.getLogRecordForDisplay());
+                LOGGER.info(logRecord.getLogRecordForDisplay());
             }
             //update max jobId
             if (logRecord.getJobId() > maxJobId) {
@@ -183,16 +190,12 @@
             }
             switch (logRecord.getLogType()) {
                 case LogType.UPDATE:
-                    if (IS_DEBUG_MODE) {
-                        updateLogCount++;
-                    }
+                    updateLogCount++;
                     break;
                 case LogType.JOB_COMMIT:
                     winnerJobSet.add(Integer.valueOf(logRecord.getJobId()));
                     jobId2WinnerEntitiesMap.remove(Integer.valueOf(logRecord.getJobId()));
-                    if (IS_DEBUG_MODE) {
-                        jobCommitLogCount++;
-                    }
+                    jobCommitLogCount++;
                     break;
                 case LogType.ENTITY_COMMIT:
                     jobId = logRecord.getJobId();
@@ -205,9 +208,10 @@
                         winnerEntitySet = jobId2WinnerEntitiesMap.get(Integer.valueOf(jobId));
                     }
                     winnerEntitySet.add(winnerEntity);
-                    if (IS_DEBUG_MODE) {
-                        entityCommitLogCount++;
-                    }
+                    entityCommitLogCount++;
+                    break;
+                case LogType.ABORT:
+                    abortLogCount++;
                     break;
                 default:
                     throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
@@ -239,11 +243,11 @@
         ILocalResourceRepository localResourceRepository = appRuntimeContext.getLocalResourceRepository();
 
         //#. set log reader to the lowWaterMarkLsn again.
-        logReader.initializeScan(lowWaterMarkLsn);
+        logReader.initializeScan(lowWaterMarkLSN);
         logRecord = logReader.next();
         while (logRecord != null) {
-            if (LogManager.IS_DEBUG_MODE) {
-                System.out.println(logRecord.getLogRecordForDisplay());
+            if (IS_DEBUG_MODE) {
+                LOGGER.info(logRecord.getLogRecordForDisplay());
             }
             LSN = logRecord.getLSN();
             jobId = logRecord.getJobId();
@@ -284,6 +288,7 @@
                              * log record.
                              *******************************************************************/
                             if (localResource == null) {
+                                logRecord = logReader.next();
                                 continue;
                             }
                             /*******************************************************************/
@@ -297,10 +302,8 @@
 
                             //#. get maxDiskLastLSN
                             ILSMIndex lsmIndex = (ILSMIndex) index;
-                            BaseOperationTracker indexOpTracker = (BaseOperationTracker) lsmIndex.getOperationTracker();
-                            AbstractLSMIOOperationCallback abstractLSMIOCallback = (AbstractLSMIOOperationCallback) indexOpTracker
-                                    .getIOOperationCallback();
-                            maxDiskLastLsn = abstractLSMIOCallback.getComponentLSN(index.getImmutableComponents());
+                            maxDiskLastLsn = ((AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback())
+                                    .getComponentLSN(lsmIndex.getImmutableComponents());
 
                             //#. set resourceId and maxDiskLastLSN to the map
                             resourceId2MaxLSNMap.put(Long.valueOf(resourceId), Long.valueOf(maxDiskLastLsn));
@@ -310,15 +313,14 @@
 
                         if (LSN > maxDiskLastLsn) {
                             redo(logRecord);
-                            if (IS_DEBUG_MODE) {
-                                redoCount++;
-                            }
+                            redoCount++;
                         }
                     }
                     break;
 
                 case LogType.JOB_COMMIT:
                 case LogType.ENTITY_COMMIT:
+                case LogType.ABORT:
                     //do nothing
                     break;
 
@@ -338,10 +340,8 @@
 
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("[RecoveryMgr] recovery is completed.");
-        }
-        if (IS_DEBUG_MODE) {
-            System.out.println("[RecoveryMgr] Count: Update/EntityCommit/JobCommit/Redo = " + updateLogCount + "/"
-                    + entityCommitLogCount + "/" + jobCommitLogCount + "/" + redoCount);
+            LOGGER.info("[RecoveryMgr's recovery log count] update/entityCommit/jobCommit/abort/redo = " + updateLogCount + "/"
+                    + entityCommitLogCount + "/" + jobCommitLogCount + "/" + abortLogCount + "/" + redoCount);
         }
     }
 
@@ -375,9 +375,8 @@
                 ILSMIndex lsmIndex = (ILSMIndex) index;
                 ILSMIndexAccessor indexAccessor = lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE,
                         NoOpOperationCallback.INSTANCE);
-                BaseOperationTracker indexOpTracker = (BaseOperationTracker) lsmIndex.getOperationTracker();
                 BlockingIOOperationCallbackWrapper cb = new BlockingIOOperationCallbackWrapper(
-                        indexOpTracker.getIOOperationCallback());
+                        lsmIndex.getIOOperationCallback());
                 callbackList.add(cb);
                 try {
                     indexAccessor.scheduleFlush(cb);
@@ -393,17 +392,18 @@
                     throw new ACIDException(e);
                 }
             }
-            minMCTFirstLSN = -2;
+            minMCTFirstLSN = SHARP_CHECKPOINT_LSN;
         } else {
             long firstLSN;
             minMCTFirstLSN = Long.MAX_VALUE;
             if (openIndexList.size() > 0) {
                 for (IIndex index : openIndexList) {
-                    firstLSN = ((BaseOperationTracker) ((ILSMIndex) index).getOperationTracker()).getFirstLSN();
+                    firstLSN = ((AbstractLSMIOOperationCallback) ((ILSMIndex) index).getIOOperationCallback())
+                            .getFirstLSN();
                     minMCTFirstLSN = Math.min(minMCTFirstLSN, firstLSN);
                 }
             } else {
-                minMCTFirstLSN = -1;
+                minMCTFirstLSN = SHARP_CHECKPOINT_LSN;
             }
         }
         CheckpointObject checkpointObject = new CheckpointObject(logMgr.getAppendLSN(), minMCTFirstLSN,
@@ -580,7 +580,7 @@
                 break;
             } else {
                 if (IS_DEBUG_MODE) {
-                    System.out.println(logRecord.getLogRecordForDisplay());
+                    LOGGER.info(logRecord.getLogRecordForDisplay());
                 }
                 currentLSN = logRecord.getLSN();
             }
@@ -600,9 +600,9 @@
                         loserTxnTable.put(loserEntity, undoLSNSet);
                     }
                     undoLSNSet.add(Long.valueOf(currentLSN));
+                    updateLogCount++;
                     if (IS_DEBUG_MODE) {
-                        updateLogCount++;
-                        System.out.println("" + Thread.currentThread().getId() + "======> update[" + currentLSN + "]:"
+                        LOGGER.info("" + Thread.currentThread().getId() + "======> update[" + currentLSN + "]:"
                                 + tempKeyTxnId);
                     }
                     break;
@@ -611,14 +611,21 @@
                     throw new ACIDException("Unexpected LogType(" + logRecord.getLogType() + ") during abort.");
 
                 case LogType.ENTITY_COMMIT:
-                    loserTxnTable.remove(tempKeyTxnId);
+                    undoLSNSet = loserTxnTable.remove(tempKeyTxnId);
+                    if (undoLSNSet == null) {
+                        undoLSNSet = loserTxnTable.remove(tempKeyTxnId);
+                    }
+                    entityCommitLogCount++;
                     if (IS_DEBUG_MODE) {
-                        entityCommitLogCount++;
-                        System.out.println("" + Thread.currentThread().getId() + "======> entity_commit[" + currentLSN + "]"
+                        LOGGER.info("" + Thread.currentThread().getId() + "======> entity_commit[" + currentLSN + "]"
                                 + tempKeyTxnId);
                     }
                     break;
 
+                case LogType.ABORT:
+                    //ignore
+                    break;
+
                 default:
                     throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
             }
@@ -650,23 +657,17 @@
                     throw new ACIDException("IllegalState exception during abort( " + txnContext.getJobId() + ")");
                 }
                 if (IS_DEBUG_MODE) {
-                    System.out.println(logRecord.getLogRecordForDisplay());
+                    LOGGER.info(logRecord.getLogRecordForDisplay());
                 }
                 undo(logRecord);
-                if (IS_DEBUG_MODE) {
-                    undoCount++;
-                }
+                undoCount++;
             }
         }
-
         logReader.close();
-
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info(" undone loser transaction's effect");
-        }
-        if (IS_DEBUG_MODE) {
-            System.out.println("UpdateLogCount/CommitLogCount/UndoCount:" + updateLogCount + "/" + entityCommitLogCount
-                    + "/" + undoCount);
+            LOGGER.info("[RecoveryManager's rollback log count] update/entityCommit/undo:" + updateLogCount + "/" + entityCommitLogCount + "/"
+                    + undoCount);
         }
     }
 
@@ -721,7 +722,7 @@
             } else {
                 throw new IllegalStateException("Unsupported OperationType: " + logRecord.getNewOp());
             }
-            ((BaseOperationTracker) index.getOperationTracker()).updateLastLSN(logRecord.getLSN());
+            ((AbstractLSMIOOperationCallback) index.getIOOperationCallback()).updateLastLSN(logRecord.getLSN());
         } catch (Exception e) {
             throw new IllegalStateException("Failed to redo", e);
         }
@@ -765,6 +766,7 @@
         this.datasetId = datasetId;
         this.pkHashValue = pkHashValue;
         this.tupleReferencePKValue = pkValue;
+        this.pkSize = pkSize;
         isByteArrayPKValue = false;
     }
 
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java
index 59a8363..5d4806d 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java
@@ -15,15 +15,11 @@
 package edu.uci.ics.asterix.transaction.management.service.transaction;
 
 import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
-import edu.uci.ics.asterix.common.context.BaseOperationTracker;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackProvider;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
-import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
 import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyProvider;
 import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
@@ -33,18 +29,12 @@
 import edu.uci.ics.hyracks.storage.common.file.ResourceIdFactory;
 
 public class AsterixRuntimeComponentsProvider implements IIndexLifecycleManagerProvider, IStorageManagerInterface,
-        ILSMIOOperationSchedulerProvider, ILSMMergePolicyProvider,
-        ILSMIOOperationCallbackProvider {
+        ILSMIOOperationSchedulerProvider, ILSMMergePolicyProvider {
     private static final long serialVersionUID = 1L;
 
     public static final AsterixRuntimeComponentsProvider RUNTIME_PROVIDER = new AsterixRuntimeComponentsProvider();
-    
-    private AsterixRuntimeComponentsProvider() {
-    }
 
-    @Override
-    public ILSMIOOperationCallback getIOOperationCallback(ILSMIndex index) {
-        return ((BaseOperationTracker) index.getOperationTracker()).getIOOperationCallback();
+    private AsterixRuntimeComponentsProvider() {
     }
 
     @Override
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
index 678956b..6fb91c8 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
@@ -21,9 +21,9 @@
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
-import edu.uci.ics.asterix.common.context.BaseOperationTracker;
 import edu.uci.ics.asterix.common.context.PrimaryIndexOperationTracker;
 import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
 import edu.uci.ics.asterix.common.transactions.AbstractOperationCallback;
 import edu.uci.ics.asterix.common.transactions.ITransactionContext;
 import edu.uci.ics.asterix.common.transactions.ITransactionManager;
@@ -71,7 +71,7 @@
 
     //indexMap is concurrently accessed by multiple threads, 
     //so those threads are synchronized on indexMap object itself
-    private Map<MutableLong, BaseOperationTracker> indexMap;
+    private Map<MutableLong, AbstractLSMIOOperationCallback> indexMap;
 
     //TODO: fix ComponentLSNs' issues. 
     //primaryIndex, primaryIndexCallback, and primaryIndexOptracker will be modified accordingly
@@ -97,7 +97,7 @@
         isTimeout = false;
         isWriteTxn = new AtomicBoolean(false);
         isMetadataTxn = false;
-        indexMap = new HashMap<MutableLong, BaseOperationTracker>();
+        indexMap = new HashMap<MutableLong, AbstractLSMIOOperationCallback>();
         primaryIndex = null;
         tempResourceIdForRegister = new MutableLong();
         tempResourceIdForSetLSN = new MutableLong();
@@ -114,7 +114,8 @@
             }
             tempResourceIdForRegister.set(resourceId);
             if (!indexMap.containsKey(tempResourceIdForRegister)) {
-                indexMap.put(new MutableLong(resourceId), ((BaseOperationTracker) index.getOperationTracker()));
+                indexMap.put(new MutableLong(resourceId),
+                        ((AbstractLSMIOOperationCallback) index.getIOOperationCallback()));
             }
         }
     }
@@ -122,16 +123,17 @@
     //[Notice] 
     //This method is called sequentially by the LogAppender threads. 
     //However, the indexMap is concurrently read and modified through this method and registerIndexAndCallback()
-    //TODO: fix issues - 591, 609, 612, and 614.
     @Override
     public void setLastLSN(long resourceId, long LSN) {
         synchronized (indexMap) {
             firstLSN.compareAndSet(-1, LSN);
             lastLSN.set(Math.max(lastLSN.get(), LSN));
-            tempResourceIdForSetLSN.set(resourceId);
-            //TODO; create version number tracker and keep LSNs there. 
-            BaseOperationTracker opTracker = indexMap.get(tempResourceIdForSetLSN);
-            opTracker.updateLastLSN(LSN);
+            if (resourceId != -1) {
+                //Non-update log's resourceId is -1.
+                tempResourceIdForSetLSN.set(resourceId);
+                AbstractLSMIOOperationCallback ioOpCallback = indexMap.get(tempResourceIdForSetLSN);
+                ioOpCallback.updateLastLSN(LSN);
+            }
         }
     }
 
@@ -221,4 +223,10 @@
     public LogRecord getLogRecord() {
         return logRecord;
     }
+
+    public void cleanupForAbort() {
+        if (primaryIndexOpTracker != null) {
+            primaryIndexOpTracker.cleanupNumActiveOperationsForAbortedJob(primaryIndexCallback);
+        }
+    }
 }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
index 01b38c2..07fc152 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -53,7 +53,12 @@
             txnCtx.setTxnState(ITransactionManager.ABORTED);
         }
         try {
-            txnSubsystem.getRecoveryManager().rollbackTransaction(txnCtx);
+            if (txnCtx.isWriteTxn()) {
+                LogRecord logRecord = ((TransactionContext) txnCtx).getLogRecord();
+                logRecord.formJobTerminateLogRecord(txnCtx, false);
+                txnSubsystem.getLogManager().log(logRecord);
+                txnSubsystem.getRecoveryManager().rollbackTransaction(txnCtx);
+            }
         } catch (Exception ae) {
             String msg = "Could not complete rollback! System is in an inconsistent state";
             if (LOGGER.isLoggable(Level.SEVERE)) {
@@ -62,6 +67,7 @@
             ae.printStackTrace();
             throw new ACIDException(msg, ae);
         } finally {
+            ((TransactionContext) txnCtx).cleanupForAbort();
             txnSubsystem.getLockManager().releaseLocks(txnCtx);
             transactionContextRepository.remove(txnCtx.getJobId());
         }
@@ -69,20 +75,24 @@
 
     @Override
     public ITransactionContext beginTransaction(JobId jobId) throws ACIDException {
-        return getTransactionContext(jobId);
+        return getTransactionContext(jobId, true);
     }
 
     @Override
-    public ITransactionContext getTransactionContext(JobId jobId) throws ACIDException {
+    public ITransactionContext getTransactionContext(JobId jobId, boolean createIfNotExist) throws ACIDException {
         setMaxJobId(jobId.getId());
         ITransactionContext txnCtx = transactionContextRepository.get(jobId);
         if (txnCtx == null) {
-            synchronized (this) {
-                txnCtx = transactionContextRepository.get(jobId);
-                if (txnCtx == null) {
-                    txnCtx = new TransactionContext(jobId, txnSubsystem);
-                    transactionContextRepository.put(jobId, txnCtx);
+            if (createIfNotExist) {
+                synchronized (this) {
+                    txnCtx = transactionContextRepository.get(jobId);
+                    if (txnCtx == null) {
+                        txnCtx = new TransactionContext(jobId, txnSubsystem);
+                        transactionContextRepository.put(jobId, txnCtx);
+                    }
                 }
+            } else {
+                throw new ACIDException("TransactionContext of " + jobId + " doesn't exist.");
             }
         }
         return txnCtx;
@@ -94,7 +104,7 @@
         try {
             if (txnCtx.isWriteTxn()) {
                 LogRecord logRecord = ((TransactionContext) txnCtx).getLogRecord();
-                logRecord.formJobCommitLogRecord(txnCtx);
+                logRecord.formJobTerminateLogRecord(txnCtx, true);
                 txnSubsystem.getLogManager().log(logRecord);
             }
         } catch (Exception ae) {