Remove Log-Based Remote Recovery

This change removes the code that supports log-based remote recovery.
Remote recovery was replaced by the failback process which depends on
copying LSM disk component instead of logs.

Change-Id: I86e3b5832b52207e36c8409a072ccbda564d78b5
Reviewed-on: https://asterix-gerrit.ics.uci.edu/871
Reviewed-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index 643bb16..0a6d62d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -123,18 +123,13 @@
             }
 
             //do not attempt to perform remote recovery if this is a virtual NC
-            if (replicationEnabled && !virtualNC) {
+            if (autoFailover && !virtualNC) {
                 if (systemState == SystemState.NEW_UNIVERSE || systemState == SystemState.CORRUPTED) {
-                    //Try to perform remote recovery
+                    //Start failback process
                     IRemoteRecoveryManager remoteRecoveryMgr = runtimeContext.getRemoteRecoveryManager();
-                    if (autoFailover) {
-                        remoteRecoveryMgr.startFailbackProcess();
-                        systemState = SystemState.RECOVERING;
-                        pendingFailbackCompletion = true;
-                    } else {
-                        remoteRecoveryMgr.performRemoteRecovery();
-                        systemState = SystemState.HEALTHY;
-                    }
+                    remoteRecoveryMgr.startFailbackProcess();
+                    systemState = SystemState.RECOVERING;
+                    pendingFailbackCompletion = true;
                 }
             } else {
                 //recover if the system is corrupted by checking system state.
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
index 921fd37..a2738e8 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
@@ -25,12 +25,8 @@
 public interface IRemoteRecoveryManager {
 
     /**
-     * Attempts to perform the remote recovery process from an active remote replica.
-     */
-    public void performRemoteRecovery();
-
-    /**
      * Performs the partitions takeover process from the {@code failedNode}
+     *
      * @param failedNode
      * @param partitions
      * @throws IOException
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
index 3fc2af0..755fbbd 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
@@ -18,11 +18,9 @@
  */
 package org.apache.asterix.common.replication;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.Set;
 
-import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.ILogRecord;
 import org.apache.hyracks.api.replication.IIOReplicationManager;
 
@@ -46,23 +44,6 @@
     public boolean hasBeenReplicated(ILogRecord logRecord);
 
     /**
-     * Requests txns logs from a remote replica.
-     *
-     * @param remoteReplicaId
-     *            The replica id to send the request to.
-     * @param replicasDataToRecover
-     *            Get logs that belong to those replicas.
-     * @param fromLSN
-     *            Low water mark for logs to be requested.
-     * @param recoveryLogsFile
-     *            a temporary file to store the logs required for recovery
-     * @throws IOException
-     * @throws ACIDException
-     */
-    public void requestReplicaLogs(String remoteReplicaId, Set<String> replicasDataToRecover, long fromLSN,
-            File recoveryLogsFile) throws IOException, ACIDException;
-
-    /**
      * Requests LSM components files from a remote replica.
      *
      * @param remoteReplicaId
@@ -127,16 +108,6 @@
     public void reportReplicaEvent(ReplicaEvent event);
 
     /**
-     * Requests the current minimum LSN of a remote replica.
-     *
-     * @param replicaId
-     *            The replica to send the request to.
-     * @return The returned minimum LSN from the remote replica.
-     * @throws IOException
-     */
-    public long requestReplicaMinLSN(String replicaId) throws IOException;
-
-    /**
      * Sends a request to remote replicas to flush indexes that have LSN less than nonSharpCheckpointTargetLSN
      *
      * @param nonSharpCheckpointTargetLSN
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
index c35b155..a16eef8 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
@@ -560,10 +560,6 @@
             //serialized node id String
             serilizedSize += Integer.BYTES + nodeId.length();
         }
-        if (logSource == LogSource.REMOTE_RECOVERY) {
-            //for LSN;
-            serilizedSize += Long.BYTES;
-        }
         serilizedSize -= CHKSUM_LEN;
         return serilizedSize;
     }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogSource.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogSource.java
index 4f3d5df..75cf6ba 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogSource.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogSource.java
@@ -22,11 +22,9 @@
 
     public static final byte LOCAL = 0;
     public static final byte REMOTE = 1;
-    public static final byte REMOTE_RECOVERY = 2;
 
     private static final String STRING_LOCAL = "LOCAL";
     private static final String STRING_REMOTE = "REMOTE";
-    private static final String STRING_REMOTE_RECOVERY = "REMOTE_RECOVERY";
 
     private static final String STRING_INVALID_LOG_SOURCE = "INVALID_LOG_SOURCE";
 
@@ -36,8 +34,6 @@
                 return STRING_LOCAL;
             case REMOTE:
                 return STRING_REMOTE;
-            case REMOTE_RECOVERY:
-                return STRING_REMOTE_RECOVERY;
             default:
                 return STRING_INVALID_LOG_SOURCE;
         }
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java
index 1ff6cc4..608e442 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java
@@ -27,7 +27,6 @@
 import java.nio.channels.SocketChannel;
 
 import org.apache.asterix.common.replication.ReplicaEvent;
-import org.apache.asterix.common.transactions.ILogRecord;
 import org.apache.asterix.replication.management.NetworkingUtil;
 import org.apache.asterix.replication.storage.LSMComponentProperties;
 import org.apache.asterix.replication.storage.LSMIndexFileProperties;
@@ -35,7 +34,7 @@
 public class ReplicationProtocol {
 
     /**
-     * All replication messages start with ReplicationFunctions (4 bytes), then the length of the request in bytes
+     * All replication messages start with ReplicationRequestType (4 bytes), then the length of the request in bytes
      */
     public static final String JOB_REPLICATION_ACK = "$";
 
@@ -48,9 +47,7 @@
      * REPLICATE_FILE: replicate a file(s)
      * DELETE_FILE: delete a file(s)
      * GET_REPLICA_FILES: used during remote recovery to request lost LSM Components
-     * GET_REPLICA_LOGS: used during remote recovery to request lost txn logs
      * GET_REPLICA_MAX_LSN: used during remote recovery initialize a log manager LSN
-     * GET_REPLICA_MIN_LSN: used during remote recovery to specify the low water mark per replica
      * GOODBYE: used to notify replicas that the replication request has been completed
      * REPLICA_EVENT: used to notify replicas about a remote replica split/merge.
      * LSM_COMPONENT_PROPERTIES: used to send the properties of an LSM Component before its physical files are sent
@@ -62,9 +59,7 @@
         REPLICATE_FILE,
         DELETE_FILE,
         GET_REPLICA_FILES,
-        GET_REPLICA_LOGS,
         GET_REPLICA_MAX_LSN,
-        GET_REPLICA_MIN_LSN,
         GOODBYE,
         REPLICA_EVENT,
         LSM_COMPONENT_PROPERTIES,
@@ -90,21 +85,20 @@
     public static ByteBuffer writeLSMComponentPropertiesRequest(LSMComponentProperties lsmCompProp, ByteBuffer buffer)
             throws IOException {
         ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-        DataOutputStream oos = new DataOutputStream(outputStream);
-        lsmCompProp.serialize(oos);
-        oos.close();
-
-        int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size();
-        if (buffer.capacity() < requestSize) {
-            buffer = ByteBuffer.allocate(requestSize);
-        } else {
-            buffer.clear();
+        try (DataOutputStream oos = new DataOutputStream(outputStream)) {
+            lsmCompProp.serialize(oos);
+            int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size();
+            if (buffer.capacity() < requestSize) {
+                buffer = ByteBuffer.allocate(requestSize);
+            } else {
+                buffer.clear();
+            }
+            buffer.putInt(ReplicationRequestType.LSM_COMPONENT_PROPERTIES.ordinal());
+            buffer.putInt(oos.size());
+            buffer.put(outputStream.toByteArray());
+            buffer.flip();
+            return buffer;
         }
-        buffer.putInt(ReplicationRequestType.LSM_COMPONENT_PROPERTIES.ordinal());
-        buffer.putInt(oos.size());
-        buffer.put(outputStream.toByteArray());
-        buffer.flip();
-        return buffer;
     }
 
     public static ReplicationRequestType getRequestType(SocketChannel socketChannel, ByteBuffer byteBuffer)
@@ -136,47 +130,23 @@
         return bb;
     }
 
-    public static void writeRemoteRecoveryLogRequest(ByteBuffer requestBuffer, ILogRecord logRecord) {
-        requestBuffer.clear();
-        //put request type (4 bytes)
-        requestBuffer.putInt(ReplicationRequestType.REPLICATE_LOG.ordinal());
-        //leave space for log size
-        requestBuffer.position(requestBuffer.position() + Integer.BYTES);
-        int logSize = logRecord.writeRemoteRecoveryLog(requestBuffer);
-        //put request size (4 bytes)
-        requestBuffer.putInt(4, logSize);
-        requestBuffer.flip();
-    }
-
-    public static void writeReplicateLogRequest(ByteBuffer requestBuffer, byte[] serializedLog) {
-        requestBuffer.clear();
-        //put request type (4 bytes)
-        requestBuffer.putInt(ReplicationRequestType.REPLICATE_LOG.ordinal());
-        //length of the log
-        requestBuffer.putInt(serializedLog.length);
-        //the log itself
-        requestBuffer.put(serializedLog);
-        requestBuffer.flip();
-    }
-
     public static ByteBuffer writeFileReplicationRequest(ByteBuffer requestBuffer, LSMIndexFileProperties afp,
             ReplicationRequestType requestType) throws IOException {
         ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-        DataOutputStream oos = new DataOutputStream(outputStream);
-        afp.serialize(oos);
-        oos.close();
-
-        int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size();
-        if (requestBuffer.capacity() < requestSize) {
-            requestBuffer = ByteBuffer.allocate(requestSize);
-        } else {
-            requestBuffer.clear();
+        try (DataOutputStream oos = new DataOutputStream(outputStream)) {
+            afp.serialize(oos);
+            int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size();
+            if (requestBuffer.capacity() < requestSize) {
+                requestBuffer = ByteBuffer.allocate(requestSize);
+            } else {
+                requestBuffer.clear();
+            }
+            requestBuffer.putInt(requestType.ordinal());
+            requestBuffer.putInt(oos.size());
+            requestBuffer.put(outputStream.toByteArray());
+            requestBuffer.flip();
+            return requestBuffer;
         }
-        requestBuffer.putInt(requestType.ordinal());
-        requestBuffer.putInt(oos.size());
-        requestBuffer.put(outputStream.toByteArray());
-        requestBuffer.flip();
-        return requestBuffer;
     }
 
     public static LSMIndexFileProperties readFileReplicationRequest(ByteBuffer buffer) throws IOException {
@@ -185,44 +155,17 @@
         return LSMIndexFileProperties.create(dis);
     }
 
-    public static ReplicaLogsRequest readReplicaLogsRequest(ByteBuffer buffer) throws IOException {
-        ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit());
-        DataInputStream dis = new DataInputStream(bais);
-        return ReplicaLogsRequest.create(dis);
-    }
-
-    public static ByteBuffer writeGetReplicaLogsRequest(ByteBuffer requestBuffer, ReplicaLogsRequest request)
-            throws IOException {
-        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-        DataOutputStream oos = new DataOutputStream(outputStream);
-        request.serialize(oos);
-        oos.close();
-
-        int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size();
-        if (requestBuffer.capacity() < requestSize) {
-            requestBuffer = ByteBuffer.allocate(requestSize);
-        } else {
-            requestBuffer.clear();
-        }
-        requestBuffer.putInt(ReplicationRequestType.GET_REPLICA_LOGS.ordinal());
-        requestBuffer.putInt(oos.size());
-        requestBuffer.put(outputStream.toByteArray());
-        requestBuffer.flip();
-        return requestBuffer;
-    }
-
     public static ByteBuffer writeReplicaEventRequest(ReplicaEvent event) throws IOException {
         ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-        DataOutputStream oos = new DataOutputStream(outputStream);
-        event.serialize(oos);
-        oos.close();
-
-        ByteBuffer buffer = ByteBuffer.allocate(REPLICATION_REQUEST_HEADER_SIZE + oos.size());
-        buffer.putInt(ReplicationRequestType.REPLICA_EVENT.ordinal());
-        buffer.putInt(oos.size());
-        buffer.put(outputStream.toByteArray());
-        buffer.flip();
-        return buffer;
+        try (DataOutputStream oos = new DataOutputStream(outputStream)) {
+            event.serialize(oos);
+            ByteBuffer buffer = ByteBuffer.allocate(REPLICATION_REQUEST_HEADER_SIZE + oos.size());
+            buffer.putInt(ReplicationRequestType.REPLICA_EVENT.ordinal());
+            buffer.putInt(oos.size());
+            buffer.put(outputStream.toByteArray());
+            buffer.flip();
+            return buffer;
+        }
     }
 
     public static ReplicaEvent readReplicaEventRequest(ByteBuffer buffer) throws IOException {
@@ -255,21 +198,20 @@
     public static ByteBuffer writeGetReplicaIndexFlushRequest(ByteBuffer buffer, ReplicaIndexFlushRequest request)
             throws IOException {
         ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-        DataOutputStream oos = new DataOutputStream(outputStream);
-        request.serialize(oos);
-        oos.close();
-
-        int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size();
-        if (buffer.capacity() < requestSize) {
-            buffer = ByteBuffer.allocate(requestSize);
-        } else {
-            buffer.clear();
+        try (DataOutputStream oos = new DataOutputStream(outputStream)) {
+            request.serialize(oos);
+            int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size();
+            if (buffer.capacity() < requestSize) {
+                buffer = ByteBuffer.allocate(requestSize);
+            } else {
+                buffer.clear();
+            }
+            buffer.putInt(ReplicationRequestType.FLUSH_INDEX.ordinal());
+            buffer.putInt(oos.size());
+            buffer.put(outputStream.toByteArray());
+            buffer.flip();
+            return buffer;
         }
-        buffer.putInt(ReplicationRequestType.FLUSH_INDEX.ordinal());
-        buffer.putInt(oos.size());
-        buffer.put(outputStream.toByteArray());
-        buffer.flip();
-        return buffer;
     }
 
     public static ReplicaFilesRequest readReplicaFileRequest(ByteBuffer buffer) throws IOException {
@@ -290,12 +232,6 @@
         requestBuffer.flip();
     }
 
-    public static void writeMinLSNRequest(ByteBuffer requestBuffer) {
-        requestBuffer.clear();
-        requestBuffer.putInt(ReplicationRequestType.GET_REPLICA_MIN_LSN.ordinal());
-        requestBuffer.flip();
-    }
-
     public static int getJobIdFromLogAckMessage(String msg) {
         return Integer.parseInt(msg.substring((msg.indexOf(JOB_REPLICATION_ACK) + 1)));
     }
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
index 331116b..a152f6c 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
@@ -56,15 +56,12 @@
 import org.apache.asterix.common.replication.ReplicaEvent;
 import org.apache.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
 import org.apache.asterix.common.transactions.ILogManager;
-import org.apache.asterix.common.transactions.ILogReader;
-import org.apache.asterix.common.transactions.ILogRecord;
 import org.apache.asterix.common.transactions.LogRecord;
 import org.apache.asterix.common.transactions.LogSource;
 import org.apache.asterix.common.transactions.LogType;
 import org.apache.asterix.common.utils.TransactionUtil;
 import org.apache.asterix.replication.functions.ReplicaFilesRequest;
 import org.apache.asterix.replication.functions.ReplicaIndexFlushRequest;
-import org.apache.asterix.replication.functions.ReplicaLogsRequest;
 import org.apache.asterix.replication.functions.ReplicationProtocol;
 import org.apache.asterix.replication.functions.ReplicationProtocol.ReplicationRequestType;
 import org.apache.asterix.replication.logging.RemoteLogMapping;
@@ -77,6 +74,8 @@
 import org.apache.hyracks.storage.am.common.api.IMetaDataPageManager;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
+import org.apache.hyracks.util.StorageUtil;
+import org.apache.hyracks.util.StorageUtil.StorageUnit;
 
 /**
  * This class is used to receive and process replication requests from remote replicas or replica events from CC
@@ -93,7 +92,7 @@
     private final IReplicationManager replicationManager;
     private final AsterixReplicationProperties replicationProperties;
     private final IAsterixAppRuntimeContextProvider asterixAppRuntimeContextProvider;
-    private final static int INTIAL_BUFFER_SIZE = 4000; //4KB
+    private final static int INTIAL_BUFFER_SIZE = StorageUtil.getSizeInBytes(4, StorageUnit.KILOBYTE);
     private final LinkedBlockingQueue<LSMComponentLSNSyncTask> lsmComponentRemoteLSN2LocalLSNMappingTaskQ;
     private final LinkedBlockingQueue<LogRecord> pendingNotificationRemoteLogsQ;
     private final Map<String, LSMComponentProperties> lsmComponentId2PropertiesMap;
@@ -195,18 +194,6 @@
         }
     }
 
-    private static void sendRemoteRecoveryLog(ILogRecord logRecord, SocketChannel socketChannel, ByteBuffer outBuffer)
-            throws IOException {
-        logRecord.setLogSource(LogSource.REMOTE_RECOVERY);
-        if (logRecord.getSerializedLogSize() > outBuffer.capacity()) {
-            int requestSize = logRecord.getSerializedLogSize() + ReplicationProtocol.REPLICATION_REQUEST_HEADER_SIZE;
-            outBuffer = ByteBuffer.allocate(requestSize);
-        }
-        //set log source to REMOTE_RECOVERY to avoid re-logging on the recipient side
-        ReplicationProtocol.writeRemoteRecoveryLogRequest(outBuffer, logRecord);
-        NetworkingUtil.transferBufferToChannel(socketChannel, outBuffer);
-    }
-
     /**
      * A replication thread is created per received replication request.
      */
@@ -249,15 +236,9 @@
                         case GET_REPLICA_MAX_LSN:
                             handleGetReplicaMaxLSN();
                             break;
-                        case GET_REPLICA_MIN_LSN:
-                            handleGetReplicaMinLSN();
-                            break;
                         case GET_REPLICA_FILES:
                             handleGetReplicaFiles();
                             break;
-                        case GET_REPLICA_LOGS:
-                            handleGetRemoteLogs();
-                            break;
                         case FLUSH_INDEX:
                             handleFlushIndex();
                             break;
@@ -373,15 +354,6 @@
             NetworkingUtil.transferBufferToChannel(socketChannel, outBuffer);
         }
 
-        private void handleGetReplicaMinLSN() throws IOException {
-            long minLSN = asterixAppRuntimeContextProvider.getAppContext().getTransactionSubsystem()
-                    .getRecoveryManager().getMinFirstLSN();
-            outBuffer.clear();
-            outBuffer.putLong(minLSN);
-            outBuffer.flip();
-            NetworkingUtil.transferBufferToChannel(socketChannel, outBuffer);
-        }
-
         private void handleGetReplicaFiles() throws IOException {
             inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
             ReplicaFilesRequest request = ReplicationProtocol.readReplicaFileRequest(inBuffer);
@@ -426,75 +398,6 @@
             ReplicationProtocol.sendGoodbye(socketChannel);
         }
 
-        private void handleGetRemoteLogs() throws IOException, ACIDException {
-            inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
-            ReplicaLogsRequest request = ReplicationProtocol.readReplicaLogsRequest(inBuffer);
-
-            Set<String> replicaIds = request.getReplicaIds();
-            //get list of partitions that belong to the replicas in the request
-            Set<Integer> requestedPartitions = new HashSet<>();
-            Map<String, ClusterPartition[]> nodePartitions = ((IAsterixPropertiesProvider) asterixAppRuntimeContextProvider
-                    .getAppContext()).getMetadataProperties().getNodePartitions();
-            for (String replicaId : replicaIds) {
-                //get replica partitions
-                ClusterPartition[] replicaPatitions = nodePartitions.get(replicaId);
-                for (ClusterPartition partition : replicaPatitions) {
-                    requestedPartitions.add(partition.getPartitionId());
-                }
-            }
-
-            long fromLSN = request.getFromLSN();
-            long minLocalFirstLSN = asterixAppRuntimeContextProvider.getAppContext().getTransactionSubsystem()
-                    .getRecoveryManager().getLocalMinFirstLSN();
-
-            //get Log reader
-            ILogReader logReader = logManager.getLogReader(true);
-            try {
-                if (fromLSN < logManager.getReadableSmallestLSN()) {
-                    fromLSN = logManager.getReadableSmallestLSN();
-                }
-
-                logReader.initializeScan(fromLSN);
-                ILogRecord logRecord = logReader.next();
-                Set<Integer> requestedPartitionsJobs = new HashSet<>();
-                while (logRecord != null) {
-                    //we should not send any local log which has already been converted to disk component
-                    if (logRecord.getLogSource() == LogSource.LOCAL && logRecord.getLSN() < minLocalFirstLSN) {
-                        logRecord = logReader.next();
-                        continue;
-                    }
-                    //send only logs that belong to the partitions of the request and required for recovery
-                    switch (logRecord.getLogType()) {
-                        case LogType.UPDATE:
-                        case LogType.ENTITY_COMMIT:
-                        case LogType.UPSERT_ENTITY_COMMIT:
-                            if (requestedPartitions.contains(logRecord.getResourcePartition())) {
-                                sendRemoteRecoveryLog(logRecord, socketChannel, outBuffer);
-                                requestedPartitionsJobs.add(logRecord.getJobId());
-                            }
-                            break;
-                        case LogType.JOB_COMMIT:
-                            if (requestedPartitionsJobs.contains(logRecord.getJobId())) {
-                                sendRemoteRecoveryLog(logRecord, socketChannel, outBuffer);
-                                requestedPartitionsJobs.remove(logRecord.getJobId());
-                            }
-                            break;
-                        case LogType.ABORT:
-                        case LogType.FLUSH:
-                            break;
-                        default:
-                            throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
-                    }
-                    logRecord = logReader.next();
-                }
-            } finally {
-                logReader.close();
-            }
-
-            //send goodbye (end of logs)
-            ReplicationProtocol.sendGoodbye(socketChannel);
-        }
-
         private void handleReplicaEvent() throws IOException {
             inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
             ReplicaEvent event = ReplicationProtocol.readReplicaEventRequest(inBuffer);
@@ -691,4 +594,4 @@
             }
         }
     }
-}
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
index d983a62..ee872a5 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
@@ -55,7 +55,6 @@
 import org.apache.asterix.common.config.AsterixReplicationProperties;
 import org.apache.asterix.common.config.IAsterixPropertiesProvider;
 import org.apache.asterix.common.dataflow.AsterixLSMIndexUtil;
-import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.replication.AsterixReplicationJob;
 import org.apache.asterix.common.replication.IReplicaResourcesManager;
 import org.apache.asterix.common.replication.IReplicationManager;
@@ -65,12 +64,10 @@
 import org.apache.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
 import org.apache.asterix.common.transactions.ILogManager;
 import org.apache.asterix.common.transactions.ILogRecord;
-import org.apache.asterix.common.transactions.LogRecord;
 import org.apache.asterix.common.transactions.LogType;
 import org.apache.asterix.event.schema.cluster.Node;
 import org.apache.asterix.replication.functions.ReplicaFilesRequest;
 import org.apache.asterix.replication.functions.ReplicaIndexFlushRequest;
-import org.apache.asterix.replication.functions.ReplicaLogsRequest;
 import org.apache.asterix.replication.functions.ReplicationProtocol;
 import org.apache.asterix.replication.functions.ReplicationProtocol.ReplicationRequestType;
 import org.apache.asterix.replication.logging.ReplicationLogBuffer;
@@ -1039,91 +1036,6 @@
         }
     }
 
