changed log record format to deal with undo/redo operation properly

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_lsm_stabilization@763 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-app/src/test/resources/metadata-transactions/check-state-queries.txt b/asterix-app/src/test/resources/metadata-transactions/check-state-queries.txt
index 6618ddd..5cee728 100644
--- a/asterix-app/src/test/resources/metadata-transactions/check-state-queries.txt
+++ b/asterix-app/src/test/resources/metadata-transactions/check-state-queries.txt
@@ -1,6 +1,6 @@
-check_dataset.aql
-check_datatype.aql
-check_dataverse.aql
-check_index.aql
-check_node.aql
-check_nodegroup.aql
+//check_dataset.aql
+//check_datatype.aql
+//check_dataverse.aql
+//check_index.aql
+//check_node.aql
+//check_nodegroup.aql
diff --git a/asterix-app/src/test/resources/metadata-transactions/init-state-queries.txt b/asterix-app/src/test/resources/metadata-transactions/init-state-queries.txt
index 5359017..5fb1c42 100644
--- a/asterix-app/src/test/resources/metadata-transactions/init-state-queries.txt
+++ b/asterix-app/src/test/resources/metadata-transactions/init-state-queries.txt
@@ -1 +1 @@
-customers_orders.aql
\ No newline at end of file
+//customers_orders.aql
diff --git a/asterix-app/src/test/resources/metadata-transactions/queries.txt b/asterix-app/src/test/resources/metadata-transactions/queries.txt
index 0762364..5f1589e 100644
--- a/asterix-app/src/test/resources/metadata-transactions/queries.txt
+++ b/asterix-app/src/test/resources/metadata-transactions/queries.txt
@@ -1,20 +1,20 @@
-create_duplicate_dataset.aql
-create_duplicate_dataverse.aql
-create_duplicate_index.aql
-create_duplicate_nodegroup.aql
-create_duplicate_type.aql
-drop_nonexistent_dataset.aql
-drop_nonexistent_datatype.aql
-drop_nonexistent_dataverse.aql
-drop_nonexistent_index.aql
-drop_nonexistent_nodegroup.aql
-rollback_drop_dataset.aql
-rollback_drop_datatype.aql
-rollback_drop_dataverse.aql
-rollback_drop_index.aql
-rollback_drop_nodegroup.aql
-rollback_new_dataset.aql
-rollback_new_datatype.aql
-rollback_new_dataverse.aql
-rollback_new_index.aql
-rollback_new_nodegroup.aql
+//create_duplicate_dataset.aql
+//create_duplicate_dataverse.aql
+//create_duplicate_index.aql
+//create_duplicate_nodegroup.aql
+//create_duplicate_type.aql
+//drop_nonexistent_dataset.aql
+//drop_nonexistent_datatype.aql
+//drop_nonexistent_dataverse.aql
+//drop_nonexistent_index.aql
+//drop_nonexistent_nodegroup.aql
+//rollback_drop_dataset.aql
+//rollback_drop_datatype.aql
+//rollback_drop_dataverse.aql
+//rollback_drop_index.aql
+//rollback_drop_nodegroup.aql
+//rollback_new_dataset.aql
+//rollback_new_datatype.aql
+//rollback_new_dataverse.aql
+//rollback_new_index.aql
+//rollback_new_nodegroup.aql
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
index 98d6ed3..c572ed6 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/MetadataNode.java
@@ -78,6 +78,7 @@
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
 import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
 import edu.uci.ics.hyracks.storage.am.lsm.btree.impls.LSMBTreeRangeSearchCursor;
+import edu.uci.ics.hyracks.storage.am.lsm.btree.tuples.LSMBTreeTupleReference;
 
 public class MetadataNode implements IMetadataNode {
     private static final long serialVersionUID = 1L;
@@ -263,7 +264,9 @@
         transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.X, txnCtx);
         // TODO: fix exceptions once new BTree exception model is in hyracks.
         indexAccessor.insert(tuple);
-        index.getTreeLogger().generateLogRecord(transactionProvider, txnCtx, IndexOperation.INSERT, tuple);
+        //TODO: extract the key from the tuple and get the PKHashValue from the key.
+        //index.getIndexLogger().generateLogRecord(transactionProvider, txnCtx, index.getDatasetId().getId(), null,
+        //        resourceID, IndexOperation.INSERT, tuple, null, null);
         indexLifecycleManager.close(resourceID);
     }
 
@@ -515,7 +518,10 @@
         // regular waiters in the LockManager.
         transactionProvider.getLockManager().lock(index.getDatasetId(), -1, LockMode.X, txnCtx);
         indexAccessor.delete(tuple);
