[ASTERIXDB-2198][REPL] Introduce Dynamic Replica Placement

- user model changes: no
- storage format changes: no
- interface changes: yes
  - Add IReplicationMessage and IReplicaTask.
  - Add notifyMetadataNodeChange to IFaultToleranceStrategy.
  - Add register to IReplicationManager to allow registering
    replicas at runtime.

Details:
- Add cluster APIs for:
  - changing partition master node.
  - changing metadata node.
- Add NC storage management API for promoting a partition replica
  to master replica.
- Implement changing metadata node at runtime in
  MetadataNodeFaultToleranceStrategy.
- Allow MetadataNodeFaultToleranceStrategy to have zero replica
  at initialization.
- Add a flag to LangExecutionUtil to skip storage distribution
  check at the end of each test.
- Add test case for metadata node failover as follows:
  1- start with nc1 as metadata node.
  2- add replica for metadata partition on nc2 at runtime.
  3- performs metadata transactions on nc1.
  4- promote metadata partition on nc2.
  5- failover metadata node to nc2.
  6- ensure the effects of the metadata transactions on (2) exists.
  7- performs more metadata transactions on nc2.
  8- ensure the effects of the metadata transactions on (7) exists.

Change-Id: I11f82efcad29d2c37324fe9d3c11d872b0348f49
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2215
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java
index 6826b26..18e837a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java
@@ -30,6 +30,7 @@
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.hyracks.api.config.IOption;
 import org.apache.hyracks.api.config.Section;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.control.common.config.ConfigUtils;
 import org.apache.hyracks.control.common.controllers.ControllerConfig;
 import org.apache.hyracks.http.api.IServletRequest;
@@ -90,6 +91,21 @@
         responseWriter.flush();
     }
 
+    @Override
+    protected void post(IServletRequest request, IServletResponse response) throws Exception {
+        switch (localPath(request)) {
+            case "/partition/master":
+                processPartitionMaster(request, response);
+                break;
+            case "/metadataNode":
+                processMetadataNode(request, response);
+                break;
+            default:
+                sendError(response, HttpResponseStatus.NOT_FOUND);
+                break;
+        }
+    }
+
     protected ObjectNode getClusterStateSummaryJSON() {
         return appCtx.getClusterStateManager().getClusterStateSummary();
     }
@@ -143,4 +159,16 @@
                 && option != ControllerConfig.Option.CONFIG_FILE_URL;
     }
 
+    private void processPartitionMaster(IServletRequest request, IServletResponse response) {
+        final String partition = request.getParameter("partition");
+        final String node = request.getParameter("node");
+        appCtx.getClusterStateManager().updateClusterPartition(Integer.valueOf(partition), node, true);
+        response.setStatus(HttpResponseStatus.OK);
+    }
+
+    private void processMetadataNode(IServletRequest request, IServletResponse response) throws HyracksDataException {
+        final String node = request.getParameter("node");
+        appCtx.getFaultToleranceStrategy().notifyMetadataNodeChange(node);
+        response.setStatus(HttpResponseStatus.OK);
+    }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
index 8e73405..c2cda4e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
@@ -33,6 +33,7 @@
 import org.apache.asterix.common.replication.IPartitionReplica;
 import org.apache.asterix.common.storage.IReplicaManager;
 import org.apache.asterix.common.storage.ReplicaIdentifier;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.http.api.IServletRequest;
 import org.apache.hyracks.http.api.IServletResponse;
 import org.apache.hyracks.http.server.AbstractServlet;
@@ -91,6 +92,9 @@
             case "/removeReplica":
                 processRemoveReplica(request, response);
                 break;
+            case "/promote":
+                processPromote(request, response);
+                break;
             default:
                 sendError(response, HttpResponseStatus.NOT_FOUND);
                 break;
@@ -160,4 +164,14 @@
         final InetSocketAddress replicaAddress = new InetSocketAddress(host, Integer.valueOf(port));
         return ReplicaIdentifier.of(Integer.valueOf(partition), replicaAddress);
     }
+
+    private void processPromote(IServletRequest request, IServletResponse response) throws HyracksDataException {
+        final String partition = request.getParameter("partition");
+        if (partition == null) {
+            response.setStatus(HttpResponseStatus.BAD_REQUEST);
+            return;
+        }
+        appCtx.getReplicaManager().promote(Integer.valueOf(partition));
+        response.setStatus(HttpResponseStatus.OK);
+    }
 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 75159af..81b232a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -213,7 +213,7 @@
         final ClusterPartition[] nodePartitions = metadataProperties.getNodePartitions().get(nodeId);
         final Set<Integer> nodePartitionsIds = Arrays.stream(nodePartitions).map(ClusterPartition::getPartitionId)
                 .collect(Collectors.toSet());
-        replicaManager = new ReplicaManager(nodePartitionsIds);
+        replicaManager = new ReplicaManager(this, nodePartitionsIds);
         isShuttingdown = false;
         activeManager = new ActiveManager(threadExecutor, getServiceContext().getNodeId(),
                 activeProperties.getMemoryComponentGlobalBudget(), compilerProperties.getFrameSize(),
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index f0ed5e9..22fa459 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -375,8 +375,8 @@
                                 } else {
                                     maxDiskLastLsn = resourceId2MaxLSNMap.get(resourceId);
                                 }
