Adapt Replication to Append Only LSM Components

This change includes the following:
- Make LSN recording in a single tree in LSM components.
- Pass LSN byte offset with every tree index file being replicated.
- Add LSN to remote recovery logs to check which logs should be replayed.
- Update ILogRecord method names to better describe their operation.

Change-Id: I8bd2656746e1c293b981d5f43e80928314ccbad0
Reviewed-on: https://asterix-gerrit.ics.uci.edu/561
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
Reviewed-by: Ian Maxon <imaxon@apache.org>
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMIndexUtil.java b/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMIndexUtil.java
index 4259a10..c7b7e3e 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMIndexUtil.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMIndexUtil.java
@@ -18,17 +18,16 @@
  */
 package org.apache.asterix.common.dataflow;
 
-import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
 import org.apache.asterix.common.transactions.ILogManager;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
 
 public class AsterixLSMIndexUtil {
 
     public static void checkAndSetFirstLSN(AbstractLSMIndex lsmIndex, ILogManager logManager)
-            throws HyracksDataException, AsterixException {
-
+            throws HyracksDataException {
         // If the index has an empty memory component, we need to set its first LSN (For soft checkpoint)
         if (lsmIndex.isCurrentMutableComponentEmpty()) {
             //prevent transactions from incorrectly setting the first LSN on a modified component by checking the index is still empty
@@ -42,9 +41,10 @@
         }
     }
 
-    public static boolean lsmComponentFileHasLSN(AbstractLSMIndex lsmIndex, String componentFilePath) {
+    public static long getComponentFileLSNOffset(AbstractLSMIndex lsmIndex, ILSMComponent lsmComponent,
+            String componentFilePath) throws HyracksDataException {
         AbstractLSMIOOperationCallback ioOpCallback = (AbstractLSMIOOperationCallback) lsmIndex
                 .getIOOperationCallback();
-        return ioOpCallback.componentFileHasLSN(componentFilePath);
+        return ioOpCallback.getComponentFileLSNOffset(lsmComponent, componentFilePath);
     }
 }
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java b/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
index b3cae06..64201e2 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
@@ -21,13 +21,12 @@
 
 import java.util.List;
 
-import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.common.api.IMetaDataPageManager;
 import org.apache.hyracks.storage.am.common.api.ITreeIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
-import org.apache.hyracks.storage.common.file.BufferedFileHandle;
 
 // A single LSMIOOperationCallback per LSM index used to perform actions around Flush and Merge operations
 public abstract class AbstractLSMIOOperationCallback implements ILSMIOOperationCallback {
@@ -96,7 +95,7 @@
         treeIndex.getMetaManager().setLSN(componentLSN);
     }
 
-    protected long getTreeIndexLSN(ITreeIndex treeIndex) throws HyracksDataException {
+    public static long getTreeIndexLSN(ITreeIndex treeIndex) throws HyracksDataException {
         return treeIndex.getMetaManager().getLSN();
     }
 
@@ -104,7 +103,7 @@
         mutableLastLSNs[writeIndex] = lastLSN;
     }
 
-    public void setFirstLSN(long firstLSN) throws AsterixException {
+    public void setFirstLSN(long firstLSN) {
         // We make sure that this method is only called on an empty component so the first LSN is not set incorrectly
         firstLSNs[writeIndex] = firstLSN;
     }
@@ -124,7 +123,13 @@
         }
         return false;
     }
-    
-    public abstract boolean componentFileHasLSN(String componentFilePath);
 
+    /**
+     * @param component
+     * @param componentFilePath
+     * @return The LSN byte offset in the LSM disk component if the index is valid, otherwise {@link IMetaDataPageManager#INVALID_LSN_OFFSET}.
+     * @throws HyracksDataException
+     */
+    public abstract long getComponentFileLSNOffset(ILSMComponent component, String componentFilePath)
+            throws HyracksDataException;
 }
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java b/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
index 8b4fa01..b522687 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
@@ -23,6 +23,7 @@
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.btree.impls.BTree;
+import org.apache.hyracks.storage.am.common.api.IMetaDataPageManager;
 import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeDiskComponent;
 import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeFileManager;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