-        index.getTreeLogger().generateLogRecord(transactionProvider, txnCtx, IndexOperation.DELETE, tuple);
+        //TODO: extract the key from the tuple and get the PKHashValue from the key.
+        //check how to get the oldValue.
+        //index.getIndexLogger().generateLogRecord(transactionProvider, txnCtx, index.getDatasetId().getId(), null,
+        //        resourceID, IndexOperation.DELETE, tuple, operation, null);
         indexLifecycleManager.close(resourceID);
     }
 
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataIndex.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataIndex.java
index 88dac70..8554ddd 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataIndex.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataIndex.java
@@ -19,14 +19,14 @@
 
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
-import edu.uci.ics.asterix.transaction.management.service.logging.TreeLogger;
+import edu.uci.ics.asterix.transaction.management.service.logging.IndexLogger;
 import edu.uci.ics.asterix.transaction.management.service.transaction.DatasetId;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
 
 /**
  * Descriptor interface for a primary or secondary index on metadata datasets.
@@ -66,7 +66,7 @@
 
     public void setFileId(int fileId);
 
-    public void initTreeLogger(ITreeIndex treeIndex) throws ACIDException;
+    public void initIndexLogger(IIndex index) throws ACIDException;
 
     public int getFileId();
 
@@ -74,9 +74,7 @@
 
     public long getResourceID();
 
-    public byte[] getResourceId();
-
-    public TreeLogger getTreeLogger();
+    public IndexLogger getIndexLogger();
     
     public DatasetId getDatasetId();
 }
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
index 3b5af14..a1a510c 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -44,8 +44,8 @@
 import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
 import edu.uci.ics.asterix.transaction.management.resource.TransactionalResourceRepository;
-import edu.uci.ics.asterix.transaction.management.service.logging.DataUtil;
-import edu.uci.ics.asterix.transaction.management.service.logging.TreeResourceManager;
+import edu.uci.ics.asterix.transaction.management.service.logging.IndexResourceManager;
+import edu.uci.ics.asterix.transaction.management.service.transaction.IResourceManager.ResourceType;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
 import edu.uci.ics.hyracks.api.application.INCApplicationContext;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -54,8 +54,8 @@
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.io.IIOManager;
 import edu.uci.ics.hyracks.storage.am.common.api.IInMemoryFreePageManager;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
 import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
 import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrameFactory;
 import edu.uci.ics.hyracks.storage.am.common.frames.LIFOMetaDataFrameFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.btree.impls.LSMBTree;
@@ -128,8 +128,8 @@
         boolean isNewUniverse = true;
         TransactionalResourceRepository resourceRepository = runtimeContext.getTransactionProvider()
                 .getTransactionalResourceRepository();
-        resourceRepository.registerTransactionalResourceManager(TreeResourceManager.ID, new TreeResourceManager(
-                runtimeContext.getTransactionProvider()));
+        resourceRepository.registerTransactionalResourceManager(ResourceType.LSM_BTREE, new IndexResourceManager(
+                ResourceType.LSM_BTREE, runtimeContext.getTransactionProvider()));
 
         metadataNodeName = asterixProperties.getMetadataNodeName();
         isNewUniverse = asterixProperties.isNewUniverse();
@@ -205,13 +205,12 @@
         }
     }
 
-    private static void registerTransactionalResource(IMetadataIndex index,
+    private static void registerTransactionalResource(IMetadataIndex metadataIndex,
             TransactionalResourceRepository resourceRepository) throws ACIDException {
-        long resourceID = index.getResourceID();
-        ITreeIndex treeIndex = (ITreeIndex) indexLifecycleManager.getIndex(resourceID);
-        byte[] resourceId = DataUtil.longToByteArray(resourceID);
-        resourceRepository.registerTransactionalResource(resourceId, treeIndex);
-        index.initTreeLogger(treeIndex);
+        long resourceId = metadataIndex.getResourceID();
+        IIndex index = indexLifecycleManager.getIndex(resourceId);
+        resourceRepository.registerTransactionalResource(resourceId, index);
+        metadataIndex.initIndexLogger(index);
     }
 
     public static void insertInitialDataverses(MetadataTransactionContext mdTxnCtx) throws Exception {
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataIndex.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataIndex.java
index d1e377e..c069d0d 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataIndex.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataIndex.java
@@ -28,16 +28,16 @@
 import edu.uci.ics.asterix.om.types.ARecordType;
 import edu.uci.ics.asterix.om.types.IAType;
 import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
-import edu.uci.ics.asterix.transaction.management.service.logging.DataUtil;
-import edu.uci.ics.asterix.transaction.management.service.logging.TreeLogger;
+import edu.uci.ics.asterix.transaction.management.service.logging.IndexLogger;
 import edu.uci.ics.asterix.transaction.management.service.transaction.DatasetId;
+import edu.uci.ics.asterix.transaction.management.service.transaction.IResourceManager.ResourceType;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
 
 /**
  * Descriptor for a primary or secondary index on metadata datasets.
@@ -67,11 +67,10 @@
     protected FileReference file;
     // Identifier of file BufferCache backing this metadata btree index.
     protected int fileId;
-    protected long resourceID;
     // Resource id of this index for use in transactions.
-    protected byte[] indexResourceId;
+    protected long resourceId;
     // Logger for tree indexes.
-    private TreeLogger treeLogger;
+    private IndexLogger indexLogger;
     // datasetId
     private final DatasetId datasetId;
 
@@ -205,12 +204,11 @@
     @Override
     public void setFileId(int fileId) {
         this.fileId = fileId;
-        this.indexResourceId = DataUtil.intToByteArray(fileId);
     }
 
     @Override
-    public void initTreeLogger(ITreeIndex treeIndex) throws ACIDException {
-        this.treeLogger = new TreeLogger(indexResourceId, treeIndex);
+    public void initIndexLogger(IIndex index) throws ACIDException {
+        this.indexLogger = new IndexLogger(resourceId, ResourceType.LSM_BTREE, index);
     }
 
     @Override
@@ -223,13 +221,8 @@
         return payloadType;
     }
 
-    @Override
-    public byte[] getResourceId() {
-        return indexResourceId;
-    }
-
-    public TreeLogger getTreeLogger() {
-        return treeLogger;
+    public IndexLogger getIndexLogger() {
+        return indexLogger;
     }
 
     @Override
@@ -244,13 +237,12 @@
 
     @Override
     public void setResourceID(long resourceID) {
-        this.resourceID = resourceID;
-        this.indexResourceId = DataUtil.longToByteArray(resourceID);
+        this.resourceId = resourceID;
     }
 
     @Override
     public long getResourceID() {
-        return resourceID;
+        return resourceId;
     }
 
     @Override
diff --git a/asterix-transactions/pom.xml b/asterix-transactions/pom.xml
index c440c2a..cb7d4e2 100644
--- a/asterix-transactions/pom.xml
+++ b/asterix-transactions/pom.xml
@@ -32,5 +32,10 @@
   		<type>jar</type>
   		<scope>compile</scope>
   	</dependency>
+	    <dependency>
+	    	<groupId>edu.uci.ics.hyracks</groupId>
+	    	<artifactId>hyracks-storage-am-lsm-common</artifactId>
+	    	<version>0.2.2-SNAPSHOT</version>
+	    </dependency>
 	</dependencies>
 </project>
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/TransactionalResourceRepository.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/TransactionalResourceRepository.java
index 25309e1..5997bf1 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/TransactionalResourceRepository.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/TransactionalResourceRepository.java
@@ -20,6 +20,7 @@
 import java.util.Map;
 
 import edu.uci.ics.asterix.transaction.management.service.transaction.IResourceManager;
+import edu.uci.ics.asterix.transaction.management.service.transaction.MutableResourceId;
 
 /**
  * Represents a repository containing Resource Managers and Resources in the
@@ -32,17 +33,18 @@
  */
 public class TransactionalResourceRepository {
 
-    private Map<ByteBuffer, Object> resourceRepository = new HashMap<ByteBuffer, Object>(); // repository
+    private Map<MutableResourceId, Object> resourceRepository = new HashMap<MutableResourceId, Object>(); // repository
 
     private Map<Byte, IResourceManager> resourceMgrRepository = new HashMap<Byte, IResourceManager>(); // repository
+    
+    private MutableResourceId mutableResourceId = new MutableResourceId(0);
 
-    public void registerTransactionalResource(byte[] resourceBytes, Object resource) {
-        // convert to ByteBuffer so that a byte[] can be used as a key in a hash map.
-        ByteBuffer resourceId = ByteBuffer.wrap(resourceBytes); // need to
-
+    public void registerTransactionalResource(long resourceId, Object resource) {
         synchronized (resourceRepository) {
+            mutableResourceId.setId(resourceId);
             if (resourceRepository.get(resourceId) == null) {
-                resourceRepository.put(resourceId, resource);
+                MutableResourceId newMutableResourceId = new MutableResourceId(resourceId);
+                resourceRepository.put(newMutableResourceId, resource);
                 
                 // wake up threads waiting for the resource
                 resourceRepository.notifyAll();
@@ -61,10 +63,10 @@
         }
     }
 
-    public Object getTransactionalResource(byte[] resourceIdBytes) {
-        ByteBuffer buffer = ByteBuffer.wrap(resourceIdBytes);
+    public Object getTransactionalResource(long resourceId) {
         synchronized (resourceRepository) {
-            while (resourceRepository.get(buffer) == null) {
+            mutableResourceId.setId(resourceId);
+            while (resourceRepository.get(mutableResourceId) == null) {
                 try {
                     resourceRepository.wait();
                 } catch (InterruptedException ie) {
@@ -73,7 +75,7 @@
                     // failures occurring elsewhere, break from the loop
                 }
             }
-            return resourceRepository.get(buffer);
+            return resourceRepository.get(mutableResourceId);
         }
     }
 
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/DataUtil.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/DataUtil.java
index 4b8d62f..91ce7ed5 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/DataUtil.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/DataUtil.java
@@ -47,6 +47,13 @@
         return bytes;
     }
 
+    public static long byteArrayToLong(byte[] bytes, int offset) {
+        return ((bytes[offset] & 0xff) << 56) + ((bytes[offset + 1] & 0xff) << 48) + ((bytes[offset + 2] & 0xff) << 40)
+                + ((bytes[offset + 3] & 0xff) << 32) + ((bytes[offset + 4] & 0xff) << 24)
+                + ((bytes[offset + 5] & 0xff) << 16) + ((bytes[offset + 6] & 0xff) << 8)
+                + ((bytes[offset + 7] & 0xff) << 0);
+    }
+
     public static byte[] longToByteArray(long value) {
         byte[] bytes = new byte[8];
         bytes[0] = (byte) ((value >>> 56) & 0xFF);
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogManager.java
index 1158029..40f716c 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogManager.java
@@ -15,51 +15,30 @@
 package edu.uci.ics.asterix.transaction.management.service.logging;
 
 import java.io.IOException;
-import java.util.Map;
 
 import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
+import edu.uci.ics.asterix.transaction.management.service.logging.IndexLogger.ReusableLogContentObject;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionProvider;
 
 public interface ILogManager {
 
     /**
-     * An API to write a log record.
-     * 
-     * @param logicalLogLocator
-     *            A reusable object passed in by the caller. When the call
-     *            returns, this object has the physical location of the log
-     *            record that was written.
-     * @param context
-     *            the transaction context associated with the transaction that
-     *            is writing the log record
-     * @param resourceMgrId
-     *            the unique identifier of the resource manager that would be
-     *            handling (interpreting) the log record if there is a need to
-     *            process and apply the log record during a redo/undo task.
-     * @param pageId
-     *            the unique identifier of the page where the operation
-     *            corresponding to the log record is applied
      * @param logType
-     *            the type of log record (@see LogType)
-     * @param logActionType
-     *            the action that needs to be taken when processing the log
-     *            record (@see LogActionType)
-     * @param length
-     *            the length of the content inside the log record. This does not
-     *            include the header or the checksum size.
+     * @param context
+     * @param datasetId
+     * @param PKHashValue
+     * @param resourceId
+     * @param resourceMgrId
+     * @param logContentSize
+     * @param reusableLogContentObject
      * @param logger
-     *            an implementation of the @see ILogger interface that is
-     *            invoked by the ILogManager instance to get the actual content
-     *            for the log record.
-     * @param loggerArguments
-     *            Represent any additional arguments that needs to be passed
-     *            back in the call the to ILogger interface APIs.
+     * @param logicalLogLocator
      * @throws ACIDException
      */
-    public void log(LogicalLogLocator logicalLogLocator, TransactionContext context, byte resourceMgrId, long pageId,
-            byte logType, byte logActionType, int length, ILogger logger, Map<Object, Object> loggerArguments)
-            throws ACIDException;
+    void log(byte logType, TransactionContext context, int datasetId, int PKHashValue, long resourceId,
+            byte resourceMgrId, int logContentSize, ReusableLogContentObject reusableLogContentObject, ILogger logger,
+            LogicalLogLocator logicalLogLocator) throws ACIDException;
 
     /**
      * @param physicalLogLocator
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogRecordHelper.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogRecordHelper.java
index ab4bfea..4f28c7d 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogRecordHelper.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogRecordHelper.java
@@ -25,38 +25,41 @@
 
 public interface ILogRecordHelper {
 
-    public byte getLogType(LogicalLogLocator logicalLogLocator);
+    byte getLogType(LogicalLogLocator logicalLogLocator);
 
-    public int getLogLength(LogicalLogLocator logicalLogLocator);
+    int getJobId(LogicalLogLocator logicalLogLocator);
 
-    public long getLogTimestamp(LogicalLogLocator logicalLogLocator);
+    int getDatasetId(LogicalLogLocator logicalLogLocator);
 
-    public long getLogChecksum(LogicalLogLocator logicalLogLocator);
+    int getPKHashValue(LogicalLogLocator logicalLogLocator);
 
-    public long getLogTransactionId(LogicalLogLocator logicalLogLocator);
+    PhysicalLogLocator getPrevLSN(LogicalLogLocator logicalLogLocator);
 
-    public byte getResourceMgrId(LogicalLogLocator logicalLogLocator);
+    boolean getPrevLSN(PhysicalLogLocator physicalLogLocator, LogicalLogLocator logicalLogLocator);
+    
+    long getResourceId(LogicalLogLocator logicalLogLocator);
+    
+    byte getResourceMgrId(LogicalLogLocator logicalLogLocater);
 
-    public long getPageId(LogicalLogLocator logicalLogLocator);
+    int getLogRecordSize(LogicalLogLocator logicalLogLocater);
 
-    public int getLogContentBeginPos(LogicalLogLocator logicalLogLocator);
+    long getLogChecksum(LogicalLogLocator logicalLogLocator);
 
-    public int getLogContentEndPos(LogicalLogLocator logicalLogLocator);
+    int getLogContentBeginPos(LogicalLogLocator logicalLogLocator);
 
-    public String getLogRecordForDisplay(LogicalLogLocator logicalLogLocator);
+    int getLogContentEndPos(LogicalLogLocator logicalLogLocator);
 
-    public byte getLogActionType(LogicalLogLocator logicalLogLocator);
+    String getLogRecordForDisplay(LogicalLogLocator logicalLogLocator);
 
-    public PhysicalLogLocator getPreviousLsnByTransaction(LogicalLogLocator logicalLogLocator);
+    void writeLogHeader(LogicalLogLocator logicalLogLocator, byte logType, TransactionContext context, int datasetId,
+            int PKHashValue, long prevLogicalLogLocator, long resourceId, byte resourceMgrId, int logRecordSize);
 
-    public boolean getPreviousLsnByTransaction(PhysicalLogLocator physicalLogLocator,
-            LogicalLogLocator logicalLogLocator);
+    boolean validateLogRecord(LogicalLogLocator logicalLogLocator);
 
-    public void writeLogHeader(TransactionContext context, LogicalLogLocator logicalLogLocator, byte resourceMgrId,
-            long pageId, byte logType, byte logActionType, int logContentSize, long prevLsnValue);
+    int getLogRecordSize(byte logType, int logBodySize);
 
-    public void writeLogTail(LogicalLogLocator logicalLogLocator, ILogManager logManager);
+    int getLogHeaderSize(byte logType);
 
-    public boolean validateLogRecord(LogManagerProperties logManagerProperties, LogicalLogLocator logicalLogLocator);
+    int getLogChecksumSize();
 
 }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogger.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogger.java
index 92984cb..e26a3cc 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogger.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogger.java
@@ -14,9 +14,9 @@
  */
 package edu.uci.ics.asterix.transaction.management.service.logging;
 
-import java.util.Map;
 
 import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
+import edu.uci.ics.asterix.transaction.management.service.logging.IndexLogger.ReusableLogContentObject;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
 
 /**
@@ -25,11 +25,11 @@
  */
 public interface ILogger {
 
-    public void preLog(TransactionContext context, Map<Object, Object> loggerArguments) throws ACIDException;
+    public void preLog(TransactionContext context, ReusableLogContentObject reusableLogContentObject) throws ACIDException;
 
-    public void log(TransactionContext context, final LogicalLogLocator logicalLogLocator, int logRecordSize,
-            Map<Object, Object> loggerArguments) throws ACIDException;
+    public void log(TransactionContext context, final LogicalLogLocator logicalLogLocator, int logContentSize,
+            ReusableLogContentObject reusableLogContentObject) throws ACIDException;
 
-    public void postLog(TransactionContext context, Map<Object, Object> loggerArguments) throws ACIDException;
+    public void postLog(TransactionContext context, ReusableLogContentObject reusableLogContentObject) throws ACIDException;
 
 }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexLogger.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexLogger.java
new file mode 100644
index 0000000..48a9dbf
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexLogger.java
@@ -0,0 +1,259 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.transaction.management.service.logging;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
+import edu.uci.ics.asterix.transaction.management.resource.ICloseable;
+import edu.uci.ics.asterix.transaction.management.service.transaction.IResourceManager.ResourceType;
+import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
+import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionProvider;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
+import edu.uci.ics.hyracks.storage.am.common.tuples.SimpleTupleWriter;
+
+
+
+public class IndexLogger implements ILogger, ICloseable {
+
+    private final Map<Object, Object> jobId2ReusableLogContentObjectRepositoryMap = new ConcurrentHashMap<Object, Object>();
+
+    public static final String TREE_INDEX = "TREE_INDEX";
+    public static final String TUPLE_REFERENCE = "TUPLE_REFERENCE";
+    public static final String TUPLE_WRITER = "TUPLE_WRITER";
+    public static final String INDEX_OPERATION = "INDEX_OPERATION";
+    public static final String RESOURCE_ID = "RESOURCE_ID";
+
+    private final long resourceId;
+    private final byte resourceType;
+    private final SimpleTupleWriter tupleWriter;
+
+    public class BTreeOperationCodes {
+        public static final byte INSERT = 0;
+        public static final byte DELETE = 1;
+    }
+
+    public IndexLogger(long resourceId, byte resourceType, IIndex index) {
+        this.resourceId = resourceId;
+        this.resourceType = resourceType;
+        this.tupleWriter = new SimpleTupleWriter();
+    }
+
+    public synchronized void close(TransactionContext context) {
+        ReusableLogContentObjectRepository txnThreadStateRepository = (ReusableLogContentObjectRepository) jobId2ReusableLogContentObjectRepositoryMap
+                .get(context.getJobId());
+        txnThreadStateRepository.remove(Thread.currentThread().getId());
+        jobId2ReusableLogContentObjectRepositoryMap.remove(context.getJobId());
+    }
+
+    public void generateLogRecord(TransactionProvider provider, TransactionContext context, int datasetId,
+            int PKHashValue, long resourceId, IndexOperation newOperation, ITupleReference newValue,
+            IndexOperation oldOperation, ITupleReference oldValue) throws ACIDException {
+
+        if (this.resourceId != resourceId) {
+            throw new ACIDException("IndexLogger mistach");
+        }
+
+        context.addCloseableResource(this); // the close method would be called
+        // on this TreeLogger instance at
+        // the time of transaction
+        // commit/abort.
+        if (newOperation != IndexOperation.INSERT && newOperation != IndexOperation.DELETE) {
+            throw new ACIDException("Loging for Operation " + newOperation + " not supported");
+        }
+
+        ReusableLogContentObject reusableLogContentObject = null;
+        ReusableLogContentObjectRepository reusableLogContentObjectRepository = null;
+        reusableLogContentObjectRepository = (ReusableLogContentObjectRepository) jobId2ReusableLogContentObjectRepositoryMap
+                .get(context.getJobId());
+        if (reusableLogContentObjectRepository == null) {
+            synchronized (context) { // threads belonging to different
+                // transaction do not need to
+                // synchronize amongst them.
+                if (reusableLogContentObjectRepository == null) {
+                    reusableLogContentObjectRepository = new ReusableLogContentObjectRepository();
+                    jobId2ReusableLogContentObjectRepositoryMap.put(context.getJobId(),
+                            reusableLogContentObjectRepository);
+                }
+            }
+        }
+
+        reusableLogContentObject = reusableLogContentObjectRepository.getObject(Thread.currentThread().getId());
+        if (reusableLogContentObject == null) {
+            LogicalLogLocator logicalLogLocator = LogUtil.getDummyLogicalLogLocator(provider.getLogManager());
+            reusableLogContentObject = new ReusableLogContentObject(logicalLogLocator, newOperation, newValue,
+                    oldOperation, oldValue);
+            reusableLogContentObjectRepository.putObject(Thread.currentThread().getId(), reusableLogContentObject);
+        } else {
+            reusableLogContentObject.setNewOperation(newOperation);
+            reusableLogContentObject.setNewValue(newValue);
+            reusableLogContentObject.setOldOperation(oldOperation);
+            reusableLogContentObject.setOldValue(oldValue);
+        }
+
+        int logContentSize = 4/*TupleFieldCount*/+ 1/*NewOperation*/+ 4/*newValueLength*/+ tupleWriter
+                .bytesRequired(newValue);
+        logContentSize += 1/*OldOperation*/+ 4/*oldValueLength*/+ tupleWriter.bytesRequired(oldValue);
+
+        provider.getLogManager().log(LogType.UPDATE, context, datasetId, PKHashValue, resourceId, resourceType,
+                logContentSize, reusableLogContentObject, this, reusableLogContentObject.getLogicalLogLocator());
+    }
+
+    @Override
+    public void log(TransactionContext context, LogicalLogLocator logicalLogLocator, int logContentSize,
+            ReusableLogContentObject reusableLogContentObject) throws ACIDException {
+        int offset = 0;
+        int tupleSize = 0;
+
+        //tuple field count
+        (logicalLogLocator.getBuffer()).writeInt(logicalLogLocator.getMemoryOffset() + offset, reusableLogContentObject
+                .getNewValue().getFieldCount());
+        offset += 4;
+
+        //new operation
+        (logicalLogLocator.getBuffer()).put(logicalLogLocator.getMemoryOffset() + offset,
+                (byte) reusableLogContentObject.getNewOperation().ordinal());
+        offset += 1;
+
+        //new tuple size
+        tupleSize = tupleWriter.bytesRequired(reusableLogContentObject.getNewValue());
+        (logicalLogLocator.getBuffer()).writeInt(logicalLogLocator.getMemoryOffset() + offset, tupleSize);
+        offset += 4;
+
+        //new tuple
+        tupleWriter.writeTuple(reusableLogContentObject.getNewValue(), logicalLogLocator.getBuffer().getArray(),
+                logicalLogLocator.getMemoryOffset() + offset);
+        offset += tupleSize;
+
+        if (resourceType == ResourceType.LSM_BTREE) {
+            //old operation
+            (logicalLogLocator.getBuffer()).put(logicalLogLocator.getMemoryOffset() + offset,
+                    (byte) reusableLogContentObject.getOldOperation().ordinal());
+            offset += 1;
+
+            if (reusableLogContentObject.getOldOperation() != IndexOperation.NOOP) {
+                //old tuple size
+                tupleSize = tupleWriter.bytesRequired(reusableLogContentObject.getOldValue());
+                (logicalLogLocator.getBuffer()).writeInt(logicalLogLocator.getMemoryOffset() + offset, tupleSize);
+                offset += 4;
+
+                //old tuple
+                tupleWriter.writeTuple(reusableLogContentObject.getNewValue(),
+                        logicalLogLocator.getBuffer().getArray(), logicalLogLocator.getMemoryOffset() + offset);
+            }
+        }
+    }
+
+    @Override
+    public void postLog(TransactionContext context, ReusableLogContentObject reusableLogContentObject)
+            throws ACIDException {
+    }
+
+    @Override
+    public void preLog(TransactionContext context, ReusableLogContentObject reusableLogContentObject)
+            throws ACIDException {
+    }
+    
+    /**
+     * Represents a utility class for generating log records corresponding to
+     * operations on a ITreeIndex implementation. A TreeLogger instance is thread
+     * safe and can be shared across multiple threads that may belong to same or
+     * different transactions.
+     */
+    public class ReusableLogContentObjectRepository {
+
+        private final Map<Long, ReusableLogContentObject> id2Object = new HashMap<Long, ReusableLogContentObject>();
+
+        public synchronized ReusableLogContentObject getObject(long threadId) {
+            return id2Object.get(threadId);
+        }
+
+        public synchronized void putObject(long threadId, ReusableLogContentObject reusableLogContentObject) {
+            this.id2Object.put(threadId, reusableLogContentObject);
+        }
+
+        public synchronized void remove(long threadId) {
+            id2Object.remove(threadId);
+        }
+    }
+
+    /**
+     * Represents the state of a transaction thread. The state contains information
+     * that includes the tuple being operated, the operation and the location of the
+     * log record corresponding to the operation.
+     */
+    public class ReusableLogContentObject {
+
+        private LogicalLogLocator logicalLogLocator;
+        private IndexOperation newOperation;
+        private ITupleReference newValue;
+        private IndexOperation oldOperation;
+        private ITupleReference oldValue;
+
+        public ReusableLogContentObject(LogicalLogLocator logicalLogLocator, IndexOperation newOperation,
+                ITupleReference newValue, IndexOperation oldOperation, ITupleReference oldValue) {
+            this.logicalLogLocator = logicalLogLocator;
+            this.newOperation = newOperation;
+            this.newValue = newValue;
+            this.oldOperation = oldOperation;
+            this.oldValue = oldValue;
+        }
+
+        public synchronized LogicalLogLocator getLogicalLogLocator() {
+            return logicalLogLocator;
+        }
+
+        public synchronized void setLogicalLogLocator(LogicalLogLocator logicalLogLocator) {
+            this.logicalLogLocator = logicalLogLocator;
+        }
+
+        public synchronized void setNewOperation(IndexOperation newOperation) {
+            this.newOperation = newOperation;
+        }
+
+        public synchronized IndexOperation getNewOperation() {
+            return newOperation;
+        }
+
+        public synchronized void setNewValue(ITupleReference newValue) {
+            this.newValue = newValue;
+        }
+
+        public synchronized ITupleReference getNewValue() {
+            return newValue;
+        }
+
+        public synchronized void setOldOperation(IndexOperation oldOperation) {
+            this.oldOperation = oldOperation;
+        }
+
+        public synchronized IndexOperation getOldOperation() {
+            return oldOperation;
+        }
+
+        public synchronized void setOldValue(ITupleReference oldValue) {
+            this.oldValue = oldValue;
+        }
+
+        public synchronized ITupleReference getOldValue() {
+            return oldValue;
+        }
+    }
+
+}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexLoggerRepository.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexLoggerRepository.java
new file mode 100644
index 0000000..54790b3
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexLoggerRepository.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.transaction.management.service.logging;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import edu.uci.ics.asterix.transaction.management.service.transaction.MutableResourceId;
+import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
+
+public class IndexLoggerRepository {
+
+    private final Map<MutableResourceId, IndexLogger> loggers = new HashMap<MutableResourceId, IndexLogger>();
+    private final TransactionProvider provider;
+    private MutableResourceId mutableResourceId;
+
+    public IndexLoggerRepository(TransactionProvider provider) {
+        this.provider = provider;
+        mutableResourceId = new MutableResourceId(0);
+    }
+
+    public synchronized IndexLogger getIndexLogger(long resourceId, byte resourceType) {
+        mutableResourceId.setId(resourceId);
+        IndexLogger logger = loggers.get(mutableResourceId);
+        if (logger == null) {
+            MutableResourceId newMutableResourceId = new MutableResourceId(resourceId);
+            IIndex index = (IIndex) provider.getTransactionalResourceRepository().getTransactionalResource(resourceId);
+            logger = new IndexLogger(resourceId, resourceType, index);
+            loggers.put(newMutableResourceId, logger);
+        }
+        return logger;
+    }
+}
\ No newline at end of file
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexResourceManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexResourceManager.java
new file mode 100644
index 0000000..19f055c
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexResourceManager.java
@@ -0,0 +1,126 @@
+/*
+ * Copyright 2009-2011 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.asterix.transaction.management.service.logging;
+
+import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
+import edu.uci.ics.asterix.transaction.management.service.transaction.IResourceManager;
+import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionProvider;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
+import edu.uci.ics.hyracks.storage.am.common.tuples.SimpleTupleReference;
+import edu.uci.ics.hyracks.storage.am.common.tuples.SimpleTupleWriter;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+
+public class IndexResourceManager implements IResourceManager {
+
+    public final byte resourceType;
+
+    private final TransactionProvider provider;
+
+    public IndexResourceManager(byte resourceType, TransactionProvider provider) {
+        this.resourceType = resourceType;
+        this.provider = provider;
+    }
+
+    public byte getResourceManagerId() {
+        return resourceType;
+    }
+
+    public void undo(ILogRecordHelper logRecordHelper, LogicalLogLocator logLocator) throws ACIDException {
+        long resourceId = logRecordHelper.getResourceId(logLocator);
+        int offset = logRecordHelper.getLogContentBeginPos(logLocator);
+
+        /*
+        byte[] logBufferContent = logLocator.getBuffer().getArray();
+        // read the length of resource id byte array
+        int resourceIdLength = DataUtil.byteArrayToInt(logBufferContent, logContentBeginPos);
+        byte[] resourceIdBytes = new byte[resourceIdLength];
+
+        // copy the resource if bytes
+        System.arraycopy(logBufferContent, logContentBeginPos + 4, resourceIdBytes, 0, resourceIdLength);
+        */
+
+        // look up the repository to obtain the resource object
+        IIndex index = (IIndex) provider.getTransactionalResourceRepository().getTransactionalResource(resourceId);
+
+        /* field count */
+        int fieldCount = logLocator.getBuffer().readInt(logLocator.getMemoryOffset() + offset);
+        offset += 4;
+
+        /* new operation */
+        byte newOperation = logLocator.getBuffer().getByte(logLocator.getMemoryOffset() + offset);
+        offset += 1;
+
+        /* new value size */
+        int newValueSize = logLocator.getBuffer().readInt(logLocator.getMemoryOffset() + offset);
+        offset += 4;
+
+        /* new value */
+        SimpleTupleWriter tupleWriter = new SimpleTupleWriter();
+        SimpleTupleReference newTuple = (SimpleTupleReference) tupleWriter.createTupleReference();
+        newTuple.setFieldCount(fieldCount);
+        newTuple.resetByTupleOffset(logLocator.getBuffer().getByteBuffer(), offset);
+        offset += newValueSize;
+
+        ILSMIndexAccessor indexAccessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
+                NoOpOperationCallback.INSTANCE);
+
+        try {
+            if (resourceType == ResourceType.LSM_BTREE) {
+
+                /* old operation */
+                byte oldOperation = logLocator.getBuffer().getByte(logLocator.getMemoryOffset() + offset);
+                offset += 1;
+
+                if (oldOperation != (byte) IndexOperation.NOOP.ordinal()) {
+                    /* old value size */
+                    int oldValueSize = logLocator.getBuffer().readInt(logLocator.getMemoryOffset() + offset);
+                    offset += 4;
+
+                    /* old value */
+                    SimpleTupleReference oldTuple = (SimpleTupleReference) tupleWriter.createTupleReference();
+                    oldTuple.setFieldCount(fieldCount);
+                    oldTuple.resetByTupleOffset(logLocator.getBuffer().getByteBuffer(), offset);
+                    offset += oldValueSize;
+
+                    if (oldOperation == (byte) IndexOperation.DELETE.ordinal()) {
+                        indexAccessor.delete(oldTuple);
+                    } else {
+                        indexAccessor.insert(oldTuple);
+                    }
+                } else {
+                    indexAccessor.physicalDelete(newTuple);
+                }
+            } else {
+                //For LSMRtree and LSMInvertedIndex
+                //delete --> physical delete
+                //insert --> logical delete
+                if (newOperation == (byte) IndexOperation.DELETE.ordinal()) {
+                    indexAccessor.physicalDelete(newTuple);
+                } else {
+                    indexAccessor.delete(newTuple);
+                }
+            }
+        } catch (Exception e) {
+            throw new ACIDException("Undo failed", e);
+        }
+    }
+
+    public void redo(ILogRecordHelper logRecordHelper, LogicalLogLocator logicalLogLocator) throws ACIDException {
+        throw new UnsupportedOperationException(" Redo logic will be implemented as part of crash recovery feature");
+    }
+
+}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java
index 7ba9130..8e89301 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java
@@ -77,8 +77,11 @@
         int integerRead = -1;
         boolean logRecordBeginPosFound = false;
         long bytesSkipped = 0;
+        
+        //check whether the currentOffset has enough space to have new log record by comparing
+        //the smallest log record type(which is commit)'s log header.
         while (logicalLogLocator.getMemoryOffset() <= readOnlyBuffer.getSize()
-                - logManager.getLogManagerProperties().getLogHeaderSize()) {
+                - logManager.getLogRecordHelper().getLogHeaderSize(LogType.COMMIT)) {
             integerRead = readOnlyBuffer.readInt(logicalLogLocator.getMemoryOffset());
             if (integerRead == logManager.getLogManagerProperties().logMagicNumber) {
                 logRecordBeginPosFound = true;
@@ -109,8 +112,8 @@
             }
         }
 
-        int logLength = logManager.getLogRecordHelper().getLogLength(logicalLogLocator);
-        if (logManager.getLogRecordHelper().validateLogRecord(logManager.getLogManagerProperties(), logicalLogLocator)) {
+        int logLength = logManager.getLogRecordHelper().getLogRecordSize(logicalLogLocator);
+        if (logManager.getLogRecordHelper().validateLogRecord(logicalLogLocator)) {
             if (nextLogicalLogLocator == null) {
                 nextLogicalLogLocator = new LogicalLogLocator(0, readOnlyBuffer, -1, logManager);
             }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
index 1d1e067..5736105 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
@@ -21,7 +21,6 @@
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
-import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -30,6 +29,7 @@
 import java.util.logging.Logger;
 
 import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
+import edu.uci.ics.asterix.transaction.management.service.logging.IndexLogger.ReusableLogContentObject;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionProvider;
@@ -495,33 +495,33 @@
         }
     }
 
-    public void log(LogicalLogLocator logLocator, TransactionContext context, byte resourceMgrId, long pageId,
-            byte logType, byte logActionType, int requestedSpaceForLog, ILogger logger,
-            Map<Object, Object> loggerArguments) throws ACIDException {
+    @Override
+    public void log(byte logType, TransactionContext context, int datasetId, int PKHashValue, long resourceId,
+            byte resourceMgrId, int logContentSize, ReusableLogContentObject reusableLogContentObject,
+            ILogger logger, LogicalLogLocator logicalLogLocator) throws ACIDException {
         /*
          * logLocator is a re-usable object that is appropriately set in each
          * invocation. If the reference is null, the log manager must throw an
          * exception
          */
-        if (logLocator == null) {
+        if (logicalLogLocator == null) {
             throw new ACIDException(
                     " you need to pass in a non-null logLocator, if you dont have it, then pass in a dummy so that the +"
                             + "log manager can set it approporiately for you");
         }
 
         // compute the total log size including the header and the checksum.
-        int totalLogSize = logManagerProperties.getLogHeaderSize() + requestedSpaceForLog
-                + logManagerProperties.getLogChecksumSize();
+        int totalLogSize = logRecordHelper.getLogRecordSize(logType, logContentSize);
 
         // check for the total space requirement to be less than a log page.
         if (totalLogSize > logManagerProperties.getLogPageSize()) {
             throw new ACIDException(
                     " Maximum Log Content Size is "
-                            + (logManagerProperties.getLogPageSize() - logManagerProperties.getLogHeaderSize() - logManagerProperties
+                            + (logManagerProperties.getLogPageSize() - logRecordHelper.getLogHeaderSize(LogType.UPDATE) - logRecordHelper
                                     .getLogChecksumSize()));
         }
 
-        // all constraints checked and we are goot to go and acquire a lsn.
+        // all constraints checked and we are good to go and acquire a lsn.
         long previousLogLocator = -1;
         long myLogLocator; // the will be set to the location (a long value)
         // where the log record needs to be placed.
@@ -537,7 +537,7 @@
             previousLogLocator = context.getLastLogLocator().getLsn();
             myLogLocator = getLsn(totalLogSize, logType);
             context.getLastLogLocator().setLsn(myLogLocator);
-            logLocator.setLsn(myLogLocator);
+            logicalLogLocator.setLsn(myLogLocator);
         }
 
         /*
@@ -564,43 +564,43 @@
 
         try {
 
-            logLocator.setBuffer(logPages[pageIndex]);
+            logicalLogLocator.setBuffer(logPages[pageIndex]);
             int pageOffset = getLogPageOffset(myLogLocator);
-            logLocator.setMemoryOffset(pageOffset);
+            logicalLogLocator.setMemoryOffset(pageOffset);
 
             /*
              * write the log header.
              */
-            logRecordHelper.writeLogHeader(context, logLocator, resourceMgrId, pageId, logType, logActionType,
-                    requestedSpaceForLog, previousLogLocator);
-
+            logRecordHelper.writeLogHeader(logicalLogLocator, logType, context, datasetId, PKHashValue, previousLogLocator,
+                    resourceId, resourceMgrId, logContentSize);
+            
             // increment the offset so that the transaction can fill up the
             // content in the correct region of the allocated space.
-            logLocator.increaseMemoryOffset(logManagerProperties.getLogHeaderSize());
+            logicalLogLocator.increaseMemoryOffset(logRecordHelper.getLogHeaderSize(logType));
 
             // a COMMIT log record does not have any content
             // and hence the logger (responsible for putting the log content) is
             // not invoked.
-            if (requestedSpaceForLog != 0) {
-                logger.preLog(context, loggerArguments);
+            if (logContentSize != 0) {
+                logger.preLog(context, reusableLogContentObject);
             }
 
-            if (requestedSpaceForLog != 0) {
+            if (logContentSize != 0) {
                 // call the logger implementation and ask to fill in the log
                 // record content at the allocated space.
-                logger.log(context, logLocator, requestedSpaceForLog, loggerArguments);
-                logger.postLog(context, loggerArguments);
+                logger.log(context, logicalLogLocator, logContentSize, reusableLogContentObject);
+                logger.postLog(context, reusableLogContentObject);
             }
 
             /*
              * The log record has been written. For integrity checks, compute
              * the checksum and put it at the end of the log record.
              */
-            int startPosChecksum = logLocator.getMemoryOffset() - logManagerProperties.getLogHeaderSize();
-            int length = totalLogSize - logManagerProperties.getLogChecksumSize();
+            int startPosChecksum = logicalLogLocator.getMemoryOffset() - logRecordHelper.getLogHeaderSize(logType);
+            int length = totalLogSize - logRecordHelper.getLogChecksumSize();
             long checksum = DataUtil.getChecksum(logPages[pageIndex], startPosChecksum, length);
-            logPages[pageIndex].writeLong(pageOffset + logManagerProperties.getLogHeaderSize() + requestedSpaceForLog,
-                    checksum);
+            logPages[pageIndex].writeLong(pageOffset + logRecordHelper.getLogHeaderSize(logType)
+                    + logContentSize, checksum);
 
             /*
              * release the ownership as the log record has been placed in
@@ -704,7 +704,7 @@
             buffer.limit(buffer.getInt(4));
             MemBasedBuffer memBuffer = new MemBasedBuffer(buffer.slice());
             logicalLogLocator = new LogicalLogLocator(physicalLogLocator.getLsn(), memBuffer, 0, this);
-            if (!logRecordHelper.validateLogRecord(logManagerProperties, logicalLogLocator)) {
+            if (!logRecordHelper.validateLogRecord(logicalLogLocator)) {
                 throw new ACIDException(" invalid log record at lsn " + physicalLogLocator.getLsn());
             }
         } catch (Exception fnfe) {
@@ -761,7 +761,7 @@
                     logLocator = new LogicalLogLocator(lsnValue, memBuffer, 0, this);
                     try {
                         // validate the log record by comparing checksums
-                        if (!logRecordHelper.validateLogRecord(logManagerProperties, logLocator)) {
+                        if (!logRecordHelper.validateLogRecord(logLocator)) {
                             throw new ACIDException(" invalid log record at lsn " + physicalLogLocator);
                         }
                     } catch (Exception e) {
@@ -779,6 +779,7 @@
         return readDiskLog(physicalLogLocator);
     }
 
+    @Override
     public ILogRecordHelper getLogRecordHelper() {
         return logRecordHelper;
     }
@@ -789,6 +790,7 @@
      * logic to event based when log manager support is integrated with the
      * Buffer Manager.
      */
+    @Override
     public synchronized void flushLog(LogicalLogLocator logicalLogLocator) throws ACIDException {
         if (logicalLogLocator.getLsn() > lsn.get()) {
             throw new ACIDException(" invalid lsn " + logicalLogLocator.getLsn());
@@ -828,6 +830,7 @@
         return pageNo == 0 ? numLogPages - 1 : pageNo - 1;
     }
 
+    @Override
     public LogManagerProperties getLogManagerProperties() {
         return logManagerProperties;
     }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManagerProperties.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManagerProperties.java
index 14b45b6..fe48743 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManagerProperties.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManagerProperties.java
@@ -48,14 +48,6 @@
     // flushing.
     private int logBufferSize = logPageSize * numLogPages;
 
-    private final int logHeaderSize = 43; /*
-                                           * ( magic number(4) + (length(4) +
-                                           * type(1) + actionType(1) +
-                                           * timestamp(8) + transacitonId(8) +
-                                           * resourceMgrId(1) + pageId(8) +
-                                           * prevLSN(8)
-                                           */
-    private int logTailSize = 8; /* checksum(8) */
     public int logMagicNumber = 123456789;
 
     public static final String LOG_PARTITION_SIZE_KEY = "log_partition_size";
@@ -104,18 +96,6 @@
         return logDir;
     }
 
-    public int getLogHeaderSize() {
-        return logHeaderSize;
-    }
-
-    public int getLogChecksumSize() {
-        return logTailSize;
-    }
-
-    public int getTotalLogRecordLength(int logContentSize) {
-        return logContentSize + logHeaderSize + logTailSize;
-    }
-
     public int getLogPageSize() {
         return logPageSize;
     }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java
index 7575b45..ef8fc54 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java
@@ -21,84 +21,83 @@
  * for writing/reading of log header and checksum as well as validating log
  * record by checksum comparison. Every ILogManager implementation has an
  * associated ILogRecordHelper implementation.
+ * == LogRecordFormat ==
+ * [Header]
+ * --------------------------- Header part1(17) : Both COMMIT and UPDATE log type have part1 fields
+ * LogMagicNumber(4)
+ * LogType(1)
+ * JobId(4)
+ * DatasetId(4) //stored in dataset_dataset in Metadata Node
+ * PKHashValue(4)
+ * --------------------------- Header part2(21) : Only UPDATE log type has part2 fields
+ * PrevLSN(8) //only for UPDATE
+ * ResourceId(8) //stored in .metadata of the corresponding index in NC node
+ * ResourceMgrId(1)
+ * LogRecordSize(4)
+ * --------------------------- COMMIT doesn't have Body fields.
+ * [Body] The Body size is given through the parameter reusableLogContentObjectLength
+ * NewOp(1)
+ * NewValueLength(4)
+ * NewValue(NewValueLength)
+ * OldOp(1)
+ * OldValueLength(4)
+ * OldValue(OldValueLength)
+ * --------------------------- Both COMMIT and UPDATE have tail fields.
+ * [Tail]
+ * Checksum(8)
  */
 public class LogRecordHelper implements ILogRecordHelper {
 
-    private final int BEGIN_MAGIC_NO_POS = 0;
-    private final int BEGING_LENGTH_POS = 4;
-    private final int BEGIN_TYPE_POS = 8;
-    private final int BEGIN_ACTION_TYPE_POS = 9;
-    private final int BEGIN_TIMESTAMP_POS = 10;
-    private final int BEGIN_TRANSACTION_ID_POS = 18;
-    /*
-    private final int BEGIN_RESOURCE_MGR_ID_POS = 26;
-    private final int BEGIN_PAGE_ID_POS = 27;
-    private final int BEGIN_PREV_LSN_POS = 35;
-    */
-    private final int BEGIN_RESOURCE_MGR_ID_POS = 22;
-    private final int BEGIN_PAGE_ID_POS = 23;
-    private final int BEGIN_PREV_LSN_POS = 31;
+    private final int LOG_CHECKSUM_SIZE = 8;
 
-    
+    private final int MAGIC_NO_POS = 0;
+    private final int LOG_TYPE_POS = 4;
+    private final int JOB_ID_POS = 5;
+    private final int DATASET_ID_POS = 9;
+    private final int PK_HASH_VALUE_POS = 13;
+    private final int PREV_LSN_POS = 17;
+    private final int RESOURCE_ID_POS = 25;
+    private final int RESOURCE_MGR_ID_POS = 33;
+    private final int LOG_RECORD_SIZE_POS = 34;
+
     private ILogManager logManager;
 
     public LogRecordHelper(ILogManager logManager) {
         this.logManager = logManager;
     }
 
+    @Override
     public byte getLogType(LogicalLogLocator logicalLogLocator) {
-        return logicalLogLocator.getBuffer().getByte(logicalLogLocator.getMemoryOffset() + BEGIN_TYPE_POS);
+        return logicalLogLocator.getBuffer().getByte(logicalLogLocator.getMemoryOffset() + LOG_TYPE_POS);
     }
 
-    public byte getLogActionType(LogicalLogLocator logicalLogLocator) {
-        return logicalLogLocator.getBuffer().getByte(logicalLogLocator.getMemoryOffset() + BEGIN_ACTION_TYPE_POS);
+    @Override
+    public int getJobId(LogicalLogLocator logicalLogLocator) {
+        return logicalLogLocator.getBuffer().readInt(logicalLogLocator.getMemoryOffset() + JOB_ID_POS);
     }
 
-    public int getLogLength(LogicalLogLocator logicalLogLocator) {
-        return (logicalLogLocator.getBuffer()).readInt(logicalLogLocator.getMemoryOffset() + BEGING_LENGTH_POS);
+    @Override
+    public int getDatasetId(LogicalLogLocator logicalLogLocator) {
+        return logicalLogLocator.getBuffer().readInt(logicalLogLocator.getMemoryOffset() + DATASET_ID_POS);
     }
 
-    public long getLogTimestamp(LogicalLogLocator logicalLogLocator) {
-        return (logicalLogLocator.getBuffer()).readLong(logicalLogLocator.getMemoryOffset() + BEGIN_TIMESTAMP_POS);
+    @Override
+    public int getPKHashValue(LogicalLogLocator logicalLogLocator) {
+        return logicalLogLocator.getBuffer().readInt(logicalLogLocator.getMemoryOffset() + PK_HASH_VALUE_POS);
     }
 
-    public long getLogChecksum(LogicalLogLocator logicalLogLocator) {
-        return (logicalLogLocator.getBuffer()).readLong(logicalLogLocator.getMemoryOffset()
-                + getLogLength(logicalLogLocator) - 8);
-    }
-
-    public long getLogTransactionId(LogicalLogLocator logicalLogLocator) {
-        return (logicalLogLocator.getBuffer()).readInt(logicalLogLocator.getMemoryOffset() + BEGIN_TRANSACTION_ID_POS);
-    }
-
-    public byte getResourceMgrId(LogicalLogLocator logicalLogLocator) {
-        return (logicalLogLocator.getBuffer()).getByte(logicalLogLocator.getMemoryOffset() + BEGIN_RESOURCE_MGR_ID_POS);
-    }
-
-    public long getPageId(LogicalLogLocator logicalLogLocator) {
-        return (logicalLogLocator.getBuffer()).readLong(logicalLogLocator.getMemoryOffset() + BEGIN_PAGE_ID_POS);
-    }
-
-    public int getLogContentBeginPos(LogicalLogLocator logicalLogLocator) {
-        return logicalLogLocator.getMemoryOffset() + logManager.getLogManagerProperties().getLogHeaderSize();
-    }
-
-    public int getLogContentEndPos(LogicalLogLocator logicalLogLocator) {
-        return logicalLogLocator.getMemoryOffset() + getLogLength(logicalLogLocator)
-                - logManager.getLogManagerProperties().getLogChecksumSize();
-    }
-
-    public PhysicalLogLocator getPreviousLsnByTransaction(LogicalLogLocator logicalLogLocator) {
-        long prevLsnValue = (logicalLogLocator.getBuffer()).readLong(logicalLogLocator.getMemoryOffset()
-                + BEGIN_PREV_LSN_POS);
+    @Override
+    public PhysicalLogLocator getPrevLSN(LogicalLogLocator logicalLogLocator) {
+        long prevLsnValue = (logicalLogLocator.getBuffer())
+                .readLong(logicalLogLocator.getMemoryOffset() + PREV_LSN_POS);
         PhysicalLogLocator previousLogLocator = new PhysicalLogLocator(prevLsnValue, logManager);
         return previousLogLocator;
     }
 
-    public boolean getPreviousLsnByTransaction(PhysicalLogLocator physicalLogLocator,
-            LogicalLogLocator logicalLogLocator) {
-        long prevLsnValue = (logicalLogLocator.getBuffer()).readLong(logicalLogLocator.getMemoryOffset()
-                + BEGIN_PREV_LSN_POS);
+    @Override
+    public boolean getPrevLSN(PhysicalLogLocator physicalLogLocator, LogicalLogLocator logicalLogLocator) {
+        long prevLsnValue = (logicalLogLocator.getBuffer())
+                .readLong(logicalLogLocator.getMemoryOffset() + PREV_LSN_POS);
         if (prevLsnValue == -1) {
             return false;
         }
@@ -106,6 +105,38 @@
         return true;
     }
 
+    @Override
+    public long getResourceId(LogicalLogLocator logicalLogLocator) {
+        return logicalLogLocator.getBuffer().readLong(logicalLogLocator.getMemoryOffset() + RESOURCE_ID_POS);
+    }
+
+    @Override
+    public byte getResourceMgrId(LogicalLogLocator logicalLogLocater) {
+        return logicalLogLocater.getBuffer().getByte(logicalLogLocater.getMemoryOffset() + RESOURCE_MGR_ID_POS);
+    }
+
+    @Override
+    public int getLogRecordSize(LogicalLogLocator logicalLogLocater) {
+        return logicalLogLocater.getBuffer().readInt(logicalLogLocater.getMemoryOffset() + LOG_RECORD_SIZE_POS);
+    }
+
+    @Override
+    public long getLogChecksum(LogicalLogLocator logicalLogLocator) {
+        return (logicalLogLocator.getBuffer()).readLong(logicalLogLocator.getMemoryOffset()
+                + getLogRecordSize(logicalLogLocator) - LOG_CHECKSUM_SIZE);
+    }
+
+    @Override
+    public int getLogContentBeginPos(LogicalLogLocator logicalLogLocator) {
+        return logicalLogLocator.getMemoryOffset() + getLogHeaderSize(getLogType(logicalLogLocator));
+    }
+
+    @Override
+    public int getLogContentEndPos(LogicalLogLocator logicalLogLocator) {
+        return logicalLogLocator.getMemoryOffset() + getLogRecordSize(logicalLogLocator) - LOG_CHECKSUM_SIZE;
+    }
+
+    @Override
     public String getLogRecordForDisplay(LogicalLogLocator logicalLogLocator) {
         StringBuilder builder = new StringBuilder();
         byte logType = new Byte(getLogType(logicalLogLocator));
@@ -118,66 +149,92 @@
                 logTypeDisplay = "UPDATE";
                 break;
         }
-        builder.append(" Log Type :" + logTypeDisplay);
-        builder.append(" Log Length :" + getLogLength(logicalLogLocator));
-        builder.append(" Log Timestamp:" + getLogTimestamp(logicalLogLocator));
-        builder.append(" Log Transaction Id:" + getLogTransactionId(logicalLogLocator));
-        builder.append(" Log Resource Mgr Id:" + getResourceMgrId(logicalLogLocator));
-        builder.append(" Page Id:" + getPageId(logicalLogLocator));
-        builder.append(" Log Checksum:" + getLogChecksum(logicalLogLocator));
-        builder.append(" Log Previous lsn: " + getPreviousLsnByTransaction(logicalLogLocator));
-        return new String(builder);
-    }
-
-    public void writeLogHeader(TransactionContext context, LogicalLogLocator logicalLogLocator, byte resourceMgrId,
-            long pageId, byte logType, byte logActionType, int logContentSize, long prevLogicalLogLocator) {
-        /* magic no */
-        (logicalLogLocator.getBuffer()).writeInt(logicalLogLocator.getMemoryOffset() + BEGIN_MAGIC_NO_POS,
-                logManager.getLogManagerProperties().logMagicNumber);
-
-        /* length */
-        int length = logManager.getLogManagerProperties().getTotalLogRecordLength(logContentSize);
-        (logicalLogLocator.getBuffer()).writeInt(logicalLogLocator.getMemoryOffset() + BEGING_LENGTH_POS, length);
-
-        /* log type */
-        (logicalLogLocator.getBuffer()).put(logicalLogLocator.getMemoryOffset() + BEGIN_TYPE_POS, logType);
-
-        /* log action type */
-        (logicalLogLocator.getBuffer()).put(logicalLogLocator.getMemoryOffset() + BEGIN_ACTION_TYPE_POS, logActionType);
-
-        /* timestamp */
-        long timestamp = System.currentTimeMillis();
-        (logicalLogLocator.getBuffer()).writeLong(logicalLogLocator.getMemoryOffset() + BEGIN_TIMESTAMP_POS, timestamp);
-
-        /* transaction id */
-        (logicalLogLocator.getBuffer()).writeInt(logicalLogLocator.getMemoryOffset() + BEGIN_TRANSACTION_ID_POS,
-                context.getJobId().getId());
-
-        /* resource Mgr id */
-        (logicalLogLocator.getBuffer()).put(logicalLogLocator.getMemoryOffset() + BEGIN_RESOURCE_MGR_ID_POS,
-                resourceMgrId);
-
-        /* page id */
-        (logicalLogLocator.getBuffer()).writeLong(logicalLogLocator.getMemoryOffset() + BEGIN_PAGE_ID_POS, pageId);
-
-        /* previous LSN's File Id by the transaction */
-        (logicalLogLocator.getBuffer()).writeLong(logicalLogLocator.getMemoryOffset() + BEGIN_PREV_LSN_POS,
-                prevLogicalLogLocator);
-    }
-
-    public void writeLogTail(LogicalLogLocator logicalLogLocator, ILogManager logManager) {
-        (logicalLogLocator.getBuffer()).writeInt(logicalLogLocator.getMemoryOffset(),
-                logManager.getLogManagerProperties().logMagicNumber);
+        builder.append(" Log Type : ").append(logTypeDisplay);
+        builder.append(" Job Id : ").append(getJobId(logicalLogLocator));
+        builder.append(" Dataset Id : ").append(getDatasetId(logicalLogLocator));
+        builder.append(" PK Hash Value : ").append(getPKHashValue(logicalLogLocator));
+        builder.append(" PrevLSN : ").append(getPrevLSN(logicalLogLocator));
+        builder.append(" Resource Id : ").append(getResourceId(logicalLogLocator));
+        builder.append(" ResourceMgr Id : ").append(getResourceMgrId(logicalLogLocator));
+        builder.append(" Log Record Size : ").append(getLogRecordSize(logicalLogLocator));
+        return builder.toString();
     }
 
     @Override
-    public boolean validateLogRecord(LogManagerProperties logManagerProperties, LogicalLogLocator logicalLogLocator) {
-        int logLength = this.getLogLength(logicalLogLocator);
+    public void writeLogHeader(LogicalLogLocator logicalLogLocator, byte logType, TransactionContext context,
+            int datasetId, int PKHashValue, long prevLogicalLogLocator, long resourceId, byte resourceMgrId,
+            int logRecordSize) {
+
+        /* magic no */
+        (logicalLogLocator.getBuffer()).writeInt(logicalLogLocator.getMemoryOffset() + MAGIC_NO_POS,
+                logManager.getLogManagerProperties().logMagicNumber);
+
+        /* log type */
+        (logicalLogLocator.getBuffer()).put(logicalLogLocator.getMemoryOffset() + LOG_TYPE_POS, logType);
+
+        /* jobId */
+        (logicalLogLocator.getBuffer()).writeInt(logicalLogLocator.getMemoryOffset() + JOB_ID_POS, context.getJobId()
+                .getId());
+
+        /* datasetId */
+        (logicalLogLocator.getBuffer()).writeInt(logicalLogLocator.getMemoryOffset() + DATASET_ID_POS, datasetId);
+
+        /* PK hash value */
+        (logicalLogLocator.getBuffer()).writeInt(logicalLogLocator.getMemoryOffset() + PK_HASH_VALUE_POS, PKHashValue);
+
+        if (logType == LogType.UPDATE) {
+            /* prevLSN */
+            (logicalLogLocator.getBuffer()).writeLong(logicalLogLocator.getMemoryOffset() + PREV_LSN_POS,
+                    prevLogicalLogLocator);
+
+            /* resourceId */
+            (logicalLogLocator.getBuffer())
+                    .writeLong(logicalLogLocator.getMemoryOffset() + RESOURCE_ID_POS, resourceId);
+
+            /* resourceMgr id */
+            (logicalLogLocator.getBuffer()).put(logicalLogLocator.getMemoryOffset() + RESOURCE_MGR_ID_POS,
+                    resourceMgrId);
+
+            /* log record size */
+            (logicalLogLocator.getBuffer()).writeInt(logicalLogLocator.getMemoryOffset() + LOG_RECORD_SIZE_POS,
+                    logRecordSize);
+        }
+    }
+
+    @Override
+    public boolean validateLogRecord(LogicalLogLocator logicalLogLocator) {
+        int logLength = this.getLogRecordSize(logicalLogLocator);
         long expectedChecksum = DataUtil.getChecksum(logicalLogLocator.getBuffer(),
-                logicalLogLocator.getMemoryOffset(), logLength - logManagerProperties.getLogChecksumSize());
-        long actualChecksum = logicalLogLocator.getBuffer().readLong(
-                logicalLogLocator.getMemoryOffset() + logLength - logManagerProperties.getLogChecksumSize());
+                logicalLogLocator.getMemoryOffset(), logLength - LOG_CHECKSUM_SIZE);
+        long actualChecksum = getLogChecksum(logicalLogLocator);
         return expectedChecksum == actualChecksum;
     }
 
+    /**
+     * @param logType
+     * @param logBodySize
+     * @return
+     */
+    @Override
+    public int getLogRecordSize(byte logType, int logBodySize) {
+        if (logType == LogType.UPDATE) {
+            return 46 + logBodySize;
+        } else {
+            return 25;
+        }
+    }
+
+    @Override
+    public int getLogHeaderSize(byte logType) {
+        if (logType == LogType.UPDATE) {
+            return 38;
+        } else {
+            return 17;
+        }
+    }
+
+    @Override
+    public int getLogChecksumSize() {
+        return LOG_CHECKSUM_SIZE;
+    }
 }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogType.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogType.java
index 72b393d..dd276c6 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogType.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogType.java
@@ -18,8 +18,5 @@
 
     public static final byte UPDATE = 0;
     public static final byte COMMIT = 1;
-    public static final byte CLR = 2;
-    public static final byte BGN_CHPKT = 3;
-    public static final byte END_CHPKT = 4;
 
 }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/TreeLogger.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/TreeLogger.java
deleted file mode 100644
index b742ada..0000000
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/TreeLogger.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Copyright 2009-2011 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.transaction.management.service.logging;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
-import edu.uci.ics.asterix.transaction.management.resource.ICloseable;
-import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
-import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionProvider;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
-import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
-
-/**
- * Represents a utility class for generating log records corresponding to
- * operations on a ITreeIndex implementation. A TreeLogger instance is thread
- * safe and can be shared across multiple threads that may belong to same or
- * different transactions.
- */
-class TransactionState {
-
-    private final Map<Long, TxnThreadState> transactionThreads = new HashMap<Long, TxnThreadState>();
-
-    public synchronized TxnThreadState getTransactionThreadState(long threadId) {
-        return transactionThreads.get(threadId);
-    }
-
-    public synchronized void putTransactionThreadState(long threadId, TxnThreadState txnThreadState) {
-        this.transactionThreads.put(threadId, txnThreadState);
-    }
-
-    public synchronized void remove(long threadId) {
-        transactionThreads.remove(threadId);
-    }
-}
-
-/**
- * Represents the state of a transaction thread. The state contains information
- * that includes the tuple being operated, the operation and the location of the
- * log record corresponding to the operation.
- */
-class TxnThreadState {
-
-    private ITupleReference tuple;
-    private IndexOperation indexOperation;
-    private LogicalLogLocator logicalLogLocator;
-
-    public TxnThreadState(LogicalLogLocator logicalLogLocator, IndexOperation indexOperation, ITupleReference tupleReference) {
-        this.tuple = tupleReference;
-        this.indexOperation = indexOperation;
-        this.logicalLogLocator = logicalLogLocator;
-    }
-
-    public synchronized ITupleReference getTuple() {
-        return tuple;
-    }
-
-    public synchronized void setTuple(ITupleReference tuple) {
-        this.tuple = tuple;
-    }
-
-    public synchronized IndexOperation getIndexOperation() {
-        return indexOperation;
-    }
-
-    public synchronized void setIndexOperation(IndexOperation indexOperation) {
-        this.indexOperation = indexOperation;
-    }
-
-    public synchronized LogicalLogLocator getLogicalLogLocator() {
-        return logicalLogLocator;
-    }
-
-    public synchronized void setLogicalLogLocator(LogicalLogLocator logicalLogLocator) {
-        this.logicalLogLocator = logicalLogLocator;
-    }
-
-}
-
-public class TreeLogger implements ILogger, ICloseable {
-
-    private static final byte resourceMgrId = TreeResourceManager.ID;
-    private final Map<Object, Object> arguments = new ConcurrentHashMap<Object, Object>();
-
-    public static final String TREE_INDEX = "TREE_INDEX";
-    public static final String TUPLE_REFERENCE = "TUPLE_REFERENCE";
-    public static final String TUPLE_WRITER = "TUPLE_WRITER";
-    public static final String INDEX_OPERATION = "INDEX_OPERATION";
-    public static final String RESOURCE_ID = "RESOURCE_ID";
-
-    private final ITreeIndex treeIndex;
-    private final ITreeIndexTupleWriter treeIndexTupleWriter;
-    private final byte[] resourceIdBytes;
-    private final byte[] resourceIdLengthBytes;
-
-    public class BTreeOperationCodes {
-        public static final byte INSERT = 0;
-        public static final byte DELETE = 1;
-    }
-
-    public TreeLogger(byte[] resourceIdBytes, ITreeIndex treeIndex) {
-        this.resourceIdBytes = resourceIdBytes;
-        this.treeIndex = treeIndex;
-        treeIndexTupleWriter = treeIndex.getLeafFrameFactory().getTupleWriterFactory().createTupleWriter();
-        this.resourceIdLengthBytes = DataUtil.intToByteArray(resourceIdBytes.length);
-    }
-
-    public synchronized void close(TransactionContext context) {
-        TransactionState txnState = (TransactionState) arguments.get(context.getJobId());
-        txnState.remove(Thread.currentThread().getId());
-        arguments.remove(context.getJobId());
-    }
-
-    public void generateLogRecord(TransactionProvider provider, TransactionContext context, IndexOperation operation,
-            ITupleReference tuple) throws ACIDException {
-        context.addCloseableResource(this); // the close method would be called
-        // on this TreeLogger instance at
-        // the time of transaction
-        // commit/abort.
-        if (operation != IndexOperation.INSERT && operation != IndexOperation.DELETE) {
-            throw new ACIDException("Loging for Operation " + operation + " not supported");
-
-        }
-
-        TxnThreadState txnThreadState = null;
-        TransactionState txnState;
-        txnState = (TransactionState) arguments.get(context.getJobId());
-        if (txnState == null) {
-            synchronized (context) { // threads belonging to different
-                // transaction do not need to
-                // synchronize amongst them.
-                if (txnState == null) {
-                    txnState = new TransactionState();
-                    arguments.put(context.getJobId(), txnState);
-                }
-            }
-        }
-
-        txnThreadState = txnState.getTransactionThreadState(Thread.currentThread().getId());
-        if (txnThreadState == null) {
-            LogicalLogLocator logicalLogLocator = LogUtil.getDummyLogicalLogLocator(provider.getLogManager());
-            txnThreadState = new TxnThreadState(logicalLogLocator, operation, tuple);
-            txnState.putTransactionThreadState(Thread.currentThread().getId(), txnThreadState);
-        }
-        txnThreadState.setIndexOperation(operation);
-        txnThreadState.setTuple(tuple);
-        int tupleSize = treeIndexTupleWriter.bytesRequired(tuple);
-        // Below 4 is for the int representing the length of resource id and 1
-        // is for
-        // the byte representing the operation
-        int logContentLength = 4 + resourceIdBytes.length + 1 + tupleSize;
-        provider.getLogManager().log(txnThreadState.getLogicalLogLocator(), context, resourceMgrId, 0L, LogType.UPDATE,
-                LogActionType.REDO_UNDO, logContentLength, (ILogger) this, arguments);
-    }
-
-    @Override
-    public void log(TransactionContext context, LogicalLogLocator logicalLogLocator, int logRecordSize,
-            Map<Object, Object> loggerArguments) throws ACIDException {
-        TransactionState txnState = (TransactionState) loggerArguments.get(context.getJobId());
-        TxnThreadState state = (TxnThreadState) txnState.getTransactionThreadState(Thread.currentThread().getId());
-        int count = 0;
-        byte[] logBuffer = logicalLogLocator.getBuffer().getArray();
-        System.arraycopy(resourceIdLengthBytes, 0, logBuffer, logicalLogLocator.getMemoryOffset(), 4);
-        count += 4; // count is incremented by 4 because we wrote the length
-                    // that is an int and hence 4 bytes
-        System.arraycopy(resourceIdBytes, 0, logBuffer, logicalLogLocator.getMemoryOffset() + count,
-                resourceIdBytes.length);
-        count += resourceIdBytes.length;
-        logBuffer[logicalLogLocator.getMemoryOffset() + count] = (byte) state.getIndexOperation().ordinal();
-        count += 1; // count is incremented by 1 to account for the byte
-                    // written.
-        treeIndexTupleWriter.writeTuple(state.getTuple(), logicalLogLocator.getBuffer().getArray(),
-                logicalLogLocator.getMemoryOffset() + count);
-    }
-
-    @Override
-    public void postLog(TransactionContext context, Map<Object, Object> loggerArguments) throws ACIDException {
-    }
-
-    @Override
-    public void preLog(TransactionContext context, Map<Object, Object> loggerArguments) throws ACIDException {
-    }
-
-}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/TreeLoggerRepository.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/TreeLoggerRepository.java
deleted file mode 100644
index 34e1466..0000000
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/TreeLoggerRepository.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Copyright 2009-2011 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.transaction.management.service.logging;
-
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
-
-import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionProvider;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
-
-public class TreeLoggerRepository {
-
-    private final Map<ByteBuffer, TreeLogger> loggers = new HashMap<ByteBuffer, TreeLogger>();
-    private final TransactionProvider provider;
-
-    public TreeLoggerRepository(TransactionProvider provider) {
-        this.provider = provider;
-    }
-
-    public synchronized TreeLogger getTreeLogger(byte[] resourceIdBytes) {
-        ByteBuffer resourceId = ByteBuffer.wrap(resourceIdBytes);
-        TreeLogger logger = loggers.get(resourceId);
-        if (logger == null) {
-            ITreeIndex treeIndex = (ITreeIndex) provider.getTransactionalResourceRepository().getTransactionalResource(
-                    resourceIdBytes);
-            logger = new TreeLogger(resourceIdBytes, treeIndex);
-            loggers.put(resourceId, logger);
-        }
-        return logger;
-    }
-}
\ No newline at end of file
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/TreeResourceManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/TreeResourceManager.java
deleted file mode 100644
index 4e6ad80..0000000
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/TreeResourceManager.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Copyright 2009-2011 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.asterix.transaction.management.service.logging;
-
-import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
-import edu.uci.ics.asterix.transaction.management.service.transaction.IResourceManager;
-import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionProvider;
-import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
-import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleReference;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
-
-public class TreeResourceManager implements IResourceManager {
-
-    public static final byte ID = (byte) 1;
-
-    private final TransactionProvider provider;
-
-    public TreeResourceManager(TransactionProvider provider) {
-        this.provider = provider;
-    }
-
-    public byte getResourceManagerId() {
-        return ID;
-    }
-
-    public void undo(ILogRecordHelper logRecordHelper, LogicalLogLocator logLocator) throws ACIDException {
-        int logContentBeginPos = logRecordHelper.getLogContentBeginPos(logLocator);
-        byte[] logBufferContent = logLocator.getBuffer().getArray();
-        // read the length of resource id byte array
-        int resourceIdLength = DataUtil.byteArrayToInt(logBufferContent, logContentBeginPos);
-        byte[] resourceIdBytes = new byte[resourceIdLength];
-
-        // copy the resource if bytes
-        System.arraycopy(logBufferContent, logContentBeginPos + 4, resourceIdBytes, 0, resourceIdLength);
-
-        // look up the repository to obtain the resource object
-        ITreeIndex treeIndex = (ITreeIndex) provider.getTransactionalResourceRepository().getTransactionalResource(
-                resourceIdBytes);
-        int operationOffset = logContentBeginPos + 4 + resourceIdLength;
-        int tupleBeginPos = operationOffset + 1;
-
-        ITreeIndexTupleReference tupleReference = treeIndex.getLeafFrameFactory().getTupleWriterFactory()
-                .createTupleWriter().createTupleReference();
-        // TODO: remove this call.
-        tupleReference.setFieldCount(tupleReference.getFieldCount());
-        tupleReference.resetByTupleOffset(logLocator.getBuffer().getByteBuffer(), tupleBeginPos);
-        byte operation = logBufferContent[operationOffset];
-        IIndexAccessor treeIndexAccessor = treeIndex.createAccessor(NoOpOperationCallback.INSTANCE,
-                NoOpOperationCallback.INSTANCE);
-        try {
-            switch (operation) {
-                case TreeLogger.BTreeOperationCodes.INSERT:
-                    treeIndexAccessor.delete(tupleReference);
-                    break;
-                case TreeLogger.BTreeOperationCodes.DELETE:
-                    treeIndexAccessor.insert(tupleReference);
-                    break;
-            }
-        } catch (Exception e) {
-            throw new ACIDException(" could not rollback ", e);
-        }
-    }
-
-    public void redo(ILogRecordHelper logRecordHelper, LogicalLogLocator logicalLogLocator) throws ACIDException {
-        throw new UnsupportedOperationException(" Redo logic will be implemented as part of crash recovery feature");
-    }
-
-}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
index e96cc73..c810170 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -115,15 +115,7 @@
                 if (resourceMgr == null) {
                     throw new ACIDException("unknown resource mgr with id " + resourceMgrId);
                 } else {
-                    byte actionType = parser.getLogActionType(memLSN);
-                    switch (actionType) {
-                        case LogActionType.REDO:
-                            resourceMgr.redo(parser, memLSN);
-                            break;
-                        case LogActionType.UNDO: /* skip these records */
-                            break;
-                        default: /* do nothing */
-                    }
+                    resourceMgr.redo(parser, memLSN);
                 }
                 writeCheckpointRecord(memLSN.getLsn());
             }
@@ -204,7 +196,7 @@
             byte logType = parser.getLogType(logLocator);
             if (LOGGER.isLoggable(Level.FINE)) {
                 LOGGER.fine(" reading LSN value inside rollback transaction method " + txnContext.getLastLogLocator()
-                        + " txn id " + parser.getLogTransactionId(logLocator) + " log type  " + logType);
+                        + " jodId " + parser.getJobId(logLocator) + " log type  " + logType);
             }
 
             switch (logType) {
@@ -222,23 +214,13 @@
                     if (resourceMgr == null) {
                         throw new ACIDException(txnContext, " unknown resource manager " + resourceMgrId);
                     } else {
-                        byte actionType = parser.getLogActionType(logLocator);
-                        switch (actionType) {
-                            case LogActionType.REDO: // no need to do anything
-                                break;
-                            case LogActionType.UNDO: // undo the log record
-                                resourceMgr.undo(parser, logLocator);
-                                break;
-                            case LogActionType.REDO_UNDO: // undo log record
-                                resourceMgr.undo(parser, logLocator);
-                                break;
-                            default:
-                        }
+                        resourceMgr.undo(parser, logLocator);
                     }
-                case LogType.CLR: // skip the CLRs as they are not undone
                     break;
                 case LogType.COMMIT:
                     throw new ACIDException(txnContext, " cannot rollback commmitted transaction");
+                default:
+                    throw new ACIDException("Unsupported LogType: " + logType);
 
             }
 
@@ -248,7 +230,7 @@
             // the logLocator object has been
             // appropriately set to the location of the next log record to be
             // processed as part of the roll back
-            boolean moreLogs = parser.getPreviousLsnByTransaction(lsn, logLocator);
+            boolean moreLogs = parser.getPrevLSN(lsn, logLocator);
             if (!moreLogs) {
                 // no more logs to process
                 break;
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/IResourceManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/IResourceManager.java
index 200527f..f7715e8 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/IResourceManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/IResourceManager.java
@@ -22,6 +22,12 @@
  * Provides APIs for undo or redo of an operation on a resource.
  */
 public interface IResourceManager {
+    
+    public class ResourceType {
+        public static final byte LSM_BTREE = 1;
+        public static final byte LSM_RTREE = 2;
+        public static final byte LSM_INVERTED_INDEX = 3;
+    }
 
     /**
      * Returns the unique identifier for the resource manager.
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/MutableResourceId.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/MutableResourceId.java
new file mode 100644
index 0000000..5552930
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/MutableResourceId.java
@@ -0,0 +1,30 @@
+package edu.uci.ics.asterix.transaction.management.service.transaction;
+
+public class MutableResourceId{
+    long id;
+
+    public MutableResourceId(long id) {
+        this.id = id;
+    }
+
+    public void setId(long id) {
+        this.id = id;
+    }
+
+    public long getId() {
+        return id;
+    }
+
+    @Override
+    public int hashCode() {
+        return (int)id;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if ((o == null) || !(o instanceof MutableResourceId)) {
+            return false;
+        }
+        return ((MutableResourceId) o).id == this.id;
+    }
+}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
index 63deb13..a6ef028 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -20,7 +20,6 @@
 import java.util.logging.Logger;
 
 import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
-import edu.uci.ics.asterix.transaction.management.service.logging.LogActionType;
 import edu.uci.ics.asterix.transaction.management.service.logging.LogType;
 
 /**
@@ -90,13 +89,9 @@
             }
 
             try {
-                if (txnContext.getTransactionType().equals(TransactionContext.TransactionType.READ_WRITE)) { // conditionally
-                    // write
-                    // commit
-                    // log
-                    // record
-                    transactionProvider.getLogManager().log(txnContext.getLastLogLocator(), txnContext, (byte) (-1), 0,
-                            LogType.COMMIT, LogActionType.NO_OP, 0, null, null);
+                if (txnContext.getTransactionType().equals(TransactionContext.TransactionType.READ_WRITE)) { 
+                    transactionProvider.getLogManager().log(LogType.COMMIT, txnContext, -1, -1, -1, (byte) 0, 0, null,
+                            null, txnContext.getLastLogLocator());
                 }
             } catch (ACIDException ae) {
                 if (LOGGER.isLoggable(Level.SEVERE)) {
@@ -125,5 +120,4 @@
     public TransactionProvider getTransactionProvider() {
         return transactionProvider;
     }
-
 }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionProvider.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionProvider.java
index ef843f4..84414b5 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionProvider.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionProvider.java
@@ -20,7 +20,7 @@
 import edu.uci.ics.asterix.transaction.management.service.locking.LockManager;
 import edu.uci.ics.asterix.transaction.management.service.logging.ILogManager;
 import edu.uci.ics.asterix.transaction.management.service.logging.LogManager;
-import edu.uci.ics.asterix.transaction.management.service.logging.TreeLoggerRepository;
+import edu.uci.ics.asterix.transaction.management.service.logging.IndexLoggerRepository;
 import edu.uci.ics.asterix.transaction.management.service.recovery.IRecoveryManager;
 import edu.uci.ics.asterix.transaction.management.service.recovery.RecoveryManager;
 
@@ -35,7 +35,7 @@
     private final ITransactionManager transactionManager;
     private final IRecoveryManager recoveryManager;
     private final TransactionalResourceRepository resourceRepository;
-    private final TreeLoggerRepository loggerRepository;
+    private final IndexLoggerRepository loggerRepository;
 
     public TransactionProvider(String id) throws ACIDException {
         this.id = id;
@@ -43,7 +43,7 @@
         this.logManager = new LogManager(this);
         this.lockManager = new LockManager(this);
         this.recoveryManager = new RecoveryManager(this);
-        this.loggerRepository = new TreeLoggerRepository(this);
+        this.loggerRepository = new IndexLoggerRepository(this);
         this.resourceRepository = new TransactionalResourceRepository();
     }
 
@@ -67,7 +67,7 @@
         return resourceRepository;
     }
     
-    public TreeLoggerRepository getTreeLoggerRepository() {
+    public IndexLoggerRepository getTreeLoggerRepository() {
         return loggerRepository;
     }
 
diff --git a/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/logging/BasicLogger.java b/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/logging/BasicLogger.java
index 3e95d6a..c9d01a0 100644
--- a/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/logging/BasicLogger.java
+++ b/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/logging/BasicLogger.java
@@ -14,12 +14,12 @@
  */
 package edu.uci.ics.asterix.transaction.management.logging;
 
-import java.util.Map;
 import java.util.Random;
 
 import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
 import edu.uci.ics.asterix.transaction.management.service.logging.IBuffer;
 import edu.uci.ics.asterix.transaction.management.service.logging.ILogger;
+import edu.uci.ics.asterix.transaction.management.service.logging.IndexLogger.ReusableLogContentObject;
 import edu.uci.ics.asterix.transaction.management.service.logging.LogicalLogLocator;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
 
@@ -29,7 +29,7 @@
     private static long count = 0;
 
     public void log(TransactionContext context, LogicalLogLocator wMemLSN, int length,
-            Map<Object, Object> loggerArguments) throws ACIDException {
+            ReusableLogContentObject reusableLogContentObject) throws ACIDException {
 
         byte[] logContent = getRandomBytes(length);
         try {
@@ -66,12 +66,12 @@
         return averageContentCreationTime;
     }
 
-    public void postLog(TransactionContext context, Map<Object, Object> loggerArguments) throws ACIDException {
+    public void postLog(TransactionContext context, ReusableLogContentObject reusableLogContentObject) throws ACIDException {
         // TODO Auto-generated method stub
 
     }
 
-    public void preLog(TransactionContext context, Map<Object, Object> loggerArguments) throws ACIDException {
+    public void preLog(TransactionContext context, ReusableLogContentObject reusableLogContentObject) throws ACIDException {
         // TODO Auto-generated method stub
 
     }
diff --git a/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/logging/test/TransactionWorkloadSimulator.java b/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/logging/test/TransactionWorkloadSimulator.java
index cc266fe..8977b04 100644
--- a/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/logging/test/TransactionWorkloadSimulator.java
+++ b/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/logging/test/TransactionWorkloadSimulator.java
@@ -29,6 +29,7 @@
 import edu.uci.ics.asterix.transaction.management.service.logging.LogicalLogLocator;
 import edu.uci.ics.asterix.transaction.management.service.transaction.DatasetId;
 import edu.uci.ics.asterix.transaction.management.service.transaction.IResourceManager;
+import edu.uci.ics.asterix.transaction.management.service.transaction.IResourceManager.ResourceType;
 import edu.uci.ics.asterix.transaction.management.service.transaction.JobIdFactory;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionProvider;
@@ -180,14 +181,14 @@
                 }
                 tempDatasetId.setId(resourceID);
                 TransactionWorkloadSimulator.lockManager.lock(tempDatasetId, -1, lockMode, context);
-                TransactionWorkloadSimulator.logManager.log(memLSN, context, ResourceMgrInfo.BTreeResourceMgrId,
-                        pageId, logType, logActionType, logSize, logger, null);
+                TransactionWorkloadSimulator.logManager.log(logType, context, resourceID,
+                        -1, resourceID, ResourceType.LSM_BTREE, logSize, null, logger, memLSN);
                 retry = false;
                 Thread.currentThread().sleep(TransactionWorkloadSimulator.workload.thinkTime);
                 logCount.incrementAndGet();
                 logByteCount.addAndGet(logSize
-                        + TransactionWorkloadSimulator.logManager.getLogManagerProperties().getLogHeaderSize()
-                        + TransactionWorkloadSimulator.logManager.getLogManagerProperties().getLogChecksumSize());
+                        + TransactionWorkloadSimulator.logManager.getLogRecordHelper().getLogHeaderSize(logType)
+                        + TransactionWorkloadSimulator.logManager.getLogRecordHelper().getLogChecksumSize());
                 myLogCount++;
             }
         } catch (ACIDException acide) {
diff --git a/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/test/FileLogger.java b/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/test/FileLogger.java
index 16c5d6b..54bb0b8 100644
--- a/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/test/FileLogger.java
+++ b/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/test/FileLogger.java
@@ -14,11 +14,11 @@
  */
 package edu.uci.ics.asterix.transaction.management.test;
 
-import java.util.Map;
 
 import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
 import edu.uci.ics.asterix.transaction.management.logging.IResource;
 import edu.uci.ics.asterix.transaction.management.service.logging.ILogger;
+import edu.uci.ics.asterix.transaction.management.service.logging.IndexLogger.ReusableLogContentObject;
 import edu.uci.ics.asterix.transaction.management.service.logging.LogicalLogLocator;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
 
@@ -39,14 +39,14 @@
     }
 
     @Override
-    public void preLog(TransactionContext context, Map<Object, Object> loggerArguments) throws ACIDException {
+    public void preLog(TransactionContext context, ReusableLogContentObject reusableLogContentObject) throws ACIDException {
         // TODO Auto-generated method stub
 
     }
 
     @Override
-    public void log(TransactionContext context, final LogicalLogLocator memLSN, int logRecordSize,
-            Map<Object, Object> loggerArguments) throws ACIDException {
+    public void log(TransactionContext context, final LogicalLogLocator memLSN, int logContentSize,
+            ReusableLogContentObject reusableLogContentObject) throws ACIDException {
         byte[] buffer = memLSN.getBuffer().getArray();
         byte[] content = logRecordContent.getBytes();
         for (int i = 0; i < resource.getId().length; i++) {
@@ -58,7 +58,7 @@
     }
 
     @Override
-    public void postLog(TransactionContext context, Map<Object, Object> loggerArguments) throws ACIDException {
+    public void postLog(TransactionContext context, ReusableLogContentObject reusableLogContentObject) throws ACIDException {
         // TODO Auto-generated method stub
 
     }
diff --git a/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/test/TransactionSimulator.java b/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/test/TransactionSimulator.java
index 4476955..f6e6d3a 100644
--- a/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/test/TransactionSimulator.java
+++ b/asterix-transactions/src/test/java/edu/uci/ics/asterix/transaction/management/test/TransactionSimulator.java
@@ -19,21 +19,20 @@
 
 import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
 import edu.uci.ics.asterix.transaction.management.logging.IResource;
-import edu.uci.ics.asterix.transaction.management.resource.TransactionalResourceRepository;
 import edu.uci.ics.asterix.transaction.management.service.locking.ILockManager;
 import edu.uci.ics.asterix.transaction.management.service.logging.ILogManager;
 import edu.uci.ics.asterix.transaction.management.service.logging.ILogger;
-import edu.uci.ics.asterix.transaction.management.service.logging.LogActionType;
 import edu.uci.ics.asterix.transaction.management.service.logging.LogType;
 import edu.uci.ics.asterix.transaction.management.service.logging.LogUtil;
 import edu.uci.ics.asterix.transaction.management.service.logging.LogicalLogLocator;
 import edu.uci.ics.asterix.transaction.management.service.recovery.IRecoveryManager;
 import edu.uci.ics.asterix.transaction.management.service.recovery.IRecoveryManager.SystemState;
 import edu.uci.ics.asterix.transaction.management.service.transaction.IResourceManager;
+import edu.uci.ics.asterix.transaction.management.service.transaction.IResourceManager.ResourceType;
 import edu.uci.ics.asterix.transaction.management.service.transaction.ITransactionManager;
 import edu.uci.ics.asterix.transaction.management.service.transaction.JobId;
-import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
 import edu.uci.ics.asterix.transaction.management.service.transaction.JobIdFactory;
+import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionProvider;
 
 public class TransactionSimulator {
@@ -78,15 +77,13 @@
             case INCREMENT:
                 finalValue = currentValue + 1;
                 int logRecordLength = ((FileLogger) logger).generateLogRecordContent(currentValue, finalValue);
-                logManager.log(memLSN, txnContext, FileResourceManager.id, 0, LogType.UPDATE, LogActionType.REDO_UNDO,
-                        logRecordLength, logger, null);
+                logManager.log(LogType.UPDATE, txnContext, 1, -1, 1, ResourceType.LSM_BTREE, 0, null, logger, memLSN);
                 ((FileResource) resource).increment();
                 break;
             case DECREMENT:
                 finalValue = currentValue - 1;
                 logRecordLength = ((FileLogger) logger).generateLogRecordContent(currentValue, finalValue);
-                logManager.log(memLSN, txnContext, FileResourceManager.id, 0, LogType.UPDATE, LogActionType.REDO_UNDO,
-                        logRecordLength, logger, null);
+                logManager.log(LogType.UPDATE, txnContext, 1, -1, 1, ResourceType.LSM_BTREE, 0, null, logger, memLSN);
                 ((FileResource) resource).decrement();
                 break;
         }