-
-                                if (lsn > maxDiskLastLsn) {
+                                // lsn @ maxDiskLastLsn is either a flush log or a master replica log
+                                if (lsn >= maxDiskLastLsn) {
                                     redo(logRecord, datasetLifecycleManager);
                                     redoCount++;
                                 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
index 0c84a6e..bf17a5b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
@@ -25,14 +25,19 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
+import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.replication.IPartitionReplica;
+import org.apache.asterix.common.replication.IRemoteRecoveryManager;
 import org.apache.asterix.common.storage.IReplicaManager;
 import org.apache.asterix.common.storage.ReplicaIdentifier;
 import org.apache.asterix.replication.storage.PartitionReplica;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class ReplicaManager implements IReplicaManager {
 
+    private final INcApplicationContext appCtx;
     /**
      * the partitions to which the current node is master
      */
@@ -42,7 +47,8 @@
      */
     private final Map<ReplicaIdentifier, PartitionReplica> replicas = new HashMap<>();
 
-    public ReplicaManager(Set<Integer> partitions) {
+    public ReplicaManager(INcApplicationContext appCtx, Set<Integer> partitions) {
+        this.appCtx = appCtx;
         this.partitions.addAll(partitions);
     }
 
@@ -52,7 +58,7 @@
             throw new IllegalStateException(
                     "This node is not the current master of partition(" + id.getPartition() + ")");
         }
-        replicas.computeIfAbsent(id, key -> new PartitionReplica(key));
+        replicas.computeIfAbsent(id, k -> new PartitionReplica(k, appCtx));
         replicas.get(id).sync();
     }
 
@@ -74,4 +80,11 @@
     public Set<Integer> getPartitions() {
         return Collections.unmodifiableSet(partitions);
     }
+
+    @Override
+    public synchronized void promote(int partition) throws HyracksDataException {
+        final IRemoteRecoveryManager remoteRecoveryManager = appCtx.getRemoteRecoveryManager();
+        remoteRecoveryManager.replayReplicaPartitionLogs(Stream.of(partition).collect(Collectors.toSet()), true);
+        partitions.add(partition);
+    }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java
index 23f225e..e4a6f4b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java
@@ -45,8 +45,8 @@
 import org.apache.asterix.app.replication.message.PreparePartitionsFailbackResponseMessage;
 import org.apache.asterix.app.replication.message.RegistrationTasksRequestMessage;
 import org.apache.asterix.app.replication.message.RegistrationTasksResponseMessage;
-import org.apache.asterix.app.replication.message.TakeoverMetadataNodeRequestMessage;
-import org.apache.asterix.app.replication.message.TakeoverMetadataNodeResponseMessage;
+import org.apache.asterix.app.replication.message.MetadataNodeRequestMessage;
+import org.apache.asterix.app.replication.message.MetadataNodeResponseMessage;
 import org.apache.asterix.app.replication.message.TakeoverPartitionsRequestMessage;
 import org.apache.asterix.app.replication.message.TakeoverPartitionsResponseMessage;
 import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
@@ -337,7 +337,7 @@
         validateClusterState();
     }
 
-    public synchronized void process(TakeoverMetadataNodeResponseMessage response) throws HyracksDataException {
+    public synchronized void process(MetadataNodeResponseMessage response) throws HyracksDataException {
         currentMetadataNode = response.getNodeId();
         metadataNodeActive = true;
         clusterManager.updateMetadataNode(currentMetadataNode, metadataNodeActive);
@@ -403,7 +403,7 @@
         ICcApplicationContext appCtx = (ICcApplicationContext) serviceCtx.getApplicationContext();
         ClusterPartition metadataPartiton = appCtx.getMetadataProperties().getMetadataPartition();
         //request the metadataPartition node to register itself as the metadata node
-        TakeoverMetadataNodeRequestMessage takeoverRequest = new TakeoverMetadataNodeRequestMessage();
+        MetadataNodeRequestMessage takeoverRequest = new MetadataNodeRequestMessage(true);
         try {
             messageBroker.sendApplicationMessageToNC(takeoverRequest, metadataPartiton.getActiveNodeId());
             // Since the metadata node will be changed, we need to rebind the proxy object
@@ -440,8 +440,8 @@
             case TAKEOVER_PARTITION_RESPONSE:
                 process((TakeoverPartitionsResponseMessage) message);
                 break;
-            case TAKEOVER_METADATA_NODE_RESPONSE:
-                process((TakeoverMetadataNodeResponseMessage) message);
+            case METADATA_NODE_RESPONSE:
+                process((MetadataNodeResponseMessage) message);
                 break;
             case PREPARE_FAILBACK_RESPONSE:
                 process((PreparePartitionsFailbackResponseMessage) message);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java
index 3341813..3470d7b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java
@@ -38,6 +38,8 @@
 import org.apache.asterix.app.nc.task.ReportLocalCountersTask;
 import org.apache.asterix.app.nc.task.StartLifecycleComponentsTask;
 import org.apache.asterix.app.nc.task.StartReplicationServiceTask;
+import org.apache.asterix.app.replication.message.MetadataNodeRequestMessage;
+import org.apache.asterix.app.replication.message.MetadataNodeResponseMessage;
 import org.apache.asterix.app.replication.message.NCLifecycleTaskReportMessage;
 import org.apache.asterix.app.replication.message.RegistrationTasksRequestMessage;
 import org.apache.asterix.app.replication.message.ReplayPartitionLogsRequestMessage;
@@ -132,6 +134,9 @@
             case REPLAY_LOGS_RESPONSE:
                 process((ReplayPartitionLogsResponseMessage) message);
                 break;
+            case METADATA_NODE_RESPONSE:
+                process((MetadataNodeResponseMessage) message);
+                break;
             default:
                 throw new RuntimeDataException(ErrorCode.UNSUPPORTED_MESSAGE_TYPE, message.getType().name());
         }
@@ -143,6 +148,26 @@
         this.metadataNodeId = clusterManager.getCurrentMetadataNodeId();
     }
 
+    @Override
+    public void notifyMetadataNodeChange(String node) throws HyracksDataException {
+        if (metadataNodeId.equals(node)) {
+            return;
+        }
+        // if current metadata node is active, we need to unbind its metadata proxy object
+        if (clusterManager.isMetadataNodeActive()) {
+            MetadataNodeRequestMessage msg = new MetadataNodeRequestMessage(false);
+            try {
+                messageBroker.sendApplicationMessageToNC(msg, metadataNodeId);
+                // when the current node responses, we will bind to the new one
+                metadataNodeId = node;
+            } catch (Exception e) {
+                throw HyracksDataException.create(e);
+            }
+        } else {
+            requestMetadataNodeTakeover(node);
+        }
+    }
+
     private synchronized void process(ReplayPartitionLogsResponseMessage msg) {
         hotStandbyMetadataReplica.add(msg.getNodeId());
         if (LOGGER.isLoggable(Level.INFO)) {
@@ -256,4 +281,20 @@
         recoveryPlan.put(hotStandbyMetadataReplica.iterator().next(), metadataPartition);
         return new RemoteRecoveryTask(recoveryPlan);
     }
+
+    private void process(MetadataNodeResponseMessage response) throws HyracksDataException {
+        clusterManager.updateMetadataNode(response.getNodeId(), response.isExported());
+        if (!response.isExported()) {
+            requestMetadataNodeTakeover(metadataNodeId);
+        }
+    }
+
+    private void requestMetadataNodeTakeover(String node) throws HyracksDataException {
+        MetadataNodeRequestMessage msg = new MetadataNodeRequestMessage(true);
+        try {
+            messageBroker.sendApplicationMessageToNC(msg, node);
+        } catch (Exception e) {
+            throw HyracksDataException.create(e);
+        }
+    }
 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverMetadataNodeRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/MetadataNodeRequestMessage.java
similarity index 73%
rename from asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverMetadataNodeRequestMessage.java
rename to asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/MetadataNodeRequestMessage.java
index 2137924..bebd133 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverMetadataNodeRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/MetadataNodeRequestMessage.java
@@ -27,24 +27,33 @@
 import org.apache.asterix.common.replication.INCLifecycleMessage;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
-public class TakeoverMetadataNodeRequestMessage implements INCLifecycleMessage, INcAddressedMessage {
+public class MetadataNodeRequestMessage implements INCLifecycleMessage, INcAddressedMessage {
 
     private static final long serialVersionUID = 1L;
-    private static final Logger LOGGER = Logger.getLogger(TakeoverMetadataNodeRequestMessage.class.getName());
+    private static final Logger LOGGER = Logger.getLogger(MetadataNodeRequestMessage.class.getName());
+    private final boolean export;
+
+    public MetadataNodeRequestMessage(boolean export) {
+        this.export = export;
+    }
 
     @Override
     public void handle(INcApplicationContext appContext) throws HyracksDataException, InterruptedException {
         INCMessageBroker broker = (INCMessageBroker) appContext.getServiceContext().getMessageBroker();
         HyracksDataException hde = null;
         try {
-            appContext.initializeMetadata(false);
-            appContext.exportMetadataNodeStub();
+            if (export) {
+                appContext.initializeMetadata(false);
+                appContext.exportMetadataNodeStub();
+            } else {
+                appContext.unexportMetadataNodeStub();
+            }
         } catch (Exception e) {
             LOGGER.log(Level.SEVERE, "Failed taking over metadata", e);
             hde = HyracksDataException.create(e);
         } finally {
-            TakeoverMetadataNodeResponseMessage reponse =
-                    new TakeoverMetadataNodeResponseMessage(appContext.getTransactionSubsystem().getId());
+            MetadataNodeResponseMessage reponse =
+                    new MetadataNodeResponseMessage(appContext.getTransactionSubsystem().getId(), export);
             try {
                 broker.sendMessageToCC(reponse);
             } catch (Exception e) {
@@ -59,11 +68,11 @@
 
     @Override
     public String toString() {
-        return TakeoverMetadataNodeRequestMessage.class.getSimpleName();
+        return MetadataNodeRequestMessage.class.getSimpleName();
     }
 
     @Override
     public MessageType getType() {
-        return MessageType.TAKEOVER_METADATA_NODE_REQUEST;
+        return MessageType.METADATA_NODE_REQUEST;
     }
 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverMetadataNodeResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/MetadataNodeResponseMessage.java
similarity index 80%
rename from asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverMetadataNodeResponseMessage.java
rename to asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/MetadataNodeResponseMessage.java
index ff1b2d2..ebde9b9 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverMetadataNodeResponseMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/MetadataNodeResponseMessage.java
@@ -24,13 +24,15 @@
 import org.apache.asterix.runtime.utils.CcApplicationContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
-public class TakeoverMetadataNodeResponseMessage implements INCLifecycleMessage, ICcAddressedMessage {
+public class MetadataNodeResponseMessage implements INCLifecycleMessage, ICcAddressedMessage {
 
     private static final long serialVersionUID = 1L;
     private final String nodeId;
+    private final boolean exported;
 
-    public TakeoverMetadataNodeResponseMessage(String nodeId) {
+    public MetadataNodeResponseMessage(String nodeId, boolean exported) {
         this.nodeId = nodeId;
+        this.exported = exported;
     }
 
     public String getNodeId() {
@@ -44,11 +46,15 @@
 
     @Override
     public String toString() {
-        return TakeoverMetadataNodeResponseMessage.class.getSimpleName();
+        return MetadataNodeResponseMessage.class.getSimpleName();
     }
 
     @Override
     public MessageType getType() {
-        return MessageType.TAKEOVER_METADATA_NODE_RESPONSE;
+        return MessageType.METADATA_NODE_RESPONSE;
+    }
+
+    public boolean isExported() {
+        return exported;
     }
 }
diff --git a/asterixdb/asterix-app/src/main/resources/cluster.xml b/asterixdb/asterix-app/src/main/resources/cluster.xml
index 7b7d52a..41be696 100644
--- a/asterixdb/asterix-app/src/main/resources/cluster.xml
+++ b/asterixdb/asterix-app/src/main/resources/cluster.xml
@@ -30,9 +30,6 @@
     </data_replication>
     <fault_tolerance>
        <strategy>metadata_node</strategy>
-       <replica>
-         <node_id>nc2</node_id>
-       </replica>
     </fault_tolerance>
   </high_availability>
 
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
index 9d73407..09761a0 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
@@ -32,7 +32,6 @@
 import java.util.List;
 
 import org.apache.asterix.app.external.ExternalUDFLibrarian;
-import org.apache.asterix.common.config.ClusterProperties;
 import org.apache.asterix.common.library.ILibraryManager;
 import org.apache.asterix.common.utils.StorageConstants;
 import org.apache.asterix.test.common.TestExecutor;
@@ -40,8 +39,8 @@
 import org.apache.commons.lang.SystemUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hyracks.api.io.IODeviceHandle;
-import org.apache.hyracks.util.ThreadDumpUtil;
 import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.util.ThreadDumpUtil;
 
 /**
  * Utils for running SQL++ or AQL runtime tests.
@@ -59,6 +58,7 @@
 
     private static ExternalUDFLibrarian librarian;
     private static final int repeat = Integer.getInteger("test.repeat", 1);
+    private static boolean checkStorageDistribution = true;
 
     public static void setUp(String configFile, TestExecutor executor) throws Exception {
         testExecutor = executor;
@@ -126,7 +126,9 @@
                 testExecutor.executeTest(PATH_ACTUAL, tcCtx, null, false, ExecutionTestUtil.FailedGroup);
 
                 try {
-                    checkStorageFiles();
+                    if (checkStorageDistribution) {
+                        checkStorageFiles();
+                    }
                 } finally {
                     testExecutor.cleanup(tcCtx.toString(), badTestCases);
                 }
@@ -223,6 +225,10 @@
         }
     }
 
+    public static void setCheckStorageDistribution(boolean checkStorageDistribution) {
+        LangExecutionUtil.checkStorageDistribution = checkStorageDistribution;
+    }
+
     private static void outputLeakedOpenFiles(String processId) throws IOException {
         Process process =
                 Runtime.getRuntime().exec(new String[] { "bash", "-c", "lsof -p " + processId + "|grep waf" });
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ReplicationExecutionTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ReplicationExecutionTest.java
index 56c7bc0..32b756b 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ReplicationExecutionTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ReplicationExecutionTest.java
@@ -25,10 +25,12 @@
 import java.util.Map;
 
 import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.config.ClusterProperties;
 import org.apache.asterix.test.common.TestExecutor;
 import org.apache.asterix.testframework.context.TestCaseContext;
 import org.apache.hyracks.control.nc.NodeControllerService;
-import org.junit.AfterClass;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -39,28 +41,38 @@
 public class ReplicationExecutionTest {
     protected static final String TEST_CONFIG_FILE_NAME = "asterix-build-configuration.xml";
     private static final TestExecutor testExecutor = new TestExecutor();
+    private static boolean configured = false;
 
     @BeforeClass
-    public static void setUp() throws Exception {
-        LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, testExecutor);
-        final NodeControllerService[] ncs = ExecutionTestUtil.integrationUtil.ncs;
-        Map<String, InetSocketAddress> ncEndPoints = new HashMap<>();
-        Map<String, InetSocketAddress> replicationAddress = new HashMap<>();
-        final String ip = InetAddress.getLoopbackAddress().getHostAddress();
-        for (NodeControllerService nc : ncs) {
-            final String nodeId = nc.getId();
-            final INcApplicationContext appCtx = (INcApplicationContext) nc.getApplicationContext();
-            int apiPort = appCtx.getExternalProperties().getNcApiPort();
-            int replicationPort = appCtx.getReplicationProperties().getDataReplicationPort(nodeId);
-            ncEndPoints.put(nodeId, InetSocketAddress.createUnresolved(ip, apiPort));
-            replicationAddress.put(nodeId, InetSocketAddress.createUnresolved(ip, replicationPort));
-        }
-        testExecutor.setNcEndPoints(ncEndPoints);
-        testExecutor.setNcReplicationAddress(replicationAddress);
+    public static void setUp() {
+        ClusterProperties.INSTANCE.getCluster().getHighAvailability().setEnabled(String.valueOf(true));
+        LangExecutionUtil.setCheckStorageDistribution(false);
     }
 
-    @AfterClass
-    public static void tearDown() throws Exception {
+    @Before
+    public void before() throws Exception {
+        LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, testExecutor);
+        if (!configured) {
+            final NodeControllerService[] ncs = ExecutionTestUtil.integrationUtil.ncs;
+            Map<String, InetSocketAddress> ncEndPoints = new HashMap<>();
+            Map<String, InetSocketAddress> replicationAddress = new HashMap<>();
+            final String ip = InetAddress.getLoopbackAddress().getHostAddress();
+            for (NodeControllerService nc : ncs) {
+                final String nodeId = nc.getId();
+                final INcApplicationContext appCtx = (INcApplicationContext) nc.getApplicationContext();
+                int apiPort = appCtx.getExternalProperties().getNcApiPort();
+                int replicationPort = appCtx.getReplicationProperties().getDataReplicationPort(nodeId);
+                ncEndPoints.put(nodeId, InetSocketAddress.createUnresolved(ip, apiPort));
+                replicationAddress.put(nodeId, InetSocketAddress.createUnresolved(ip, replicationPort));
+            }
+            testExecutor.setNcEndPoints(ncEndPoints);
+            testExecutor.setNcReplicationAddress(replicationAddress);
+            configured = true;
+        }
+    }
+
+    @After
+    public void after() throws Exception {
         LangExecutionUtil.tearDown();
     }
 
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.pollget.http
similarity index 97%
rename from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
rename to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.pollget.http
index d287fad..6867a5d 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.pollget.http
@@ -16,4 +16,6 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-nc:asterix_nc1 /admin/storage/partition/0
+//polltimeoutsecs=30
+
+nc:asterix_nc1 /admin/storage/partition/0
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.1.sto.cmd
similarity index 94%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.1.sto.cmd
index d287fad..7ddaa20 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.1.sto.cmd
@@ -16,4 +16,4 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-nc:asterix_nc1 /admin/storage/partition/0
+nc:asterix_nc1 /addReplica 0 asterix_nc2
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.10.ddl.sqlpp
similarity index 94%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.10.ddl.sqlpp
index d287fad..f96d5a8 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.10.ddl.sqlpp
@@ -16,4 +16,4 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-nc:asterix_nc1 /admin/storage/partition/0
+CREATE DATASET ds_2(MyType) PRIMARY KEY id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.11.query.sqlpp
similarity index 91%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.11.query.sqlpp
index d287fad..555954d 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.11.query.sqlpp
@@ -16,4 +16,6 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-nc:asterix_nc1 /admin/storage/partition/0
+SELECT value count(*)
+FROM Metadata.`Dataset`
+WHERE DatasetName = 'ds_2';
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.2.pollget.http
similarity index 96%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.2.pollget.http
index d287fad..6867a5d 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.2.pollget.http
@@ -16,4 +16,6 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-nc:asterix_nc1 /admin/storage/partition/0
+//polltimeoutsecs=30
+
+nc:asterix_nc1 /admin/storage/partition/0
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.3.ddl.sqlpp
similarity index 90%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.3.ddl.sqlpp
index d287fad..15bc3c5 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.3.ddl.sqlpp
@@ -16,4 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-nc:asterix_nc1 /admin/storage/partition/0
+CREATE TYPE MyType AS {
+  id : int
+};
+
+CREATE DATASET ds_1(MyType) PRIMARY KEY id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.4.sto.cmd
similarity index 94%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.4.sto.cmd
index d287fad..a5753f0 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.4.sto.cmd
@@ -16,4 +16,4 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-nc:asterix_nc1 /admin/storage/partition/0
+nc:asterix_nc2 /promote 0 asterix_nc2
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.5.get.http
similarity index 94%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.5.get.http
index d287fad..4a53aed 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.5.get.http
@@ -16,4 +16,4 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-nc:asterix_nc1 /admin/storage/partition/0
+nc:asterix_nc2 /admin/storage
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.6.post.http
similarity index 92%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.6.post.http
index d287fad..2e8fc63 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.6.post.http
@@ -16,4 +16,4 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-nc:asterix_nc1 /admin/storage/partition/0
+/admin/cluster/partition/master?partition=0&node=asterix_nc2
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.7.post.http
similarity index 94%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.7.post.http
index d287fad..e8dca0b 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.7.post.http
@@ -16,4 +16,4 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-nc:asterix_nc1 /admin/storage/partition/0
+/admin/cluster/metadataNode?node=asterix_nc2
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.8.pollget.http
similarity index 94%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.8.pollget.http
index d287fad..32e2f78 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.8.pollget.http
@@ -16,4 +16,6 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-nc:asterix_nc1 /admin/storage/partition/0
+//polltimeoutsecs=30
+
+/admin/cluster/summary
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.9.query.sqlpp
similarity index 91%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.9.query.sqlpp
index d287fad..a612cbb 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.9.query.sqlpp
@@ -16,4 +16,6 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-nc:asterix_nc1 /admin/storage/partition/0
+SELECT value count(*)
+FROM Metadata.`Dataset`
+WHERE DatasetName = 'ds_1';
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/replication.xml b/asterixdb/asterix-app/src/test/resources/runtimets/replication.xml
index a635676..c3dfb3c 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/replication.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/replication.xml
@@ -24,5 +24,10 @@
         <output-dir compare="Text">add_replica</output-dir>
       </compilation-unit>
     </test-case>
+    <test-case FilePath="replication">
+      <compilation-unit name="metadata_failover">
+        <output-dir compare="Text">metadata_failover</output-dir>
+      </compilation-unit>
+    </test-case>
   </test-group>
 </test-suite>
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/add_replica/add_replica.2.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/add_replica/add_replica.2.adm
index 3553d9c..6836f71 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/add_replica/add_replica.2.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/add_replica/add_replica.2.adm
@@ -2,6 +2,6 @@
   "partition" : 0,
   "replicas" : [ {
     "location" : "127.0.0.1:2017",
-    "status" : "DISCONNECTED"
+    "status" : "IN_SYNC"
   } ]
 } ]
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/metadata_failover/metadata_failover.11.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/metadata_failover/metadata_failover.11.adm
new file mode 100644
index 0000000..56a6051
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/metadata_failover/metadata_failover.11.adm
@@ -0,0 +1 @@
+1
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/metadata_failover/metadata_failover.2.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/metadata_failover/metadata_failover.2.adm
new file mode 100644
index 0000000..6836f71
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/metadata_failover/metadata_failover.2.adm
@@ -0,0 +1,7 @@
+[ {
+  "partition" : 0,
+  "replicas" : [ {
+    "location" : "127.0.0.1:2017",
+    "status" : "IN_SYNC"
+  } ]
+} ]
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/metadata_failover/metadata_failover.5.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/metadata_failover/metadata_failover.5.adm
new file mode 100644
index 0000000..3d3204d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/metadata_failover/metadata_failover.5.adm
@@ -0,0 +1,10 @@
+[ {
+  "partition" : 0,
+  "replicas" : [ ]
+}, {
+  "partition" : 2,
+  "replicas" : [ ]
+}, {
+  "partition" : 3,
+  "replicas" : [ ]
+} ]
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/metadata_failover/metadata_failover.6.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/metadata_failover/metadata_failover.6.adm
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/metadata_failover/metadata_failover.6.adm
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/metadata_failover/metadata_failover.7.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/metadata_failover/metadata_failover.7.adm
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/metadata_failover/metadata_failover.7.adm
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/metadata_failover/metadata_failover.8.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/metadata_failover/metadata_failover.8.adm
new file mode 100644
index 0000000..fa5cfb4
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/metadata_failover/metadata_failover.8.adm
@@ -0,0 +1,38 @@
+{
+  "metadata_node" : "asterix_nc2",
+  "partitions" : {
+    "0" : {
+      "active" : true,
+      "activeNodeId" : "asterix_nc2",
+      "iodeviceNum" : 0,
+      "nodeId" : "asterix_nc1",
+      "partitionId" : 0,
+      "pendingActivation" : false
+    },
+    "1" : {
+      "active" : true,
+      "activeNodeId" : "asterix_nc1",
+      "iodeviceNum" : 1,
+      "nodeId" : "asterix_nc1",
+      "partitionId" : 1,
+      "pendingActivation" : false
+    },
+    "2" : {
+      "active" : true,
+      "activeNodeId" : "asterix_nc2",
+      "iodeviceNum" : 0,
+      "nodeId" : "asterix_nc2",
+      "partitionId" : 2,
+      "pendingActivation" : false
+    },
+    "3" : {
+      "active" : true,
+      "activeNodeId" : "asterix_nc2",
+      "iodeviceNum" : 1,
+      "nodeId" : "asterix_nc2",
+      "partitionId" : 3,
+      "pendingActivation" : false
+    }
+  },
+  "state" : "ACTIVE"
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/metadata_failover/metadata_failover.9.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/metadata_failover/metadata_failover.9.adm
new file mode 100644
index 0000000..56a6051
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/metadata_failover/metadata_failover.9.adm
@@ -0,0 +1 @@
+1
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IFaultToleranceStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IFaultToleranceStrategy.java
index 5c286cc..e871374 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IFaultToleranceStrategy.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IFaultToleranceStrategy.java
@@ -64,4 +64,12 @@
      */
     IFaultToleranceStrategy from(ICCServiceContext serviceCtx, IReplicationStrategy replicationStrategy);
 
+    /**
+     * Performs the required steps to change the metadata node to {@code node}
+     *
+     * @param node
+     */
+    default void notifyMetadataNodeChange(String node) throws HyracksDataException {
+        throw new UnsupportedOperationException(getClass() + " does not support metadata node change");
+    }
 }
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/INCLifecycleMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/INCLifecycleMessage.java
index cb9fa8f..372a88a 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/INCLifecycleMessage.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/INCLifecycleMessage.java
@@ -34,8 +34,8 @@
         REGISTRATION_TASKS_RESULT,
         TAKEOVER_PARTITION_REQUEST,
         TAKEOVER_PARTITION_RESPONSE,
-        TAKEOVER_METADATA_NODE_REQUEST,
-        TAKEOVER_METADATA_NODE_RESPONSE
+        METADATA_NODE_REQUEST,
+        METADATA_NODE_RESPONSE
     }
 
     /**
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java
index 1c3f030..72a7f9d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.common.replication;
 
+import java.util.List;
 import java.util.Set;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -30,4 +31,6 @@
      * @throws HyracksDataException
      */
     long getPartitionsMinLSN(Set<Integer> partitions) throws HyracksDataException;
+
+    List<String> getPartitionIndexesFiles(int partition, boolean relativePath) throws HyracksDataException;
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
index b969bef..04c4437 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
@@ -127,4 +127,11 @@
      * @param buffer
      */
     public void replicateTxnLogBatch(ByteBuffer buffer);
+
+    /**
+     * Registers {@code replica}. After registration, the replica will be included in all replication events
+     *
+     * @param replica
+     */
+    void register(IPartitionReplica replica);
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationThread.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationThread.java
index a88b82a..5b9d4fa 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationThread.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationThread.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.common.replication;
 
+import java.nio.ByteBuffer;
 import java.nio.channels.SocketChannel;
 
 import org.apache.asterix.common.transactions.LogRecord;
@@ -27,13 +28,19 @@
     /**
      * Sends a notification to this thread that logRecord has been flushed.
      *
-     * @param logRecord
-     *            The log that has been flushed.
+     * @param logRecord The log that has been flushed.
      */
-    public void notifyLogReplicationRequester(LogRecord logRecord);
+    void notifyLogReplicationRequester(LogRecord logRecord);
 
     /**
-     * @return The replication client socket channel.
+     * @return The replication socket channel.
      */
-    public SocketChannel getReplicationClientSocket();
+    SocketChannel getChannel();
+
+    /**
+     * Gets a reusable buffer that can be used to send data
+     *
+     * @return the reusable buffer
+     */
+    ByteBuffer getReusableBuffer();
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java
index bd4b32f..f0bba41 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java
@@ -69,21 +69,16 @@
         if (metadataNode == null) {
             throw new IllegalStateException("Invalid metadata node specified");
         }
-
-        if (cluster.getHighAvailability().getFaultTolerance().getReplica() == null
-                || cluster.getHighAvailability().getFaultTolerance().getReplica().getNodeId() == null
-                || cluster.getHighAvailability().getFaultTolerance().getReplica().getNodeId().isEmpty()) {
-            throw new RuntimeDataException(ErrorCode.INVALID_CONFIGURATION,
-                    "One or more replicas must be specified for metadata node.");
-        }
-
         final Set<Replica> replicas = new HashSet<>();
-        for (String nodeId : cluster.getHighAvailability().getFaultTolerance().getReplica().getNodeId()) {
-            Node node = ClusterProperties.INSTANCE.getNodeById(nodeId);
-            if (node == null) {
-                throw new RuntimeDataException(ErrorCode.INVALID_CONFIGURATION, "Invalid replica specified: " + nodeId);
+        if (cluster.getHighAvailability().getFaultTolerance().getReplica() != null) {
+            for (String nodeId : cluster.getHighAvailability().getFaultTolerance().getReplica().getNodeId()) {
+                Node node = ClusterProperties.INSTANCE.getNodeById(nodeId);
+                if (node == null) {
+                    throw new RuntimeDataException(ErrorCode.INVALID_CONFIGURATION,
+                            "Invalid replica specified: " + nodeId);
+                }
+                replicas.add(new Replica(node));
             }
-            replicas.add(new Replica(node));
         }
         MetadataOnlyReplicationStrategy st = new MetadataOnlyReplicationStrategy();
         st.metadataNodeId = cluster.getMetadataNode();
@@ -91,4 +86,9 @@
         st.metadataNodeReplicas = replicas;
         return st;
     }
+
+    @Override
+    public boolean isParticipant(String nodeId) {
+        return true;
+    }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
index a3b2b50..19eee8f 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
@@ -22,6 +22,7 @@
 import java.util.Set;
 
 import org.apache.asterix.common.replication.IPartitionReplica;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public interface IReplicaManager {
 
@@ -54,4 +55,11 @@
      * @return The list of partition
      */
     Set<Integer> getPartitions();
+
+    /**
+     * Promotes a partition by making this node its master replica
+     *
+     * @param partition
+     */
+    void promote(int partition) throws HyracksDataException;
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/PartitionReplica.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/PartitionReplica.java
deleted file mode 100644
index 18733ce..0000000
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/PartitionReplica.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.storage;
-
-import static org.apache.asterix.common.storage.PartitionReplica.PartitionReplicaStatus.CATCHING_UP;
-import static org.apache.asterix.common.storage.PartitionReplica.PartitionReplicaStatus.DISCONNECTED;
-import static org.apache.asterix.common.storage.PartitionReplica.PartitionReplicaStatus.IN_SYNC;
-
-import org.apache.hyracks.util.JSONUtil;
-import org.apache.hyracks.util.annotations.ThreadSafe;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
-@ThreadSafe
-public class PartitionReplica {
-
-    public enum PartitionReplicaStatus {
-        /* replica is in-sync with master */
-        IN_SYNC,
-        /* replica is still catching up with master */
-        CATCHING_UP,
-        /* replica is not connected with master */
-        DISCONNECTED
-    }
-
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-    private final ReplicaIdentifier id;
-    private PartitionReplicaStatus status = DISCONNECTED;
-
-    public PartitionReplica(ReplicaIdentifier id) {
-        this.id = id;
-    }
-
-    public synchronized PartitionReplicaStatus getStatus() {
-        return status;
-    }
-
-    public ReplicaIdentifier getIdentifier() {
-        return id;
-    }
-
-    public synchronized void sync() {
-        if (status == IN_SYNC || status == CATCHING_UP) {
-            return;
-        }
-        //TODO complete implementation
-    }
-
-    public JsonNode asJson() {
-        ObjectNode json = OBJECT_MAPPER.createObjectNode();
-        json.put("id", id.toString());
-        json.put("state", status.name());
-        return json;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-        PartitionReplica that = (PartitionReplica) o;
-        return id.equals(that.id);
-    }
-
-    @Override
-    public int hashCode() {
-        return id.hashCode();
-    }
-
-    @Override
-    public String toString() {
-        try {
-            return JSONUtil.convertNode(asJson());
-        } catch (JsonProcessingException e) {
-            throw new IllegalStateException(e);
-        }
-    }
-}
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
index 4aa6982..8cc11a8 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
@@ -71,6 +71,10 @@
         return Paths.get(root, partition, dataverse, dataset, rebalance, index);
     }
 
+    public Path getFileRelativePath() {
+        return Paths.get(root, partition, dataverse, dataset, rebalance, index, name);
+    }
+
     protected static void parse(ResourceReference ref, String path) {
         // format: root/partition/dataverse/dataset/rebalanceCount/index/fileName
         final String[] tokens = path.split(File.separator);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
index 220b089..f59914d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
@@ -35,6 +35,7 @@
      */
     public static final String INDEX_CHECKPOINT_FILE_PREFIX = ".idx_checkpoint_";
     public static final String METADATA_FILE_NAME = ".metadata";
+    public static final String MASK_FILE_PREFIX = ".mask_";
     public static final String LEGACY_DATASET_INDEX_NAME_SEPARATOR = "_idx_";
 
     /**
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
index b93ccb5..d2c5ad7 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
@@ -90,13 +90,21 @@
 
     /**
      * @param fileAbsolutePath
-     * @return the file relative path starting from the partition directory
+     * @return the file's index relative path starting from the storage directory
      */
     public static String getIndexFileRelativePath(String fileAbsolutePath) {
         return ResourceReference.of(fileAbsolutePath).getRelativePath().toString();
     }
 
     /**
+     * @param fileAbsolutePath
+     * @return the file's relative path starting from the storage directory
+     */
+    public static String getFileRelativePath(String fileAbsolutePath) {
+        return ResourceReference.of(fileAbsolutePath).getFileRelativePath().toString();
+    }
+
+    /**
      * Create a file
      * Note: this method is not thread safe. It is the responsibility of the caller to ensure no path conflict when
      * creating files simultaneously
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicaTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicaTask.java
new file mode 100644
index 0000000..001d41f
--- /dev/null
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicaTask.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.api;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.replication.IReplicationThread;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface IReplicaTask extends IReplicationMessage {
+
+    /**
+     * Performs the task on the replica
+     *
+     * @param appCtx
+     * @param worker
+     * @throws HyracksDataException
+     */
+    void perform(INcApplicationContext appCtx, IReplicationThread worker) throws HyracksDataException;
+}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationMessage.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationMessage.java
new file mode 100644
index 0000000..2e1cb8a
--- /dev/null
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationMessage.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.api;
+
+import java.io.OutputStream;
+
+import org.apache.asterix.replication.functions.ReplicationProtocol;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface IReplicationMessage {
+
+    /**
+     * @return the message type
+     */
+    ReplicationProtocol.ReplicationRequestType getMessageType();
+
+    /**
+     * Serializes {@link IReplicationMessage} to {@code out}
+     *
+     * @param out
+     * @throws HyracksDataException
+     */
+    void serialize(OutputStream out) throws HyracksDataException;
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java
index 8a52529..8094548 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java
@@ -25,10 +25,18 @@
 import java.nio.ByteBuffer;
 import java.nio.channels.SocketChannel;
 
+import org.apache.asterix.common.exceptions.ReplicationException;
 import org.apache.asterix.common.replication.ReplicaEvent;
+import org.apache.asterix.replication.api.IReplicationMessage;
 import org.apache.asterix.replication.management.NetworkingUtil;
+import org.apache.asterix.replication.messaging.CheckpointPartitionIndexesTask;
+import org.apache.asterix.replication.messaging.DeleteFileTask;
+import org.apache.asterix.replication.messaging.PartitionResourcesListResponse;
+import org.apache.asterix.replication.messaging.PartitionResourcesListTask;
+import org.apache.asterix.replication.messaging.ReplicateFileTask;
 import org.apache.asterix.replication.storage.LSMComponentProperties;
 import org.apache.asterix.replication.storage.LSMIndexFileProperties;
+import org.apache.asterix.replication.storage.PartitionReplica;
 import org.apache.hyracks.data.std.util.ExtendedByteArrayOutputStream;
 
 public class ReplicationProtocol {
@@ -38,8 +46,8 @@
      */
     public static final String JOB_REPLICATION_ACK = "$";
 
-    public final static int REPLICATION_REQUEST_TYPE_SIZE = Integer.BYTES;
-    public final static int REPLICATION_REQUEST_HEADER_SIZE = REPLICATION_REQUEST_TYPE_SIZE + Integer.BYTES;
+    public static final  int REPLICATION_REQUEST_TYPE_SIZE = Integer.BYTES;
+    private static final  int REPLICATION_REQUEST_HEADER_SIZE = REPLICATION_REQUEST_TYPE_SIZE + Integer.BYTES;
 
     /*
      * ReplicationRequestType:
@@ -64,7 +72,12 @@
         REPLICA_EVENT,
         LSM_COMPONENT_PROPERTIES,
         ACK,
-        FLUSH_INDEX
+        FLUSH_INDEX,
+        PARTITION_RESOURCES_REQUEST,
+        PARTITION_RESOURCES_RESPONSE,
+        REPLICATE_RESOURCE_FILE,
+        DELETE_RESOURCE_FILE,
+        CHECKPOINT_PARTITION
     }
 
     public static ByteBuffer readRequest(SocketChannel socketChannel, ByteBuffer dataBuffer) throws IOException {
@@ -256,4 +269,86 @@
         ByteBuffer ackBuffer = ReplicationProtocol.getAckBuffer();
         NetworkingUtil.transferBufferToChannel(socketChannel, ackBuffer);
     }
-}
+
+    public static void sendAck(SocketChannel socketChannel, ByteBuffer buf) {
+        try {
+            buf.clear();
+            buf.putInt(ReplicationRequestType.ACK.ordinal());
+            buf.flip();
+            NetworkingUtil.transferBufferToChannel(socketChannel, buf);
+        } catch (IOException e) {
+            throw new ReplicationException(e);
+        }
+    }
+
+    public static void waitForAck(PartitionReplica replica) throws IOException {
+        final SocketChannel channel = replica.getChannel();
+        final ByteBuffer buf = replica.gerReusableBuffer();
+        ReplicationRequestType responseFunction = ReplicationProtocol.getRequestType(channel, buf);
+        if (responseFunction != ReplicationRequestType.ACK) {
+            throw new IllegalStateException("Unexpected response while waiting for ack.");
+        }
+    }
+
+    public static void sendTo(PartitionReplica replica, IReplicationMessage task) {
+        final SocketChannel channel = replica.getChannel();
+        final ByteBuffer buf = replica.gerReusableBuffer();
+        sendTo(channel, task, buf);
+    }
+
+    public static void sendTo(SocketChannel channel, IReplicationMessage task, ByteBuffer buf) {
+        ExtendedByteArrayOutputStream outputStream = new ExtendedByteArrayOutputStream();
+        try (DataOutputStream oos = new DataOutputStream(outputStream)) {
+            task.serialize(oos);
+            final int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size();
+            final ByteBuffer requestBuffer = ensureSize(buf, requestSize);
+            requestBuffer.putInt(task.getMessageType().ordinal());
+            requestBuffer.putInt(oos.size());
+            requestBuffer.put(outputStream.getByteArray(), 0, outputStream.getLength());
+            requestBuffer.flip();
+            NetworkingUtil.transferBufferToChannel(channel, requestBuffer);
+        } catch (IOException e) {
+            throw new ReplicationException(e);
+        }
+    }
+
+    public static IReplicationMessage read(SocketChannel socketChannel, ByteBuffer buffer) throws IOException {
+        final ReplicationRequestType type = getRequestType(socketChannel, buffer);
+        return readMessage(type, socketChannel, buffer);
+    }
+
+    public static IReplicationMessage readMessage(ReplicationRequestType type, SocketChannel socketChannel,
+            ByteBuffer buffer) {
+        try {
+            ReplicationProtocol.readRequest(socketChannel, buffer);
+            final ByteArrayInputStream input =
+                    new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit());
+            try (DataInputStream dis = new DataInputStream(input)) {
+                switch (type) {
+                    case PARTITION_RESOURCES_REQUEST:
+                        return PartitionResourcesListTask.create(dis);
+                    case PARTITION_RESOURCES_RESPONSE:
+                        return PartitionResourcesListResponse.create(dis);
+                    case REPLICATE_RESOURCE_FILE:
+                        return ReplicateFileTask.create(dis);
+                    case DELETE_RESOURCE_FILE:
+                        return DeleteFileTask.create(dis);
+                    case CHECKPOINT_PARTITION:
+                        return CheckpointPartitionIndexesTask.create(dis);
+                    default:
+                        throw new IllegalStateException("Unrecognized replication message");
+                }
+            }
+        } catch (IOException e) {
+            throw new ReplicationException(e);
+        }
+    }
+
+    private static ByteBuffer ensureSize(ByteBuffer buffer, int size) {
+        if (buffer.capacity() < size) {
+            return ByteBuffer.allocate(size);
+        }
+        buffer.clear();
+        return buffer;
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
index dfc23d3..ba24e07 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
@@ -24,10 +24,10 @@
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousCloseException;
 import java.nio.channels.FileChannel;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
-import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -71,6 +71,10 @@
 import org.apache.asterix.replication.functions.ReplicationProtocol;
 import org.apache.asterix.replication.functions.ReplicationProtocol.ReplicationRequestType;
 import org.apache.asterix.replication.logging.RemoteLogMapping;
+import org.apache.asterix.replication.messaging.CheckpointPartitionIndexesTask;
+import org.apache.asterix.replication.messaging.DeleteFileTask;
+import org.apache.asterix.replication.messaging.PartitionResourcesListTask;
+import org.apache.asterix.replication.messaging.ReplicateFileTask;
 import org.apache.asterix.replication.storage.LSMComponentLSNSyncTask;
 import org.apache.asterix.replication.storage.LSMComponentProperties;
 import org.apache.asterix.replication.storage.LSMIndexFileProperties;
@@ -108,12 +112,12 @@
     private final Map<Long, RemoteLogMapping> localLsn2RemoteMapping;
     private final Map<String, RemoteLogMapping> replicaUniqueLSN2RemoteMapping;
     private final LSMComponentsSyncService lsmComponentLSNMappingService;
-    private final Set<Integer> nodeHostedPartitions;
     private final ReplicationNotifier replicationNotifier;
     private final Object flushLogslock = new Object();
     private final IDatasetLifecycleManager dsLifecycleManager;
     private final PersistentLocalResourceRepository localResourceRep;
     private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
+    private final INcApplicationContext appCtx;
 
     public ReplicationChannel(String nodeId, ReplicationProperties replicationProperties, ILogManager logManager,
             IReplicaResourcesManager replicaResoucesManager, IReplicationManager replicationManager,
@@ -135,19 +139,8 @@
         lsmComponentLSNMappingService = new LSMComponentsSyncService();
         replicationNotifier = new ReplicationNotifier();
         replicationThreads = Executors.newCachedThreadPool(ncServiceContext.getThreadFactory());
-        Map<String, ClusterPartition[]> nodePartitions =
-                asterixAppRuntimeContextProvider.getAppContext().getMetadataProperties().getNodePartitions();
-        Set<String> nodeReplicationClients = replicationProperties.getRemotePrimaryReplicasIds(nodeId);
-        List<Integer> clientsPartitions = new ArrayList<>();
-        for (String clientId : nodeReplicationClients) {
-            for (ClusterPartition clusterPartition : nodePartitions.get(clientId)) {
-                clientsPartitions.add(clusterPartition.getPartitionId());
-            }
-        }
-        nodeHostedPartitions = new HashSet<>(clientsPartitions.size());
-        nodeHostedPartitions.addAll(clientsPartitions);
-        this.indexCheckpointManagerProvider =
-                ((INcApplicationContext) ncServiceContext.getApplicationContext()).getIndexCheckpointManagerProvider();
+        this.appCtx = (INcApplicationContext) ncServiceContext.getApplicationContext();
+        this.indexCheckpointManagerProvider = appCtx.getIndexCheckpointManagerProvider();
     }
 
     @Override
@@ -167,12 +160,14 @@
             LOGGER.log(Level.INFO, "opened Replication Channel @ IP Address: " + nodeIP + ":" + dataPort);
 
             //start accepting replication requests
-            while (true) {
+            while (serverSocketChannel.isOpen()) {
                 SocketChannel socketChannel = serverSocketChannel.accept();
                 socketChannel.configureBlocking(true);
                 //start a new thread to handle the request
                 replicationThreads.execute(new ReplicationThread(socketChannel));
             }
+        } catch (AsynchronousCloseException e) {
+            LOGGER.log(Level.WARNING, "Replication channel closed", e);
         } catch (IOException e) {
             throw new IllegalStateException(
                     "Could not open replication channel @ IP Address: " + nodeIP + ":" + dataPort, e);
@@ -209,10 +204,8 @@
 
     @Override
     public void close() throws IOException {
-        if (!serverSocketChannel.isOpen()) {
-            serverSocketChannel.close();
-            LOGGER.log(Level.INFO, "Replication channel closed.");
-        }
+        serverSocketChannel.close();
+        LOGGER.log(Level.INFO, "Replication channel closed.");
     }
 
     /**
@@ -263,6 +256,18 @@
                         case FLUSH_INDEX:
                             handleFlushIndex();
                             break;
+                        case PARTITION_RESOURCES_REQUEST:
+                            handleGetPartitionResources();
+                            break;
+                        case REPLICATE_RESOURCE_FILE:
+                            handleReplicateResourceFile();
+                            break;
+                        case DELETE_RESOURCE_FILE:
+                            handleDeleteResourceFile();
+                            break;
+                        case CHECKPOINT_PARTITION:
+                            handleCheckpointPartition();
+                            break;
                         default:
                             throw new IllegalStateException("Unknown replication request");
                     }
@@ -476,10 +481,7 @@
                 switch (remoteLog.getLogType()) {
                     case LogType.UPDATE:
                     case LogType.ENTITY_COMMIT:
-                        //if the log partition belongs to a partitions hosted on this node, replicated it
-                        if (nodeHostedPartitions.contains(remoteLog.getResourcePartition())) {
-                            logManager.log(remoteLog);
-                        }
+                        logManager.log(remoteLog);
                         break;
                     case LogType.JOB_COMMIT:
                     case LogType.ABORT:
@@ -542,10 +544,15 @@
         }
 
         @Override
-        public SocketChannel getReplicationClientSocket() {
+        public SocketChannel getChannel() {
             return socketChannel;
         }
 
+        @Override
+        public ByteBuffer getReusableBuffer() {
+            return outBuffer;
+        }
+
         private void checkpointReplicaIndexes(RemoteLogMapping remoteLogMapping, int datasetId) {
             try {
                 Predicate<LocalResource> replicaIndexesPredicate = lr -> {
@@ -568,6 +575,30 @@
                 LOGGER.log(Level.SEVERE, "Failed to checkpoint replica indexes", e);
             }
         }
+
+        private void handleGetPartitionResources() throws IOException {
+            final PartitionResourcesListTask task = (PartitionResourcesListTask) ReplicationProtocol
+                    .readMessage(ReplicationRequestType.PARTITION_RESOURCES_REQUEST, socketChannel, inBuffer);
+            task.perform(appCtx, this);
+        }
+
+        private void handleReplicateResourceFile() throws HyracksDataException {
+            ReplicateFileTask task = (ReplicateFileTask) ReplicationProtocol
+                    .readMessage(ReplicationRequestType.REPLICATE_RESOURCE_FILE, socketChannel, inBuffer);
+            task.perform(appCtx, this);
+        }
+
+        private void handleDeleteResourceFile() throws HyracksDataException {
+            DeleteFileTask task = (DeleteFileTask) ReplicationProtocol
+                    .readMessage(ReplicationRequestType.DELETE_RESOURCE_FILE, socketChannel, inBuffer);
+            task.perform(appCtx, this);
+        }
+
+        private void handleCheckpointPartition() throws HyracksDataException {
+            CheckpointPartitionIndexesTask task = (CheckpointPartitionIndexesTask) ReplicationProtocol
+                    .readMessage(ReplicationRequestType.CHECKPOINT_PARTITION, socketChannel, inBuffer);
+            task.perform(appCtx, this);
+        }
     }
 
     /**
@@ -581,7 +612,7 @@
                 try {
                     LogRecord logRecord = pendingNotificationRemoteLogsQ.take();
                     //send ACK to requester
-                    logRecord.getReplicationThread().getReplicationClientSocket().socket().getOutputStream()
+                    logRecord.getReplicationThread().getChannel().socket().getOutputStream()
                             .write((localNodeID + ReplicationProtocol.JOB_REPLICATION_ACK + logRecord.getTxnId()
                                     + System.lineSeparator()).getBytes());
                 } catch (InterruptedException e) {
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
index 5cf7eab..b933db8 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
@@ -40,6 +40,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
@@ -53,7 +54,9 @@
 import java.util.stream.Collectors;
 
 import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.common.config.ClusterProperties;
 import org.apache.asterix.common.config.ReplicationProperties;
+import org.apache.asterix.common.replication.IPartitionReplica;
 import org.apache.asterix.common.replication.IReplicaResourcesManager;
 import org.apache.asterix.common.replication.IReplicationManager;
 import org.apache.asterix.common.replication.IReplicationStrategy;
@@ -1177,6 +1180,25 @@
         buffer.position(buffer.limit());
     }
 
+    @Override
+    public void register(IPartitionReplica replica) {
+        // find the replica node based on ip and replication port
+        final Optional<Node> replicaNode = ClusterProperties.INSTANCE.getCluster().getNode().stream()
+                .filter(node -> node.getClusterIp().equals(replica.getIdentifier().getLocation().getHostString())
+                        && node.getReplicationPort().intValue() == replica.getIdentifier().getLocation().getPort())
+                .findAny();
+        if (!replicaNode.isPresent()) {
+            throw new IllegalStateException("Couldn't find node for replica: " + replica);
+        }
+        Replica replicaRef = new Replica(replicaNode.get());
+        final String replicaId = replicaRef.getId();
+        replicas.putIfAbsent(replicaId, replicaRef);
+        replica2PartitionsMap.computeIfAbsent(replicaId, k -> new HashSet<>());
+        replica2PartitionsMap.get(replicaId).add(replica.getIdentifier().getPartition());
+        updateReplicaInfo(replicaRef);
+        checkReplicaState(replicaId, false, true);
+    }
+
     //supporting classes
     /**
      * This class is responsible for processing replica events.
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
new file mode 100644
index 0000000..2c1937b
--- /dev/null
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.messaging;
+
+import java.io.DataInput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Collection;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.replication.IReplicationThread;
+import org.apache.asterix.common.storage.DatasetResourceReference;
+import org.apache.asterix.common.storage.IIndexCheckpointManager;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
+import org.apache.asterix.replication.api.IReplicaTask;
+import org.apache.asterix.replication.functions.ReplicationProtocol;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.common.LocalResource;
+
+/**
+ * A task to initialize the checkpoints for all indexes in a partition with the replica's current LSN
+ */
+public class CheckpointPartitionIndexesTask implements IReplicaTask {
+
+    private final int partition;
+
+    public CheckpointPartitionIndexesTask(int partition) {
+        this.partition = partition;
+    }
+
+    @Override
+    public void perform(INcApplicationContext appCtx, IReplicationThread worker) throws HyracksDataException {
+        final IIndexCheckpointManagerProvider indexCheckpointManagerProvider =
+                appCtx.getIndexCheckpointManagerProvider();
+        PersistentLocalResourceRepository resRepo =
+                (PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
+        final Collection<LocalResource> partitionResources = resRepo.getPartitionResources(partition).values();
+        final long currentLSN = appCtx.getTransactionSubsystem().getLogManager().getAppendLSN();
+        for (LocalResource ls : partitionResources) {
+            final IIndexCheckpointManager indexCheckpointManager =
+                    indexCheckpointManagerProvider.get(DatasetResourceReference.of(ls));
+            indexCheckpointManager.delete();
+            indexCheckpointManager.init(currentLSN);
+        }
+        ReplicationProtocol.sendAck(worker.getChannel(), worker.getReusableBuffer());
+    }
+
+    @Override
+    public ReplicationProtocol.ReplicationRequestType getMessageType() {
+        return ReplicationProtocol.ReplicationRequestType.CHECKPOINT_PARTITION;
+    }
+
+    @Override
+    public void serialize(OutputStream out) throws HyracksDataException {
+        try {
+            DataOutputStream dos = new DataOutputStream(out);
+            dos.writeInt(partition);
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    public static CheckpointPartitionIndexesTask create(DataInput input) throws HyracksDataException {
+        try {
+            int partition = input.readInt();
+            return new CheckpointPartitionIndexesTask(partition);
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java
new file mode 100644
index 0000000..ea43ee9
--- /dev/null
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.messaging;
+
+import java.io.DataInput;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.exceptions.ReplicationException;
+import org.apache.asterix.common.replication.IReplicationThread;
+import org.apache.asterix.replication.api.IReplicaTask;
+import org.apache.asterix.replication.functions.ReplicationProtocol;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IIOManager;
+
+/**
+ * A task to delete a file on a replica if exists
+ */
+public class DeleteFileTask implements IReplicaTask {
+
+    private static final Logger LOGGER = Logger.getLogger(DeleteFileTask.class.getName());
+    private final String file;
+
+    public DeleteFileTask(String file) {
+        this.file = file;
+    }
+
+    @Override
+    public void perform(INcApplicationContext appCtx, IReplicationThread worker) {
+        try {
+            final IIOManager ioManager = appCtx.getIoManager();
+            final File localFile = ioManager.resolve(file).getFile();
+            if (localFile.exists()) {
+                Files.delete(localFile.toPath());
+                LOGGER.info(() -> "Deleted file: " + localFile.getAbsolutePath());
+            } else {
+                LOGGER.warning(() -> "Requested to delete a non-existing file: " + localFile.getAbsolutePath());
+            }
+            ReplicationProtocol.sendAck(worker.getChannel(), worker.getReusableBuffer());
+        } catch (IOException e) {
+            throw new ReplicationException(e);
+        }
+    }
+
+    @Override
+    public ReplicationProtocol.ReplicationRequestType getMessageType() {
+        return ReplicationProtocol.ReplicationRequestType.DELETE_RESOURCE_FILE;
+    }
+
+    @Override
+    public void serialize(OutputStream out) throws HyracksDataException {
+        try {
+            DataOutputStream dos = new DataOutputStream(out);
+            dos.writeUTF(file);
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    public static DeleteFileTask create(DataInput input) throws IOException {
+        return new DeleteFileTask(input.readUTF());
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListResponse.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListResponse.java
new file mode 100644
index 0000000..85b7bb9
--- /dev/null
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListResponse.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.messaging;
+
+import java.io.DataInput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.replication.api.IReplicationMessage;
+import org.apache.asterix.replication.functions.ReplicationProtocol;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class PartitionResourcesListResponse implements IReplicationMessage {
+
+    private final int partition;
+    private final List<String> resources;
+
+    public PartitionResourcesListResponse(int partition, List<String> resources) {
+        this.partition = partition;
+        this.resources = resources;
+    }
+
+    @Override
+    public ReplicationProtocol.ReplicationRequestType getMessageType() {
+        return ReplicationProtocol.ReplicationRequestType.PARTITION_RESOURCES_RESPONSE;
+    }
+
+    @Override
+    public void serialize(OutputStream out) throws HyracksDataException {
+        try {
+            DataOutputStream dos = new DataOutputStream(out);
+            dos.writeInt(partition);
+            dos.writeInt(resources.size());
+            for (String file : resources) {
+                dos.writeUTF(file);
+            }
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    public List<String> getResources() {
+        return resources;
+    }
+
+    public static PartitionResourcesListResponse create(DataInput input) throws IOException {
+        int partition = input.readInt();
+        int size = input.readInt();
+        List<String> resources = new ArrayList<>();
+        for (int i = 0; i < size; i++) {
+            resources.add(input.readUTF());
+        }
+        return new PartitionResourcesListResponse(partition, resources);
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
new file mode 100644
index 0000000..b2b8ac6
--- /dev/null
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.messaging;
+
+import java.io.DataInput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.replication.IReplicationThread;
+import org.apache.asterix.common.utils.StoragePathUtil;
+import org.apache.asterix.replication.api.IReplicaTask;
+import org.apache.asterix.replication.functions.ReplicationProtocol;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * A task to get the list of the files in a partition on a replica
+ */
+public class PartitionResourcesListTask implements IReplicaTask {
+
+    private final int partition;
+
+    public PartitionResourcesListTask(int partition) {
+        this.partition = partition;
+    }
+
+    @Override
+    public void perform(INcApplicationContext appCtx, IReplicationThread worker) throws HyracksDataException {
+        //TODO delete any invalid files with masks
+        final List<String> partitionResources =
+                appCtx.getReplicaResourcesManager().getPartitionIndexesFiles(partition, false).stream()
+                        .map(StoragePathUtil::getFileRelativePath).collect(Collectors.toList());
+        final PartitionResourcesListResponse response =
+                new PartitionResourcesListResponse(partition, partitionResources);
+        ReplicationProtocol.sendTo(worker.getChannel(), response, worker.getReusableBuffer());
+    }
+
+    @Override
+    public ReplicationProtocol.ReplicationRequestType getMessageType() {
+        return ReplicationProtocol.ReplicationRequestType.PARTITION_RESOURCES_REQUEST;
+    }
+
+    @Override
+    public void serialize(OutputStream out) throws HyracksDataException {
+        try {
+            DataOutputStream dos = new DataOutputStream(out);
+            dos.writeInt(partition);
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    public static PartitionResourcesListTask create(DataInput input) throws HyracksDataException {
+        try {
+            int partition = input.readInt();
+            return new PartitionResourcesListTask(partition);
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
new file mode 100644
index 0000000..14e9180
--- /dev/null
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.messaging;
+
+import java.io.DataInput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.exceptions.ReplicationException;
+import org.apache.asterix.common.replication.IReplicationThread;
+import org.apache.asterix.common.utils.StorageConstants;
+import org.apache.asterix.replication.api.IReplicaTask;
+import org.apache.asterix.replication.functions.ReplicationProtocol;
+import org.apache.asterix.replication.management.NetworkingUtil;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
+
+/**
+ * A task to replicate a file from a master replica
+ */
+public class ReplicateFileTask implements IReplicaTask {
+
+    private static final Logger LOGGER = Logger.getLogger(DeleteFileTask.class.getName());
+    private final String file;
+    private final long size;
+
+    public ReplicateFileTask(String file, long size) {
+        this.file = file;
+        this.size = size;
+    }
+
+    @Override
+    public void perform(INcApplicationContext appCtx, IReplicationThread worker) throws HyracksDataException {
+        try {
+            final IIOManager ioManager = appCtx.getIoManager();
+            // resolve path
+            final FileReference localPath = ioManager.resolve(file);
+            final Path resourceDir = Files.createDirectories(localPath.getFile().getParentFile().toPath());
+            // create mask
+            final Path maskPath = Paths.get(resourceDir.toString(),
+                    StorageConstants.MASK_FILE_PREFIX + localPath.getFile().getName());
+            Files.createFile(maskPath);
+
+            // receive actual file
+            final Path filePath = Paths.get(resourceDir.toString(), localPath.getFile().getName());
+            Files.createFile(filePath);
+            try (RandomAccessFile fileOutputStream = new RandomAccessFile(filePath.toFile(),
+                    "rw"); FileChannel fileChannel = fileOutputStream.getChannel()) {
+                fileOutputStream.setLength(size);
+                NetworkingUtil.downloadFile(fileChannel, worker.getChannel());
+                fileChannel.force(true);
+            }
+            //delete mask
+            Files.delete(maskPath);
+            LOGGER.info(() -> "Replicated file: " + localPath);
+            ReplicationProtocol.sendAck(worker.getChannel(), worker.getReusableBuffer());
+        } catch (IOException e) {
+            throw new ReplicationException(e);
+        }
+    }
+
+    @Override
+    public ReplicationProtocol.ReplicationRequestType getMessageType() {
+        return ReplicationProtocol.ReplicationRequestType.REPLICATE_RESOURCE_FILE;
+    }
+
+    @Override
+    public void serialize(OutputStream out) throws HyracksDataException {
+        try {
+            DataOutputStream dos = new DataOutputStream(out);
+            dos.writeUTF(file);
+            dos.writeLong(size);
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    public static ReplicateFileTask create(DataInput input) throws IOException {
+        final String s = input.readUTF();
+        final long i = input.readLong();
+        return new ReplicateFileTask(s, i);
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/FileSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/FileSynchronizer.java
new file mode 100644
index 0000000..8aa4487
--- /dev/null
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/FileSynchronizer.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.recovery;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.nio.channels.SocketChannel;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.exceptions.ReplicationException;
+import org.apache.asterix.replication.functions.ReplicationProtocol;
+import org.apache.asterix.replication.management.NetworkingUtil;
+import org.apache.asterix.replication.messaging.DeleteFileTask;
+import org.apache.asterix.replication.messaging.ReplicateFileTask;
+import org.apache.asterix.replication.storage.PartitionReplica;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
+
+public class FileSynchronizer {
+
+    private final INcApplicationContext appCtx;
+    private final PartitionReplica replica;
+
+    public FileSynchronizer(INcApplicationContext appCtx, PartitionReplica replica) {
+        this.appCtx = appCtx;
+        this.replica = replica;
+    }
+
+    public void replicate(String file) {
+        try {
+            final IIOManager ioManager = appCtx.getIoManager();
+            final SocketChannel channel = replica.getChannel();
+            final FileReference filePath = ioManager.resolve(file);
+            ReplicateFileTask task = new ReplicateFileTask(file, filePath.getFile().length());
+            ReplicationProtocol.sendTo(replica, task);
+            // send the file itself
+            try (RandomAccessFile fromFile = new RandomAccessFile(filePath.getFile(),
+                    "r"); FileChannel fileChannel = fromFile.getChannel()) {
+                NetworkingUtil.sendFile(fileChannel, channel);
+            }
+            ReplicationProtocol.waitForAck(replica);
+        } catch (IOException e) {
+            throw new ReplicationException(e);
+        }
+    }
+
+    public void delete(String file) {
+        try {
+            final DeleteFileTask task = new DeleteFileTask(file);
+            ReplicationProtocol.sendTo(replica, task);
+            ReplicationProtocol.waitForAck(replica);
+        } catch (IOException e) {
+            throw new ReplicationException(e);
+        }
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/ReplicaFilesSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/ReplicaFilesSynchronizer.java
new file mode 100644
index 0000000..2021cee
--- /dev/null
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/ReplicaFilesSynchronizer.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.recovery;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.utils.StoragePathUtil;
+import org.apache.asterix.replication.functions.ReplicationProtocol;
+import org.apache.asterix.replication.messaging.PartitionResourcesListResponse;
+import org.apache.asterix.replication.messaging.PartitionResourcesListTask;
+import org.apache.asterix.replication.storage.PartitionReplica;
+
+/**
+ * Ensures that the files between master and a replica are synchronized
+ */
+public class ReplicaFilesSynchronizer {
+
+    private final PartitionReplica replica;
+    private final INcApplicationContext appCtx;
+
+    public ReplicaFilesSynchronizer(INcApplicationContext appCtx, PartitionReplica replica) {
+        this.appCtx = appCtx;
+        this.replica = replica;
+    }
+
+    public void sync() throws IOException {
+        final int partition = replica.getIdentifier().getPartition();
+        final Set<String> replicaFiles = getReplicaFiles(partition);
+        final Set<String> masterFiles =
+                appCtx.getReplicaResourcesManager().getPartitionIndexesFiles(partition, false).stream()
+                        .map(StoragePathUtil::getFileRelativePath).collect(Collectors.toSet());
+        // find files on master and not on replica
+        final List<String> replicaMissingFiles =
+                masterFiles.stream().filter(file -> !replicaFiles.contains(file)).collect(Collectors.toList());
+        replicateMissingFiles(replicaMissingFiles);
+        // find files on replica and not on master
+        final List<String> replicaInvalidFiles =
+                replicaFiles.stream().filter(file -> !masterFiles.contains(file)).collect(Collectors.toList());
+        deleteInvalidFiles(replicaInvalidFiles);
+    }
+
+    private Set<String> getReplicaFiles(int partition) throws IOException {
+        final PartitionResourcesListTask replicaFilesRequest = new PartitionResourcesListTask(partition);
+        final SocketChannel channel = replica.getChannel();
+        final ByteBuffer reusableBuffer = replica.gerReusableBuffer();
+        ReplicationProtocol.sendTo(replica, replicaFilesRequest);
+        final PartitionResourcesListResponse response =
+                (PartitionResourcesListResponse) ReplicationProtocol.read(channel, reusableBuffer);
+        return new HashSet<>(response.getResources());
+    }
+
+    private void replicateMissingFiles(List<String> files) {
+        final FileSynchronizer sync = new FileSynchronizer(appCtx, replica);
+        files.forEach(sync::replicate);
+    }
+
+    private void deleteInvalidFiles(List<String> files) {
+        final FileSynchronizer sync = new FileSynchronizer(appCtx, replica);
+        files.forEach(sync::delete);
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/ReplicaSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/ReplicaSynchronizer.java
new file mode 100644
index 0000000..5c88460
--- /dev/null
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/ReplicaSynchronizer.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.recovery;
+
+import java.io.IOException;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.config.ReplicationProperties;
+import org.apache.asterix.common.replication.IReplicationStrategy;
+import org.apache.asterix.replication.functions.ReplicationProtocol;
+import org.apache.asterix.replication.messaging.CheckpointPartitionIndexesTask;
+import org.apache.asterix.replication.storage.PartitionReplica;
+
+/**
+ * Performs the steps required to ensure any newly added replica
+ * will be in-sync with master
+ */
+public class ReplicaSynchronizer {
+
+    private final INcApplicationContext appCtx;
+    private final PartitionReplica replica;
+
+    public ReplicaSynchronizer(INcApplicationContext appCtx, PartitionReplica replica) {
+        this.appCtx = appCtx;
+        this.replica = replica;
+    }
+
+    public void sync() throws IOException {
+        syncFiles();
+        checkpointReplicaIndexes();
+        appCtx.getReplicationManager().register(replica);
+    }
+
+    private void syncFiles() throws IOException {
+        final ReplicaFilesSynchronizer fileSync = new ReplicaFilesSynchronizer(appCtx, replica);
+        fileSync.sync();
+        // flush replicated dataset to generate disk component for any remaining in-memory components
+        final ReplicationProperties repl = appCtx.getReplicationProperties();
+        final IReplicationStrategy replStrategy = repl.getReplicationStrategy();
+        appCtx.getDatasetLifecycleManager().flushDataset(replStrategy);
+        // sync any newly generated files
+        fileSync.sync();
+    }
+
+    private void checkpointReplicaIndexes() throws IOException {
+        CheckpointPartitionIndexesTask task =
+                new CheckpointPartitionIndexesTask(replica.getIdentifier().getPartition());
+        ReplicationProtocol.sendTo(replica, task);
+        ReplicationProtocol.waitForAck(replica);
+    }
+}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/PartitionReplica.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/PartitionReplica.java
index c6d1b60..d9ce75e 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/PartitionReplica.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/PartitionReplica.java
@@ -22,12 +22,20 @@
 import static org.apache.asterix.common.replication.IPartitionReplica.PartitionReplicaStatus.DISCONNECTED;
 import static org.apache.asterix.common.replication.IPartitionReplica.PartitionReplicaStatus.IN_SYNC;
 
+import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.nio.channels.SocketChannel;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
+import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.exceptions.ReplicationException;
 import org.apache.asterix.common.replication.IPartitionReplica;
 import org.apache.asterix.common.storage.ReplicaIdentifier;
+import org.apache.asterix.replication.functions.ReplicationProtocol;
+import org.apache.asterix.replication.recovery.ReplicaSynchronizer;
 import org.apache.hyracks.util.JSONUtil;
+import org.apache.hyracks.util.StorageUtil;
 import org.apache.hyracks.util.annotations.ThreadSafe;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
@@ -38,12 +46,18 @@
 @ThreadSafe
 public class PartitionReplica implements IPartitionReplica {
 
+    private static final Logger LOGGER = Logger.getLogger(PartitionReplica.class.getName());
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    private static final int INITIAL_BUFFER_SIZE = StorageUtil.getIntSizeInBytes(4, StorageUtil.StorageUnit.KILOBYTE);
+    private final INcApplicationContext appCtx;
     private final ReplicaIdentifier id;
+    private ByteBuffer reusbaleBuf;
     private PartitionReplicaStatus status = DISCONNECTED;
+    private SocketChannel sc;
 
-    public PartitionReplica(ReplicaIdentifier id) {
+    public PartitionReplica(ReplicaIdentifier id, INcApplicationContext appCtx) {
         this.id = id;
+        this.appCtx = appCtx;
     }
 
     @Override
@@ -60,9 +74,53 @@
         if (status == IN_SYNC || status == CATCHING_UP) {
             return;
         }
+        setStatus(CATCHING_UP);
+        appCtx.getThreadExecutor().execute(() -> {
+            try {
+                new ReplicaSynchronizer(appCtx, this).sync();
+                setStatus(IN_SYNC);
+            } catch (Exception e) {
+                LOGGER.log(Level.SEVERE, e, () -> "Failed to sync replica " + this);
+                setStatus(DISCONNECTED);
+            } finally {
+                close();
+            }
+        });
     }
 
-    public JsonNode asJson() {
+    public synchronized SocketChannel getChannel() {
+        try {
+            if (sc == null || !sc.isOpen() || !sc.isConnected()) {
+                sc = SocketChannel.open();
+                sc.configureBlocking(true);
+                sc.connect(id.getLocation());
+            }
+            return sc;
+        } catch (IOException e) {
+            throw new ReplicationException(e);
+        }
+    }
+
+    public synchronized void close() {
+        try {
+            if (sc != null && sc.isOpen()) {
+                ReplicationProtocol.sendGoodbye(sc);
+                sc.close();
+                sc = null;
+            }
+        } catch (IOException e) {
+            throw new ReplicationException(e);
+        }
+    }
+
+    public synchronized ByteBuffer gerReusableBuffer() {
+        if (reusbaleBuf == null) {
+            reusbaleBuf = ByteBuffer.allocate(INITIAL_BUFFER_SIZE);
+        }
+        return reusbaleBuf;
+    }
+
+    private JsonNode asJson() {
         ObjectNode json = OBJECT_MAPPER.createObjectNode();
         json.put("id", id.toString());
         json.put("state", status.name());
@@ -94,4 +152,9 @@
             throw new ReplicationException(e);
         }
     }
+
+    private synchronized void setStatus(PartitionReplicaStatus status) {
+        LOGGER.info(() -> "Replica " + this + " status changing: " + this.status + " -> " + status);
+        this.status = status;
+    }
 }
\ No newline at end of file
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
index 2ff74a8..398f97d 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
@@ -182,6 +182,7 @@
      * @param partition
      * @return Absolute paths to all partition files
      */
+    @Override
     public List<String> getPartitionIndexesFiles(int partition, boolean relativePath) throws HyracksDataException {
         List<String> partitionFiles = new ArrayList<String>();
         Set<File> partitionIndexes = localRepository.getPartitionIndexes(partition);