-    //Recovery Method
-    @Override
-    public long requestReplicaMinLSN(String selectedReplicaId) throws IOException {
-        long minLSN = 0;
-        ReplicationProtocol.writeMinLSNRequest(dataBuffer);
-        try (SocketChannel socketChannel = getReplicaSocket(selectedReplicaId);) {
-            //transfer request
-            NetworkingUtil.transferBufferToChannel(socketChannel, dataBuffer);
-
-            //read response
-            NetworkingUtil.readBytes(socketChannel, dataBuffer, Long.BYTES);
-            minLSN = dataBuffer.getLong();
-
-            //send goodbye
-            ReplicationProtocol.sendGoodbye(socketChannel);
-        }
-
-        return minLSN;
-    }
-
-    //Recovery Method
-    @Override
-    public void requestReplicaLogs(String remoteNode, Set<String> nodeIdsToRecoverFor, long fromLSN,
-            File recoveryLogsFile) throws IOException, ACIDException {
-        ReplicaLogsRequest request = new ReplicaLogsRequest(nodeIdsToRecoverFor, fromLSN);
-        dataBuffer = ReplicationProtocol.writeGetReplicaLogsRequest(dataBuffer, request);
-        try (SocketChannel socketChannel = getReplicaSocket(remoteNode)) {
-            //transfer request
-            NetworkingUtil.transferBufferToChannel(socketChannel, dataBuffer);
-
-            //read response type
-            ReplicationRequestType responseType = ReplicationProtocol.getRequestType(socketChannel, dataBuffer);
-
-            ILogRecord logRecord = new LogRecord();
-            Set<Integer> nodePartitions = ((PersistentLocalResourceRepository) asterixAppRuntimeContextProvider
-                    .getLocalResourceRepository()).getNodeOrignalPartitions();
-            Set<Integer> nodePartitionsJobs = new HashSet<>();
-            try (RandomAccessFile raf = new RandomAccessFile(recoveryLogsFile, "rw");
-                    FileChannel fileChannel = raf.getChannel();) {
-                while (responseType != ReplicationRequestType.GOODBYE) {
-                    dataBuffer = ReplicationProtocol.readRequest(socketChannel, dataBuffer);
-                    logRecord.readRemoteLog(dataBuffer, true);
-                    switch (logRecord.getLogType()) {
-                        case LogType.UPDATE:
-                        case LogType.ENTITY_COMMIT:
-                        case LogType.UPSERT_ENTITY_COMMIT:
-                            if (nodePartitions.contains(logRecord.getResourcePartition())) {
-                                nodePartitionsJobs.add(logRecord.getJobId());
-                                dataBuffer.flip();
-                                while (dataBuffer.hasRemaining()) {
-                                    //store log in temp file to replay it for recovery
-                                    fileChannel.write(dataBuffer);
-                                }
-                            } else {
-                                //send log to log manager as a remote recovery log
-                                logManager.log(logRecord);
-                            }
-                            break;
-                        case LogType.JOB_COMMIT:
-                            if (nodePartitionsJobs.contains(logRecord.getJobId())) {
-                                nodePartitionsJobs.remove(logRecord.getJobId());
-                                dataBuffer.flip();
-                                while (dataBuffer.hasRemaining()) {
-                                    //store log in temp file to replay it for recovery
-                                    fileChannel.write(dataBuffer);
-                                }
-                                break;
-                            }
-                            logManager.log(logRecord);
-                            break;
-                        case LogType.ABORT:
-                        case LogType.FLUSH:
-                            break;
-                        default:
-                            throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
-                    }
-                    responseType = ReplicationProtocol.getRequestType(socketChannel, dataBuffer);
-                }
-            }
-
-            //send goodbye
-            ReplicationProtocol.sendGoodbye(socketChannel);
-        }
-    }
-
     public int getLogPageSize() {
         return logManager.getLogPageSize();
     }
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
index 3c6fc0e..47e60b2 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
@@ -18,10 +18,7 @@
  */
 package org.apache.asterix.replication.recovery;
 