@@ -57,17 +58,18 @@
         long maxLSN = -1;
         for (ILSMComponent c : diskComponents) {
             BTree btree = ((LSMBTreeDiskComponent) c).getBTree();
-            maxLSN = Math.max(getTreeIndexLSN(btree), maxLSN);
+            maxLSN = Math.max(AbstractLSMIOOperationCallback.getTreeIndexLSN(btree), maxLSN);
         }
         return maxLSN;
     }
 
     @Override
-    public boolean componentFileHasLSN(String componentFilePath) {
-        if (componentFilePath.endsWith(LSMBTreeFileManager.BTREE_STRING)) {
-            return true;
+    public long getComponentFileLSNOffset(ILSMComponent diskComponent, String diskComponentFilePath)
+            throws HyracksDataException {
+        if (diskComponentFilePath.endsWith(LSMBTreeFileManager.BTREE_STRING)) {
+            LSMBTreeDiskComponent btreeComponent = (LSMBTreeDiskComponent) diskComponent;
+            return btreeComponent.getBTree().getMetaManager().getLSNOffset();
         }
-
-        return false;
+        return IMetaDataPageManager.INVALID_LSN_OFFSET;
     }
 }
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java b/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java
index 229ccd6..1d0c0a5 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java
@@ -21,6 +21,7 @@
 import java.util.List;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.common.api.IMetaDataPageManager;
 import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeWithBuddyDiskComponent;
 import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeWithBuddyFileManager;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
@@ -34,7 +35,6 @@
         if (newComponent != null) {
             LSMBTreeWithBuddyDiskComponent btreeComponent = (LSMBTreeWithBuddyDiskComponent) newComponent;
             putLSNIntoMetadata(btreeComponent.getBTree(), oldComponents);
-            putLSNIntoMetadata(btreeComponent.getBuddyBTree(), oldComponents);
         }
     }
 
@@ -51,19 +51,18 @@
         long maxLSN = -1;
         for (Object o : diskComponents) {
             LSMBTreeWithBuddyDiskComponent btreeComponent = (LSMBTreeWithBuddyDiskComponent) o;
-            maxLSN = Math.max(getTreeIndexLSN(btreeComponent.getBTree()), maxLSN);
+            maxLSN = Math.max(AbstractLSMIOOperationCallback.getTreeIndexLSN(btreeComponent.getBTree()), maxLSN);
         }
         return maxLSN;
     }
 
     @Override
-    public boolean componentFileHasLSN(String componentFilePath) {
-        if (componentFilePath.endsWith(LSMBTreeWithBuddyFileManager.BTREE_STRING)
-                || componentFilePath.endsWith(LSMBTreeWithBuddyFileManager.BUDDY_BTREE_STRING)) {
-            return true;
+    public long getComponentFileLSNOffset(ILSMComponent diskComponent, String diskComponentFilePath)
+            throws HyracksDataException {
+        if (diskComponentFilePath.endsWith(LSMBTreeWithBuddyFileManager.BTREE_STRING)) {
+            LSMBTreeWithBuddyDiskComponent btreeComponent = (LSMBTreeWithBuddyDiskComponent) diskComponent;
+            return btreeComponent.getBTree().getMetaManager().getLSNOffset();
         }
-
-        return false;
+        return IMetaDataPageManager.INVALID_LSN_OFFSET;
     }
-
 }
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java b/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
index 3e4ff04..faa9166 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
@@ -22,6 +22,7 @@
 import java.util.List;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.common.api.IMetaDataPageManager;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
 import org.apache.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndexDiskComponent;
@@ -55,17 +56,18 @@
         long maxLSN = -1;
         for (Object o : diskComponents) {
             LSMInvertedIndexDiskComponent invIndexComponent = (LSMInvertedIndexDiskComponent) o;
-            maxLSN = Math.max(getTreeIndexLSN(invIndexComponent.getDeletedKeysBTree()), maxLSN);
+            maxLSN = Math.max(AbstractLSMIOOperationCallback.getTreeIndexLSN(invIndexComponent.getDeletedKeysBTree()), maxLSN);
         }
         return maxLSN;
     }
 
     @Override
-    public boolean componentFileHasLSN(String componentFilePath) {
-        if (componentFilePath.endsWith(LSMInvertedIndexFileManager.DELETED_KEYS_BTREE_SUFFIX)) {
-            return true;
+    public long getComponentFileLSNOffset(ILSMComponent diskComponent, String diskComponentFilePath)
+            throws HyracksDataException {
+        if (diskComponentFilePath.endsWith(LSMInvertedIndexFileManager.DELETED_KEYS_BTREE_SUFFIX)) {
+            LSMInvertedIndexDiskComponent invIndexComponent = (LSMInvertedIndexDiskComponent) diskComponent;
+            return invIndexComponent.getDeletedKeysBTree().getMetaManager().getLSNOffset();
         }
-
-        return false;
+        return IMetaDataPageManager.INVALID_LSN_OFFSET;
     }
 }
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java b/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
index 7c483f3..4725d7a 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
@@ -22,6 +22,7 @@
 import java.util.List;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.common.api.IMetaDataPageManager;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
 import org.apache.hyracks.storage.am.lsm.rtree.impls.LSMRTreeDiskComponent;
@@ -39,7 +40,6 @@
         if (newComponent != null) {
             LSMRTreeDiskComponent rtreeComponent = (LSMRTreeDiskComponent) newComponent;
             putLSNIntoMetadata(rtreeComponent.getRTree(), oldComponents);
-            putLSNIntoMetadata(rtreeComponent.getBTree(), oldComponents);
         }
     }
 
@@ -56,18 +56,18 @@
         long maxLSN = -1;
         for (Object o : diskComponents) {
             LSMRTreeDiskComponent rtreeComponent = (LSMRTreeDiskComponent) o;
-            maxLSN = Math.max(getTreeIndexLSN(rtreeComponent.getRTree()), maxLSN);
+            maxLSN = Math.max(AbstractLSMIOOperationCallback.getTreeIndexLSN(rtreeComponent.getRTree()), maxLSN);
         }
         return maxLSN;
     }
 
     @Override
-    public boolean componentFileHasLSN(String componentFilePath) {
-        if (componentFilePath.endsWith(LSMRTreeFileManager.RTREE_STRING)
-                || componentFilePath.endsWith(LSMRTreeFileManager.BTREE_STRING)) {
-            return true;
+    public long getComponentFileLSNOffset(ILSMComponent diskComponent, String diskComponentFilePath)
+            throws HyracksDataException {
+        if (diskComponentFilePath.endsWith(LSMRTreeFileManager.RTREE_STRING)) {
+            LSMRTreeDiskComponent rtreeComponent = (LSMRTreeDiskComponent) diskComponent;
+            return rtreeComponent.getRTree().getMetaManager().getLSNOffset();
         }
-
-        return false;
+        return IMetaDataPageManager.INVALID_LSN_OFFSET;
     }
 }
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java b/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
index 9694949..7c76e98 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
@@ -119,9 +119,9 @@
 
     public void setNodeId(String nodeId);
 
-    public int serialize(ByteBuffer buffer);
+    public int writeRemoteRecoveryLog(ByteBuffer buffer);
 
-    public void deserialize(ByteBuffer buffer, boolean remoteRecoveryLog, String localNodeId);
+    public void readRemoteLog(ByteBuffer buffer, boolean remoteRecoveryLog, String localNodeId);
 
     public void setReplicationThread(IReplicationThread replicationThread);
 
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java b/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
index bbbe59e..f837df2 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
@@ -356,7 +356,7 @@
     }
 
     @Override