-import java.io.File;
 import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.channels.FileChannel;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -40,21 +37,17 @@
 import org.apache.asterix.common.replication.IRemoteRecoveryManager;
 import org.apache.asterix.common.replication.IReplicationManager;
 import org.apache.asterix.common.transactions.ILogManager;
-import org.apache.asterix.common.transactions.ILogReader;
 import org.apache.asterix.common.transactions.IRecoveryManager;
 import org.apache.asterix.om.util.AsterixClusterProperties;
 import org.apache.asterix.replication.storage.ReplicaResourcesManager;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
-import org.apache.asterix.transaction.management.service.logging.RemoteLogReader;
 
 public class RemoteRecoveryManager implements IRemoteRecoveryManager {
 
-    private static final String RECOVERY_LOGS_FILE_NAME = "recoveryLogs";
     private final IReplicationManager replicationManager;
     private static final Logger LOGGER = Logger.getLogger(RemoteRecoveryManager.class.getName());
     private final IAsterixAppRuntimeContext runtimeContext;
     private final AsterixReplicationProperties replicationProperties;
-    private final static int REMOTE_RECOVERY_JOB_ID = -1;
     private Map<String, Set<String>> failbackRecoveryReplicas;
 
     public RemoteRecoveryManager(IReplicationManager replicationManager, IAsterixAppRuntimeContext runtimeContext,
@@ -64,93 +57,6 @@
         this.replicationProperties = replicationProperties;
     }
 
-    @Override
-    public void performRemoteRecovery() {
-        //The whole remote recovery process should be atomic.
-        //Any error happens, we should start the recovery from the start until the recovery is
-        //complete or an illegal state is reached (cannot recover or max attempts exceed).
-        int maxRecoveryAttempts = replicationProperties.getMaxRemoteRecoveryAttempts();
-        PersistentLocalResourceRepository resourceRepository = (PersistentLocalResourceRepository) runtimeContext
-                .getLocalResourceRepository();
-        IRecoveryManager recoveryManager = runtimeContext.getTransactionSubsystem().getRecoveryManager();
-        ILogManager logManager = runtimeContext.getTransactionSubsystem().getLogManager();
-        while (true) {
-            //start recovery steps
-            try {
-                if (maxRecoveryAttempts <= 0) {
-                    //to avoid infinite loop in case of unexpected behavior.
-                    throw new IllegalStateException("Failed to perform remote recovery.");
-                }
-
-                //delete any existing recovery files from previous failed recovery attempts
-                recoveryManager.deleteRecoveryTemporaryFiles();
-
-                //create temporary file to store recovery logs
-                File recoveryLogsFile = recoveryManager.createJobRecoveryFile(REMOTE_RECOVERY_JOB_ID,
-                        RECOVERY_LOGS_FILE_NAME);
-
-                /*** Prepare for Recovery ***/
-                //1. check remote replicas states
-                replicationManager.initializeReplicasState();
-                int activeReplicasCount = replicationManager.getActiveReplicasCount();
-
-                if (activeReplicasCount == 0) {
-                    throw new IllegalStateException("no ACTIVE remote replica(s) exists to perform remote recovery");
-                }
-
-                //2. clean any memory data that could've existed from previous failed recovery attempt
-                IDatasetLifecycleManager datasetLifeCycleManager = runtimeContext.getDatasetLifecycleManager();
-                datasetLifeCycleManager.closeAllDatasets();
-
-                //3. remove any existing storage data and initialize storage metadata
-                resourceRepository.deleteStorageData(true);
-                resourceRepository.initializeNewUniverse(AsterixClusterProperties.INSTANCE.getStorageDirectoryName());
-
-                //4. select remote replicas to recover from per lost replica data
-                Map<String, Set<String>> selectedRemoteReplicas = constructRemoteRecoveryPlan();
-
-                //5. get max LSN from selected remote replicas
-                long maxRemoteLSN = replicationManager.getMaxRemoteLSN(selectedRemoteReplicas.keySet());
-
-                //6. force LogManager to start from a partition > maxLSN in selected remote replicas
-                logManager.renewLogFilesAndStartFromLSN(maxRemoteLSN);
-
-                /*** Start Recovery Per Lost Replica ***/
-                for (Entry<String, Set<String>> remoteReplica : selectedRemoteReplicas.entrySet()) {
-                    String replicaId = remoteReplica.getKey();
-                    Set<String> replicasDataToRecover = remoteReplica.getValue();
-
-                    //Request indexes metadata and LSM components
-                    replicationManager.requestReplicaFiles(replicaId, replicasDataToRecover, new HashSet<String>());
-
-                    //Get min LSN to start requesting logs from
-                    long minLSN = replicationManager.requestReplicaMinLSN(replicaId);
-
-                    //Request remote logs from selected remote replicas
-                    replicationManager.requestReplicaLogs(replicaId, replicasDataToRecover, minLSN, recoveryLogsFile);
-
-                    //Replay remote logs using recovery manager
-                    if (replicasDataToRecover.contains(logManager.getNodeId())) {
-                        //replay logs for local partitions only
-                        Set<Integer> nodePartitions = resourceRepository.getNodeOrignalPartitions();
-                        try (RandomAccessFile raf = new RandomAccessFile(recoveryLogsFile, "r");
-                                FileChannel fileChannel = raf.getChannel();) {
-                            ILogReader logReader = new RemoteLogReader(fileChannel, fileChannel.size(),
-                                    logManager.getLogPageSize());
-                            recoveryManager.replayPartitionsLogs(nodePartitions, logReader, 0);
-                        }
-                    }
-                }
-                LOGGER.log(Level.INFO, "Completed remote recovery successfully!");
-                break;
-            } catch (Exception e) {
-                e.printStackTrace();
-                LOGGER.log(Level.WARNING, "Failed during remote recovery. Attempting again...");
-                maxRecoveryAttempts--;
-            }
-        }
-    }
-
     private Map<String, Set<String>> constructRemoteRecoveryPlan() {
         //1. identify which replicas reside in this node
         String localNodeId = runtimeContext.getTransactionSubsystem().getId();
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/RemoteLogReader.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/RemoteLogReader.java
deleted file mode 100644
index 4dc1700..0000000
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/RemoteLogReader.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.transaction.management.service.logging;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-
-import org.apache.asterix.common.exceptions.ACIDException;
-import org.apache.asterix.common.transactions.ILogReader;
-import org.apache.asterix.common.transactions.ILogRecord;
-import org.apache.asterix.common.transactions.ILogRecord.RecordReadStatus;
-import org.apache.asterix.common.transactions.LogRecord;
-
-public class RemoteLogReader implements ILogReader {
-
-    private final FileChannel fileChannel;
-    private final ILogRecord logRecord;
-    private final ByteBuffer readBuffer;
-    private long readLSN;
-    private final int logPageSize;
-
-    public RemoteLogReader(FileChannel fileChannel, long logFileSize, int logPageSize) {
-        this.fileChannel = fileChannel;
-        this.logPageSize = logPageSize;
-        logRecord = new LogRecord();
-        readBuffer = ByteBuffer.allocate(logPageSize);
-    }
-
-    @Override
-    public void initializeScan(long beginLSN) throws ACIDException {
-        readLSN = beginLSN;
-        fillLogReadBuffer();
-    }
-
-    private boolean fillLogReadBuffer() throws ACIDException {
-        return fillLogReadBuffer(logPageSize, readBuffer);
-    }
-
-    private boolean fillLogReadBuffer(int pageSize, ByteBuffer readBuffer) throws ACIDException {
-        int size = 0;
-        int read = 0;
-        readBuffer.position(0);
-        readBuffer.limit(logPageSize);
-        try {
-            fileChannel.position(readLSN);
-            //We loop here because read() may return 0, but this simply means we are waiting on IO.
-            //Therefore we want to break out only when either the buffer is full, or we reach EOF.
-            while (size < pageSize && read != -1) {
-                read = fileChannel.read(readBuffer);
-                if (read > 0) {
-                    size += read;
-                }
-            }
-        } catch (IOException e) {
-            throw new ACIDException(e);
-        }
-        readBuffer.position(0);
-        readBuffer.limit(size);
-        if (size == 0 && read == -1) {
-            return false; //EOF
-        }
-        return true;
-    }
-
-    @Override
-    public ILogRecord read(long LSN) throws ACIDException {
-        throw new UnsupportedOperationException("Random read is not supported.");
-    }
-
-    @Override
-    public ILogRecord next() throws ACIDException {
-        if (readBuffer.position() == readBuffer.limit()) {
-            boolean hasRemaining = fillLogReadBuffer();
-            if (!hasRemaining) {
-                return null;
-            }
-        }
-        ByteBuffer readBuffer = this.readBuffer;
-        boolean refilled = false;
-
-        while (true) {
-            RecordReadStatus status = logRecord.readRemoteLog(readBuffer, true);
-            switch (status) {
-                case TRUNCATED: {
-                    if (!refilled) {
-                        //we may have just read off the end of the buffer, so try refiling it
-                        if (!fillLogReadBuffer()) {
-                            return null;
-                        }
-                        refilled = true;
-                        //now see what we have in the refilled buffer
-                        continue;
-                    }
-                    return null;
-                }
-                case LARGE_RECORD: {
-                    readBuffer = ByteBuffer.allocate(logRecord.getLogSize());
-                    fillLogReadBuffer(logRecord.getLogSize(), readBuffer);
-                    //now see what we have in the expanded buffer
-                    continue;
-                }
-                case BAD_CHKSUM: {
-                    return null;
-                }
-                case OK:
-                    break;
-            }
-            break;
-        }
-
-        readLSN += logRecord.getSerializedLogSize();
-        return logRecord;
-    }
-
-    @Override
-    public void close() throws ACIDException {
-        try {
-            if (fileChannel != null) {
-                if (fileChannel.isOpen()) {
-                    fileChannel.close();
-                }
-            }
-        } catch (IOException e) {
-            throw new ACIDException(e);
-        }
-    }
-
-}