-    public void deserialize(ByteBuffer buffer, boolean remoteRecoveryLog, String localNodeId) {
+    public void readRemoteLog(ByteBuffer buffer, boolean remoteRecoveryLog, String localNodeId) {
         readLogHeader(buffer);
         if (!remoteRecoveryLog || !nodeId.equals(localNodeId)) {
             readLogBody(buffer, false);
@@ -370,6 +370,11 @@
             LSN = buffer.getLong();
             numOfFlushedIndexes = buffer.getInt();
         }
+
+        //remote recovery logs need to have the LSN to check which should be replayed
+        if (remoteRecoveryLog && nodeId.equals(localNodeId)) {
+            LSN = buffer.getLong();
+        }
     }
 
     private ITupleReference readPKValue(ByteBuffer buffer) {
@@ -499,15 +504,14 @@
     }
 
     @Override
-    public int serialize(ByteBuffer buffer) {
+    public int writeRemoteRecoveryLog(ByteBuffer buffer) {
         int bufferBegin = buffer.position();
         writeLogRecordCommonFields(buffer);
-
         if (logType == LogType.FLUSH) {
             buffer.putLong(LSN);
             buffer.putInt(numOfFlushedIndexes);
         }
-
+        //LSN must be included in all remote recovery logs (not only FLUSH)
         buffer.putLong(LSN);
         return buffer.position() - bufferBegin;
     }
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/functions/AsterixReplicationProtocol.java b/asterix-replication/src/main/java/org/apache/asterix/replication/functions/AsterixReplicationProtocol.java
index f75ed6b..8e020eb 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/functions/AsterixReplicationProtocol.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/functions/AsterixReplicationProtocol.java
@@ -140,14 +140,14 @@
         return bb;
     }
 
-    public static void writeReplicateLogRequest(ByteBuffer requestBuffer, ILogRecord logRecord) {
+    public static void writeRemoteRecoveryLogRequest(ByteBuffer requestBuffer, ILogRecord logRecord) {
         requestBuffer.clear();
         //put request type (4 bytes)
         requestBuffer.putInt(ReplicationRequestType.REPLICATE_LOG.ordinal());
         //leave space for log size
         requestBuffer.position(requestBuffer.position() + Integer.BYTES);
-        int logSize = logRecord.serialize(requestBuffer);
-        //put request type (4 bytes)
+        int logSize = logRecord.writeRemoteRecoveryLog(requestBuffer);
+        //put request size (4 bytes)
         requestBuffer.putInt(4, logSize);
         requestBuffer.flip();
     }
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java b/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
index c0af2c8..38be05e 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
@@ -89,6 +89,7 @@
         full.set(false);
         appendOffset = 0;
         flushOffset = 0;
+        stop = false;
     }
 
     public void flush() {
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
index 5a88626..e6b2ebf 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
@@ -70,6 +70,7 @@
 import org.apache.asterix.replication.storage.LSMComponentProperties;
 import org.apache.asterix.replication.storage.ReplicaResourcesManager;
 import org.apache.hyracks.api.application.INCApplicationContext;
+import org.apache.hyracks.storage.am.common.api.IMetaDataPageManager;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
 
@@ -272,7 +273,8 @@
             Set<Long> requestedIndexesToBeFlushed = request.getLaggingRescouresIds();
 
             //2. check which indexes can be flushed (open indexes) and which cannot be flushed (closed or have empty memory component)
-            IDatasetLifecycleManager datasetLifeCycleManager = asterixAppRuntimeContextProvider.getDatasetLifecycleManager();
+            IDatasetLifecycleManager datasetLifeCycleManager = asterixAppRuntimeContextProvider
+                    .getDatasetLifecycleManager();
             List<IndexInfo> openIndexesInfo = datasetLifeCycleManager.getOpenIndexesInfo();
             Set<Integer> datasetsToForceFlush = new HashSet<Integer>();
             for (IndexInfo iInfo : openIndexesInfo) {
@@ -336,9 +338,9 @@
                 }
                 if (afp.isLSMComponentFile()) {
                     String compoentId = LSMComponentProperties.getLSMComponentID(afp.getFilePath(), afp.getNodeId());
-                    if (afp.isRequireLSNSync()) {
+                    if (afp.getLSNByteOffset() != IMetaDataPageManager.INVALID_LSN_OFFSET) {
                         LSMComponentLSNSyncTask syncTask = new LSMComponentLSNSyncTask(compoentId,
-                                destFile.getAbsolutePath());
+                                destFile.getAbsolutePath(), afp.getLSNByteOffset());
                         lsmComponentRemoteLSN2LocalLSNMappingTaskQ.offer(syncTask);
                     } else {
                         updateLSMComponentRemainingFiles(compoentId);
@@ -384,8 +386,8 @@
                     try (RandomAccessFile fromFile = new RandomAccessFile(filePath, "r");
                             FileChannel fileChannel = fromFile.getChannel();) {
                         long fileSize = fileChannel.size();
-                        fileProperties.initialize(filePath, fileSize, replicaId, false, false, false);
-
+                        fileProperties.initialize(filePath, fileSize, replicaId, false,
+                                IMetaDataPageManager.INVALID_LSN_OFFSET, false);
                         outBuffer = AsterixReplicationProtocol.writeFileReplicationRequest(outBuffer, fileProperties,
                                 ReplicationRequestType.REPLICATE_FILE);
 
@@ -437,7 +439,7 @@
 
                         //set log source to REMOTE_RECOVERY to avoid re-logging on the recipient side
                         logRecord.setLogSource(LogSource.REMOTE_RECOVERY);
-                        AsterixReplicationProtocol.writeReplicateLogRequest(outBuffer, logRecord);
+                        AsterixReplicationProtocol.writeRemoteRecoveryLogRequest(outBuffer, logRecord);
                         NetworkingUtil.transferBufferToChannel(socketChannel, outBuffer);
                     }
                     logRecord = logReader.next();
@@ -475,7 +477,7 @@
             inBuffer = AsterixReplicationProtocol.readRequest(socketChannel, inBuffer);
 
             //Deserialize log
-            remoteLog.deserialize(inBuffer, false, localNodeID);
+            remoteLog.readRemoteLog(inBuffer, false, localNodeID);
             remoteLog.setLogSource(LogSource.REMOTE);
 
             if (remoteLog.getLogType() == LogType.JOB_COMMIT) {
@@ -547,7 +549,7 @@
                     LSMComponentLSNSyncTask syncTask = lsmComponentRemoteLSN2LocalLSNMappingTaskQ.take();
                     LSMComponentProperties lsmCompProp = lsmComponentId2PropertiesMap.get(syncTask.getComponentId());
                     try {
-                        syncLSMComponentFlushLSN(lsmCompProp, syncTask.getComponentFilePath());
+                        syncLSMComponentFlushLSN(lsmCompProp, syncTask);
                         updateLSMComponentRemainingFiles(lsmCompProp.getComponentId());
                     } catch (Exception e) {
                         e.printStackTrace();
@@ -558,7 +560,8 @@
             }
         }
 
-        private void syncLSMComponentFlushLSN(LSMComponentProperties lsmCompProp, String filePath) throws Exception {
+        private void syncLSMComponentFlushLSN(LSMComponentProperties lsmCompProp, LSMComponentLSNSyncTask syncTask)
+                throws Exception {
             long remoteLSN = lsmCompProp.getOriginalLSN();
             //LSN=0 (bulkload) does not need to be updated and there is no flush log corresponding to it
             if (remoteLSN == 0) {
@@ -611,7 +614,7 @@
                 }
             }
 
-            Path path = Paths.get(filePath);
+            Path path = Paths.get(syncTask.getComponentFilePath());
             if (Files.notExists(path)) {
                 /*
                  * This could happen when a merged component arrives and deletes the flushed 
@@ -621,7 +624,7 @@
                 return;
             }
 
-            File destFile = new File(filePath);
+            File destFile = new File(syncTask.getComponentFilePath());
             ByteBuffer metadataBuffer = ByteBuffer.allocate(Long.BYTES);
             metadataBuffer.putLong(lsmCompProp.getReplicaLSN());
             metadataBuffer.flip();
@@ -629,12 +632,12 @@
             //replace the remote LSN value by the local one
             try (RandomAccessFile fileOutputStream = new RandomAccessFile(destFile, "rw");
                     FileChannel fileChannel = fileOutputStream.getChannel()) {
+                long lsnStartOffset = syncTask.getLSNByteOffset();
                 while (metadataBuffer.hasRemaining()) {
-                    fileChannel.write(metadataBuffer, lsmCompProp.getLSNOffset());
+                    lsnStartOffset += fileChannel.write(metadataBuffer, lsnStartOffset);
                 }
                 fileChannel.force(true);
             }
         }
     }
-
 }
\ No newline at end of file
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
index 3e29828..36e5dff 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
@@ -81,6 +81,8 @@
 import org.apache.hyracks.api.replication.IReplicationJob.ReplicationExecutionType;
 import org.apache.hyracks.api.replication.IReplicationJob.ReplicationJobType;
 import org.apache.hyracks.api.replication.IReplicationJob.ReplicationOperation;
+import org.apache.hyracks.storage.am.common.api.IMetaDataPageManager;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexReplicationJob;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
 
@@ -296,13 +298,16 @@
                             long fileSize = fileChannel.size();
 
                             if (LSMComponentJob != null) {
-                                boolean requireLSNSync = AsterixLSMIndexUtil.lsmComponentFileHasLSN(
-                                        (AbstractLSMIndex) LSMComponentJob.getLSMIndex(), filePath);
+                                //since this is LSM_COMPONENT REPLICATE job, the job will contain only the component being replicated.
+                                ILSMComponent diskComponent = LSMComponentJob.getLSMIndexOperationContext()
+                                        .getComponentsToBeReplicated().get(0);
+                                long LSNByteOffset = AsterixLSMIndexUtil.getComponentFileLSNOffset(
+                                        (AbstractLSMIndex) LSMComponentJob.getLSMIndex(), diskComponent, filePath);
                                 asterixFileProperties.initialize(filePath, fileSize, nodeId, isLSMComponentFile,
-                                        requireLSNSync, remainingFiles == 0);
+                                        LSNByteOffset, remainingFiles == 0);
                             } else {
-                                asterixFileProperties.initialize(filePath, fileSize, nodeId, isLSMComponentFile, false,
-                                        remainingFiles == 0);
+                                asterixFileProperties.initialize(filePath, fileSize, nodeId, isLSMComponentFile,
+                                        IMetaDataPageManager.INVALID_LSN_OFFSET, remainingFiles == 0);
                             }
 
                             requestBuffer = AsterixReplicationProtocol.writeFileReplicationRequest(requestBuffer,
@@ -343,8 +348,8 @@
             } else if (job.getOperation() == ReplicationOperation.DELETE) {
                 for (String filePath : job.getJobFiles()) {
                     remainingFiles--;
-                    asterixFileProperties.initialize(filePath, -1, nodeId, isLSMComponentFile, false,
-                            remainingFiles == 0);
+                    asterixFileProperties.initialize(filePath, -1, nodeId, isLSMComponentFile,
+                            IMetaDataPageManager.INVALID_LSN_OFFSET, remainingFiles == 0);
                     AsterixReplicationProtocol.writeFileReplicationRequest(requestBuffer, asterixFileProperties,
                             ReplicationRequestType.DELETE_FILE);
 
@@ -1068,7 +1073,7 @@
             ILogRecord logRecord = new LogRecord();
             while (responseType != ReplicationRequestType.GOODBYE) {
                 dataBuffer = AsterixReplicationProtocol.readRequest(socketChannel, dataBuffer);
-                logRecord.deserialize(dataBuffer, true, nodeId);
+                logRecord.readRemoteLog(dataBuffer, true, nodeId);
 
                 if (logRecord.getNodeId().equals(nodeId)) {
                     //store log in memory to replay it for recovery
@@ -1243,5 +1248,4 @@
             }
         }
     }
-
 }
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/AsterixLSMIndexFileProperties.java b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/AsterixLSMIndexFileProperties.java
index 30f2afc..5e0c9b0 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/AsterixLSMIndexFileProperties.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/AsterixLSMIndexFileProperties.java
@@ -24,6 +24,7 @@
 import java.io.IOException;
 import java.io.OutputStream;
 
+import org.apache.hyracks.storage.am.common.api.IMetaDataPageManager;
 import org.apache.hyracks.storage.am.common.util.IndexFileNameUtil;
 
 public class AsterixLSMIndexFileProperties {
@@ -35,36 +36,36 @@
     private int ioDeviceNum;
     private String idxName;
     private boolean lsmComponentFile;
-    private boolean requireLSNSync;
     private String filePath;
     private boolean requiresAck = false;
+    private long LSNByteOffset;
 
     public AsterixLSMIndexFileProperties() {
     }
 
     public AsterixLSMIndexFileProperties(String filePath, long fileSize, String nodeId, boolean lsmComponentFile,
-            boolean requireLSNSync, boolean requiresAck) {
-        initialize(filePath, fileSize, nodeId, lsmComponentFile, requireLSNSync, requiresAck);
+            long LSNByteOffset, boolean requiresAck) {
+        initialize(filePath, fileSize, nodeId, lsmComponentFile, LSNByteOffset, requiresAck);
     }
 
     public AsterixLSMIndexFileProperties(LSMComponentProperties lsmComponentProperties) {
-        initialize(lsmComponentProperties.getComponentId(), -1, lsmComponentProperties.getNodeId(), false, false, false);
+        initialize(lsmComponentProperties.getComponentId(), -1, lsmComponentProperties.getNodeId(), false,
+                IMetaDataPageManager.INVALID_LSN_OFFSET, false);
     }
 
-    public void initialize(String filePath, long fileSize, String nodeId, boolean lsmComponentFile,
-            boolean requireLSNSync, boolean requiresAck) {
+    public void initialize(String filePath, long fileSize, String nodeId, boolean lsmComponentFile, long LSNByteOffset,
+            boolean requiresAck) {
         this.filePath = filePath;
         this.fileSize = fileSize;
         this.nodeId = nodeId;
         String[] tokens = filePath.split(File.separator);
-
         int arraySize = tokens.length;
         this.fileName = tokens[arraySize - 1];
         this.ioDeviceNum = getDeviceIONumFromName(tokens[arraySize - 2]);
         this.idxName = tokens[arraySize - 3];
         this.dataverse = tokens[arraySize - 4];
         this.lsmComponentFile = lsmComponentFile;
-        this.requireLSNSync = requireLSNSync;
+        this.LSNByteOffset = LSNByteOffset;
         this.requiresAck = requiresAck;
     }
 
@@ -78,7 +79,7 @@
         dos.writeUTF(filePath);
         dos.writeLong(fileSize);
         dos.writeBoolean(lsmComponentFile);
-        dos.writeBoolean(requireLSNSync);
+        dos.writeLong(LSNByteOffset);
         dos.writeBoolean(requiresAck);
     }
 
@@ -87,10 +88,10 @@
         String filePath = input.readUTF();
         long fileSize = input.readLong();
         boolean lsmComponentFile = input.readBoolean();
-        boolean requireLSNSync = input.readBoolean();
+        long LSNByteOffset = input.readLong();
         boolean requiresAck = input.readBoolean();
         AsterixLSMIndexFileProperties fileProp = new AsterixLSMIndexFileProperties();
-        fileProp.initialize(filePath, fileSize, nodeId, lsmComponentFile, requireLSNSync, requiresAck);
+        fileProp.initialize(filePath, fileSize, nodeId, lsmComponentFile, LSNByteOffset, requiresAck);
         return fileProp;
     }
 
@@ -158,14 +159,6 @@
         this.lsmComponentFile = lsmComponentFile;
     }
 
-    public boolean isRequireLSNSync() {
-        return requireLSNSync;
-    }
-
-    public void setRequireLSNSync(boolean requireLSNSync) {
-        this.requireLSNSync = requireLSNSync;
-    }
-
     public boolean requiresAck() {
         return requiresAck;
     }
@@ -184,6 +177,15 @@
         sb.append("IDX Name: " + idxName + "  ");
         sb.append("isLSMComponentFile : " + lsmComponentFile + "  ");
         sb.append("Dataverse: " + dataverse);
+        sb.append("LSN Byte Offset: " + LSNByteOffset);
         return sb.toString();
     }
+
+    public long getLSNByteOffset() {
+        return LSNByteOffset;
+    }
+
+    public void setLSNByteOffset(long lSNByteOffset) {
+        LSNByteOffset = lSNByteOffset;
+    }
 }
\ No newline at end of file
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentLSNSyncTask.java b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentLSNSyncTask.java
index 69f7d07..450eb7d 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentLSNSyncTask.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentLSNSyncTask.java
@@ -21,10 +21,12 @@
 public class LSMComponentLSNSyncTask {
     private String componentFilePath;
     private String componentId;
+    private long LSNByteOffset;
 
-    public LSMComponentLSNSyncTask(String componentId, String componentFilePath) {
+    public LSMComponentLSNSyncTask(String componentId, String componentFilePath, long LSNByteOffset) {
         this.componentId = componentId;
         this.componentFilePath = componentFilePath;
+        this.setLSNByteOffset(LSNByteOffset);
     }
 
     public String getComponentFilePath() {
@@ -42,4 +44,12 @@
     public void setComponentId(String componentId) {
         this.componentId = componentId;
     }
+
+    public long getLSNByteOffset() {
+        return LSNByteOffset;
+    }
+
+    public void setLSNByteOffset(long lSNByteOffset) {
+        LSNByteOffset = lSNByteOffset;
+    }
 }
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java
index 84d5dbe..794a6e1 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java
@@ -27,7 +27,6 @@
 
 import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.common.frames.LIFOMetaDataFrame;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexReplicationJob;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
@@ -50,9 +49,7 @@
         this.nodeId = nodeId;
         componentId = LSMComponentProperties.getLSMComponentID((String) job.getJobFiles().toArray()[0], nodeId);
         numberOfFiles = new AtomicInteger(job.getJobFiles().size());
-        originalLSN = getLSMComponentLSN((AbstractLSMIndex) job.getLSMIndex(), job.getLSMIndexOperationContext());
-        //TODO this should be changed to a dynamic value when append only LSM indexes are implemented
-        LSNOffset = LIFOMetaDataFrame.lsnOff;
+        originalLSN = LSMComponentProperties.getLSMComponentLSN((AbstractLSMIndex) job.getLSMIndex(), job.getLSMIndexOperationContext());
         opType = job.getLSMOpType();
     }
 
@@ -60,11 +57,11 @@
 
     }
 
-    public long getLSMComponentLSN(AbstractLSMIndex lsmIndex, ILSMIndexOperationContext ctx) {
+    public static long getLSMComponentLSN(AbstractLSMIndex lsmIndex, ILSMIndexOperationContext ctx) {
         long componentLSN = -1;
         try {
-            componentLSN = ((AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback()).getComponentLSN(ctx
-                    .getComponentsToBeReplicated());
+            componentLSN = ((AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback())
+                    .getComponentLSN(ctx.getComponentsToBeReplicated());
         } catch (HyracksDataException e) {
             e.printStackTrace();
         }
@@ -149,14 +146,6 @@
         this.componentId = componentId;
     }
 
-    public long getLSNOffset() {
-        return LSNOffset;
-    }
-
-    public void setLSNOffset(long lSNOffset) {
-        LSNOffset = lSNOffset;
-    }
-
     public long getOriginalLSN() {
         return originalLSN;
     }
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
index f4100ee..45ba9bd 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
@@ -179,6 +179,7 @@
         appendOffset = 0;
         flushOffset = 0;
         isLastPage = false;
+        stop = false;
     }
 
     ////////////////////////////////////
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
index 2a40052..1966c39 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
@@ -31,7 +31,7 @@
 
     private IReplicationManager replicationManager;
 
-    public LogManagerWithReplication(TransactionSubsystem txnSubsystem) throws ACIDException {
+    public LogManagerWithReplication(TransactionSubsystem txnSubsystem) {
         super(txnSubsystem);
     }
 
@@ -41,7 +41,8 @@
             throw new IllegalStateException();
         }
 
-        if (logRecord.getLogType() == LogType.FLUSH) {
+        //Remote flush logs do not need to be flushed separately since they may not trigger local flush
+        if (logRecord.getLogType() == LogType.FLUSH && logRecord.getLogSource() == LogSource.LOCAL) {
             flushLogsQ.offer(logRecord);
             return;
         }