Asterix NCs Fault Tolerance

This change includes the following:
- Adapt replication to unique partitions storage.
- Implement auto failover for failing NCs.
- Implement auto failover for metadata node.
- Fix for ASTERIXDB-1251 using proper error message.
- Basic replication test cases using vagrant virtual cluster for:
   1. LSM bulkload components replication.
   2. LSM Memory components replication and recovery.
   3. Metadata node takeover.
These test cases will be part of the cluster test profile.

Change-Id: Ice26d980912a315fcb3efdd571d6ce88717cfea4
Reviewed-on: https://asterix-gerrit.ics.uci.edu/573
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
index 6d0b321..8a40876 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
@@ -19,7 +19,10 @@
 package org.apache.asterix.api.common;
 
 import java.io.IOException;
+import java.rmi.RemoteException;
+import java.rmi.server.UnicastRemoteObject;
 import java.util.List;
+import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.common.api.AsterixThreadExecutor;
@@ -46,9 +49,14 @@
 import org.apache.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
 import org.apache.asterix.common.transactions.IRecoveryManager;
 import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
+import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.asterix.external.feed.api.IFeedManager;
 import org.apache.asterix.external.feed.management.FeedManager;
-import org.apache.asterix.common.transactions.ITransactionSubsystem;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataNode;
+import org.apache.asterix.metadata.api.IAsterixStateProxy;
+import org.apache.asterix.metadata.api.IMetadataNode;
+import org.apache.asterix.metadata.bootstrap.MetadataBootstrap;
 import org.apache.asterix.metadata.bootstrap.MetadataIndexImmutableProperties;
 import org.apache.asterix.om.util.AsterixClusterProperties;
 import org.apache.asterix.replication.management.ReplicationChannel;
@@ -85,6 +93,7 @@
 import org.apache.hyracks.storage.common.file.IResourceIdFactory;
 
 public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAsterixPropertiesProvider {
+    private static final Logger LOGGER = Logger.getLogger(AsterixAppRuntimeContext.class.getName());
 
     private static final AsterixPropertiesAccessor ASTERIX_PROPERTIES_ACCESSOR;
 
@@ -128,8 +137,9 @@
     private IReplicationManager replicationManager;
     private IRemoteRecoveryManager remoteRecoveryManager;
     private IReplicaResourcesManager replicaResourcesManager;
+    private final int metadataRmiPort;
 
-    public AsterixAppRuntimeContext(INCApplicationContext ncApplicationContext) {
+    public AsterixAppRuntimeContext(INCApplicationContext ncApplicationContext, int metadataRmiPort) {
         this.ncApplicationContext = ncApplicationContext;
         compilerProperties = new AsterixCompilerProperties(ASTERIX_PROPERTIES_ACCESSOR);
         externalProperties = new AsterixExternalProperties(ASTERIX_PROPERTIES_ACCESSOR);
@@ -140,6 +150,7 @@
         buildProperties = new AsterixBuildProperties(ASTERIX_PROPERTIES_ACCESSOR);
         replicationProperties = new AsterixReplicationProperties(ASTERIX_PROPERTIES_ACCESSOR,
                 AsterixClusterProperties.INSTANCE.getCluster());
+        this.metadataRmiPort = metadataRmiPort;
     }
 
     public void initialize(boolean initialRun) throws IOException, ACIDException, AsterixException {
@@ -159,7 +170,8 @@
         metadataMergePolicyFactory = new PrefixMergePolicyFactory();
 
         ILocalResourceRepositoryFactory persistentLocalResourceRepositoryFactory = new PersistentLocalResourceRepositoryFactory(
-                ioManager, ncApplicationContext.getNodeId());
+                ioManager, ncApplicationContext.getNodeId(), metadataProperties);
+
         localResourceRepository = persistentLocalResourceRepositoryFactory.createRepository();
 
         IAsterixAppRuntimeContextProvider asterixAppRuntimeContextProvider = new AsterixAppRuntimeContextProdiverForRecovery(
@@ -186,9 +198,7 @@
         if (replicationProperties.isReplicationEnabled()) {
             String nodeId = ncApplicationContext.getNodeId();
 
-            replicaResourcesManager = new ReplicaResourcesManager(ioManager.getIODevices(),
-                    AsterixClusterProperties.INSTANCE.getStorageDirectoryName(), nodeId,
-                    replicationProperties.getReplicationStore());
+            replicaResourcesManager = new ReplicaResourcesManager(localResourceRepository, metadataProperties);
 
             replicationManager = new ReplicationManager(nodeId, replicationProperties, replicaResourcesManager,
                     txnSubsystem.getLogManager(), asterixAppRuntimeContextProvider);
@@ -225,14 +235,14 @@
          * to process any logs that might be generated during stopping these components
          */
         lccm.register((ILifeCycleComponent) txnSubsystem.getLogManager());
-        lccm.register((ILifeCycleComponent) txnSubsystem.getRecoveryManager());
         /**
-         * ReplicationManager must be stopped after indexLifecycleManager so that any logs/files generated
-         * during closing datasets are sent to remote replicas
+         * ReplicationManager must be stopped after indexLifecycleManager and recovery manager
+         * so that any logs/files generated during closing datasets or checkpoints are sent to remote replicas
          */
         if (replicationManager != null) {
             lccm.register(replicationManager);
         }
+        lccm.register((ILifeCycleComponent) txnSubsystem.getRecoveryManager());
         /**
          * Stopping indexLifecycleManager will flush and close all datasets.
          */
@@ -380,4 +390,35 @@
     public void initializeResourceIdFactory() throws HyracksDataException {
         resourceIdFactory = new GlobalResourceIdFactoryProvider(ncApplicationContext).createResourceIdFactory();
     }
+
+    @Override
+    public void initializeMetadata(boolean newUniverse) throws Exception {
+        IAsterixStateProxy proxy = null;
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Bootstrapping metadata");
+        }
+        MetadataNode.INSTANCE.initialize(this);
+
+        proxy = (IAsterixStateProxy) ncApplicationContext.getDistributedState();
+        if (proxy == null) {
+            throw new IllegalStateException("Metadata node cannot access distributed state");
+        }
+
+        // This is a special case, we just give the metadataNode directly.
+        // This way we can delay the registration of the metadataNode until
+        // it is completely initialized.
+        MetadataManager.INSTANCE = new MetadataManager(proxy, MetadataNode.INSTANCE);
+        MetadataBootstrap.startUniverse(this, ncApplicationContext, newUniverse);
+        MetadataBootstrap.startDDLRecovery();
+
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Metadata node bound");
+        }
+    }
+
+    @Override
+    public void exportMetadataNodeStub() throws RemoteException {
+        IMetadataNode stub = (IMetadataNode) UnicastRemoteObject.exportObject(MetadataNode.INSTANCE, metadataRmiPort);
+        ((IAsterixStateProxy) ncApplicationContext.getDistributedState()).setMetadataNode(stub);
+    }
 }
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
index 2a7b3e4..f2fc9bf 100644
--- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
@@ -79,7 +79,7 @@
 
     @Override
     public void start(ICCApplicationContext ccAppCtx, String[] args) throws Exception {
-        messageBroker = new CCMessageBroker((ClusterControllerService)ccAppCtx.getControllerService());
+        messageBroker = new CCMessageBroker((ClusterControllerService) ccAppCtx.getControllerService());
         this.appCtx = ccAppCtx;
 
         if (LOGGER.isLoggable(Level.INFO)) {
@@ -87,7 +87,11 @@
         }
 
         appCtx.setThreadFactory(new AsterixThreadFactory(new LifeCycleComponentManager()));
-        AsterixAppContextInfo.initialize(appCtx, getNewHyracksClientConnection());
+        GlobalRecoveryManager.INSTANCE = new GlobalRecoveryManager(
+                (HyracksConnection) getNewHyracksClientConnection());
+
+        AsterixAppContextInfo.initialize(appCtx, getNewHyracksClientConnection(),
+                GlobalRecoveryManager.INSTANCE);
 
         proxy = AsterixStateProxy.registerRemoteObject();
         appCtx.setDistributedState(proxy);
@@ -111,9 +115,7 @@
         centralFeedManager = CentralFeedManager.getInstance();
         centralFeedManager.start();
 
-        AsterixGlobalRecoveryManager.INSTANCE = new AsterixGlobalRecoveryManager(
-                (HyracksConnection) getNewHyracksClientConnection());
-        ClusterManager.INSTANCE.registerSubscriber(AsterixGlobalRecoveryManager.INSTANCE);
+        ClusterManager.INSTANCE.registerSubscriber(GlobalRecoveryManager.INSTANCE);
 
         AsterixReplicationProperties asterixRepliactionProperties = AsterixAppContextInfo.getInstance()
                 .getReplicationProperties();
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
index 9505692..00b7391 100644
--- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
@@ -95,19 +95,12 @@
             if (LOGGER.isLoggable(Level.INFO)) {
                 LOGGER.info("NC: " + deadNode + " left");
             }
-            AsterixClusterProperties.INSTANCE.removeNCConfiguration(deadNode);
-
             //if metadata node failed, we need to rebind the proxy connection when it joins again.
-            //Note: the format for the NC should be (INSTANCE-NAME)_(NC-ID)
-            if (AsterixClusterProperties.INSTANCE.getCluster() != null) {
-                String instanceName = AsterixClusterProperties.INSTANCE.getCluster().getInstanceName();
-                String metadataNodeName = AsterixClusterProperties.INSTANCE.getCluster().getMetadataNode();
-                String completeMetadataNodeName = instanceName + "_" + metadataNodeName;
-                if (deadNode.equals(completeMetadataNodeName)) {
-                    MetadataManager.INSTANCE.rebindMetadataNode = true;
-                }
+            String metadataNode = AsterixClusterProperties.INSTANCE.getCurrentMetadataNode();
+            if (deadNode.equals(metadataNode)) {
+                MetadataManager.INSTANCE.rebindMetadataNode = true;
             }
-
+            AsterixClusterProperties.INSTANCE.removeNCConfiguration(deadNode);
         }
         updateProgress(ClusterEventType.NODE_FAILURE, deadNodeIds);
         Set<IClusterEventsSubscriber> subscribers = ClusterManager.INSTANCE.getRegisteredClusterEventSubscribers();
@@ -166,7 +159,8 @@
                 case REMOVE_NODE:
                     nodesToRemove.addAll(((RemoveNodeWork) w).getNodesToBeRemoved());
                     nodeRemovalRequests.add(w);
-                    RemoveNodeWorkResponse response = new RemoveNodeWorkResponse((RemoveNodeWork) w, Status.IN_PROGRESS);
+                    RemoveNodeWorkResponse response = new RemoveNodeWorkResponse((RemoveNodeWork) w,
+                            Status.IN_PROGRESS);
                     pendingWorkResponses.add(response);
                     break;
             }
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/AsterixGlobalRecoveryManager.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
similarity index 91%
rename from asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/AsterixGlobalRecoveryManager.java
rename to asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
index 4fae7e9..2bac1cf 100644
--- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/AsterixGlobalRecoveryManager.java
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
@@ -23,13 +23,14 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.asterix.common.api.IClusterEventsSubscriber;
 import org.apache.asterix.common.api.IClusterManagementWork;
+import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
 import org.apache.asterix.common.api.IClusterManagementWorkResponse;
-import org.apache.asterix.common.config.MetadataConstants;
+import org.apache.asterix.common.cluster.IGlobalRecoveryMaanger;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.DatasetConfig.ExternalDatasetTransactionState;
 import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
+import org.apache.asterix.common.config.MetadataConstants;
 import org.apache.asterix.external.indexing.ExternalFile;
 import org.apache.asterix.feed.CentralFeedManager;
 import org.apache.asterix.file.ExternalIndexingOperations;
@@ -41,20 +42,19 @@
 import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
 import org.apache.asterix.metadata.entities.Index;
 import org.apache.asterix.om.util.AsterixClusterProperties;
-import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
 import org.apache.hyracks.api.client.HyracksConnection;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 
-public class AsterixGlobalRecoveryManager implements IClusterEventsSubscriber {
+public class GlobalRecoveryManager implements IGlobalRecoveryMaanger {
 
     private static ClusterState state;
-    private static final Logger LOGGER = Logger.getLogger(AsterixGlobalRecoveryManager.class.getName());
+    private static final Logger LOGGER = Logger.getLogger(GlobalRecoveryManager.class.getName());
     private HyracksConnection hcc;
-    public static AsterixGlobalRecoveryManager INSTANCE;
+    public static GlobalRecoveryManager INSTANCE;
 
-    public AsterixGlobalRecoveryManager(HyracksConnection hcc) throws Exception {
-        state = AsterixClusterProperties.INSTANCE.getState();
+    public GlobalRecoveryManager(HyracksConnection hcc) throws Exception {
+        state = ClusterState.UNUSABLE;
         this.hcc = hcc;
     }
 
@@ -67,6 +67,28 @@
 
     @Override
     public Set<IClusterManagementWork> notifyNodeJoin(String joinedNodeId) {
+        startGlobalRecovery();
+        return null;
+    }
+
+    private void executeHyracksJob(JobSpecification spec) throws Exception {
+        spec.setMaxReattempts(0);
+        JobId jobId = hcc.startJob(spec);
+        hcc.waitForCompletion(jobId);
+    }
+
+    @Override
+    public void notifyRequestCompletion(IClusterManagementWorkResponse response) {
+        // Do nothing
+    }
+
+    @Override
+    public void notifyStateChange(ClusterState previousState, ClusterState newState) {
+        // Do nothing?
+    }
+
+    @Override
+    public void startGlobalRecovery() {
         // perform global recovery if state changed to active
         final ClusterState newState = AsterixClusterProperties.INSTANCE.getState();
         boolean needToRecover = !newState.equals(state) && (newState == ClusterState.ACTIVE);
@@ -84,7 +106,8 @@
                         List<Dataverse> dataverses = MetadataManager.INSTANCE.getDataverses(mdTxnCtx);
                         for (Dataverse dataverse : dataverses) {
                             if (!dataverse.getDataverseName().equals(MetadataConstants.METADATA_DATAVERSE_NAME)) {
-                                AqlMetadataProvider metadataProvider = new AqlMetadataProvider(dataverse, CentralFeedManager.getInstance());
+                                AqlMetadataProvider metadataProvider = new AqlMetadataProvider(dataverse,
+                                        CentralFeedManager.getInstance());
                                 List<Dataset> datasets = MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx,
                                         dataverse.getDataverseName());
                                 for (Dataset dataset : datasets) {
@@ -110,8 +133,8 @@
                                                 }
                                                 // 2. clean artifacts in NCs
                                                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
-                                                JobSpecification jobSpec = ExternalIndexingOperations.buildAbortOp(
-                                                        dataset, indexes, metadataProvider);
+                                                JobSpecification jobSpec = ExternalIndexingOperations
+                                                        .buildAbortOp(dataset, indexes, metadataProvider);
                                                 executeHyracksJob(jobSpec);
                                                 // 3. correct the dataset state
                                                 ((ExternalDatasetDetails) dataset.getDatasetDetails())
@@ -125,8 +148,8 @@
                                                 // if ready to commit, roll forward
                                                 // 1. commit indexes in NCs
                                                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
-                                                JobSpecification jobSpec = ExternalIndexingOperations.buildRecoverOp(
-                                                        dataset, indexes, metadataProvider);
+                                                JobSpecification jobSpec = ExternalIndexingOperations
+                                                        .buildRecoverOp(dataset, indexes, metadataProvider);
                                                 executeHyracksJob(jobSpec);
                                                 // 2. add pending files in metadata
                                                 for (ExternalFile file : files) {
@@ -134,7 +157,8 @@
                                                         MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file);
                                                         file.setPendingOp(ExternalFilePendingOp.PENDING_NO_OP);
                                                         MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, file);
-                                                    } else if (file.getPendingOp() == ExternalFilePendingOp.PENDING_DROP_OP) {
+                                                    } else if (file
+                                                            .getPendingOp() == ExternalFilePendingOp.PENDING_DROP_OP) {
                                                         // find original file
                                                         for (ExternalFile originalFile : files) {
                                                             if (originalFile.getFileName().equals(file.getFileName())) {
@@ -145,7 +169,8 @@
                                                                 break;
                                                             }
                                                         }
-                                                    } else if (file.getPendingOp() == ExternalFilePendingOp.PENDING_APPEND_OP) {
+                                                    } else if (file
+                                                            .getPendingOp() == ExternalFilePendingOp.PENDING_APPEND_OP) {
                                                         // find original file
                                                         for (ExternalFile originalFile : files) {
                                                             if (originalFile.getFileName().equals(file.getFileName())) {
@@ -196,22 +221,5 @@
             state = newState;
             recoveryThread.start();
         }
-        return null;
-    }
-
-    private void executeHyracksJob(JobSpecification spec) throws Exception {
-        spec.setMaxReattempts(0);
-        JobId jobId = hcc.startJob(spec);
-        hcc.waitForCompletion(jobId);
-    }
-
-    @Override
-    public void notifyRequestCompletion(IClusterManagementWorkResponse response) {
-        // Do nothing
-    }
-
-    @Override
-    public void notifyStateChange(ClusterState previousState, ClusterState newState) {
-        // Do nothing?
     }
 }
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index 3341387..dac9af5 100644
--- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -20,7 +20,6 @@
 
 import java.io.File;
 import java.io.IOException;
-import java.rmi.server.UnicastRemoteObject;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -42,10 +41,6 @@
 import org.apache.asterix.event.schema.cluster.Cluster;
 import org.apache.asterix.event.schema.cluster.Node;
 import org.apache.asterix.messaging.NCMessageBroker;
-import org.apache.asterix.metadata.MetadataManager;
-import org.apache.asterix.metadata.MetadataNode;
-import org.apache.asterix.metadata.api.IAsterixStateProxy;
-import org.apache.asterix.metadata.api.IMetadataNode;
 import org.apache.asterix.metadata.bootstrap.MetadataBootstrap;
 import org.apache.asterix.om.util.AsterixClusterProperties;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
@@ -102,7 +97,7 @@
             LOGGER.info("Starting Asterix node controller: " + nodeId);
         }
 
-        runtimeContext = new AsterixAppRuntimeContext(ncApplicationContext);
+        runtimeContext = new AsterixAppRuntimeContext(ncApplicationContext, metadataRmiPort);
         AsterixMetadataProperties metadataProperties = ((IAsterixPropertiesProvider) runtimeContext)
                 .getMetadataProperties();
         if (!metadataProperties.getNodeNames().contains(ncApplicationContext.getNodeId())) {
@@ -215,33 +210,12 @@
             localResourceRepository.initializeNewUniverse(AsterixClusterProperties.INSTANCE.getStorageDirectoryName());
         }
 
-        IAsterixStateProxy proxy = null;
         isMetadataNode = nodeId.equals(metadataProperties.getMetadataNodeName());
         if (isMetadataNode) {
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Bootstrapping metadata");
-            }
-            MetadataNode.INSTANCE.initialize(runtimeContext);
-
-            proxy = (IAsterixStateProxy) ncApplicationContext.getDistributedState();
-            if (proxy == null) {
-                throw new IllegalStateException("Metadata node cannot access distributed state");
-            }
-
-            //This is a special case, we just give the metadataNode directly.
-            //This way we can delay the registration of the metadataNode until
-            //it is completely initialized.
-            MetadataManager.INSTANCE = new MetadataManager(proxy, MetadataNode.INSTANCE);
-            MetadataBootstrap.startUniverse(((IAsterixPropertiesProvider) runtimeContext), ncApplicationContext,
-                    systemState == SystemState.NEW_UNIVERSE);
-            MetadataBootstrap.startDDLRecovery();
-
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Metadata node bound");
-            }
+            runtimeContext.initializeMetadata(systemState == SystemState.NEW_UNIVERSE);
         }
-
         ExternalLibraryBootstrap.setUpExternaLibraries(isMetadataNode);
+
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Starting lifecycle components");
         }
@@ -267,9 +241,7 @@
         recoveryMgr.checkpoint(true, RecoveryManager.NON_SHARP_CHECKPOINT_TARGET_LSN);
 
         if (isMetadataNode) {
-            IMetadataNode stub = null;
-            stub = (IMetadataNode) UnicastRemoteObject.exportObject(MetadataNode.INSTANCE, metadataRmiPort);
-            proxy.setMetadataNode(stub);
+            runtimeContext.exportMetadataNodeStub();
         }
 
         //Clean any temporary files
diff --git a/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java b/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
index 095ef1b..aeaef59 100644
--- a/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
+++ b/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
@@ -29,14 +29,17 @@
 import org.apache.asterix.common.messaging.ReportMaxResourceIdRequestMessage;
 import org.apache.asterix.common.messaging.ResourceIdRequestMessage;
 import org.apache.asterix.common.messaging.ResourceIdRequestResponseMessage;
+import org.apache.asterix.common.messaging.TakeoverMetadataNodeResponseMessage;
+import org.apache.asterix.common.messaging.TakeoverPartitionsResponseMessage;
+import org.apache.asterix.common.messaging.api.IApplicationMessage;
+import org.apache.asterix.common.messaging.api.ICCMessageBroker;
 import org.apache.asterix.om.util.AsterixClusterProperties;
 import org.apache.hyracks.api.messages.IMessage;
-import org.apache.hyracks.api.messages.IMessageBroker;
 import org.apache.hyracks.api.util.JavaSerializationUtils;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.cc.NodeControllerState;
 
-public class CCMessageBroker implements IMessageBroker {
+public class CCMessageBroker implements ICCMessageBroker {
 
     private final static Logger LOGGER = Logger.getLogger(CCMessageBroker.class.getName());
     private final AtomicLong globalResourceId = new AtomicLong(0);
@@ -58,6 +61,12 @@
             case REPORT_MAX_RESOURCE_ID_RESPONSE:
                 handleReportResourceMaxIdResponse(message, nodeId);
                 break;
+            case TAKEOVER_PARTITIONS_RESPONSE:
+                handleTakeoverPartitionsResponse(message);
+                break;
+            case TAKEOVER_METADATA_NODE_RESPONSE:
+                handleTakeoverMetadataNodeResponse(message);
+                break;
             default:
                 LOGGER.warning("Unknown message: " + absMessage.getMessageType());
                 break;
@@ -89,7 +98,8 @@
         nodesReportedMaxResourceId.add(nodeId);
     }
 
-    private void sendApplicationMessageToNC(IMessage msg, String nodeId) throws Exception {
+    @Override
+    public void sendApplicationMessageToNC(IApplicationMessage msg, String nodeId) throws Exception {
         Map<String, NodeControllerState> nodeMap = ccs.getNodeMap();
         NodeControllerState state = nodeMap.get(nodeId);
         state.getNodeController().sendApplicationMessageToNC(JavaSerializationUtils.serialize(msg), null, nodeId);
@@ -106,4 +116,14 @@
             }
         }
     }
+
+    private void handleTakeoverPartitionsResponse(IMessage message) {
+        TakeoverPartitionsResponseMessage msg = (TakeoverPartitionsResponseMessage) message;
+        AsterixClusterProperties.INSTANCE.processPartitionTakeoverResponse(msg);
+    }
+
+    private void handleTakeoverMetadataNodeResponse(IMessage message) {
+        TakeoverMetadataNodeResponseMessage msg = (TakeoverMetadataNodeResponseMessage) message;
+        AsterixClusterProperties.INSTANCE.processMetadataNodeTakeoverResponse(msg);
+    }
 }
diff --git a/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
index 001771e..8f8723e 100644
--- a/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
+++ b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
@@ -25,9 +25,13 @@
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.common.messaging.AbstractApplicationMessage;
 import org.apache.asterix.common.messaging.ReportMaxResourceIdMessage;
+import org.apache.asterix.common.messaging.TakeoverMetadataNodeResponseMessage;
+import org.apache.asterix.common.messaging.TakeoverPartitionsRequestMessage;
+import org.apache.asterix.common.messaging.TakeoverPartitionsResponseMessage;
 import org.apache.asterix.common.messaging.api.IApplicationMessage;
 import org.apache.asterix.common.messaging.api.IApplicationMessageCallback;
 import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.asterix.common.replication.IRemoteRecoveryManager;
 import org.apache.asterix.metadata.bootstrap.MetadataIndexImmutableProperties;
 import org.apache.hyracks.api.messages.IMessage;
 import org.apache.hyracks.api.util.JavaSerializationUtils;
@@ -75,11 +79,40 @@
             case REPORT_MAX_RESOURCE_ID_REQUEST:
                 reportMaxResourceId();
                 break;
+            case TAKEOVER_PARTITIONS_REQUEST:
+                handleTakeoverPartitons(message);
+                break;
+            case TAKEOVER_METADATA_NODE_REQUEST:
+                handleTakeoverMetadataNode(message);
+                break;
             default:
                 break;
         }
     }
 
+    private void handleTakeoverPartitons(IMessage message) throws Exception {
+        TakeoverPartitionsRequestMessage msg = (TakeoverPartitionsRequestMessage) message;
+        IAsterixAppRuntimeContext appContext = (IAsterixAppRuntimeContext) ncs.getApplicationContext()
+                .getApplicationObject();
+        IRemoteRecoveryManager remoteRecoeryManager = appContext.getRemoteRecoveryManager();
+        remoteRecoeryManager.takeoverPartitons(msg.getFailedNode(), msg.getPartitions());
+        //send response after takeover is completed
+        TakeoverPartitionsResponseMessage reponse = new TakeoverPartitionsResponseMessage(msg.getRequestId(),
+                appContext.getTransactionSubsystem().getId(), msg.getPartitions());
+        sendMessage(reponse, null);
+    }
+
+    private void handleTakeoverMetadataNode(IMessage message) throws Exception {
+        IAsterixAppRuntimeContext appContext = (IAsterixAppRuntimeContext) ncs.getApplicationContext()
+                .getApplicationObject();
+        appContext.initializeMetadata(false);
+        appContext.exportMetadataNodeStub();
+        //send response after takeover is completed
+        TakeoverMetadataNodeResponseMessage reponse = new TakeoverMetadataNodeResponseMessage(
+                appContext.getTransactionSubsystem().getId());
+        sendMessage(reponse, null);
+    }
+
     @Override
     public void reportMaxResourceId() throws Exception {
         IAsterixAppRuntimeContext appContext = (IAsterixAppRuntimeContext) ncs.getApplicationContext()
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java b/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
index 3386252..975180b 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.common.api;
 
 import java.io.IOException;
+import java.rmi.RemoteException;
 import java.util.List;
 import java.util.concurrent.Executor;
 
@@ -89,4 +90,17 @@
     public IReplicationChannel getReplicationChannel();
 
     public void initializeResourceIdFactory() throws HyracksDataException;
+
+    /**
+     * Exports the metadata node to the metadata RMI port.
+     * @throws RemoteException
+     */
+    public void exportMetadataNodeStub() throws RemoteException;
+
+    /**
+     * Initializes the metadata node and bootstraps the metadata.
+     * @param newUniverse
+     * @throws Exception
+     */
+    public void initializeMetadata(boolean newUniverse) throws Exception;
 }
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalRecoveryMaanger.java b/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalRecoveryMaanger.java
new file mode 100644
index 0000000..48b1e73
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalRecoveryMaanger.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.cluster;
+
+import org.apache.asterix.common.api.IClusterEventsSubscriber;
+
+public interface IGlobalRecoveryMaanger extends IClusterEventsSubscriber {
+
+    /**
+     * Starts the global recovery process if the cluster state changed to ACTIVE.
+     */
+    public void startGlobalRecovery();
+}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
index 8e2c4e7..473a163 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
@@ -65,4 +65,8 @@
     public SortedMap<Integer, ClusterPartition> getClusterPartitions() {
         return accessor.getClusterPartitions();
     }
+
+    public Map<String, String> getTransactionLogDirs() {
+        return accessor.getTransactionLogDirs();
+    }
 }
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
index 1ef7e3e..fa5b503 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
@@ -36,7 +36,6 @@
     private static int REPLICATION_TIME_OUT_DEFAULT = 15;
 
     private static final String NODE_IP_ADDRESS_DEFAULT = "127.0.0.1";
-    private static final String REPLICATION_STORE_DEFAULT = "asterix-replication";
     private final String NODE_NAME_PREFIX;
     private final Cluster cluster;
 
@@ -102,8 +101,8 @@
             }
 
             if (nodeIndex == -1) {
-                LOGGER.log(Level.WARNING, "Could not find node " + getRealCluserNodeID(nodeId)
-                        + " in cluster configurations");
+                LOGGER.log(Level.WARNING,
+                        "Could not find node " + getRealCluserNodeID(nodeId) + " in cluster configurations");
                 return null;
             }
 
@@ -179,13 +178,6 @@
         return replicaIds;
     }
 
-    public String getReplicationStore() {
-        if (cluster != null) {
-            return cluster.getDataReplication().getReplicationStore();
-        }
-        return REPLICATION_STORE_DEFAULT;
-    }
-
     public int getReplicationFactor() {
         if (cluster != null) {
             if (cluster.getDataReplication() == null || cluster.getDataReplication().getReplicationFactor() == null) {
@@ -202,5 +194,4 @@
         }
         return REPLICATION_TIME_OUT_DEFAULT;
     }
-
 }
\ No newline at end of file
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IAsterixApplicationContextInfo.java b/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IAsterixApplicationContextInfo.java
index 8dc3efe..a5cd72b 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IAsterixApplicationContextInfo.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IAsterixApplicationContextInfo.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.common.dataflow;
 
+import org.apache.asterix.common.cluster.IGlobalRecoveryMaanger;
 import org.apache.hyracks.api.application.ICCApplicationContext;
 import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
 import org.apache.hyracks.storage.common.IStorageManagerInterface;
@@ -48,4 +49,6 @@
      * @return ICCApplicationContext implementation instance
      */
     public ICCApplicationContext getCCApplicationContext();
+
+    public IGlobalRecoveryMaanger getGlobalRecoveryManager();
 }
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeRequestMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeRequestMessage.java
new file mode 100644
index 0000000..78b86a8
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeRequestMessage.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging;
+
+public class TakeoverMetadataNodeRequestMessage extends AbstractApplicationMessage {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public ApplicationMessageType getMessageType() {
+        return ApplicationMessageType.TAKEOVER_METADATA_NODE_REQUEST;
+    }
+}
\ No newline at end of file
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeResponseMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeResponseMessage.java
new file mode 100644
index 0000000..78f7429
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeResponseMessage.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging;
+
+public class TakeoverMetadataNodeResponseMessage extends AbstractApplicationMessage {
+
+    private static final long serialVersionUID = 1L;
+    private final String nodeId;
+
+    public TakeoverMetadataNodeResponseMessage(String nodeId) {
+        this.nodeId = nodeId;
+    }
+
+    @Override
+    public ApplicationMessageType getMessageType() {
+        return ApplicationMessageType.TAKEOVER_METADATA_NODE_RESPONSE;
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+}
\ No newline at end of file
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsRequestMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsRequestMessage.java
new file mode 100644
index 0000000..abfa7d2
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsRequestMessage.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging;
+
+public class TakeoverPartitionsRequestMessage extends AbstractApplicationMessage {
+
+    private static final long serialVersionUID = 1L;
+    private final Integer[] partitions;
+    private final String failedNode;
+    private final long requestId;
+    private final String nodeId;
+
+    public TakeoverPartitionsRequestMessage(long requestId, String nodeId, String failedNode,
+            Integer[] partitionsToTakeover) {
+        this.requestId = requestId;
+        this.nodeId = nodeId;
+        this.failedNode = failedNode;
+        this.partitions = partitionsToTakeover;
+    }
+
+    @Override
+    public ApplicationMessageType getMessageType() {
+        return ApplicationMessageType.TAKEOVER_PARTITIONS_REQUEST;
+    }
+
+    public Integer[] getPartitions() {
+        return partitions;
+    }
+
+    public long getRequestId() {
+        return requestId;
+    }
+
+    public String getFailedNode() {
+        return failedNode;
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("Request ID: " + requestId);
+        sb.append(" Node ID: " + nodeId);
+        sb.append(" Failed Node: " + failedNode);
+        sb.append(" Partitions: ");
+        for (Integer partitionId : partitions) {
+            sb.append(partitionId + ",");
+        }
+        //remove last comma
+        sb.charAt(sb.length() - 1);
+        return sb.toString();
+    }
+}
\ No newline at end of file
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsResponseMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsResponseMessage.java
new file mode 100644
index 0000000..86eb3cb
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsResponseMessage.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging;
+
+public class TakeoverPartitionsResponseMessage extends AbstractApplicationMessage {
+
+    private static final long serialVersionUID = 1L;
+    private final Integer[] partitions;
+    private final String nodeId;
+    private final long requestId;
+
+    public TakeoverPartitionsResponseMessage(long requestId, String nodeId, Integer[] partitionsToTakeover) {
+        this.requestId = requestId;
+        this.nodeId = nodeId;
+        this.partitions = partitionsToTakeover;
+    }
+
+    @Override
+    public ApplicationMessageType getMessageType() {
+        return ApplicationMessageType.TAKEOVER_PARTITIONS_RESPONSE;
+    }
+
+    public Integer[] getPartitions() {
+        return partitions;
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    public long getRequestId() {
+        return requestId;
+    }
+}
\ No newline at end of file
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
index 61ab7cd..57a0dae 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
@@ -26,7 +26,11 @@
         RESOURCE_ID_REQUEST,
         RESOURCE_ID_RESPONSE,
         REPORT_MAX_RESOURCE_ID_REQUEST,
-        REPORT_MAX_RESOURCE_ID_RESPONSE
+        REPORT_MAX_RESOURCE_ID_RESPONSE,
+        TAKEOVER_PARTITIONS_REQUEST,
+        TAKEOVER_PARTITIONS_RESPONSE,
+        TAKEOVER_METADATA_NODE_REQUEST,
+        TAKEOVER_METADATA_NODE_RESPONSE
     }
 
     public abstract ApplicationMessageType getMessageType();
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
new file mode 100644
index 0000000..7dafbd5
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging.api;
+
+import org.apache.hyracks.api.messages.IMessageBroker;
+
+public interface ICCMessageBroker extends IMessageBroker {
+
+    /**
+     * Sends the passed message to the specified {@code nodeId}
+     * @param msg
+     * @param nodeId
+     * @throws Exception
+     */
+    public void sendApplicationMessageToNC(IApplicationMessage msg, String nodeId) throws Exception;
+}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java b/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
index 63d29a0..ecc9494 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
@@ -18,8 +18,24 @@
  */
 package org.apache.asterix.common.replication;
 
+import java.io.IOException;
+
+import org.apache.asterix.common.exceptions.ACIDException;
+
 public interface IRemoteRecoveryManager {
 
+    /**
+     * Attempts to perform the remote recovery process from an active remote replica.
+     */
     public void performRemoteRecovery();
 
+    /**
+     * Performs the partitions takeover process from the {@code failedNode}
+     * @param failedNode
+     * @param partitions
+     * @throws IOException
+     * @throws ACIDException
+     */
+    public void takeoverPartitons(String failedNode, Integer[] partitions) throws IOException, ACIDException;
+
 }
\ No newline at end of file
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java b/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java
index c796f37..f13d300 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java
@@ -22,10 +22,15 @@
 
 public interface IReplicaResourcesManager {
 
-    public String getIndexPath(String backupNodeId, int IODeviceNum, String dataverse, String dataset);
-
-    public String getLocalStorageFolder();
-
+    /**
+     * @param remoteNodes
+     * @return The minimum LSN of all indexes that belong to {@code remoteNodes}.
+     */
     public long getMinRemoteLSN(Set<String> remoteNodes);
 
+    /**
+     * @param partitions
+     * @return the minimum LSN of all indexes that belong to {@code partitions}.
+     */
+    public long getPartitionsMinLSN(Integer[] partitions);
 }
\ No newline at end of file
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java b/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
index 9ea9957..a2b7a82 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
@@ -111,4 +111,14 @@
      * @throws HyracksDataException
      */
     public long getLocalMinFirstLSN() throws HyracksDataException;
+
+    /**
+     * Replay the logs that belong to the passed {@code partitions} starting from the {@code lowWaterMarkLSN}
+     * @param partitions
+     * @param lowWaterMarkLSN
+     * @param failedNode
+     * @throws IOException
+     * @throws ACIDException
+     */
+    public void replayPartitionsLogs(Integer[] partitions, long lowWaterMarkLSN, String failedNode) throws IOException, ACIDException;
 }
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java b/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
index acfb9d5..48e42bd 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
@@ -56,6 +56,14 @@
     }
 
     public static String prepareDataverseIndexName(String dataverseName, String datasetName, String idxName) {
-        return dataverseName + File.separator + datasetName + StoragePathUtil.DATASET_INDEX_NAME_SEPARATOR + idxName;
+        return prepareDataverseIndexName(dataverseName, prepareFullIndexName(datasetName, idxName));
+    }
+
+    public static String prepareDataverseIndexName(String dataverseName, String fullIndexName) {
+        return dataverseName + File.separator + fullIndexName;
+    }
+
+    private static String prepareFullIndexName(String datasetName, String idxName) {
+        return (datasetName + DATASET_INDEX_NAME_SEPARATOR + idxName);
     }
 }
diff --git a/asterix-common/src/main/resources/schema/cluster.xsd b/asterix-common/src/main/resources/schema/cluster.xsd
index 872c959..e0605f0 100644
--- a/asterix-common/src/main/resources/schema/cluster.xsd
+++ b/asterix-common/src/main/resources/schema/cluster.xsd
@@ -47,7 +47,7 @@
 	<xs:element name="enabled" type="xs:boolean" />
 	<xs:element name="replication_port" type="xs:integer" />
 	<xs:element name="replication_factor" type="xs:integer" />
-	<xs:element name="replication_store" type="xs:string" />
+	<xs:element name="auto_failover" type="xs:boolean" />
 	<xs:element name="replication_time_out" type="xs:integer" />
 
 	<!-- definition of complex elements -->
@@ -82,7 +82,7 @@
 				<xs:element ref="cl:enabled" />
 				<xs:element ref="cl:replication_port" />
 				<xs:element ref="cl:replication_factor" />
-				<xs:element ref="cl:replication_store" />
+				<xs:element ref="cl:auto_failover" />
 				<xs:element ref="cl:replication_time_out" />
 			</xs:sequence>
 		</xs:complexType>
diff --git a/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java b/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
index 447e96d..f54db63 100644
--- a/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
+++ b/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
@@ -24,12 +24,14 @@
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.FileReader;
+import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.nio.charset.StandardCharsets;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -54,6 +56,7 @@
 import org.apache.commons.httpclient.methods.StringRequestEntity;
 import org.apache.commons.httpclient.params.HttpMethodParams;
 import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.json.JSONObject;
 
 public class TestExecutor {
@@ -361,6 +364,14 @@
         return getProcessOutput(p);
     }
 
+    private static String executeVagrantScript(ProcessBuilder pb, String node, String scriptName) throws Exception {
+        pb.command("vagrant", "ssh", node, "--", pb.environment().get("SCRIPT_HOME") + scriptName);
+        Process p = pb.start();
+        p.waitFor();
+        InputStream input = p.getInputStream();
+        return IOUtils.toString(input, StandardCharsets.UTF_8.name());
+    }
+
     private static String getScriptPath(String queryPath, String scriptBasePath, String scriptFileName) {
         String targetWord = "queries" + File.separator;
         int targetWordSize = targetWord.lastIndexOf(File.separator);
@@ -565,6 +576,22 @@
                             }
                             System.err.println("...but that was expected.");
                             break;
+                        case "vagrant_script":
+                            try {
+                                String[] command = statement.trim().split(" ");
+                                if (command.length != 2) {
+                                    throw new Exception("invalid vagrant script format");
+                                }
+                                String nodeId = command[0];
+                                String scriptName = command[1];
+                                String output = executeVagrantScript(pb, nodeId, scriptName);
+                                if (output.contains("ERROR")) {
+                                    throw new Exception(output);
+                                }
+                            } catch (Exception e) {
+                                throw new Exception("Test \"" + testFile + "\" FAILED!\n", e);
+                            }
+                            break;
                         default:
                             throw new IllegalArgumentException("No statements of type " + ctx.getType());
                     }
diff --git a/asterix-events/src/main/java/org/apache/asterix/event/util/PatternCreator.java b/asterix-events/src/main/java/org/apache/asterix/event/util/PatternCreator.java
index 6085019..ce84cc8 100644
--- a/asterix-events/src/main/java/org/apache/asterix/event/util/PatternCreator.java
+++ b/asterix-events/src/main/java/org/apache/asterix/event/util/PatternCreator.java
@@ -315,10 +315,6 @@
         patternList.addAll(createRemoveAsterixLogDirPattern(instance).getPattern());
         patternList.addAll(createRemoveAsterixRootMetadata(instance).getPattern());
         patternList.addAll(createRemoveAsterixTxnLogs(instance).getPattern());
-        if (instance.getCluster().getDataReplication() != null
-                && instance.getCluster().getDataReplication().isEnabled()) {
-            patternList.addAll(createRemoveAsterixReplicationPattern(instance).getPattern());
-        }
         Patterns patterns = new Patterns(patternList);
         return patterns;
     }
@@ -647,29 +643,4 @@
         Patterns patterns = new Patterns(patternList);
         return patterns;
     }
-
-    private Patterns createRemoveAsterixReplicationPattern(AsterixInstance instance) throws Exception {
-
-        List<Pattern> patternList = new ArrayList<Pattern>();
-        Cluster cluster = instance.getCluster();
-
-        Nodeid nodeid = null;
-        String pargs = null;
-        Event event = null;
-        for (Node node : cluster.getNode()) {
-            String[] nodeIODevices;
-            String iodevices = node.getIodevices() == null ? cluster.getIodevices() : node.getIodevices();
-            nodeIODevices = iodevices.trim().split(",");
-            for (String nodeIODevice : nodeIODevices) {
-                pargs = nodeIODevice + File.separator + cluster.getDataReplication().getReplicationStore();
-                nodeid = new Nodeid(new Value(null, node.getId()));
-                event = new Event("file_delete", nodeid, pargs);
-                patternList.add(new Pattern(null, 1, null, event));
-            }
-        }
-
-        Patterns patterns = new Patterns(patternList);
-        return patterns;
-    }
-
 }
diff --git a/asterix-installer/pom.xml b/asterix-installer/pom.xml
index 34c3e0a..b257e71 100644
--- a/asterix-installer/pom.xml
+++ b/asterix-installer/pom.xml
@@ -29,6 +29,7 @@
 		<failsafe.test.excludes>**/DmlRecoveryIT.java</failsafe.test.excludes>
         <cluster.test.excludes>**/AsterixClusterLifeCycleIT.java</cluster.test.excludes>
 		<cluster.extest.excludes>**/ClusterExecutionIT.java</cluster.extest.excludes>
+        <replication.test.excludes>**/ReplicationIT.java</replication.test.excludes>
 	</properties>
 
   <licenses>
@@ -123,6 +124,7 @@
 						<exclude>${failsafe.test.excludes}</exclude>
                         <exclude>${cluster.test.excludes}</exclude>
 					    <exclude>${cluster.extest.excludes}</exclude>
+                        <exclude>${replication.test.excludes}</exclude>
 					</excludes>
 				</configuration>
 				<executions>
@@ -194,6 +196,8 @@
                             <forkMode>pertest</forkMode>
                             <excludes>
                                 <exclude>${failsafe.test.excludes}</exclude>
+                                <exclude>${cluster.test.excludes}</exclude>
+                                <exclude>${cluster.extest.excludes}</exclude>
                             </excludes>
                         </configuration>
                         <executions>
diff --git a/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java b/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
index 09c65c8..9559394 100644
--- a/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
+++ b/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
@@ -288,12 +288,6 @@
                 valid = false;
             }
 
-            if (cluster.getDataReplication().getReplicationStore() == null
-                    || cluster.getDataReplication().getReplicationStore().length() == 0) {
-                valid = false;
-                LOGGER.fatal("Replication store not defined. " + ERROR);
-            }
-
             if (cluster.getDataReplication().getReplicationPort() == null
                     || cluster.getDataReplication().getReplicationPort().toString().length() == 0) {
                 valid = false;
diff --git a/asterix-installer/src/test/java/org/apache/asterix/installer/test/ReplicationIT.java b/asterix-installer/src/test/java/org/apache/asterix/installer/test/ReplicationIT.java
new file mode 100644
index 0000000..86c15ae
--- /dev/null
+++ b/asterix-installer/src/test/java/org/apache/asterix/installer/test/ReplicationIT.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.installer.test;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+import java.util.logging.Logger;
+
+import org.apache.asterix.test.aql.TestExecutor;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.codehaus.plexus.util.FileUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class ReplicationIT {
+
+    private static final String PATH_BASE = StringUtils
+            .join(new String[] { "src", "test", "resources", "integrationts", "replication" }, File.separator);
+    private static final String CLUSTER_BASE = StringUtils
+            .join(new String[] { "src", "test", "resources", "clusterts" }, File.separator);
+    private static final String PATH_ACTUAL = "repliationtest" + File.separator;
+    private static String managixFolderName;
+    private static final Logger LOGGER = Logger.getLogger(ReplicationIT.class.getName());
+    private static File asterixProjectDir = new File(System.getProperty("user.dir"));
+    private static final String CLUSTER_CC_ADDRESS = "10.10.0.2";
+    private static final int CLUSTER_CC_API_PORT = 19002;
+    private static ProcessBuilder pb;
+    private static Map<String, String> env;
+    private final static TestExecutor testExecutor = new TestExecutor(CLUSTER_CC_ADDRESS, CLUSTER_CC_API_PORT);
+    private static String SCRIPT_HOME;
+    protected TestCaseContext tcCtx;
+
+    public ReplicationIT(TestCaseContext tcCtx) {
+        this.tcCtx = tcCtx;
+    }
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        File outdir = new File(PATH_ACTUAL);
+        outdir.mkdirs();
+
+        // vagrant setup
+        File installerTargetDir = new File(asterixProjectDir, "target");
+        String[] installerFiles = installerTargetDir.list(new FilenameFilter() {
+            @Override
+            public boolean accept(File dir, String name) {
+                return new File(dir, name).isDirectory() && name.startsWith("asterix-installer")
+                        && name.endsWith("binary-assembly");
+            }
+        });
+
+        if (installerFiles == null || installerFiles.length == 0) {
+            throw new Exception("Couldn't find installer binaries");
+        }
+
+        managixFolderName = installerFiles[0];
+
+        //copy tests data
+        FileUtils.copyDirectoryStructure(
+                new File(StringUtils.join(
+                        new String[] { "..", "asterix-replication", "src", "test", "resources", "data" },
+                        File.separator)),
+                new File(StringUtils.join(new String[] { "src", "test", "resources", "clusterts", "data" },
+                        File.separator)));
+
+        //copy tests scripts
+        FileUtils.copyDirectoryStructure(
+                new File(StringUtils.join(
+                        new String[] { "..", "asterix-replication", "src", "test", "resources", "scripts" },
+                        File.separator)),
+                new File(StringUtils.join(new String[] { "src", "test", "resources", "clusterts", "scripts" },
+                        File.separator)));
+
+        invoke("cp", "-r", installerTargetDir.toString() + "/" + managixFolderName,
+                asterixProjectDir + "/" + CLUSTER_BASE);
+
+        remoteInvoke("cp -r /vagrant/" + managixFolderName + " /tmp/asterix");
+
+        SCRIPT_HOME = "/vagrant/scripts/";
+        pb = new ProcessBuilder();
+        env = pb.environment();
+        env.put("SCRIPT_HOME", SCRIPT_HOME);
+        File cwd = new File(asterixProjectDir.toString() + "/" + CLUSTER_BASE);
+        pb.directory(cwd);
+        pb.redirectErrorStream(true);
+
+        //make scripts executable
+        String chmodScriptsCmd = "chmod -R +x " + SCRIPT_HOME;
+        remoteInvoke(chmodScriptsCmd, "cc");
+        remoteInvoke(chmodScriptsCmd, "nc1");
+        remoteInvoke(chmodScriptsCmd, "nc2");
+
+        //managix configure
+        logOutput(managixInvoke("configure").getInputStream());
+
+        //managix validate
+        String validateOutput = IOUtils.toString(managixInvoke("validate").getInputStream(),
+                StandardCharsets.UTF_8.name());
+        if (validateOutput.contains("ERROR")) {
+            throw new Exception("Managix validate error: " + validateOutput);
+        }
+    }
+
+    @AfterClass
+    public static void tearDown() throws Exception {
+        //remove files
+        remoteInvoke("rm -rf /vagrant/asterix");
+    }
+
+    @Before
+    public void beforeTest() throws Exception {
+        //create instance
+        managixInvoke("create -n vagrant-ssh -c /vagrant/cluster_with_replication.xml").getInputStream();
+    }
+
+    @After
+    public void afterTest() throws Exception {
+        //stop instance
+        managixInvoke("stop -n vagrant-ssh");
+
+        //verify that all processes have been stopped
+        String killProcesses = "kill_cc_and_nc.sh";
+        executeVagrantScript("cc", killProcesses);
+        executeVagrantScript("nc1", killProcesses);
+        executeVagrantScript("nc2", killProcesses);
+
+        //delete storage
+        String deleteStorage = "delete_storage.sh";
+        executeVagrantScript("cc", deleteStorage);
+        executeVagrantScript("nc1", deleteStorage);
+        executeVagrantScript("nc2", deleteStorage);
+
+        //delete instance
+        managixInvoke("delete -n vagrant-ssh");
+    }
+
+    @Test
+    public void test() throws Exception {
+        testExecutor.executeTest(PATH_ACTUAL, tcCtx, pb, false);
+    }
+
+    @Parameters(name = "ReplicationIT {index}: {0}")
+    public static Collection<Object[]> tests() throws Exception {
+        Collection<Object[]> testArgs = buildTestsInXml(TestCaseContext.DEFAULT_TESTSUITE_XML_NAME);
+        if (testArgs.size() == 0) {
+            testArgs = buildTestsInXml(TestCaseContext.DEFAULT_TESTSUITE_XML_NAME);
+        }
+        return testArgs;
+    }
+
+    protected static Collection<Object[]> buildTestsInXml(String xmlfile) throws Exception {
+        Collection<Object[]> testArgs = new ArrayList<Object[]>();
+        TestCaseContext.Builder b = new TestCaseContext.Builder();
+        for (TestCaseContext ctx : b.build(new File(PATH_BASE), xmlfile)) {
+            testArgs.add(new Object[] { ctx });
+        }
+        return testArgs;
+    }
+
+    public static boolean checkOutput(InputStream input, String requiredSubString) {
+        String candidate;
+        try {
+            candidate = IOUtils.toString(input, StandardCharsets.UTF_8.name());
+        } catch (IOException e) {
+            LOGGER.warning("Could not check output of subprocess");
+            return false;
+        }
+        return candidate.contains(requiredSubString);
+    }
+
+    public static boolean checkOutput(String candidate, String requiredSubString) {
+        return candidate.contains(requiredSubString);
+    }
+
+    public static String processOut(Process p) throws IOException {
+        InputStream input = p.getInputStream();
+        return IOUtils.toString(input, StandardCharsets.UTF_8.name());
+    }
+
+    public static void logOutput(InputStream input) {
+        try {
+            LOGGER.info(IOUtils.toString(input, StandardCharsets.UTF_8.name()));
+        } catch (IOException e) {
+            LOGGER.warning("Could not print output of subprocess");
+        }
+    }
+
+    private static Process invoke(String... args) throws Exception {
+        ProcessBuilder pb = new ProcessBuilder(args);
+        pb.redirectErrorStream(true);
+        Process p = pb.start();
+        return p;
+    }
+
+    private static Process remoteInvoke(String cmd) throws Exception {
+        ProcessBuilder pb = new ProcessBuilder("vagrant", "ssh", "cc", "-c", "MANAGIX_HOME=/tmp/asterix/ " + cmd);
+        File cwd = new File(asterixProjectDir.toString() + "/" + CLUSTER_BASE);
+        pb.directory(cwd);
+        pb.redirectErrorStream(true);
+        Process p = pb.start();
+        p.waitFor();
+        return p;
+    }
+
+    private static Process remoteInvoke(String cmd, String node) throws Exception {
+        ProcessBuilder pb = new ProcessBuilder("vagrant", "ssh", node, "--", cmd);
+        File cwd = new File(asterixProjectDir.toString() + "/" + CLUSTER_BASE);
+        pb.directory(cwd);
+        pb.redirectErrorStream(true);
+        Process p = pb.start();
+        p.waitFor();
+        return p;
+    }
+
+    private static Process managixInvoke(String cmd) throws Exception {
+        return remoteInvoke("/tmp/asterix/bin/managix " + cmd);
+    }
+
+    private static String executeVagrantScript(String node, String scriptName) throws Exception {
+        pb.command("vagrant", "ssh", node, "--", SCRIPT_HOME + scriptName);
+        Process p = pb.start();
+        p.waitFor();
+        InputStream input = p.getInputStream();
+        return IOUtils.toString(input, StandardCharsets.UTF_8.name());
+    }
+}
\ No newline at end of file
diff --git a/asterix-installer/src/test/resources/clusterts/cluster_with_replication.xml b/asterix-installer/src/test/resources/clusterts/cluster_with_replication.xml
new file mode 100644
index 0000000..b9f4658
--- /dev/null
+++ b/asterix-installer/src/test/resources/clusterts/cluster_with_replication.xml
@@ -0,0 +1,63 @@
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements.  See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership.  The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License.  You may obtain a copy of the License at
+ !
+ !   http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied.  See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<cluster xmlns="cluster">
+
+    <name>vagrant</name>
+
+    <username>vagrant</username>
+
+    <working_dir>
+        <dir>/vagrant/asterix/managix-working</dir>
+        <NFS>true</NFS>
+    </working_dir>
+
+    <log_dir>/home/vagrant/asterix/logs/</log_dir>
+    <txn_log_dir>/home/vagrant/asterix/tx_logs</txn_log_dir>
+    <iodevices>/home/vagrant/asterix/p1,/home/vagrant/asterix/p2</iodevices>
+
+    <store>storage</store>
+
+    <java_home>/usr/java/latest</java_home>
+    <metadata_node>nc1</metadata_node>
+
+    <data_replication>
+        <enabled>true</enabled>
+        <replication_port>2000</replication_port>
+        <replication_factor>2</replication_factor>
+        <auto_failover>true</auto_failover>
+        <replication_time_out>10</replication_time_out>
+    </data_replication>
+
+    <master_node>
+        <id>cc</id>
+        <client_ip>10.10.0.2</client_ip>
+        <cluster_ip>10.10.0.2</cluster_ip>
+        <client_port>1098</client_port>
+        <cluster_port>1099</cluster_port>
+        <http_port>8888</http_port>
+    </master_node>
+    <node>
+        <id>nc1</id>
+        <cluster_ip>10.10.0.3</cluster_ip>
+    </node>
+    <node>
+        <id>nc2</id>
+        <cluster_ip>10.10.0.4</cluster_ip>
+    </node>
+</cluster>
\ No newline at end of file
diff --git a/asterix-installer/src/test/resources/clusterts/known_hosts b/asterix-installer/src/test/resources/clusterts/known_hosts
index 5ab452a..273f9f3 100644
--- a/asterix-installer/src/test/resources/clusterts/known_hosts
+++ b/asterix-installer/src/test/resources/clusterts/known_hosts
@@ -1,6 +1,6 @@
-nc1,10.10.0.3 ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAQEA4DTPG2/P073Ak5htIlGYxFh9BYo3bwxBW/SZ1+MDtBUGSEFG0wP4dDRNxwzdmc2EB5JbiAb4t2wLtlFJUbhZUuhxxv2WP4Uastt+U2CWe8+/OsSCAieDv9+dvhT2YxkyAwSCFjyv9T+ftE7YyuIqgBoDTcsvCGzhcl80xd/mQuboneYdR8A0Q2cbd47BQ1D+76HW3l0t51N4fsvbds/LdkXtqVqCadiTKuZ/Ki9JkWlIwffF1IvtARdkW5hyA2fkdJcNBGcfIlxcEvXUvEcIi2FVdY+rJTR9014SrspPtfTyPe4mMgwNQGGT1dm9tnjhq/N6WrYOQ8J3HUn+r7tO5w==
-nc2,10.10.0.4 ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAQEA4DTPG2/P073Ak5htIlGYxFh9BYo3bwxBW/SZ1+MDtBUGSEFG0wP4dDRNxwzdmc2EB5JbiAb4t2wLtlFJUbhZUuhxxv2WP4Uastt+U2CWe8+/OsSCAieDv9+dvhT2YxkyAwSCFjyv9T+ftE7YyuIqgBoDTcsvCGzhcl80xd/mQuboneYdR8A0Q2cbd47BQ1D+76HW3l0t51N4fsvbds/LdkXtqVqCadiTKuZ/Ki9JkWlIwffF1IvtARdkW5hyA2fkdJcNBGcfIlxcEvXUvEcIi2FVdY+rJTR9014SrspPtfTyPe4mMgwNQGGT1dm9tnjhq/N6WrYOQ8J3HUn+r7tO5w==
-nc3,10.10.0.5 ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAQEA4DTPG2/P073Ak5htIlGYxFh9BYo3bwxBW/SZ1+MDtBUGSEFG0wP4dDRNxwzdmc2EB5JbiAb4t2wLtlFJUbhZUuhxxv2WP4Uastt+U2CWe8+/OsSCAieDv9+dvhT2YxkyAwSCFjyv9T+ftE7YyuIqgBoDTcsvCGzhcl80xd/mQuboneYdR8A0Q2cbd47BQ1D+76HW3l0t51N4fsvbds/LdkXtqVqCadiTKuZ/Ki9JkWlIwffF1IvtARdkW5hyA2fkdJcNBGcfIlxcEvXUvEcIi2FVdY+rJTR9014SrspPtfTyPe4mMgwNQGGT1dm9tnjhq/N6WrYOQ8J3HUn+r7tO5w==
-127.0.0.1 ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAQEA4DTPG2/P073Ak5htIlGYxFh9BYo3bwxBW/SZ1+MDtBUGSEFG0wP4dDRNxwzdmc2EB5JbiAb4t2wLtlFJUbhZUuhxxv2WP4Uastt+U2CWe8+/OsSCAieDv9+dvhT2YxkyAwSCFjyv9T+ftE7YyuIqgBoDTcsvCGzhcl80xd/mQuboneYdR8A0Q2cbd47BQ1D+76HW3l0t51N4fsvbds/LdkXtqVqCadiTKuZ/Ki9JkWlIwffF1IvtARdkW5hyA2fkdJcNBGcfIlxcEvXUvEcIi2FVdY+rJTR9014SrspPtfTyPe4mMgwNQGGT1dm9tnjhq/N6WrYOQ8J3HUn+r7tO5w==
 ::1 ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAQEA4DTPG2/P073Ak5htIlGYxFh9BYo3bwxBW/SZ1+MDtBUGSEFG0wP4dDRNxwzdmc2EB5JbiAb4t2wLtlFJUbhZUuhxxv2WP4Uastt+U2CWe8+/OsSCAieDv9+dvhT2YxkyAwSCFjyv9T+ftE7YyuIqgBoDTcsvCGzhcl80xd/mQuboneYdR8A0Q2cbd47BQ1D+76HW3l0t51N4fsvbds/LdkXtqVqCadiTKuZ/Ki9JkWlIwffF1IvtARdkW5hyA2fkdJcNBGcfIlxcEvXUvEcIi2FVdY+rJTR9014SrspPtfTyPe4mMgwNQGGT1dm9tnjhq/N6WrYOQ8J3HUn+r7tO5w==
-10.10.0.2 ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAQEA4DTPG2/P073Ak5htIlGYxFh9BYo3bwxBW/SZ1+MDtBUGSEFG0wP4dDRNxwzdmc2EB5JbiAb4t2wLtlFJUbhZUuhxxv2WP4Uastt+U2CWe8+/OsSCAieDv9+dvhT2YxkyAwSCFjyv9T+ftE7YyuIqgBoDTcsvCGzhcl80xd/mQuboneYdR8A0Q2cbd47BQ1D+76HW3l0t51N4fsvbds/LdkXtqVqCadiTKuZ/Ki9JkWlIwffF1IvtARdkW5hyA2fkdJcNBGcfIlxcEvXUvEcIi2FVdY+rJTR9014SrspPtfTyPe4mMgwNQGGT1dm9tnjhq/N6WrYOQ8J3HUn+r7tO5w==
+127.0.0.1 ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAQEA9gfVcSU968DdflnfJkup3em0MFX13uUTxsiaPisyahjcc++d+9yo+OL5Avffznal9Ev9hzptYclUw9Rx1fSs09g6w8qRiaRLHhYw3mgAer6vG9NZxaj2i2TAVUjdPKZDOSgtFzvPZiwWfhdqi6CmzMRi3MO5G3LzIRzB+qjMitgJ4p/R0mLTik40KTBQNxgN5EmegoANjyBDbdvBKf4vQvMwYGvByq4rMDyDCC8JgcR+B1kR8xNKwDYZFokuSrmAw6PXUmjyW6FaWaUFvrqaj64l1yHqLPcZDTbcSzum6+RaR5Gg2upfs3ahICOzzlnwnwtZxznWKU+rNd4T29cHOw==
+nc1,10.10.0.3 ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAQEA9gfVcSU968DdflnfJkup3em0MFX13uUTxsiaPisyahjcc++d+9yo+OL5Avffznal9Ev9hzptYclUw9Rx1fSs09g6w8qRiaRLHhYw3mgAer6vG9NZxaj2i2TAVUjdPKZDOSgtFzvPZiwWfhdqi6CmzMRi3MO5G3LzIRzB+qjMitgJ4p/R0mLTik40KTBQNxgN5EmegoANjyBDbdvBKf4vQvMwYGvByq4rMDyDCC8JgcR+B1kR8xNKwDYZFokuSrmAw6PXUmjyW6FaWaUFvrqaj64l1yHqLPcZDTbcSzum6+RaR5Gg2upfs3ahICOzzlnwnwtZxznWKU+rNd4T29cHOw==
+nc2,10.10.0.4 ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAQEA9gfVcSU968DdflnfJkup3em0MFX13uUTxsiaPisyahjcc++d+9yo+OL5Avffznal9Ev9hzptYclUw9Rx1fSs09g6w8qRiaRLHhYw3mgAer6vG9NZxaj2i2TAVUjdPKZDOSgtFzvPZiwWfhdqi6CmzMRi3MO5G3LzIRzB+qjMitgJ4p/R0mLTik40KTBQNxgN5EmegoANjyBDbdvBKf4vQvMwYGvByq4rMDyDCC8JgcR+B1kR8xNKwDYZFokuSrmAw6PXUmjyW6FaWaUFvrqaj64l1yHqLPcZDTbcSzum6+RaR5Gg2upfs3ahICOzzlnwnwtZxznWKU+rNd4T29cHOw==10.10.0.2 ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAQEA9gfVcSU968DdflnfJkup3em0MFX13uUTxsiaPisyahjcc++d+9yo+OL5Avffznal9Ev9hzptYclUw9Rx1fSs09g6w8qRiaRLHhYw3mgAer6vG9NZxaj2i2TAVUjdPKZDOSgtFzvPZiwWfhdqi6CmzMRi3MO5G3LzIRzB+qjMitgJ4p/R0mLTik40KTBQNxgN5EmegoANjyBDbdvBKf4vQvMwYGvByq4rMDyDCC8JgcR+B1kR8xNKwDYZFokuSrmAw6PXUmjyW6FaWaUFvrqaj64l1yHqLPcZDTbcSzum6+RaR5Gg2upfs3ahICOzzlnwnwtZxznWKU+rNd4T29cHOw==
+10.10.0.4 ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAQEA9gfVcSU968DdflnfJkup3em0MFX13uUTxsiaPisyahjcc++d+9yo+OL5Avffznal9Ev9hzptYclUw9Rx1fSs09g6w8qRiaRLHhYw3mgAer6vG9NZxaj2i2TAVUjdPKZDOSgtFzvPZiwWfhdqi6CmzMRi3MO5G3LzIRzB+qjMitgJ4p/R0mLTik40KTBQNxgN5EmegoANjyBDbdvBKf4vQvMwYGvByq4rMDyDCC8JgcR+B1kR8xNKwDYZFokuSrmAw6PXUmjyW6FaWaUFvrqaj64l1yHqLPcZDTbcSzum6+RaR5Gg2upfs3ahICOzzlnwnwtZxznWKU+rNd4T29cHOw==
+10.10.0.2 ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAQEA9gfVcSU968DdflnfJkup3em0MFX13uUTxsiaPisyahjcc++d+9yo+OL5Avffznal9Ev9hzptYclUw9Rx1fSs09g6w8qRiaRLHhYw3mgAer6vG9NZxaj2i2TAVUjdPKZDOSgtFzvPZiwWfhdqi6CmzMRi3MO5G3LzIRzB+qjMitgJ4p/R0mLTik40KTBQNxgN5EmegoANjyBDbdvBKf4vQvMwYGvByq4rMDyDCC8JgcR+B1kR8xNKwDYZFokuSrmAw6PXUmjyW6FaWaUFvrqaj64l1yHqLPcZDTbcSzum6+RaR5Gg2upfs3ahICOzzlnwnwtZxznWKU+rNd4T29cHOw==
\ No newline at end of file
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.1.ddl.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.1.ddl.aql
new file mode 100644
index 0000000..6a2441e
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.1.ddl.aql
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name  : bulkload.aql
+ * Description     : Check that Bulkload LSM component are replicated correclty.
+                     The test goes as follows:
+                     start 2 nodes, bulkload a dataset, query data, kill one node
+                     and wait until the failover complete, query the data again.
+ * Expected Result : Success
+ * Date            : January 6 2016
+ */
+
+drop dataverse TinySocial if exists;
+create dataverse TinySocial;
+use dataverse TinySocial;
+
+create type EmploymentType as open {
+    organization-name: string,
+    start-date: date,
+    end-date: date?
+}
+
+create type FacebookUserType as closed {
+    id: int,
+    alias: string,
+    name: string,
+    user-since: datetime,
+    friend-ids: {{ int32 }},
+    employment: [EmploymentType]
+}
+
+/********* 2. Create Datasets  ***********/
+use dataverse TinySocial;
+
+drop dataset FacebookUsers if exists;
+
+create dataset FacebookUsers(FacebookUserType)
+primary key id;
\ No newline at end of file
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.2.update.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.2.update.aql
new file mode 100644
index 0000000..ae14ad0
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.2.update.aql
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name  : bulkload.aql
+ * Description     : Check that Bulkload LSM component are replicated correclty.
+                     The test goes as follows:
+                     start 2 nodes, bulkload a dataset, query data, kill one node
+                     and wait until the failover complete, query the data again.
+ * Expected Result : Success
+ * Date            : January 6 2016
+ */
+
+use dataverse TinySocial;
+
+load dataset FacebookUsers using localfs
+(("path"="vagrant-ssh_nc1:///vagrant/data/fbu.adm"),("format"="adm"));
\ No newline at end of file
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.3.txnqbc.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.3.txnqbc.aql
new file mode 100644
index 0000000..9c8cb96
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.3.txnqbc.aql
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name  : bulkload.aql
+ * Description     : Check that Bulkload LSM component are replicated correclty.
+                     The test goes as follows:
+                     start 2 nodes, bulkload a dataset, query data, kill one node
+                     and wait until the failover complete, query the data again.
+ * Expected Result : Success
+ * Date            : January 6 2016
+ */
+
+use dataverse TinySocial;
+
+count (for $x in dataset FacebookUsers return $x);
\ No newline at end of file
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.4.vagrant_script.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.4.vagrant_script.aql
new file mode 100644
index 0000000..5695ed7
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.4.vagrant_script.aql
@@ -0,0 +1 @@
+nc2 kill_cc_and_nc.sh
\ No newline at end of file
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.5.sleep.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.5.sleep.aql
new file mode 100644
index 0000000..51b0e76
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.5.sleep.aql
@@ -0,0 +1 @@
+60000
\ No newline at end of file
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.6.txnqar.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.6.txnqar.aql
new file mode 100644
index 0000000..9c8cb96
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.6.txnqar.aql
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name  : bulkload.aql
+ * Description     : Check that Bulkload LSM component are replicated correclty.
+                     The test goes as follows:
+                     start 2 nodes, bulkload a dataset, query data, kill one node
+                     and wait until the failover complete, query the data again.
+ * Expected Result : Success
+ * Date            : January 6 2016
+ */
+
+use dataverse TinySocial;
+
+count (for $x in dataset FacebookUsers return $x);
\ No newline at end of file
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.1.ddl.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.1.ddl.aql
new file mode 100644
index 0000000..8de2067
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.1.ddl.aql
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name  : mem_component_recovery.aql
+ * Description     : Check that Memory LSM component are replicated and recovered correclty.
+                     The test goes as follows:
+                     start 2 nodes, bulkload a dataset, copy it to in-memory dataset, query
+                     data from memory, kill one node and wait until the failover complete,
+                     query the data again.
+ * Expected Result : Success
+ * Date            : January 6 2016
+ */
+
+drop dataverse TinySocial if exists;
+create dataverse TinySocial;
+use dataverse TinySocial;
+
+create type EmploymentType as open {
+    organization-name: string,
+    start-date: date,
+    end-date: date?
+}
+
+create type FacebookUserType as closed {
+    id: int,
+    alias: string,
+    name: string,
+    user-since: datetime,
+    friend-ids: {{ int32 }},
+    employment: [EmploymentType]
+}
+
+/********* 2. Create Datasets  ***********/
+use dataverse TinySocial;
+
+drop dataset FacebookUsers if exists;
+
+create dataset FacebookUsers(FacebookUserType)
+primary key id;
+
+create dataset FacebookUsersInMemory(FacebookUserType)
+primary key id;
\ No newline at end of file
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.2.update.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.2.update.aql
new file mode 100644
index 0000000..8087689
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.2.update.aql
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name  : mem_component_recovery.aql
+ * Description     : Check that Memory LSM component are replicated and recovered correclty.
+                     The test goes as follows:
+                     start 2 nodes, bulkload a dataset, copy it to in-memory dataset, query
+                     data from memory, kill one node and wait until the failover complete,
+                     query the data again.
+ * Expected Result : Success
+ * Date            : January 6 2016
+ */
+use dataverse TinySocial;
+
+load dataset FacebookUsers using localfs
+(("path"="vagrant-ssh_nc1:///vagrant/data/fbu.adm"),("format"="adm"));
+
+insert into dataset TinySocial.FacebookUsersInMemory(for $x in dataset TinySocial.FacebookUsers return $x);
\ No newline at end of file
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.3.txnqbc.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.3.txnqbc.aql
new file mode 100644
index 0000000..e25e409
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.3.txnqbc.aql
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name  : mem_component_recovery.aql
+ * Description     : Check that Memory LSM component are replicated and recovered correclty.
+                     The test goes as follows:
+                     start 2 nodes, bulkload a dataset, copy it to in-memory dataset, query
+                     data from memory, kill one node and wait until the failover complete,
+                     query the data again.
+ * Expected Result : Success
+ * Date            : January 6 2016
+ */
+
+use dataverse TinySocial;
+
+count (for $x in dataset FacebookUsersInMemory return $x);
\ No newline at end of file
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.4.vagrant_script.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.4.vagrant_script.aql
new file mode 100644
index 0000000..5695ed7
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.4.vagrant_script.aql
@@ -0,0 +1 @@
+nc2 kill_cc_and_nc.sh
\ No newline at end of file
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.5.sleep.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.5.sleep.aql
new file mode 100644
index 0000000..51b0e76
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.5.sleep.aql
@@ -0,0 +1 @@
+60000
\ No newline at end of file
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.6.txnqar.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.6.txnqar.aql
new file mode 100644
index 0000000..e25e409
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.6.txnqar.aql
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name  : mem_component_recovery.aql
+ * Description     : Check that Memory LSM component are replicated and recovered correclty.
+                     The test goes as follows:
+                     start 2 nodes, bulkload a dataset, copy it to in-memory dataset, query
+                     data from memory, kill one node and wait until the failover complete,
+                     query the data again.
+ * Expected Result : Success
+ * Date            : January 6 2016
+ */
+
+use dataverse TinySocial;
+
+count (for $x in dataset FacebookUsersInMemory return $x);
\ No newline at end of file
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.1.ddl.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.1.ddl.aql
new file mode 100644
index 0000000..113d144
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.1.ddl.aql
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name  : metadata_node_recovery.aql
+ * Description     : Check that metadata node failover is done correctly.
+                     The test goes as follows:
+                     start 2 nodes, create a dataset, kill metadata node
+                     and wait until the failover complete, verify the
+                     dataset still exists.
+ * Expected Result : Success
+ * Date            : January 6 2016
+ */
+
+drop dataverse TinySocial if exists;
+create dataverse TinySocial;
+use dataverse TinySocial;
+
+create type EmploymentType as open {
+    organization-name: string,
+    start-date: date,
+    end-date: date?
+}
+
+create type FacebookUserType as closed {
+    id: int,
+    alias: string,
+    name: string,
+    user-since: datetime,
+    friend-ids: {{ int32 }},
+    employment: [EmploymentType]
+}
+
+/********* 2. Create Datasets  ***********/
+use dataverse TinySocial;
+
+drop dataset FacebookUsers if exists;
+
+create dataset FacebookUsers(FacebookUserType)
+primary key id;
\ No newline at end of file
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.2.txnqbc.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.2.txnqbc.aql
new file mode 100644
index 0000000..76bdcfe
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.2.txnqbc.aql
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name  : metadata_node_recovery.aql
+ * Description     : Check that metadata node failover is done correctly.
+                     The test goes as follows:
+                     start 2 nodes, create a dataset, kill metadata node
+                     and wait until the failover complete, verify the
+                     dataset still exists.
+ * Expected Result : Success
+ * Date            : January 6 2016
+ */
+
+for $x in dataset Metadata.Dataset where $x.DatasetName ='FacebookUsers' return $x.DatasetName;
\ No newline at end of file
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.3.vagrant_script.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.3.vagrant_script.aql
new file mode 100644
index 0000000..5eec164
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.3.vagrant_script.aql
@@ -0,0 +1 @@
+nc1 kill_cc_and_nc.sh
\ No newline at end of file
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.4.sleep.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.4.sleep.aql
new file mode 100644
index 0000000..51b0e76
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.4.sleep.aql
@@ -0,0 +1 @@
+60000
\ No newline at end of file
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.5.txnqar.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.5.txnqar.aql
new file mode 100644
index 0000000..ac1c593
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.5.txnqar.aql
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name  : metadata_node_recovery.aql
+ * Description     : Check that metadata node failover is done correctly.
+                     The test goes as follows:
+                     start 2 nodes, create a dataset, kill metadata node
+                     and wait until the failover complete, verify the
+                     dataset still exists.
+ * Expected Result : Success
+ * Date            : January 6 2016
+ */
+
+use dataverse TinySocial;
+
+for $x in dataset Metadata.Dataset where $x.DatasetName ='FacebookUsers' return $x.DatasetName;
\ No newline at end of file
diff --git a/asterix-installer/src/test/resources/integrationts/replication/testsuite.xml b/asterix-installer/src/test/resources/integrationts/replication/testsuite.xml
new file mode 100644
index 0000000..f033086
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/testsuite.xml
@@ -0,0 +1,37 @@
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements.  See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership.  The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License.  You may obtain a copy of the License at
+ !
+ !   http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied.  See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<test-suite xmlns="urn:xml.testframework.asterix.apache.org" ResultOffsetPath="results" QueryOffsetPath="queries" QueryFileExtension=".aql">
+  <test-group name="failover">
+    <test-case FilePath="failover">
+      <compilation-unit name="bulkload">
+        <output-dir compare="Text">bulkload</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="failover">
+      <compilation-unit name="mem_component_recovery">
+        <output-dir compare="Text">mem_component_recovery</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="failover">
+      <compilation-unit name="metadata_node">
+        <output-dir compare="Text">metadata_node</output-dir>
+      </compilation-unit>
+    </test-case>
+  </test-group>
+</test-suite>
\ No newline at end of file
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index aa7f7d5..823e861 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -383,6 +383,12 @@
             dataLifecycleManager.register(absolutePath, lsmBtree);
         } else {
             final LocalResource resource = localResourceRepository.getResourceByPath(absolutePath);
+            if (resource == null) {
+                throw new Exception("Could not find required metadata indexes. Please delete "
+                        + propertiesProvider.getMetadataProperties().getTransactionLogDirs()
+                                .get(runtimeContext.getTransactionSubsystem().getId())
+                        + " to intialize as a new instance. (WARNING: all data will be lost.)");
+            }
             resourceID = resource.getResourceId();
             lsmBtree = (LSMBTree) dataLifecycleManager.getIndex(absolutePath);
             if (lsmBtree == null) {
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
index b6f3c9e..601ce15 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
@@ -142,4 +142,10 @@
         FileSplit[] splits = splitsForFilesIndex(mdTxnCtx, dataverseName, datasetName, targetIdxName, create);
         return StoragePathUtil.splitProviderAndPartitionConstraints(splits);
     }
+
+    public static String getIndexPath(String partitionPath, int partition, String dataverse, String fullIndexName) {
+        String storageDirName = AsterixClusterProperties.INSTANCE.getStorageDirectoryName();
+        return partitionPath + StoragePathUtil.prepareStoragePartitionPath(storageDirName, partition) + File.separator
+                + StoragePathUtil.prepareDataverseIndexName(dataverse, fullIndexName);
+    }
 }
diff --git a/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixAppContextInfo.java b/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixAppContextInfo.java
index 866162b..082618b 100644
--- a/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixAppContextInfo.java
+++ b/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixAppContextInfo.java
@@ -20,10 +20,11 @@
 
 import java.util.logging.Logger;
 
+import org.apache.asterix.common.cluster.IGlobalRecoveryMaanger;
+import org.apache.asterix.common.config.AsterixBuildProperties;
 import org.apache.asterix.common.config.AsterixCompilerProperties;
 import org.apache.asterix.common.config.AsterixExternalProperties;
 import org.apache.asterix.common.config.AsterixFeedProperties;
-import org.apache.asterix.common.config.AsterixBuildProperties;
 import org.apache.asterix.common.config.AsterixMetadataProperties;
 import org.apache.asterix.common.config.AsterixPropertiesAccessor;
 import org.apache.asterix.common.config.AsterixReplicationProperties;
@@ -56,12 +57,13 @@
     private AsterixFeedProperties feedProperties;
     private AsterixBuildProperties buildProperties;
     private AsterixReplicationProperties replicationProperties;
-
+    private final IGlobalRecoveryMaanger globalRecoveryMaanger;
     private IHyracksClientConnection hcc;
 
-    public static void initialize(ICCApplicationContext ccAppCtx, IHyracksClientConnection hcc) throws AsterixException {
+    public static void initialize(ICCApplicationContext ccAppCtx, IHyracksClientConnection hcc,
+            IGlobalRecoveryMaanger globalRecoveryMaanger) throws AsterixException {
         if (INSTANCE == null) {
-            INSTANCE = new AsterixAppContextInfo(ccAppCtx, hcc);
+            INSTANCE = new AsterixAppContextInfo(ccAppCtx, hcc, globalRecoveryMaanger);
         }
         AsterixPropertiesAccessor propertiesAccessor = new AsterixPropertiesAccessor();
         INSTANCE.compilerProperties = new AsterixCompilerProperties(propertiesAccessor);
@@ -77,9 +79,11 @@
         Logger.getLogger("org.apache").setLevel(INSTANCE.externalProperties.getLogLevel());
     }
 
-    private AsterixAppContextInfo(ICCApplicationContext ccAppCtx, IHyracksClientConnection hcc) {
+    private AsterixAppContextInfo(ICCApplicationContext ccAppCtx, IHyracksClientConnection hcc,
+            IGlobalRecoveryMaanger globalRecoveryMaanger) {
         this.appCtx = ccAppCtx;
         this.hcc = hcc;
+        this.globalRecoveryMaanger = globalRecoveryMaanger;
     }
 
     public static AsterixAppContextInfo getInstance() {
@@ -144,4 +148,9 @@
     public AsterixReplicationProperties getReplicationProperties() {
         return replicationProperties;
     }
+
+    @Override
+    public IGlobalRecoveryMaanger getGlobalRecoveryManager() {
+        return globalRecoveryMaanger;
+    }
 }
diff --git a/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java b/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
index 80008c5..c2c28df 100644
--- a/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
+++ b/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
@@ -35,6 +35,12 @@
 
 import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
 import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.common.config.AsterixReplicationProperties;
+import org.apache.asterix.common.messaging.TakeoverMetadataNodeRequestMessage;
+import org.apache.asterix.common.messaging.TakeoverMetadataNodeResponseMessage;
+import org.apache.asterix.common.messaging.TakeoverPartitionsRequestMessage;
+import org.apache.asterix.common.messaging.TakeoverPartitionsResponseMessage;
+import org.apache.asterix.common.messaging.api.ICCMessageBroker;
 import org.apache.asterix.event.schema.cluster.Cluster;
 import org.apache.asterix.event.schema.cluster.Node;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
@@ -45,6 +51,11 @@
  */
 
 public class AsterixClusterProperties {
+    /**
+     * TODO: currently after instance restarts we require all nodes to join again, otherwise the cluster wont be ACTIVE.
+     * we may overcome this by storing the cluster state before the instance shutdown and using it on startup to identify
+     * the nodes that are expected the join.
+     */
 
     private static final Logger LOGGER = Logger.getLogger(AsterixClusterProperties.class.getName());
 
@@ -63,6 +74,12 @@
 
     private Map<String, ClusterPartition[]> node2PartitionsMap = null;
     private SortedMap<Integer, ClusterPartition> clusterPartitions = null;
+    private Map<Long, TakeoverPartitionsRequestMessage> pendingTakeoverRequests = null;
+
+    private long takeoverRequestId = 0;
+    private String currentMetadataNode = null;
+    private boolean isMetadataNodeActive = false;
+    private boolean autoFailover = false;
 
     private AsterixClusterProperties() {
         InputStream is = this.getClass().getClassLoader().getResourceAsStream(CLUSTER_CONFIGURATION_FILE);
@@ -82,6 +99,13 @@
             if (AsterixAppContextInfo.getInstance().getCCApplicationContext() != null) {
                 node2PartitionsMap = AsterixAppContextInfo.getInstance().getMetadataProperties().getNodePartitions();
                 clusterPartitions = AsterixAppContextInfo.getInstance().getMetadataProperties().getClusterPartitions();
+                currentMetadataNode = AsterixAppContextInfo.getInstance().getMetadataProperties().getMetadataNodeName();
+                if (isAutoFailoverEnabled()) {
+                    autoFailover = cluster.getDataReplication().isAutoFailover();
+                }
+                if (autoFailover) {
+                    pendingTakeoverRequests = new HashMap<>();
+                }
             }
         }
     }
@@ -91,18 +115,24 @@
     public synchronized void removeNCConfiguration(String nodeId) {
         updateNodePartitions(nodeId, false);
         ncConfiguration.remove(nodeId);
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info(" Removing configuration parameters for node id " + nodeId);
+        if (nodeId.equals(currentMetadataNode)) {
+            isMetadataNodeActive = false;
+            LOGGER.info("Metadata node is now inactive");
         }
-        // TODO implement fault tolerance as follows:
-        // 1. collect the partitions of the failed NC
-        // 2. For each partition, request a remote replica to take over.
-        // 3. wait until each remote replica completes the recovery for the lost partitions
-        // 4. update the cluster state
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Removing configuration parameters for node id " + nodeId);
+        }
+        if (autoFailover) {
+            requestPartitionsTakeover(nodeId);
+        }
     }
 
     public synchronized void addNCConfiguration(String nodeId, Map<String, String> configuration) {
         ncConfiguration.put(nodeId, configuration);
+        if (nodeId.equals(currentMetadataNode)) {
+            isMetadataNodeActive = true;
+            LOGGER.info("Metadata node is now active");
+        }
         updateNodePartitions(nodeId, true);
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info(" Registering configuration parameters for node id " + nodeId);
@@ -118,8 +148,6 @@
                 p.setActive(added);
                 if (added) {
                     p.setActiveNodeId(nodeId);
-                } else {
-                    p.setActiveNodeId(null);
                 }
             }
             resetClusterPartitionConstraint();
@@ -135,13 +163,20 @@
                 return;
             }
         }
-        // if all storage partitions are active, then the cluster is active
-        state = ClusterState.ACTIVE;
-        LOGGER.info("Cluster is now ACTIVE");
+        //if all storage partitions are active as well as the metadata node, then the cluster is active
+        if (isMetadataNodeActive) {
+            state = ClusterState.ACTIVE;
+            LOGGER.info("Cluster is now ACTIVE");
+            //start global recovery
+            AsterixAppContextInfo.getInstance().getGlobalRecoveryManager().startGlobalRecovery();
+        } else {
+            requestMetadataNodeTakeover();
+        }
     }
 
     /**
      * Returns the number of IO devices configured for a Node Controller
+     *
      * @param nodeId
      *            unique identifier of the Node Controller
      * @return number of IO devices. -1 if the node id is not valid. A node id
@@ -155,6 +190,7 @@
 
     /**
      * Returns the IO devices configured for a Node Controller
+     *
      * @param nodeId
      *            unique identifier of the Node Controller
      * @return a list of IO devices. null if node id is not valid. A node id is not valid
@@ -257,4 +293,133 @@
         // virtual cluster without cluster config file
         return DEFAULT_STORAGE_DIR_NAME;
     }
-}
+
+    private synchronized void requestPartitionsTakeover(String failedNodeId) {
+        //replica -> list of partitions to takeover
+        Map<String, List<Integer>> partitionRecoveryPlan = new HashMap<>();
+        AsterixReplicationProperties replicationProperties = AsterixAppContextInfo.getInstance()
+                .getReplicationProperties();
+
+        //collect the partitions of the failed NC
+        List<ClusterPartition> lostPartitions = getNodeAssignedPartitions(failedNodeId);
+        for (ClusterPartition partition : lostPartitions) {
+            //find replicas for this partitions
+            Set<String> partitionReplicas = replicationProperties.getNodeReplicasIds(partition.getNodeId());
+            //find a replica that is still active
+            for (String replica : partitionReplicas) {
+                //TODO (mhubail) currently this assigns the partition to the first found active replica.
+                //It needs to be modified to consider load balancing.
+                if (ncConfiguration.containsKey(replica)) {
+                    if (!partitionRecoveryPlan.containsKey(replica)) {
+                        List<Integer> replicaPartitions = new ArrayList<>();
+                        replicaPartitions.add(partition.getPartitionId());
+                        partitionRecoveryPlan.put(replica, replicaPartitions);
+                    } else {
+                        partitionRecoveryPlan.get(replica).add(partition.getPartitionId());
+                    }
+                }
+            }
+        }
+
+        ICCMessageBroker messageBroker = (ICCMessageBroker) AsterixAppContextInfo.getInstance()
+                .getCCApplicationContext().getMessageBroker();
+        //For each replica, send a request to takeover the assigned partitions
+        for (String replica : partitionRecoveryPlan.keySet()) {
+            Integer[] partitionsToTakeover = partitionRecoveryPlan.get(replica).toArray(new Integer[] {});
+            long requestId = takeoverRequestId++;
+            TakeoverPartitionsRequestMessage takeoverRequest = new TakeoverPartitionsRequestMessage(requestId, replica,
+                    failedNodeId, partitionsToTakeover);
+            pendingTakeoverRequests.put(requestId, takeoverRequest);
+            try {
+                messageBroker.sendApplicationMessageToNC(takeoverRequest, replica);
+            } catch (Exception e) {
+                /**
+                 * if we fail to send the request, it means the NC we tried to send the request to
+                 * has failed. When the failure notification arrives, we will send any pending request
+                 * that belongs to the failed NC to a different active replica.
+                 */
+                LOGGER.warning("Failed to send takeover request: " + takeoverRequest);
+                e.printStackTrace();
+            }
+        }
+    }
+
+    private synchronized List<ClusterPartition> getNodeAssignedPartitions(String nodeId) {
+        List<ClusterPartition> nodePartitions = new ArrayList<>();
+        for (ClusterPartition partition : clusterPartitions.values()) {
+            if (partition.getActiveNodeId().equals(nodeId)) {
+                nodePartitions.add(partition);
+            }
+        }
+        /**
+         * if there is any pending takeover request that this node was supposed to handle,
+         * it needs to be sent to a different replica
+         */
+        List<Long> failedTakeoverRequests = new ArrayList<>();
+        for (TakeoverPartitionsRequestMessage request : pendingTakeoverRequests.values()) {
+            if (request.getNodeId().equals(nodeId)) {
+                for (Integer partitionId : request.getPartitions()) {
+                    nodePartitions.add(clusterPartitions.get(partitionId));
+                }
+                failedTakeoverRequests.add(request.getId());
+            }
+        }
+
+        //remove failed requests
+        for (Long requestId : failedTakeoverRequests) {
+            pendingTakeoverRequests.remove(requestId);
+        }
+
+        return nodePartitions;
+    }
+
+    private synchronized void requestMetadataNodeTakeover() {
+        //need a new node to takeover metadata node
+        ClusterPartition metadataPartiton = AsterixAppContextInfo.getInstance().getMetadataProperties()
+                .getMetadataPartition();
+        //request the metadataPartition node to register itself as the metadata node
+        TakeoverMetadataNodeRequestMessage takeoverRequest = new TakeoverMetadataNodeRequestMessage();
+        ICCMessageBroker messageBroker = (ICCMessageBroker) AsterixAppContextInfo.getInstance()
+                .getCCApplicationContext().getMessageBroker();
+        try {
+            messageBroker.sendApplicationMessageToNC(takeoverRequest, metadataPartiton.getActiveNodeId());
+        } catch (Exception e) {
+            /**
+             * if we fail to send the request, it means the NC we tried to send the request to
+             * has failed. When the failure notification arrives, a new NC will be assigned to
+             * the metadata partition and a new metadata node takeover request will be sent to it.
+             */
+            LOGGER.warning("Failed to send metadata node takeover request to: " + metadataPartiton.getActiveNodeId());
+            e.printStackTrace();
+        }
+    }
+
+    public synchronized void processPartitionTakeoverResponse(TakeoverPartitionsResponseMessage reponse) {
+        for (Integer partitonId : reponse.getPartitions()) {
+            ClusterPartition partition = clusterPartitions.get(partitonId);
+            partition.setActive(true);
+            partition.setActiveNodeId(reponse.getNodeId());
+        }
+        pendingTakeoverRequests.remove(reponse.getRequestId());
+        resetClusterPartitionConstraint();
+        updateClusterState();
+    }
+
+    public synchronized void processMetadataNodeTakeoverResponse(TakeoverMetadataNodeResponseMessage reponse) {
+        currentMetadataNode = reponse.getNodeId();
+        isMetadataNodeActive = true;
+        LOGGER.info("Current metadata node: " + currentMetadataNode);
+        updateClusterState();
+    }
+
+    public synchronized String getCurrentMetadataNode() {
+        return currentMetadataNode;
+    }
+
+    public boolean isAutoFailoverEnabled() {
+        if (cluster != null && cluster.getDataReplication() != null && cluster.getDataReplication().isEnabled()) {
+            return cluster.getDataReplication().isAutoFailover();
+        }
+        return false;
+    }
+}
\ No newline at end of file
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/functions/AsterixReplicationProtocol.java b/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java
similarity index 95%
rename from asterix-replication/src/main/java/org/apache/asterix/replication/functions/AsterixReplicationProtocol.java
rename to asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java
index 8e020eb..be8f8e3 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/functions/AsterixReplicationProtocol.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java
@@ -30,10 +30,10 @@
 import org.apache.asterix.common.replication.ReplicaEvent;
 import org.apache.asterix.common.transactions.ILogRecord;
 import org.apache.asterix.replication.management.NetworkingUtil;
-import org.apache.asterix.replication.storage.AsterixLSMIndexFileProperties;
 import org.apache.asterix.replication.storage.LSMComponentProperties;
+import org.apache.asterix.replication.storage.LSMIndexFileProperties;
 
-public class AsterixReplicationProtocol {
+public class ReplicationProtocol {
 
     /**
      * All replication messages start with ReplicationFunctions (4 bytes), then the length of the request in bytes
@@ -115,7 +115,7 @@
         //read replication request type
         NetworkingUtil.readBytes(socketChannel, byteBuffer, REPLICATION_REQUEST_TYPE_SIZE);
 
-        ReplicationRequestType requestType = AsterixReplicationProtocol.ReplicationRequestType.values()[byteBuffer
+        ReplicationRequestType requestType = ReplicationProtocol.ReplicationRequestType.values()[byteBuffer
                 .getInt()];
         return requestType;
     }
@@ -163,7 +163,7 @@
         requestBuffer.flip();
     }
 
-    public static ByteBuffer writeFileReplicationRequest(ByteBuffer requestBuffer, AsterixLSMIndexFileProperties afp,
+    public static ByteBuffer writeFileReplicationRequest(ByteBuffer requestBuffer, LSMIndexFileProperties afp,
             ReplicationRequestType requestType) throws IOException {
         ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
         DataOutputStream oos = new DataOutputStream(outputStream);
@@ -183,10 +183,10 @@
         return requestBuffer;
     }
 
-    public static AsterixLSMIndexFileProperties readFileReplicationRequest(ByteBuffer buffer) throws IOException {
+    public static LSMIndexFileProperties readFileReplicationRequest(ByteBuffer buffer) throws IOException {
         ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit());
         DataInputStream dis = new DataInputStream(bais);
-        return AsterixLSMIndexFileProperties.create(dis);
+        return LSMIndexFileProperties.create(dis);
     }
 
     public static ReplicaLogsRequest readReplicaLogsRequest(ByteBuffer buffer) throws IOException {
@@ -335,12 +335,12 @@
      * @throws IOException
      */
     public static void sendGoodbye(SocketChannel socketChannel) throws IOException {
-        ByteBuffer goodbyeBuffer = AsterixReplicationProtocol.getGoodbyeBuffer();
+        ByteBuffer goodbyeBuffer = ReplicationProtocol.getGoodbyeBuffer();
         NetworkingUtil.transferBufferToChannel(socketChannel, goodbyeBuffer);
     }
 
     public static void sendAck(SocketChannel socketChannel) throws IOException {
-        ByteBuffer ackBuffer = AsterixReplicationProtocol.getAckBuffer();
+        ByteBuffer ackBuffer = ReplicationProtocol.getAckBuffer();
         NetworkingUtil.transferBufferToChannel(socketChannel, ackBuffer);
     }
 }
\ No newline at end of file
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java b/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
index 38be05e..a7cfaec 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
@@ -27,7 +27,7 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.asterix.common.transactions.ILogRecord;
-import org.apache.asterix.replication.functions.AsterixReplicationProtocol;
+import org.apache.asterix.replication.functions.ReplicationProtocol;
 import org.apache.asterix.replication.management.NetworkingUtil;
 import org.apache.asterix.replication.management.ReplicationManager;
 
@@ -53,7 +53,7 @@
     }
 
     public void append(ILogRecord logRecord) {
-        appendBuffer.putInt(AsterixReplicationProtocol.ReplicationRequestType.REPLICATE_LOG.ordinal());
+        appendBuffer.putInt(ReplicationProtocol.ReplicationRequestType.REPLICATE_LOG.ordinal());
         appendBuffer.putInt(logRecord.getSerializedLogSize());
         appendBuffer.put(logRecord.getSerializedLog());
 
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaEventNotifier.java b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaEventNotifier.java
index 633d87a..9915c83 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaEventNotifier.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaEventNotifier.java
@@ -30,7 +30,7 @@
 import org.apache.asterix.common.config.AsterixReplicationProperties;
 import org.apache.asterix.common.replication.Replica;
 import org.apache.asterix.common.replication.ReplicaEvent;
-import org.apache.asterix.replication.functions.AsterixReplicationProtocol;
+import org.apache.asterix.replication.functions.ReplicationProtocol;
 
 public class ReplicaEventNotifier implements Runnable {
 
@@ -61,7 +61,7 @@
 
         ByteBuffer buffer = null;
         try {
-            buffer = AsterixReplicationProtocol.writeReplicaEventRequest(event);
+            buffer = ReplicationProtocol.writeReplicaEventRequest(event);
         } catch (IOException e) {
             e.printStackTrace();
         }
@@ -79,7 +79,7 @@
                     //send replica event
                     connection.write(buffer);
                     //send goodbye
-                    connection.write(AsterixReplicationProtocol.getGoodbyeBuffer());
+                    connection.write(ReplicationProtocol.getGoodbyeBuffer());
                     break;
                 } catch (IOException | UnresolvedAddressException e) {
                     try {
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaStateChecker.java b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaStateChecker.java
index 07ed144..0c94c61 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaStateChecker.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaStateChecker.java
@@ -28,7 +28,7 @@
 import org.apache.asterix.common.config.AsterixReplicationProperties;
 import org.apache.asterix.common.replication.Replica;
 import org.apache.asterix.common.replication.Replica.ReplicaState;
-import org.apache.asterix.replication.functions.AsterixReplicationProtocol;
+import org.apache.asterix.replication.functions.ReplicationProtocol;
 
 public class ReplicaStateChecker implements Callable<Void> {
 
@@ -61,7 +61,7 @@
                 connection = SocketChannel.open();
                 connection.configureBlocking(true);
                 connection.connect(new InetSocketAddress(replicaAddress.getHostString(), replicaAddress.getPort()));
-                ByteBuffer buffer = AsterixReplicationProtocol.getGoodbyeBuffer();
+                ByteBuffer buffer = ReplicationProtocol.getGoodbyeBuffer();
                 connection.write(buffer);
                 replicationManager.updateReplicaState(replica.getId(), ReplicaState.ACTIVE, suspendReplication);
                 return null;
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
index e6b2ebf..c97fe94 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
@@ -42,7 +42,9 @@
 import java.util.logging.Logger;
 
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.config.AsterixReplicationProperties;
+import org.apache.asterix.common.config.IAsterixPropertiesProvider;
 import org.apache.asterix.common.context.DatasetLifecycleManager.IndexInfo;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
@@ -59,15 +61,15 @@
 import org.apache.asterix.common.transactions.LogRecord;
 import org.apache.asterix.common.transactions.LogSource;
 import org.apache.asterix.common.transactions.LogType;
-import org.apache.asterix.replication.functions.AsterixReplicationProtocol;
-import org.apache.asterix.replication.functions.AsterixReplicationProtocol.ReplicationRequestType;
+import org.apache.asterix.replication.functions.ReplicationProtocol;
+import org.apache.asterix.replication.functions.ReplicationProtocol.ReplicationRequestType;
 import org.apache.asterix.replication.functions.ReplicaFilesRequest;
 import org.apache.asterix.replication.functions.ReplicaIndexFlushRequest;
 import org.apache.asterix.replication.functions.ReplicaLogsRequest;
 import org.apache.asterix.replication.logging.RemoteLogMapping;
-import org.apache.asterix.replication.storage.AsterixLSMIndexFileProperties;
 import org.apache.asterix.replication.storage.LSMComponentLSNSyncTask;
 import org.apache.asterix.replication.storage.LSMComponentProperties;
+import org.apache.asterix.replication.storage.LSMIndexFileProperties;
 import org.apache.asterix.replication.storage.ReplicaResourcesManager;
 import org.apache.hyracks.api.application.INCApplicationContext;
 import org.apache.hyracks.storage.am.common.api.IMetaDataPageManager;
@@ -210,7 +212,7 @@
         public void run() {
             Thread.currentThread().setName("Replication Thread");
             try {
-                ReplicationRequestType replicationFunction = AsterixReplicationProtocol.getRequestType(socketChannel,
+                ReplicationRequestType replicationFunction = ReplicationProtocol.getRequestType(socketChannel,
                         inBuffer);
                 while (replicationFunction != ReplicationRequestType.GOODBYE) {
                     switch (replicationFunction) {
@@ -251,7 +253,7 @@
                             throw new IllegalStateException("Unknown replication request");
                         }
                     }
-                    replicationFunction = AsterixReplicationProtocol.getRequestType(socketChannel, inBuffer);
+                    replicationFunction = ReplicationProtocol.getRequestType(socketChannel, inBuffer);
                 }
             } catch (Exception e) {
                 e.printStackTrace();
@@ -267,9 +269,9 @@
         }
 
         private void handleFlushIndex() throws IOException {
-            inBuffer = AsterixReplicationProtocol.readRequest(socketChannel, inBuffer);
+            inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
             //1. read which indexes are requested to be flushed from remote replica
-            ReplicaIndexFlushRequest request = AsterixReplicationProtocol.readReplicaIndexFlushRequest(inBuffer);
+            ReplicaIndexFlushRequest request = ReplicationProtocol.readReplicaIndexFlushRequest(inBuffer);
             Set<Long> requestedIndexesToBeFlushed = request.getLaggingRescouresIds();
 
             //2. check which indexes can be flushed (open indexes) and which cannot be flushed (closed or have empty memory component)
@@ -302,26 +304,25 @@
             //the remaining indexes in the requested set are those which cannot be flushed.
             //4. respond back to the requester that those indexes cannot be flushed
             ReplicaIndexFlushRequest laggingIndexesResponse = new ReplicaIndexFlushRequest(requestedIndexesToBeFlushed);
-            outBuffer = AsterixReplicationProtocol.writeGetReplicaIndexFlushRequest(outBuffer, laggingIndexesResponse);
+            outBuffer = ReplicationProtocol.writeGetReplicaIndexFlushRequest(outBuffer, laggingIndexesResponse);
             NetworkingUtil.transferBufferToChannel(socketChannel, outBuffer);
         }
 
         private void handleLSMComponentProperties() throws IOException {
-            inBuffer = AsterixReplicationProtocol.readRequest(socketChannel, inBuffer);
-            LSMComponentProperties lsmCompProp = AsterixReplicationProtocol.readLSMPropertiesRequest(inBuffer);
+            inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
+            LSMComponentProperties lsmCompProp = ReplicationProtocol.readLSMPropertiesRequest(inBuffer);
             //create mask to indicate that this component is not valid yet
             replicaResourcesManager.createRemoteLSMComponentMask(lsmCompProp);
             lsmComponentId2PropertiesMap.put(lsmCompProp.getComponentId(), lsmCompProp);
         }
 
         private void handleReplicateFile() throws IOException {
-            inBuffer = AsterixReplicationProtocol.readRequest(socketChannel, inBuffer);
-            AsterixLSMIndexFileProperties afp = AsterixReplicationProtocol.readFileReplicationRequest(inBuffer);
+            inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
+            LSMIndexFileProperties afp = ReplicationProtocol.readFileReplicationRequest(inBuffer);
 
-            String replicaFolderPath = replicaResourcesManager.getIndexPath(afp.getNodeId(), afp.getIoDeviceNum(),
-                    afp.getDataverse(), afp.getIdxName());
-
-            String replicaFilePath = replicaFolderPath + File.separator + afp.getFileName();
+            //get index path
+            String indexPath = replicaResourcesManager.getIndexPath(afp);
+            String replicaFilePath = indexPath + File.separator + afp.getFileName();
 
             //create file
             File destFile = new File(replicaFilePath);
@@ -334,20 +335,20 @@
                 fileChannel.force(true);
 
                 if (afp.requiresAck()) {
-                    AsterixReplicationProtocol.sendAck(socketChannel);
+                    ReplicationProtocol.sendAck(socketChannel);
                 }
                 if (afp.isLSMComponentFile()) {
-                    String compoentId = LSMComponentProperties.getLSMComponentID(afp.getFilePath(), afp.getNodeId());
+                    String componentId = LSMComponentProperties.getLSMComponentID(afp.getFilePath());
                     if (afp.getLSNByteOffset() != IMetaDataPageManager.INVALID_LSN_OFFSET) {
-                        LSMComponentLSNSyncTask syncTask = new LSMComponentLSNSyncTask(compoentId,
+                        LSMComponentLSNSyncTask syncTask = new LSMComponentLSNSyncTask(componentId,
                                 destFile.getAbsolutePath(), afp.getLSNByteOffset());
                         lsmComponentRemoteLSN2LocalLSNMappingTaskQ.offer(syncTask);
                     } else {
-                        updateLSMComponentRemainingFiles(compoentId);
+                        updateLSMComponentRemainingFiles(componentId);
                     }
                 } else {
                     //index metadata file
-                    replicaResourcesManager.initializeReplicaIndexLSNMap(replicaFolderPath, logManager.getAppendLSN());
+                    replicaResourcesManager.initializeReplicaIndexLSNMap(indexPath, logManager.getAppendLSN());
                 }
             }
         }
@@ -370,43 +371,48 @@
         }
 
         private void handleGetReplicaFiles() throws IOException {
-            inBuffer = AsterixReplicationProtocol.readRequest(socketChannel, inBuffer);
-            ReplicaFilesRequest request = AsterixReplicationProtocol.readReplicaFileRequest(inBuffer);
+            inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
+            ReplicaFilesRequest request = ReplicationProtocol.readReplicaFileRequest(inBuffer);
 
-            AsterixLSMIndexFileProperties fileProperties = new AsterixLSMIndexFileProperties();
+            LSMIndexFileProperties fileProperties = new LSMIndexFileProperties();
 
             List<String> filesList;
             Set<String> replicaIds = request.getReplicaIds();
-
+            Map<String, ClusterPartition[]> nodePartitions = ((IAsterixPropertiesProvider) asterixAppRuntimeContextProvider
+                    .getAppContext()).getMetadataProperties().getNodePartitions();
             for (String replicaId : replicaIds) {
-                filesList = replicaResourcesManager.getResourcesForReplica(replicaId);
+                //get replica partitions
+                ClusterPartition[] replicaPatitions = nodePartitions.get(replicaId);
+                for (ClusterPartition partition : replicaPatitions) {
+                    filesList = replicaResourcesManager.getPartitionIndexesFiles(partition.getPartitionId());
 
-                //start sending files
-                for (String filePath : filesList) {
-                    try (RandomAccessFile fromFile = new RandomAccessFile(filePath, "r");
-                            FileChannel fileChannel = fromFile.getChannel();) {
-                        long fileSize = fileChannel.size();
-                        fileProperties.initialize(filePath, fileSize, replicaId, false,
-                                IMetaDataPageManager.INVALID_LSN_OFFSET, false);
-                        outBuffer = AsterixReplicationProtocol.writeFileReplicationRequest(outBuffer, fileProperties,
-                                ReplicationRequestType.REPLICATE_FILE);
+                    //start sending files
+                    for (String filePath : filesList) {
+                        try (RandomAccessFile fromFile = new RandomAccessFile(filePath, "r");
+                                FileChannel fileChannel = fromFile.getChannel();) {
+                            long fileSize = fileChannel.size();
+                            fileProperties.initialize(filePath, fileSize, replicaId, false,
+                                    IMetaDataPageManager.INVALID_LSN_OFFSET, false);
+                            outBuffer = ReplicationProtocol.writeFileReplicationRequest(outBuffer,
+                                    fileProperties, ReplicationRequestType.REPLICATE_FILE);
 
-                        //send file info
-                        NetworkingUtil.transferBufferToChannel(socketChannel, outBuffer);
+                            //send file info
+                            NetworkingUtil.transferBufferToChannel(socketChannel, outBuffer);
 
-                        //transfer file
-                        NetworkingUtil.sendFile(fileChannel, socketChannel);
+                            //transfer file
+                            NetworkingUtil.sendFile(fileChannel, socketChannel);
+                        }
                     }
                 }
             }
 
             //send goodbye (end of files)
-            AsterixReplicationProtocol.sendGoodbye(socketChannel);
+            ReplicationProtocol.sendGoodbye(socketChannel);
         }
 
         private void handleGetRemoteLogs() throws IOException, ACIDException {
-            inBuffer = AsterixReplicationProtocol.readRequest(socketChannel, inBuffer);
-            ReplicaLogsRequest request = AsterixReplicationProtocol.readReplicaLogsRequest(inBuffer);
+            inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
+            ReplicaLogsRequest request = ReplicationProtocol.readReplicaLogsRequest(inBuffer);
 
             Set<String> replicaIds = request.getReplicaIds();
             long fromLSN = request.getFromLSN();
@@ -433,13 +439,13 @@
                     if (replicaIds.contains(logRecord.getNodeId()) && logRecord.getLogType() != LogType.FLUSH) {
                         if (logRecord.getSerializedLogSize() > outBuffer.capacity()) {
                             int requestSize = logRecord.getSerializedLogSize()
-                                    + AsterixReplicationProtocol.REPLICATION_REQUEST_HEADER_SIZE;
+                                    + ReplicationProtocol.REPLICATION_REQUEST_HEADER_SIZE;
                             outBuffer = ByteBuffer.allocate(requestSize);
                         }
 
                         //set log source to REMOTE_RECOVERY to avoid re-logging on the recipient side
                         logRecord.setLogSource(LogSource.REMOTE_RECOVERY);
-                        AsterixReplicationProtocol.writeRemoteRecoveryLogRequest(outBuffer, logRecord);
+                        ReplicationProtocol.writeRemoteRecoveryLogRequest(outBuffer, logRecord);
                         NetworkingUtil.transferBufferToChannel(socketChannel, outBuffer);
                     }
                     logRecord = logReader.next();
@@ -449,32 +455,32 @@
             }
 
             //send goodbye (end of logs)
-            AsterixReplicationProtocol.sendGoodbye(socketChannel);
+            ReplicationProtocol.sendGoodbye(socketChannel);
         }
 
         private void handleUpdateReplica() throws IOException {
-            inBuffer = AsterixReplicationProtocol.readRequest(socketChannel, inBuffer);
-            Replica replica = AsterixReplicationProtocol.readReplicaUpdateRequest(inBuffer);
+            inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
+            Replica replica = ReplicationProtocol.readReplicaUpdateRequest(inBuffer);
             replicationManager.updateReplicaInfo(replica);
         }
 
         private void handleReplicaEvent() throws IOException {
-            inBuffer = AsterixReplicationProtocol.readRequest(socketChannel, inBuffer);
-            ReplicaEvent event = AsterixReplicationProtocol.readReplicaEventRequest(inBuffer);
+            inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
+            ReplicaEvent event = ReplicationProtocol.readReplicaEventRequest(inBuffer);
             replicationManager.reportReplicaEvent(event);
         }
 
         private void handleDeleteFile() throws IOException {
-            inBuffer = AsterixReplicationProtocol.readRequest(socketChannel, inBuffer);
-            AsterixLSMIndexFileProperties fileProp = AsterixReplicationProtocol.readFileReplicationRequest(inBuffer);
-            replicaResourcesManager.deleteRemoteFile(fileProp);
+            inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
+            LSMIndexFileProperties fileProp = ReplicationProtocol.readFileReplicationRequest(inBuffer);
+            replicaResourcesManager.deleteIndexFile(fileProp);
             if (fileProp.requiresAck()) {
-                AsterixReplicationProtocol.sendAck(socketChannel);
+                ReplicationProtocol.sendAck(socketChannel);
             }
         }
 
         private void handleLogReplication() throws IOException, ACIDException {
-            inBuffer = AsterixReplicationProtocol.readRequest(socketChannel, inBuffer);
+            inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
 
             //Deserialize log
             remoteLog.readRemoteLog(inBuffer, false, localNodeID);
@@ -518,7 +524,7 @@
                 //send ACK to requester
                 try {
                     socketChannel.socket().getOutputStream().write(
-                            (localNodeID + AsterixReplicationProtocol.JOB_COMMIT_ACK + logRecord.getJobId() + "\n")
+                            (localNodeID + ReplicationProtocol.JOB_COMMIT_ACK + logRecord.getJobId() + "\n")
                                     .getBytes());
                     socketChannel.socket().getOutputStream().flush();
                 } catch (IOException e) {
@@ -625,6 +631,7 @@
             }
 
             File destFile = new File(syncTask.getComponentFilePath());
+            //prepare local LSN buffer
             ByteBuffer metadataBuffer = ByteBuffer.allocate(Long.BYTES);
             metadataBuffer.putLong(lsmCompProp.getReplicaLSN());
             metadataBuffer.flip();
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
index 36e5dff..5c35df4 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
@@ -66,17 +66,16 @@
 import org.apache.asterix.common.transactions.LogRecord;
 import org.apache.asterix.common.transactions.LogType;
 import org.apache.asterix.event.schema.cluster.Node;
-import org.apache.asterix.replication.functions.AsterixReplicationProtocol;
-import org.apache.asterix.replication.functions.AsterixReplicationProtocol.ReplicationRequestType;
+import org.apache.asterix.replication.functions.ReplicationProtocol;
+import org.apache.asterix.replication.functions.ReplicationProtocol.ReplicationRequestType;
 import org.apache.asterix.replication.functions.ReplicaFilesRequest;
 import org.apache.asterix.replication.functions.ReplicaIndexFlushRequest;
 import org.apache.asterix.replication.functions.ReplicaLogsRequest;
 import org.apache.asterix.replication.logging.ReplicationLogBuffer;
 import org.apache.asterix.replication.logging.ReplicationLogFlusher;
-import org.apache.asterix.replication.storage.AsterixLSMIndexFileProperties;
 import org.apache.asterix.replication.storage.LSMComponentProperties;
+import org.apache.asterix.replication.storage.LSMIndexFileProperties;
 import org.apache.asterix.replication.storage.ReplicaResourcesManager;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.replication.IReplicationJob;
 import org.apache.hyracks.api.replication.IReplicationJob.ReplicationExecutionType;
 import org.apache.hyracks.api.replication.IReplicationJob.ReplicationJobType;
@@ -129,7 +128,8 @@
     private ReplicationLogFlusher txnlogsReplicator;
     private Future<? extends Object> txnLogReplicatorTask;
     private Map<String, SocketChannel> logsReplicaSockets = null;
-
+    //TODO this class needs to be refactored by moving its private classes to separate files
+    //and possibly using MessageBroker to send/receive remote replicas events.
     public ReplicationManager(String nodeId, AsterixReplicationProperties replicationProperties,
             IReplicaResourcesManager remoteResoucesManager, ILogManager logManager,
             IAsterixAppRuntimeContextProvider asterixAppRuntimeContextProvider) {
@@ -255,7 +255,7 @@
             throws IOException {
         boolean isLSMComponentFile;
         ByteBuffer responseBuffer = null;
-        AsterixLSMIndexFileProperties asterixFileProperties = new AsterixLSMIndexFileProperties();
+        LSMIndexFileProperties asterixFileProperties = new LSMIndexFileProperties();
         if (requestBuffer == null) {
             requestBuffer = ByteBuffer.allocate(INITIAL_BUFFER_SIZE);
         }
@@ -277,7 +277,7 @@
                         //send LSMComponent properties
                         LSMComponentJob = (ILSMIndexReplicationJob) job;
                         LSMComponentProperties lsmCompProp = new LSMComponentProperties(LSMComponentJob, nodeId);
-                        requestBuffer = AsterixReplicationProtocol.writeLSMComponentPropertiesRequest(lsmCompProp,
+                        requestBuffer = ReplicationProtocol.writeLSMComponentPropertiesRequest(lsmCompProp,
                                 requestBuffer);
                         sendRequest(replicasSockets, requestBuffer);
                     }
@@ -310,7 +310,7 @@
                                         IMetaDataPageManager.INVALID_LSN_OFFSET, remainingFiles == 0);
                             }
 
-                            requestBuffer = AsterixReplicationProtocol.writeFileReplicationRequest(requestBuffer,
+                            requestBuffer = ReplicationProtocol.writeFileReplicationRequest(requestBuffer,
                                     asterixFileProperties, ReplicationRequestType.REPLICATE_FILE);
 
                             Iterator<Map.Entry<String, SocketChannel>> iterator = replicasSockets.entrySet().iterator();
@@ -350,7 +350,7 @@
                     remainingFiles--;
                     asterixFileProperties.initialize(filePath, -1, nodeId, isLSMComponentFile,
                             IMetaDataPageManager.INVALID_LSN_OFFSET, remainingFiles == 0);
-                    AsterixReplicationProtocol.writeFileReplicationRequest(requestBuffer, asterixFileProperties,
+                    ReplicationProtocol.writeFileReplicationRequest(requestBuffer, asterixFileProperties,
                             ReplicationRequestType.DELETE_FILE);
 
                     Iterator<Map.Entry<String, SocketChannel>> iterator = replicasSockets.entrySet().iterator();
@@ -392,13 +392,13 @@
     private static ReplicationRequestType waitForResponse(SocketChannel socketChannel, ByteBuffer responseBuffer)
             throws IOException {
         if (responseBuffer == null) {
-            responseBuffer = ByteBuffer.allocate(AsterixReplicationProtocol.REPLICATION_REQUEST_TYPE_SIZE);
+            responseBuffer = ByteBuffer.allocate(ReplicationProtocol.REPLICATION_REQUEST_TYPE_SIZE);
         } else {
             responseBuffer.clear();
         }
 
         //read response from remote replicas
-        ReplicationRequestType responseFunction = AsterixReplicationProtocol.getRequestType(socketChannel,
+        ReplicationRequestType responseFunction = ReplicationProtocol.getRequestType(socketChannel,
                 responseBuffer);
         return responseFunction;
     }
@@ -519,7 +519,7 @@
         node.setClusterIp(newAddress);
         Replica replica = new Replica(node);
 
-        ByteBuffer buffer = AsterixReplicationProtocol.writeUpdateReplicaRequest(replica);
+        ByteBuffer buffer = ReplicationProtocol.writeUpdateReplicaRequest(replica);
         Map<String, SocketChannel> replicaSockets = getActiveRemoteReplicasSockets();
         sendRequest(replicaSockets, buffer);
         closeReplicaSockets(replicaSockets);
@@ -537,7 +537,7 @@
         node.setClusterIp(NetworkingUtil.getHostAddress(hostIPAddressFirstOctet));
         Replica replica = new Replica(node);
         ReplicaEvent event = new ReplicaEvent(replica, ReplicaEventType.SHUTDOWN);
-        ByteBuffer buffer = AsterixReplicationProtocol.writeReplicaEventRequest(event);
+        ByteBuffer buffer = ReplicationProtocol.writeReplicaEventRequest(event);
         Map<String, SocketChannel> replicaSockets = getActiveRemoteReplicasSockets();
         sendRequest(replicaSockets, buffer);
         closeReplicaSockets(replicaSockets);
@@ -581,7 +581,7 @@
      */
     private void closeReplicaSockets(Map<String, SocketChannel> replicaSockets) {
         //send goodbye
-        ByteBuffer goodbyeBuffer = AsterixReplicationProtocol.getGoodbyeBuffer();
+        ByteBuffer goodbyeBuffer = ReplicationProtocol.getGoodbyeBuffer();
         sendRequest(replicaSockets, goodbyeBuffer);
 
         Iterator<Map.Entry<String, SocketChannel>> iterator = replicaSockets.entrySet().iterator();
@@ -910,7 +910,7 @@
         ByteBuffer requestBuffer = ByteBuffer.allocate(INITIAL_BUFFER_SIZE);
         for (String replicaId : replicaIds) {
             //1. identify replica indexes with LSN less than nonSharpCheckpointTargetLSN.
-            HashMap<Long, String> laggingIndexes = replicaResourcesManager.getLaggingReplicaIndexesId2PathMap(replicaId,
+            Map<Long, String> laggingIndexes = replicaResourcesManager.getLaggingReplicaIndexesId2PathMap(replicaId,
                     nonSharpCheckpointTargetLSN);
 
             if (laggingIndexes.size() > 0) {
@@ -919,7 +919,7 @@
                 try (SocketChannel socketChannel = getReplicaSocket(replicaId)) {
                     ReplicaIndexFlushRequest laggingIndexesRequest = new ReplicaIndexFlushRequest(
                             laggingIndexes.keySet());
-                    requestBuffer = AsterixReplicationProtocol.writeGetReplicaIndexFlushRequest(requestBuffer,
+                    requestBuffer = ReplicationProtocol.writeGetReplicaIndexFlushRequest(requestBuffer,
                             laggingIndexesRequest);
                     NetworkingUtil.transferBufferToChannel(socketChannel, requestBuffer);
 
@@ -927,19 +927,19 @@
                     ReplicationRequestType responseFunction = waitForResponse(socketChannel, requestBuffer);
 
                     if (responseFunction == ReplicationRequestType.FLUSH_INDEX) {
-                        requestBuffer = AsterixReplicationProtocol.readRequest(socketChannel, requestBuffer);
+                        requestBuffer = ReplicationProtocol.readRequest(socketChannel, requestBuffer);
                         //returning the indexes that were not flushed
-                        laggingIndexesResponse = AsterixReplicationProtocol.readReplicaIndexFlushRequest(requestBuffer);
+                        laggingIndexesResponse = ReplicationProtocol.readReplicaIndexFlushRequest(requestBuffer);
                     }
                     //send goodbye
-                    AsterixReplicationProtocol.sendGoodbye(socketChannel);
+                    ReplicationProtocol.sendGoodbye(socketChannel);
                 }
 
                 //4. update the LSN_MAP for indexes that were not flushed to the current append LSN to indicate no operations happend.
                 if (laggingIndexesResponse != null) {
                     for (Long resouceId : laggingIndexesResponse.getLaggingRescouresIds()) {
                         String indexPath = laggingIndexes.get(resouceId);
-                        HashMap<Long, Long> indexLSNMap = replicaResourcesManager.getReplicaIndexLSNMap(indexPath);
+                        Map<Long, Long> indexLSNMap = replicaResourcesManager.getReplicaIndexLSNMap(indexPath);
                         indexLSNMap.put(ReplicaResourcesManager.REPLICA_INDEX_CREATION_LSN, startLSN);
                         replicaResourcesManager.updateReplicaIndexLSNMap(indexPath, indexLSNMap);
                     }
@@ -953,7 +953,7 @@
     public long getMaxRemoteLSN(Set<String> remoteReplicas) throws IOException {
         long maxRemoteLSN = 0;
 
-        AsterixReplicationProtocol.writeGetReplicaMaxLSNRequest(dataBuffer);
+        ReplicationProtocol.writeGetReplicaMaxLSNRequest(dataBuffer);
         Map<String, SocketChannel> replicaSockets = new HashMap<String, SocketChannel>();
         try {
             for (String replicaId : remoteReplicas) {
@@ -988,26 +988,26 @@
     @Override
     public void requestReplicaFiles(String selectedReplicaId, Set<String> replicasDataToRecover) throws IOException {
         ReplicaFilesRequest request = new ReplicaFilesRequest(replicasDataToRecover);
-        AsterixReplicationProtocol.writeGetReplicaFilesRequest(dataBuffer, request);
+        ReplicationProtocol.writeGetReplicaFilesRequest(dataBuffer, request);
 
         try (SocketChannel socketChannel = getReplicaSocket(selectedReplicaId)) {
 
             //transfer request
             NetworkingUtil.transferBufferToChannel(socketChannel, dataBuffer);
 
-            String destFolder;
+            String indexPath;
             String destFilePath;
-
-            ReplicationRequestType responseFunction = AsterixReplicationProtocol.getRequestType(socketChannel,
+            ReplicationRequestType responseFunction = ReplicationProtocol.getRequestType(socketChannel,
                     dataBuffer);
-            AsterixLSMIndexFileProperties fileProperties;
+            LSMIndexFileProperties fileProperties;
             while (responseFunction != ReplicationRequestType.GOODBYE) {
-                dataBuffer = AsterixReplicationProtocol.readRequest(socketChannel, dataBuffer);
+                dataBuffer = ReplicationProtocol.readRequest(socketChannel, dataBuffer);
 
-                fileProperties = AsterixReplicationProtocol.readFileReplicationRequest(dataBuffer);
-                destFolder = replicaResourcesManager.getIndexPath(fileProperties.getNodeId(),
-                        fileProperties.getIoDeviceNum(), fileProperties.getDataverse(), fileProperties.getIdxName());
-                destFilePath = destFolder + File.separator + fileProperties.getFileName();
+                fileProperties = ReplicationProtocol.readFileReplicationRequest(dataBuffer);
+
+                //get index path
+                indexPath = replicaResourcesManager.getIndexPath(fileProperties);
+                destFilePath = indexPath + File.separator + fileProperties.getFileName();
 
                 //create file
                 File destFile = new File(destFilePath);
@@ -1024,14 +1024,14 @@
                 //we need to create LSN map for .metadata files that belong to remote replicas
                 if (!fileProperties.isLSMComponentFile() && !fileProperties.getNodeId().equals(nodeId)) {
                     //replica index
-                    replicaResourcesManager.initializeReplicaIndexLSNMap(destFolder, logManager.getAppendLSN());
+                    replicaResourcesManager.initializeReplicaIndexLSNMap(indexPath, logManager.getAppendLSN());
                 }
 
-                responseFunction = AsterixReplicationProtocol.getRequestType(socketChannel, dataBuffer);
+                responseFunction = ReplicationProtocol.getRequestType(socketChannel, dataBuffer);
             }
 
             //send goodbye
-            AsterixReplicationProtocol.sendGoodbye(socketChannel);
+            ReplicationProtocol.sendGoodbye(socketChannel);
         }
     }
 
@@ -1039,7 +1039,7 @@
     @Override
     public long requestReplicaMinLSN(String selectedReplicaId) throws IOException {
         long minLSN = 0;
-        AsterixReplicationProtocol.writeMinLSNRequest(dataBuffer);
+        ReplicationProtocol.writeMinLSNRequest(dataBuffer);
         try (SocketChannel socketChannel = getReplicaSocket(selectedReplicaId);) {
             //transfer request
             NetworkingUtil.transferBufferToChannel(socketChannel, dataBuffer);
@@ -1049,7 +1049,7 @@
             minLSN = dataBuffer.getLong();
 
             //send goodbye
-            AsterixReplicationProtocol.sendGoodbye(socketChannel);
+            ReplicationProtocol.sendGoodbye(socketChannel);
         }
 
         return minLSN;
@@ -1060,19 +1060,19 @@
     public ArrayList<ILogRecord> requestReplicaLogs(String remoteNode, Set<String> nodeIdsToRecoverFor, long fromLSN)
             throws IOException, ACIDException {
         ReplicaLogsRequest request = new ReplicaLogsRequest(nodeIdsToRecoverFor, fromLSN);
-        dataBuffer = AsterixReplicationProtocol.writeGetReplicaLogsRequest(dataBuffer, request);
+        dataBuffer = ReplicationProtocol.writeGetReplicaLogsRequest(dataBuffer, request);
 
         try (SocketChannel socketChannel = getReplicaSocket(remoteNode)) {
             //transfer request
             NetworkingUtil.transferBufferToChannel(socketChannel, dataBuffer);
 
             //read response type
-            ReplicationRequestType responseType = AsterixReplicationProtocol.getRequestType(socketChannel, dataBuffer);
+            ReplicationRequestType responseType = ReplicationProtocol.getRequestType(socketChannel, dataBuffer);
 
             ArrayList<ILogRecord> recoveryLogs = new ArrayList<ILogRecord>();
             ILogRecord logRecord = new LogRecord();
             while (responseType != ReplicationRequestType.GOODBYE) {
-                dataBuffer = AsterixReplicationProtocol.readRequest(socketChannel, dataBuffer);
+                dataBuffer = ReplicationProtocol.readRequest(socketChannel, dataBuffer);
                 logRecord.readRemoteLog(dataBuffer, true, nodeId);
 
                 if (logRecord.getNodeId().equals(nodeId)) {
@@ -1085,11 +1085,11 @@
                     logManager.log(logRecord);
                 }
 
-                responseType = AsterixReplicationProtocol.getRequestType(socketChannel, dataBuffer);
+                responseType = ReplicationProtocol.getRequestType(socketChannel, dataBuffer);
             }
 
             //send goodbye
-            AsterixReplicationProtocol.sendGoodbye(socketChannel);
+            ReplicationProtocol.sendGoodbye(socketChannel);
             return recoveryLogs;
         }
     }
@@ -1136,11 +1136,7 @@
             updateReplicaState(replicaId, ReplicaState.DEAD, true);
 
             //delete any invalid LSMComponents for this replica
-            try {
-                replicaResourcesManager.cleanInvalidLSMComponents(replicaId);
-            } catch (HyracksDataException e) {
-                e.printStackTrace();
-            }
+            replicaResourcesManager.cleanInvalidLSMComponents(replicaId);
         }
 
         public void handleShutdownEvent(String replicaId) {
@@ -1237,8 +1233,8 @@
                         break;
                     }
                     //read ACK for job commit log
-                    String replicaId = AsterixReplicationProtocol.getNodeIdFromLogAckMessage(responseLine);
-                    int jobId = AsterixReplicationProtocol.getJobIdFromLogAckMessage(responseLine);
+                    String replicaId = ReplicationProtocol.getNodeIdFromLogAckMessage(responseLine);
+                    int jobId = ReplicationProtocol.getJobIdFromLogAckMessage(responseLine);
                     addAckToJob(jobId, replicaId);
                 }
             } catch (AsynchronousCloseException e1) {
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java b/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
index a82b535..ee987f8 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.replication.recovery;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -30,10 +31,12 @@
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.config.AsterixReplicationProperties;
+import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.replication.IRemoteRecoveryManager;
 import org.apache.asterix.common.replication.IReplicationManager;
 import org.apache.asterix.common.transactions.ILogManager;
 import org.apache.asterix.common.transactions.ILogRecord;
+import org.apache.asterix.om.util.AsterixClusterProperties;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 
 public class RemoteRecoveryManager implements IRemoteRecoveryManager {
@@ -55,13 +58,20 @@
 
     @Override
     public void performRemoteRecovery() {
+        //TODO this method needs to be adapted to perform failback when autoFailover is enabled.
+        //Currently we will not allow a node to perform remote recovery since another replica
+        //already tookover its workload and might not resync correctly if there are on on-going
+        //jobs on the replica.
+        if (AsterixClusterProperties.INSTANCE.isAutoFailoverEnabled()) {
+            throw new IllegalStateException("Cannot perform remote recovery when auto failover is enabled.");
+        }
         //The whole remote recovery process should be atomic.
         //Any error happens, we should start the recovery from the start until the recovery is complete or an illegal state is reached (cannot recovery).
         int maxRecoveryAttempts = 10;
         PersistentLocalResourceRepository resourceRepository = (PersistentLocalResourceRepository) runtimeContext
                 .getLocalResourceRepository();
         while (true) {
-            //start recovery recovery steps
+            //start recovery steps
             try {
                 maxRecoveryAttempts--;
 
@@ -76,7 +86,7 @@
                 int activeReplicasCount = replicationManager.getActiveReplicasCount();
 
                 if (activeReplicasCount == 0) {
-                    throw new IllegalStateException("no ACTIVE remote replica(s) exists to performe remote recovery");
+                    throw new IllegalStateException("no ACTIVE remote replica(s) exists to perform remote recovery");
                 }
 
                 //2. clean any memory data that could've existed from previous failed recovery attempt
@@ -90,8 +100,7 @@
                 Map<String, Set<String>> selectedRemoteReplicas = constructRemoteRecoveryPlan();
 
                 //5. get max LSN from selected remote replicas
-                long maxRemoteLSN = 0;
-                maxRemoteLSN = replicationManager.getMaxRemoteLSN(selectedRemoteReplicas.keySet());
+                long maxRemoteLSN = replicationManager.getMaxRemoteLSN(selectedRemoteReplicas.keySet());
 
                 //6. force LogManager to start from a partition > maxLSN in selected remote replicas
                 logManager.renewLogFilesAndStartFromLSN(maxRemoteLSN);
@@ -107,8 +116,7 @@
                     //2. Initialize local resources based on the newly received files (if we are recovering the primary replica on this node)
                     if (replicasDataToRecover.contains(logManager.getNodeId())) {
                         ((PersistentLocalResourceRepository) runtimeContext.getLocalResourceRepository())
-                                .initializeNewUniverse(
-                                        runtimeContext.getReplicaResourcesManager().getLocalStorageFolder());
+                                .initializeNewUniverse(AsterixClusterProperties.INSTANCE.getStorageDirectoryName());
                         //initialize resource id factor to correct max resource id
                         runtimeContext.initializeResourceIdFactory();
                     }
@@ -140,7 +148,6 @@
     }
 
     private Map<String, Set<String>> constructRemoteRecoveryPlan() {
-
         //1. identify which replicas reside in this node
         String localNodeId = logManager.getNodeId();
         Set<String> nodes = replicationProperties.getNodeReplicasIds(localNodeId);
@@ -205,4 +212,14 @@
 
         return recoveryList;
     }
+
+    @Override
+    public void takeoverPartitons(String failedNode, Integer[] partitions) throws IOException, ACIDException {
+        long minLSN = runtimeContext.getReplicaResourcesManager().getPartitionsMinLSN(partitions);
+        //reply logs > minLSN that belong to these partitions
+        //TODO (mhubail) currently we assume the logs for these partitions belong to the failed node
+        //this needs to be updated once log formats are updated to include the partition id
+        runtimeContext.getTransactionSubsystem().getRecoveryManager().replayPartitionsLogs(partitions, minLSN,
+                failedNode);
+    }
 }
\ No newline at end of file
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/AsterixFilesUtil.java b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/AsterixFilesUtil.java
deleted file mode 100644
index 67b39c4..0000000
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/AsterixFilesUtil.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.replication.storage;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.FileVisitResult;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.nio.file.SimpleFileVisitor;
-import java.nio.file.attribute.BasicFileAttributes;
-
-public class AsterixFilesUtil {
-
-    public static void deleteFolder(String folderPath) throws IOException {
-        File folder = new File(folderPath);
-        if (folder.exists()) {
-            //delete files inside the folder
-            while (deleteDirecotryFiles(folderPath) != true) {
-                //if there is a file being written (e.g. LSM Component), wait and try again to delete the file
-                try {
-                    Thread.sleep(500);
-                } catch (InterruptedException e) {
-                    //ignore
-                }
-            }
-
-            //delete the folder itself
-            folder.delete();
-        }
-    }
-
-    private static boolean deleteDirecotryFiles(String dirPath) throws IOException {
-        try {
-            Path directory = Paths.get(dirPath);
-            Files.walkFileTree(directory, new SimpleFileVisitor<Path>() {
-                @Override
-                public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
-                    Files.delete(file);
-                    return FileVisitResult.CONTINUE;
-                }
-
-                @Override
-                public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
-                    Files.delete(dir);
-                    return FileVisitResult.CONTINUE;
-                }
-
-                @Override
-                public FileVisitResult visitFileFailed(Path file, IOException exc) throws IOException {
-                    return FileVisitResult.CONTINUE;
-                }
-
-            });
-            return true;
-        } catch (Exception e) {
-            e.printStackTrace();
-            return false;
-        }
-    }
-}
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java
index 794a6e1..841a99f 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java
@@ -47,9 +47,10 @@
 
     public LSMComponentProperties(ILSMIndexReplicationJob job, String nodeId) {
         this.nodeId = nodeId;
-        componentId = LSMComponentProperties.getLSMComponentID((String) job.getJobFiles().toArray()[0], nodeId);
+        componentId = LSMComponentProperties.getLSMComponentID((String) job.getJobFiles().toArray()[0]);
         numberOfFiles = new AtomicInteger(job.getJobFiles().size());
-        originalLSN = LSMComponentProperties.getLSMComponentLSN((AbstractLSMIndex) job.getLSMIndex(), job.getLSMIndexOperationContext());
+        originalLSN = LSMComponentProperties.getLSMComponentLSN((AbstractLSMIndex) job.getLSMIndex(),
+                job.getLSMIndexOperationContext());
         opType = job.getLSMOpType();
     }
 
@@ -94,7 +95,7 @@
 
     public String getMaskPath(ReplicaResourcesManager resourceManager) {
         if (maskPath == null) {
-            AsterixLSMIndexFileProperties afp = new AsterixLSMIndexFileProperties(this);
+            LSMIndexFileProperties afp = new LSMIndexFileProperties(this);
             maskPath = getReplicaComponentPath(resourceManager) + File.separator + afp.getFileName()
                     + ReplicaResourcesManager.LSM_COMPONENT_MASK_SUFFIX;
         }
@@ -103,9 +104,8 @@
 
     public String getReplicaComponentPath(ReplicaResourcesManager resourceManager) {
         if (replicaPath == null) {
-            AsterixLSMIndexFileProperties afp = new AsterixLSMIndexFileProperties(this);
-            replicaPath = resourceManager.getIndexPath(afp.getNodeId(), afp.getIoDeviceNum(), afp.getDataverse(),
-                    afp.getIdxName());
+            LSMIndexFileProperties afp = new LSMIndexFileProperties(this);
+            replicaPath = resourceManager.getIndexPath(afp);
         }
         return replicaPath;
     }
@@ -113,27 +113,24 @@
     /***
      * @param filePath
      *            any file of the LSM component
-     * @param nodeId
      * @return a unique id based on the timestamp of the component
      */
-    public static String getLSMComponentID(String filePath, String nodeId) {
+    public static String getLSMComponentID(String filePath) {
         String[] tokens = filePath.split(File.separator);
 
         int arraySize = tokens.length;
         String fileName = tokens[arraySize - 1];
-        String ioDevoceName = tokens[arraySize - 2];
-        String idxName = tokens[arraySize - 3];
-        String dataverse = tokens[arraySize - 4];
+        String idxName = tokens[arraySize - 2];
+        String dataverse = tokens[arraySize - 3];
+        String partitionName = tokens[arraySize - 4];
 
         StringBuilder componentId = new StringBuilder();
-        componentId.append(nodeId);
+        componentId.append(partitionName);
         componentId.append(File.separator);
         componentId.append(dataverse);
         componentId.append(File.separator);
         componentId.append(idxName);
         componentId.append(File.separator);
-        componentId.append(ioDevoceName);
-        componentId.append(File.separator);
         componentId.append(fileName.substring(0, fileName.lastIndexOf(AbstractLSMIndexFileManager.SPLIT_STRING)));
         return componentId.toString();
     }
@@ -142,18 +139,10 @@
         return componentId;
     }
 
-    public void setComponentId(String componentId) {
-        this.componentId = componentId;
-    }
-
     public long getOriginalLSN() {
         return originalLSN;
     }
 
-    public void setOriginalLSN(long lSN) {
-        originalLSN = lSN;
-    }
-
     public String getNodeId() {
         return nodeId;
     }
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/AsterixLSMIndexFileProperties.java b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java
similarity index 69%
rename from asterix-replication/src/main/java/org/apache/asterix/replication/storage/AsterixLSMIndexFileProperties.java
rename to asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java
index 5e0c9b0..890d3a2 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/AsterixLSMIndexFileProperties.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java
@@ -24,31 +24,31 @@
 import java.io.IOException;
 import java.io.OutputStream;
 
+import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.hyracks.storage.am.common.api.IMetaDataPageManager;
-import org.apache.hyracks.storage.am.common.util.IndexFileNameUtil;
 
-public class AsterixLSMIndexFileProperties {
+public class LSMIndexFileProperties {
 
     private String fileName;
     private long fileSize;
     private String nodeId;
     private String dataverse;
-    private int ioDeviceNum;
     private String idxName;
     private boolean lsmComponentFile;
     private String filePath;
     private boolean requiresAck = false;
     private long LSNByteOffset;
+    private int partition;
 
-    public AsterixLSMIndexFileProperties() {
+    public LSMIndexFileProperties() {
     }
 
-    public AsterixLSMIndexFileProperties(String filePath, long fileSize, String nodeId, boolean lsmComponentFile,
+    public LSMIndexFileProperties(String filePath, long fileSize, String nodeId, boolean lsmComponentFile,
             long LSNByteOffset, boolean requiresAck) {
         initialize(filePath, fileSize, nodeId, lsmComponentFile, LSNByteOffset, requiresAck);
     }
 
-    public AsterixLSMIndexFileProperties(LSMComponentProperties lsmComponentProperties) {
+    public LSMIndexFileProperties(LSMComponentProperties lsmComponentProperties) {
         initialize(lsmComponentProperties.getComponentId(), -1, lsmComponentProperties.getNodeId(), false,
                 IMetaDataPageManager.INVALID_LSN_OFFSET, false);
     }
@@ -58,19 +58,22 @@
         this.filePath = filePath;
         this.fileSize = fileSize;
         this.nodeId = nodeId;
-        String[] tokens = filePath.split(File.separator);
-        int arraySize = tokens.length;
-        this.fileName = tokens[arraySize - 1];
-        this.ioDeviceNum = getDeviceIONumFromName(tokens[arraySize - 2]);
-        this.idxName = tokens[arraySize - 3];
-        this.dataverse = tokens[arraySize - 4];
         this.lsmComponentFile = lsmComponentFile;
         this.LSNByteOffset = LSNByteOffset;
         this.requiresAck = requiresAck;
     }
 
-    public static int getDeviceIONumFromName(String name) {
-        return Integer.parseInt(name.substring(IndexFileNameUtil.IO_DEVICE_NAME_PREFIX.length()));
+    public void splitFileName() {
+        String[] tokens = filePath.split(File.separator);
+        int arraySize = tokens.length;
+        this.fileName = tokens[arraySize - 1];
+        this.idxName = tokens[arraySize - 2];
+        this.dataverse = tokens[arraySize - 3];
+        this.partition = getPartitonNumFromName(tokens[arraySize - 4]);
+    }
+
+    private static int getPartitonNumFromName(String name) {
+        return Integer.parseInt(name.substring(StoragePathUtil.PARTITION_DIR_PREFIX.length()));
     }
 
     public void serialize(OutputStream out) throws IOException {
@@ -83,15 +86,15 @@
         dos.writeBoolean(requiresAck);
     }
 
-    public static AsterixLSMIndexFileProperties create(DataInput input) throws IOException {
+    public static LSMIndexFileProperties create(DataInput input) throws IOException {
         String nodeId = input.readUTF();
         String filePath = input.readUTF();
         long fileSize = input.readLong();
         boolean lsmComponentFile = input.readBoolean();
         long LSNByteOffset = input.readLong();
         boolean requiresAck = input.readBoolean();
-        AsterixLSMIndexFileProperties fileProp = new AsterixLSMIndexFileProperties();
-        fileProp.initialize(filePath, fileSize, nodeId, lsmComponentFile, LSNByteOffset, requiresAck);
+        LSMIndexFileProperties fileProp = new LSMIndexFileProperties(filePath, fileSize, nodeId, lsmComponentFile,
+                LSNByteOffset, requiresAck);
         return fileProp;
     }
 
@@ -99,10 +102,6 @@
         return filePath;
     }
 
-    public void setFilePath(String filePath) {
-        this.filePath = filePath;
-    }
-
     public long getFileSize() {
         return fileSize;
     }
@@ -111,10 +110,6 @@
         return fileName;
     }
 
-    public void setFileName(String fileName) {
-        this.fileName = fileName;
-    }
-
     public String getNodeId() {
         return nodeId;
     }
@@ -131,49 +126,25 @@
         this.dataverse = dataverse;
     }
 
-    public void setFileSize(long fileSize) {
-        this.fileSize = fileSize;
-    }
-
-    public int getIoDeviceNum() {
-        return ioDeviceNum;
-    }
-
-    public void setIoDeviceNum(int ioDevoceNum) {
-        this.ioDeviceNum = ioDevoceNum;
-    }
-
     public String getIdxName() {
         return idxName;
     }
 
-    public void setIdxName(String idxName) {
-        this.idxName = idxName;
-    }
-
     public boolean isLSMComponentFile() {
         return lsmComponentFile;
     }
 
-    public void setLsmComponentFile(boolean lsmComponentFile) {
-        this.lsmComponentFile = lsmComponentFile;
-    }
-
     public boolean requiresAck() {
         return requiresAck;
     }
 
-    public void setRequiresAck(boolean requiresAck) {
-        this.requiresAck = requiresAck;
-    }
-
     @Override
     public String toString() {
         StringBuilder sb = new StringBuilder();
         sb.append("File Name: " + fileName + "  ");
         sb.append("File Size: " + fileSize + "  ");
         sb.append("Node ID: " + nodeId + "  ");
-        sb.append("I/O Device: " + ioDeviceNum + "  ");
+        sb.append("Partition: " + partition + "  ");
         sb.append("IDX Name: " + idxName + "  ");
         sb.append("isLSMComponentFile : " + lsmComponentFile + "  ");
         sb.append("Dataverse: " + dataverse);
@@ -185,7 +156,7 @@
         return LSNByteOffset;
     }
 
-    public void setLSNByteOffset(long lSNByteOffset) {
-        LSNByteOffset = lSNByteOffset;
+    public int getPartition() {
+        return partition;
     }
 }
\ No newline at end of file
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
index 3e3043c..b9f7506 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
@@ -38,91 +38,65 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.common.config.AsterixMetadataProperties;
 import org.apache.asterix.common.replication.IReplicaResourcesManager;
+import org.apache.asterix.common.utils.StoragePathUtil;
+import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil;
+import org.apache.asterix.om.util.AsterixClusterProperties;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
+import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.io.IODeviceHandle;
-import org.apache.hyracks.storage.am.common.util.IndexFileNameUtil;
+import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
 import org.apache.hyracks.storage.common.file.LocalResource;
 
 public class ReplicaResourcesManager implements IReplicaResourcesManager {
     private static final Logger LOGGER = Logger.getLogger(ReplicaResourcesManager.class.getName());
-    private final String[] mountPoints;
-    private final int numIODevices;
-    private static final String REPLICA_FOLDER_SUFFIX = "_replica";
-    private final String replicationStorageFolder;
-    public final String localStorageFolder;
-    private final String localNodeID;
     public final static String LSM_COMPONENT_MASK_SUFFIX = "_mask";
     private final static String REPLICA_INDEX_LSN_MAP_NAME = ".LSN_MAP";
     public static final long REPLICA_INDEX_CREATION_LSN = -1;
     private final AtomicLong lastMinRemoteLSN;
+    private final PersistentLocalResourceRepository localRepository;
+    private final Map<String, ClusterPartition[]> nodePartitions;
 
-    public ReplicaResourcesManager(List<IODeviceHandle> devices, String localStorageFolder, String localNodeID,
-            String replicationStorageFolder) throws HyracksDataException {
-        numIODevices = devices.size();
-        this.mountPoints = new String[numIODevices];
-        for (int i = 0; i < numIODevices; i++) {
-            String mountPoint = devices.get(i).getPath().getPath();
-            File mountPointDir = new File(mountPoint);
-            if (!mountPointDir.exists()) {
-                throw new HyracksDataException(mountPointDir.getAbsolutePath() + " doesn't exist.");
-            }
-            if (!mountPoint.endsWith(System.getProperty("file.separator"))) {
-                mountPoints[i] = new String(mountPoint + System.getProperty("file.separator"));
-            } else {
-                mountPoints[i] = new String(mountPoint);
-            }
-        }
-        this.localStorageFolder = localStorageFolder;
-        this.localNodeID = localNodeID;
-        this.replicationStorageFolder = replicationStorageFolder;
+    public ReplicaResourcesManager(ILocalResourceRepository localRepository,
+            AsterixMetadataProperties metadataProperties) {
+        this.localRepository = (PersistentLocalResourceRepository) localRepository;
+        nodePartitions = metadataProperties.getNodePartitions();
         lastMinRemoteLSN = new AtomicLong(-1);
     }
 
-    @Override
-    public String getLocalStorageFolder() {
-        return localStorageFolder;
-    }
-
-    private String getReplicaStorageFolder(String replicaId, int IODeviceNum) {
-        if (replicaId.equals(localNodeID)) {
-            return mountPoints[IODeviceNum] + localStorageFolder;
-        } else {
-            return mountPoints[IODeviceNum] + replicationStorageFolder + File.separator + replicaId
-                    + REPLICA_FOLDER_SUFFIX;
-        }
-    }
-
-    public void deleteRemoteFile(AsterixLSMIndexFileProperties afp) throws IOException {
-        String indexPath = getIndexPath(afp.getNodeId(), afp.getIoDeviceNum(), afp.getDataverse(), afp.getIdxName());
+    public void deleteIndexFile(LSMIndexFileProperties afp) {
+        String indexPath = getIndexPath(afp);
         if (indexPath != null) {
             if (afp.isLSMComponentFile()) {
                 String backupFilePath = indexPath + File.separator + afp.getFileName();
 
                 //delete file
                 File destFile = new File(backupFilePath);
-                if (destFile.exists()) {
-                    destFile.delete();
-                }
+                FileUtils.deleteQuietly(destFile);
             } else {
                 //delete index files
                 indexPath = indexPath.substring(0, indexPath.lastIndexOf(File.separator));
-                AsterixFilesUtil.deleteFolder(indexPath);
+                FileUtils.deleteQuietly(new File(indexPath));
             }
         }
     }
 
-    public String getIndexPath(String replicaId, int IODeviceNum, String dataverse, String dataset) {
-        //mounting point/backupNodeId_replica/Dataverse/Dataset/device_id_#/
-        String remoteIndexFolderPath = getReplicaStorageFolder(replicaId, IODeviceNum) + File.separator + dataverse
-                + File.separator + dataset + File.separator + IndexFileNameUtil.IO_DEVICE_NAME_PREFIX + IODeviceNum;
-        Path path = Paths.get(remoteIndexFolderPath);
+    public String getIndexPath(LSMIndexFileProperties fileProperties) {
+        fileProperties.splitFileName();
+        //get partition path in this node
+        String partitionPath = localRepository.getPartitionPath(fileProperties.getPartition());
+        //get index path
+        String indexPath = SplitsAndConstraintsUtil.getIndexPath(partitionPath, fileProperties.getPartition(),
+                fileProperties.getDataverse(), fileProperties.getIdxName());
+
+        Path path = Paths.get(indexPath);
         if (!Files.exists(path)) {
-            File indexFolder = new File(remoteIndexFolderPath);
+            File indexFolder = new File(indexPath);
             indexFolder.mkdirs();
         }
-        return remoteIndexFolderPath;
+        return indexPath;
     }
 
     public void initializeReplicaIndexLSNMap(String indexPath, long currentLSN) throws IOException {
@@ -144,14 +118,10 @@
         //remove mask to mark component as valid
         String maskPath = lsmComponentProperties.getMaskPath(this);
         Path path = Paths.get(maskPath);
-
-        if (Files.exists(path)) {
-            File maskFile = new File(maskPath);
-            maskFile.delete();
-        }
+        Files.deleteIfExists(path);
 
         //add component LSN to the index LSNs map
-        HashMap<Long, Long> lsnMap = getReplicaIndexLSNMap(lsmComponentProperties.getReplicaComponentPath(this));
+        Map<Long, Long> lsnMap = getReplicaIndexLSNMap(lsmComponentProperties.getReplicaComponentPath(this));
         lsnMap.put(lsmComponentProperties.getOriginalLSN(), lsmComponentProperties.getReplicaLSN());
 
         //update map on disk
@@ -159,73 +129,11 @@
 
     }
 
-    public List<String> getResourcesForReplica(String nodeId) throws HyracksDataException {
-        List<String> resourcesList = new ArrayList<String>();
-        String rootFolder;
-        for (int i = 0; i < numIODevices; i++) {
-            rootFolder = getReplicaStorageFolder(nodeId, i);
-            File rootDirFile = new File(rootFolder);
-            if (!rootDirFile.exists()) {
-                continue;
-            }
-
-            File[] dataverseFileList = rootDirFile.listFiles();
-            for (File dataverseFile : dataverseFileList) {
-                if (dataverseFile.isDirectory()) {
-                    File[] indexFileList = dataverseFile.listFiles();
-                    if (indexFileList != null) {
-                        for (File indexFile : indexFileList) {
-                            if (indexFile.isDirectory()) {
-                                File[] ioDevicesList = indexFile.listFiles();
-                                if (ioDevicesList != null) {
-                                    for (File ioDeviceFile : ioDevicesList) {
-                                        if (ioDeviceFile.isDirectory()) {
-                                            File[] metadataFiles = ioDeviceFile.listFiles(LSM_INDEX_FILES_FILTER);
-                                            if (metadataFiles != null) {
-                                                for (File metadataFile : metadataFiles) {
-                                                    resourcesList.add(metadataFile.getAbsolutePath());
-                                                }
-                                            }
-                                        }
-                                    }
-                                }
-                            }
-                        }
-                    }
-                }
-            }
-        }
-        return resourcesList;
-    }
-
-    public Set<File> getReplicaIndexes(String replicaId) throws HyracksDataException {
+    public Set<File> getReplicaIndexes(String replicaId) {
         Set<File> remoteIndexesPaths = new HashSet<File>();
-        for (int i = 0; i < numIODevices; i++) {
-            String rootReplicaFolder = getReplicaStorageFolder(replicaId, i);
-            File rootDirFile = new File(rootReplicaFolder);
-            if (!rootDirFile.exists()) {
-                continue;
-            }
-            File[] dataverseFileList = rootDirFile.listFiles();
-            for (File dataverseFile : dataverseFileList) {
-                if (dataverseFile.isDirectory()) {
-                    File[] indexFileList = dataverseFile.listFiles();
-                    if (indexFileList != null) {
-                        for (File indexFile : indexFileList) {
-                            if (indexFile.isDirectory()) {
-                                File[] ioDevicesList = indexFile.listFiles();
-                                if (ioDevicesList != null) {
-                                    for (File ioDeviceFile : ioDevicesList) {
-                                        if (ioDeviceFile.isDirectory()) {
-                                            remoteIndexesPaths.add(ioDeviceFile);
-                                        }
-                                    }
-                                }
-                            }
-                        }
-                    }
-                }
-            }
+        ClusterPartition[] partitions = nodePartitions.get(replicaId);
+        for (ClusterPartition partition : partitions) {
+            remoteIndexesPaths.addAll(getPartitionIndexes(partition.getPartitionId()));
         }
         return remoteIndexesPaths;
     }
@@ -237,41 +145,60 @@
         }
         long minRemoteLSN = Long.MAX_VALUE;
         for (String replica : replicaIds) {
-            try {
-                //for every index in replica
-                Set<File> remoteIndexes = getReplicaIndexes(replica);
-                for (File indexFolder : remoteIndexes) {
-                    //read LSN map
-                    try {
-                        //get max LSN per index
-                        long remoteIndexMaxLSN = getReplicaIndexMaxLSN(indexFolder);
+            //for every index in replica
+            Set<File> remoteIndexes = getReplicaIndexes(replica);
+            for (File indexFolder : remoteIndexes) {
+                //read LSN map
+                try {
+                    //get max LSN per index
+                    long remoteIndexMaxLSN = getReplicaIndexMaxLSN(indexFolder);
 
-                        //get min of all maximums
-                        minRemoteLSN = Math.min(minRemoteLSN, remoteIndexMaxLSN);
-                    } catch (IOException e) {
-                        LOGGER.log(Level.INFO, indexFolder.getAbsolutePath() + " Couldn't read LSN map for index "
-                                + indexFolder);
-                        continue;
-                    }
+                    //get min of all maximums
+                    minRemoteLSN = Math.min(minRemoteLSN, remoteIndexMaxLSN);
+                } catch (IOException e) {
+                    LOGGER.log(Level.INFO,
+                            indexFolder.getAbsolutePath() + " Couldn't read LSN map for index " + indexFolder);
+                    continue;
                 }
-            } catch (HyracksDataException e) {
-                e.printStackTrace();
             }
         }
         lastMinRemoteLSN.set(minRemoteLSN);
         return minRemoteLSN;
     }
 
-    public HashMap<Long, String> getLaggingReplicaIndexesId2PathMap(String replicaId, long targetLSN)
-            throws IOException {
-        HashMap<Long, String> laggingReplicaIndexes = new HashMap<Long, String>();
+    @Override
+    public long getPartitionsMinLSN(Integer[] partitions) {
+        long minRemoteLSN = Long.MAX_VALUE;
+        for (Integer partition : partitions) {
+            //for every index in replica
+            Set<File> remoteIndexes = getPartitionIndexes(partition);
+            for (File indexFolder : remoteIndexes) {
+                //read LSN map
+                try {
+                    //get max LSN per index
+                    long remoteIndexMaxLSN = getReplicaIndexMaxLSN(indexFolder);
+
+                    //get min of all maximums
+                    minRemoteLSN = Math.min(minRemoteLSN, remoteIndexMaxLSN);
+                } catch (IOException e) {
+                    LOGGER.log(Level.INFO,
+                            indexFolder.getAbsolutePath() + " Couldn't read LSN map for index " + indexFolder);
+                    continue;
+                }
+            }
+        }
+        return minRemoteLSN;
+    }
+
+    public Map<Long, String> getLaggingReplicaIndexesId2PathMap(String replicaId, long targetLSN) throws IOException {
+        Map<Long, String> laggingReplicaIndexes = new HashMap<Long, String>();
         try {
             //for every index in replica
             Set<File> remoteIndexes = getReplicaIndexes(replicaId);
             for (File indexFolder : remoteIndexes) {
                 if (getReplicaIndexMaxLSN(indexFolder) < targetLSN) {
-                    File localResource = new File(indexFolder + File.separator
-                            + PersistentLocalResourceRepository.METADATA_FILE_NAME);
+                    File localResource = new File(
+                            indexFolder + File.separator + PersistentLocalResourceRepository.METADATA_FILE_NAME);
                     LocalResource resource = PersistentLocalResourceRepository.readLocalResource(localResource);
                     laggingReplicaIndexes.put(resource.getResourceId(), indexFolder.getAbsolutePath());
                 }
@@ -286,7 +213,7 @@
     private long getReplicaIndexMaxLSN(File indexFolder) throws IOException {
         long remoteIndexMaxLSN = 0;
         //get max LSN per index
-        HashMap<Long, Long> lsnMap = getReplicaIndexLSNMap(indexFolder.getAbsolutePath());
+        Map<Long, Long> lsnMap = getReplicaIndexLSNMap(indexFolder.getAbsolutePath());
         if (lsnMap != null) {
             for (Long lsn : lsnMap.values()) {
                 remoteIndexMaxLSN = Math.max(remoteIndexMaxLSN, lsn);
@@ -296,7 +223,7 @@
         return remoteIndexMaxLSN;
     }
 
-    public void cleanInvalidLSMComponents(String replicaId) throws HyracksDataException {
+    public void cleanInvalidLSMComponents(String replicaId) {
         //for every index in replica
         Set<File> remoteIndexes = getReplicaIndexes(replicaId);
         for (File remoteIndexFile : remoteIndexes) {
@@ -312,7 +239,7 @@
         }
     }
 
-    private void deleteLSMComponentFilesForMask(File maskFile) {
+    private static void deleteLSMComponentFilesForMask(File maskFile) {
         String lsmComponentTimeStamp = maskFile.getName().substring(0,
                 maskFile.getName().length() - LSM_COMPONENT_MASK_SUFFIX.length());
         File indexFolder = maskFile.getParentFile();
@@ -325,78 +252,92 @@
         }
     }
 
-    @SuppressWarnings("unchecked")
-    public synchronized HashMap<Long, Long> getReplicaIndexLSNMap(String indexPath) throws IOException {
-        FileInputStream fis = null;
-        ObjectInputStream oisFromFis = null;
-        try {
-            fis = new FileInputStream(indexPath + File.separator + REPLICA_INDEX_LSN_MAP_NAME);
-            oisFromFis = new ObjectInputStream(fis);
+    @SuppressWarnings({ "unchecked" })
+    public synchronized Map<Long, Long> getReplicaIndexLSNMap(String indexPath) throws IOException {
+        try (FileInputStream fis = new FileInputStream(indexPath + File.separator + REPLICA_INDEX_LSN_MAP_NAME);
+                ObjectInputStream oisFromFis = new ObjectInputStream(fis)) {
             Map<Long, Long> lsnMap = null;
             try {
                 lsnMap = (Map<Long, Long>) oisFromFis.readObject();
             } catch (ClassNotFoundException e) {
                 e.printStackTrace();
             }
-            return (HashMap<Long, Long>) lsnMap;
-        } finally {
-            if (oisFromFis != null) {
-                oisFromFis.close();
-            }
-            if (oisFromFis == null && fis != null) {
-                fis.close();
-            }
+            return lsnMap;
         }
     }
 
-    public synchronized void updateReplicaIndexLSNMap(String indexPath, HashMap<Long, Long> lsnMap) throws IOException {
-        FileOutputStream fos = null;
-        ObjectOutputStream oosToFos = null;
-        try {
-            fos = new FileOutputStream(indexPath + File.separator + REPLICA_INDEX_LSN_MAP_NAME);
-            oosToFos = new ObjectOutputStream(fos);
+    public synchronized void updateReplicaIndexLSNMap(String indexPath, Map<Long, Long> lsnMap) throws IOException {
+        try (FileOutputStream fos = new FileOutputStream(indexPath + File.separator + REPLICA_INDEX_LSN_MAP_NAME);
+                ObjectOutputStream oosToFos = new ObjectOutputStream(fos)) {
             oosToFos.writeObject(lsnMap);
             oosToFos.flush();
             lastMinRemoteLSN.set(-1);
-        } finally {
-            if (oosToFos != null) {
-                oosToFos.close();
-            }
-            if (oosToFos == null && fos != null) {
-                fos.close();
+        }
+    }
+
+    /**
+     * @param partition
+     * @return Set of file references to each index in the partition
+     */
+    public Set<File> getPartitionIndexes(int partition) {
+        Set<File> partitionIndexes = new HashSet<File>();
+        String storageDirName = AsterixClusterProperties.INSTANCE.getStorageDirectoryName();
+        String partitionStoragePath = localRepository.getPartitionPath(partition)
+                + StoragePathUtil.prepareStoragePartitionPath(storageDirName, partition);
+        File partitionRoot = new File(partitionStoragePath);
+        if (partitionRoot.exists() && partitionRoot.isDirectory()) {
+            File[] dataverseFileList = partitionRoot.listFiles();
+            if (dataverseFileList != null) {
+                for (File dataverseFile : dataverseFileList) {
+                    if (dataverseFile.isDirectory()) {
+                        File[] indexFileList = dataverseFile.listFiles();
+                        if (indexFileList != null) {
+                            for (File indexFile : indexFileList) {
+                                partitionIndexes.add(indexFile);
+                            }
+                        }
+                    }
+                }
             }
         }
+        return partitionIndexes;
+    }
+
+    /**
+     * @param partition
+     * @return Absolute paths to all partition files
+     */
+    public List<String> getPartitionIndexesFiles(int partition) {
+        List<String> partitionFiles = new ArrayList<String>();
+        Set<File> partitionIndexes = getPartitionIndexes(partition);
+        for (File indexDir : partitionIndexes) {
+            if (indexDir.isDirectory()) {
+                File[] indexFiles = indexDir.listFiles(LSM_INDEX_FILES_FILTER);
+                if (indexFiles != null) {
+                    for (File file : indexFiles) {
+                        partitionFiles.add(file.getAbsolutePath());
+                    }
+                }
+            }
+        }
+        return partitionFiles;
     }
 
     private static final FilenameFilter LSM_COMPONENTS_MASKS_FILTER = new FilenameFilter() {
         public boolean accept(File dir, String name) {
-            if (name.endsWith(LSM_COMPONENT_MASK_SUFFIX)) {
-                return true;
-            } else {
-                return false;
-            }
+            return name.endsWith(LSM_COMPONENT_MASK_SUFFIX);
         }
     };
 
     private static final FilenameFilter LSM_COMPONENTS_NON_MASKS_FILTER = new FilenameFilter() {
         public boolean accept(File dir, String name) {
-            if (!name.endsWith(LSM_COMPONENT_MASK_SUFFIX)) {
-                return true;
-            } else {
-                return false;
-            }
+            return !name.endsWith(LSM_COMPONENT_MASK_SUFFIX);
         }
     };
 
     private static final FilenameFilter LSM_INDEX_FILES_FILTER = new FilenameFilter() {
         public boolean accept(File dir, String name) {
-            if (name.equalsIgnoreCase(PersistentLocalResourceRepository.METADATA_FILE_NAME)) {
-                return true;
-            } else if (!name.startsWith(".")) {
-                return true;
-            } else {
-                return false;
-            }
+            return name.equalsIgnoreCase(PersistentLocalResourceRepository.METADATA_FILE_NAME) || !name.startsWith(".");
         }
     };
 }
\ No newline at end of file
diff --git a/asterix-replication/src/test/resources/data/fbu.adm b/asterix-replication/src/test/resources/data/fbu.adm
new file mode 100644
index 0000000..7e99ea4
--- /dev/null
+++ b/asterix-replication/src/test/resources/data/fbu.adm
@@ -0,0 +1,10 @@
+{"id":1,"alias":"Margarita","name":"MargaritaStoddard","user-since":datetime("2012-08-20T10:10:00"),"friend-ids":{{2,3,6,10}},"employment":[{"organization-name":"Codetechno","start-date":date("2006-08-06")}]}
+{"id":2,"alias":"Isbel","name":"IsbelDull","user-since":datetime("2011-01-22T10:10:00"),"friend-ids":{{1,4}},"employment":[{"organization-name":"Hexviafind","start-date":date("2010-04-27")}]}
+{"id":3,"alias":"Emory","name":"EmoryUnk","user-since":datetime("2012-07-10T10:10:00"),"friend-ids":{{1,5,8,9}},"employment":[{"organization-name":"geomedia","start-date":date("2010-06-17"),"end-date":date("2010-01-26")}]}
+{"id":4,"alias":"Nicholas","name":"NicholasStroh","user-since":datetime("2010-12-27T10:10:00"),"friend-ids":{{2}},"employment":[{"organization-name":"Zamcorporation","start-date":date("2010-06-08")}]}
+{"id":5,"alias":"Von","name":"VonKemble","user-since":datetime("2010-01-05T10:10:00"),"friend-ids":{{3,6,10}},"employment":[{"organization-name":"Kongreen","start-date":date("2010-11-27")}]}
+{"id":6,"alias":"Willis","name":"WillisWynne","user-since":datetime("2005-01-17T10:10:00"),"friend-ids":{{1,3,7}},"employment":[{"organization-name":"jaydax","start-date":date("2009-05-15")}]}
+{"id":7,"alias":"Suzanna","name":"SuzannaTillson","user-since":datetime("2012-08-07T10:10:00"),"friend-ids":{{6}},"employment":[{"organization-name":"Labzatron","start-date":date("2011-04-19")}]}
+{"id":8,"alias":"Nila","name":"NilaMilliron","user-since":datetime("2008-01-01T10:10:00"),"friend-ids":{{3}},"employment":[{"organization-name":"Plexlane","start-date":date("2010-02-28")}]}
+{"id":9,"alias":"Woodrow","name":"WoodrowNehling","user-since":datetime("2005-09-20T10:10:00"),"friend-ids":{{3,10}},"employment":[{"organization-name":"Zuncan","start-date":date("2003-04-22"),"end-date":date("2009-12-13")}]}
+{"id":10,"alias":"Bram","name":"BramHatch","user-since":datetime("2010-10-16T10:10:00"),"friend-ids":{{1,5,9}},"employment":[{"organization-name":"physcane","start-date":date("2007-06-05"),"end-date":date("2011-11-05")}]}
diff --git a/asterix-replication/src/test/resources/scripts/delete_storage.sh b/asterix-replication/src/test/resources/scripts/delete_storage.sh
new file mode 100755
index 0000000..129030b
--- /dev/null
+++ b/asterix-replication/src/test/resources/scripts/delete_storage.sh
@@ -0,0 +1,18 @@
+#!/usr/bin/env bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+rm -rf ~/asterix/*
diff --git a/asterix-replication/src/test/resources/scripts/kill_cc_and_nc.sh b/asterix-replication/src/test/resources/scripts/kill_cc_and_nc.sh
new file mode 100755
index 0000000..2582713
--- /dev/null
+++ b/asterix-replication/src/test/resources/scripts/kill_cc_and_nc.sh
@@ -0,0 +1,18 @@
+#!/usr/bin/env bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+jps | awk '{if ($2 == "NCDriver" || $2 == "CCDriver") print $1;}' | xargs -n 1 kill -9
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 52fd806..9f59148 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -32,9 +32,12 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.SortedMap;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.common.config.AsterixMetadataProperties;
 import org.apache.asterix.common.replication.AsterixReplicationJob;
 import org.apache.asterix.common.replication.IReplicationManager;
 import org.apache.commons.io.FileUtils;
@@ -63,10 +66,13 @@
     private IReplicationManager replicationManager;
     private boolean isReplicationEnabled = false;
     private Set<String> filesToBeReplicated;
+    private final SortedMap<Integer, ClusterPartition> clusterPartitions;
 
-    public PersistentLocalResourceRepository(List<IODeviceHandle> devices, String nodeId) throws HyracksDataException {
+    public PersistentLocalResourceRepository(List<IODeviceHandle> devices, String nodeId,
+            AsterixMetadataProperties metadataProperties) throws HyracksDataException {
         mountPoints = new String[devices.size()];
         this.nodeId = nodeId;
+        this.clusterPartitions = metadataProperties.getClusterPartitions();
         for (int i = 0; i < mountPoints.length; i++) {
             String mountPoint = devices.get(i).getPath().getPath();
             File mountPointDir = new File(mountPoint);
@@ -156,37 +162,18 @@
             resourceCache.put(resource.getResourcePath(), resource);
         }
 
-        FileOutputStream fos = null;
-        ObjectOutputStream oosToFos = null;
-
-        try {
-            fos = new FileOutputStream(resourceFile);
-            oosToFos = new ObjectOutputStream(fos);
+        try (FileOutputStream fos = new FileOutputStream(resourceFile);
+                ObjectOutputStream oosToFos = new ObjectOutputStream(fos)) {
             oosToFos.writeObject(resource);
             oosToFos.flush();
         } catch (IOException e) {
             throw new HyracksDataException(e);
-        } finally {
-            if (oosToFos != null) {
-                try {
-                    oosToFos.close();
-                } catch (IOException e) {
-                    throw new HyracksDataException(e);
-                }
-            }
-            if (oosToFos == null && fos != null) {
-                try {
-                    fos.close();
-                } catch (IOException e) {
-                    throw new HyracksDataException(e);
-                }
-            }
+        }
 
-            //if replication enabled, send resource metadata info to remote nodes
-            if (isReplicationEnabled && resource.getResourceId() != STORAGE_LOCAL_RESOURCE_ID) {
-                String filePath = getFileName(resource.getResourcePath(), resource.getResourceId());
-                createReplicationJob(ReplicationOperation.REPLICATE, filePath);
-            }
+        //if replication enabled, send resource metadata info to remote nodes
+        if (isReplicationEnabled && resource.getResourceId() != STORAGE_LOCAL_RESOURCE_ID) {
+            String filePath = getFileName(resource.getResourcePath(), resource.getResourceId());
+            createReplicationJob(ReplicationOperation.REPLICATE, filePath);
         }
     }
 
@@ -304,31 +291,12 @@
     }
 
     public static LocalResource readLocalResource(File file) throws HyracksDataException {
-        FileInputStream fis = null;
-        ObjectInputStream oisFromFis = null;
-
-        try {
-            fis = new FileInputStream(file);
-            oisFromFis = new ObjectInputStream(fis);
+        try (FileInputStream fis = new FileInputStream(file);
+                ObjectInputStream oisFromFis = new ObjectInputStream(fis)) {
             LocalResource resource = (LocalResource) oisFromFis.readObject();
             return resource;
         } catch (Exception e) {
             throw new HyracksDataException(e);
-        } finally {
-            if (oisFromFis != null) {
-                try {
-                    oisFromFis.close();
-                } catch (IOException e) {
-                    throw new HyracksDataException(e);
-                }
-            }
-            if (oisFromFis == null && fis != null) {
-                try {
-                    fis.close();
-                } catch (IOException e) {
-                    throw new HyracksDataException(e);
-                }
-            }
         }
     }
 
@@ -427,4 +395,13 @@
         }
         return storageRootDir;
     }
+
+    /**
+     * @param partition
+     * @return The partition local path on this NC.
+     */
+    public String getPartitionPath(int partition) {
+        //currently each partition is replicated on the same IO device number on all NCs.
+        return mountPoints[clusterPartitions.get(partition).getIODeviceNum()];
+    }
 }
\ No newline at end of file
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java
index b6bb7dc..e79a0d3 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.transaction.management.resource;
 
+import org.apache.asterix.common.config.AsterixMetadataProperties;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
@@ -26,14 +27,17 @@
 public class PersistentLocalResourceRepositoryFactory implements ILocalResourceRepositoryFactory {
     private final IIOManager ioManager;
     private final String nodeId;
+    private final AsterixMetadataProperties metadataProperties;
 
-    public PersistentLocalResourceRepositoryFactory(IIOManager ioManager, String nodeId) {
+    public PersistentLocalResourceRepositoryFactory(IIOManager ioManager, String nodeId,
+            AsterixMetadataProperties metadataProperties) {
         this.ioManager = ioManager;
         this.nodeId = nodeId;
+        this.metadataProperties = metadataProperties;
     }
 
     @Override
     public ILocalResourceRepository createRepository() throws HyracksDataException {
-        return new PersistentLocalResourceRepository(ioManager.getIODevices(), nodeId);
+        return new PersistentLocalResourceRepository(ioManager.getIODevices(), nodeId, metadataProperties);
     }
 }
\ No newline at end of file
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
index 701e529..8fe75f2 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -44,13 +44,11 @@
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.SortedMap;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.api.ILocalResourceMetadata;
-import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.config.IAsterixPropertiesProvider;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
@@ -72,14 +70,11 @@
 import org.apache.asterix.transaction.management.service.transaction.TransactionSubsystem;
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
-import org.apache.hyracks.control.nc.io.IOManager;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.common.api.IIndex;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
-import org.apache.hyracks.storage.am.common.util.IndexFileNameUtil;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
@@ -300,7 +295,6 @@
             //get datasetLifeCycleManager
             IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
                     .getDatasetLifecycleManager();
-            IIOManager ioManager = appRuntimeContext.getIOManager();
             ILocalResourceRepository localResourceRepository = appRuntimeContext.getLocalResourceRepository();
             Map<Long, LocalResource> resourcesMap = ((PersistentLocalResourceRepository) localResourceRepository)
                     .loadAndGetAllResources();
@@ -507,13 +501,10 @@
 
         //#. get indexLifeCycleManager 
         IAsterixAppRuntimeContextProvider appRuntimeContext = txnSubsystem.getAsterixAppRuntimeContextProvider();
-        IIOManager ioManager = appRuntimeContext.getIOManager();
-        SortedMap<Integer, ClusterPartition> clusterPartitions = ((IAsterixPropertiesProvider) appRuntimeContext
-                .getAppContext()).getMetadataProperties().getClusterPartitions();
         IDatasetLifecycleManager datasetLifecycleManager = appRuntimeContext.getDatasetLifecycleManager();
-        ILocalResourceRepository localResourceRepository = appRuntimeContext.getLocalResourceRepository();
-        Map<Long, LocalResource> resourcesMap = ((PersistentLocalResourceRepository) localResourceRepository)
-                .loadAndGetAllResources();
+        PersistentLocalResourceRepository localResourceRepository = (PersistentLocalResourceRepository) appRuntimeContext
+                .getLocalResourceRepository();
+        Map<Long, LocalResource> resourcesMap = localResourceRepository.loadAndGetAllResources();
         //#. set log reader to the lowWaterMarkLsn again.
         for (int i = 0; i < remoteLogs.size(); i++) {
             logRecord = remoteLogs.get(i);
@@ -561,16 +552,11 @@
                             //get index instance from IndexLifeCycleManager
                             //if index is not registered into IndexLifeCycleManager,
                             //create the index using LocalMetadata stored in LocalResourceRepository
-                            //get the resource path relative to this node
-                            int resourcePartition = localResource.getPartition();
-                            //get partition io device id
-                            //NOTE:
-                            //currently we store all partition in the same IO device in all nodes. If this changes,
-                            //this needs to be updated to find the IO device in which the partition is stored in this local node.
-                            int ioDevice = clusterPartitions.get(resourcePartition).getIODeviceNum();
-                            String resourceAbsolutePath = ioManager
-                                    .getAbsoluteFileRef(ioDevice, localResource.getResourceName()).getFile()
-                                    .getAbsolutePath();
+                            //get partition path in this node
+                            String partitionIODevicePath = localResourceRepository
+                                    .getPartitionPath(localResource.getPartition());
+                            String resourceAbsolutePath = partitionIODevicePath + File.separator
+                                    + localResource.getResourceName();
                             localResource.setResourcePath(resourceAbsolutePath);
                             index = (ILSMIndex) datasetLifecycleManager.getIndex(resourceAbsolutePath);
                             if (index == null) {
@@ -578,8 +564,8 @@
                                 localResourceMetadata = (ILocalResourceMetadata) localResource.getResourceObject();
                                 index = localResourceMetadata.createIndexInstance(appRuntimeContext,
                                         resourceAbsolutePath, localResource.getPartition());
-                                datasetLifecycleManager.register(localResource.getResourceName(), index);
-                                datasetLifecycleManager.open(localResource.getResourceName());
+                                datasetLifecycleManager.register(resourceAbsolutePath, index);
+                                datasetLifecycleManager.open(resourceAbsolutePath);
 
                                 //#. get maxDiskLastLSN
                                 ILSMIndex lsmIndex = index;
@@ -1099,6 +1085,242 @@
         }
     }
 
+    //TODO (mhubail) RecoveryManager has three methods that perform logs REDO based on different parameters.
+    //They need to be refactored to use partitions only once the log format includes partition id.
+    @Override
+    public synchronized void replayPartitionsLogs(Integer[] partitions, long lowWaterMarkLSN, String failedNode)
+            throws IOException, ACIDException {
+        //delete any recovery files from previous failed recovery attempts
+        deleteRecoveryTemporaryFiles();
+
+        int updateLogCount = 0;
+        int entityCommitLogCount = 0;
+        int jobCommitLogCount = 0;
+        int redoCount = 0;
+        int abortLogCount = 0;
+        int jobId = -1;
+
+        state = SystemState.RECOVERING;
+        LOGGER.log(Level.INFO, "[RecoveryMgr] starting recovery ...");
+
+        Set<Integer> winnerJobSet = new HashSet<Integer>();
+        jobId2WinnerEntitiesMap = new HashMap<>();
+
+        TxnId tempKeyTxnId = new TxnId(-1, -1, -1, null, -1, false);
+        JobEntityCommits jobEntityWinners = null;
+        //#. read checkpoint file and set lowWaterMark where anaylsis and redo start
+        long readableSmallestLSN = logMgr.getReadableSmallestLSN();
+        if (lowWaterMarkLSN < readableSmallestLSN) {
+            lowWaterMarkLSN = readableSmallestLSN;
+        }
+        //-------------------------------------------------------------------------
+        //  [ analysis phase ]
+        //  - collect all committed Lsn
+        //-------------------------------------------------------------------------
+        LOGGER.log(Level.INFO, "[RecoveryMgr] in analysis phase");
+        IAsterixAppRuntimeContextProvider appRuntimeContext = txnSubsystem.getAsterixAppRuntimeContextProvider();
+        //get datasetLifeCycleManager
+        IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
+                .getDatasetLifecycleManager();
+        PersistentLocalResourceRepository localResourceRepository = (PersistentLocalResourceRepository) appRuntimeContext
+                .getLocalResourceRepository();
+        Map<Long, LocalResource> resourcesMap = localResourceRepository.loadAndGetAllResources();
+        Map<Long, Long> resourceId2MaxLSNMap = new HashMap<Long, Long>();
+
+        //#. set log reader to the lowWaterMarkLsn
+        ILogReader logReader = logMgr.getLogReader(true);
+        ILogRecord logRecord = null;
+        try {
+            logReader.initializeScan(lowWaterMarkLSN);
+            logRecord = logReader.next();
+            while (logRecord != null) {
+                if (IS_DEBUG_MODE) {
+                    LOGGER.info(logRecord.getLogRecordForDisplay());
+                }
+                //TODO update this partitions once the log format is updated to include partitons
+                if (logRecord.getNodeId().equals(failedNode)) {
+                    switch (logRecord.getLogType()) {
+                        case LogType.UPDATE:
+                            updateLogCount++;
+                            break;
+                        case LogType.JOB_COMMIT:
+                            jobId = logRecord.getJobId();
+                            winnerJobSet.add(jobId);
+                            if (jobId2WinnerEntitiesMap.containsKey(jobId)) {
+                                jobEntityWinners = jobId2WinnerEntitiesMap.get(jobId);
+                                //to delete any spilled files as well
+                                jobEntityWinners.clear();
+                                jobId2WinnerEntitiesMap.remove(jobId);
+                            }
+                            jobCommitLogCount++;
+                            break;
+                        case LogType.ENTITY_COMMIT:
+                            jobId = logRecord.getJobId();
+                            if (!jobId2WinnerEntitiesMap.containsKey(jobId)) {
+                                jobEntityWinners = new JobEntityCommits(jobId);
+                                if (needToFreeMemory()) {
+                                    //if we don't have enough memory for one more job, we will force all jobs to spill their cached entities to disk.
+                                    //This could happen only when we have many jobs with small number of records and none of them have job commit.
+                                    freeJobsCachedEntities(jobId);
+                                }
+                                jobId2WinnerEntitiesMap.put(jobId, jobEntityWinners);
+                            } else {
+                                jobEntityWinners = jobId2WinnerEntitiesMap.get(jobId);
+                            }
+                            jobEntityWinners.add(logRecord);
+                            entityCommitLogCount++;
+                            break;
+                        case LogType.ABORT:
+                            abortLogCount++;
+                            break;
+                        case LogType.FLUSH:
+                            break;
+                        default:
+                            throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
+                    }
+                }
+                logRecord = logReader.next();
+            }
+
+            //prepare winners for search after analysis is done to flush anything remaining in memory to disk.
+            for (JobEntityCommits winners : jobId2WinnerEntitiesMap.values()) {
+                winners.prepareForSearch();
+            }
+            //-------------------------------------------------------------------------
+            //  [ redo phase ]
+            //  - redo if
+            //    1) The TxnId is committed && --> guarantee durability
+            //    2) lsn > maxDiskLastLsn of the index --> guarantee idempotence
+            //-------------------------------------------------------------------------
+            LOGGER.info("[RecoveryMgr] in redo phase");
+
+            long resourceId;
+            long maxDiskLastLsn;
+            long LSN = -1;
+            ILSMIndex index = null;
+            LocalResource localResource = null;
+            ILocalResourceMetadata localResourceMetadata = null;
+            boolean foundWinner = false;
+            //set log reader to the lowWaterMarkLsn again.
+            logReader.initializeScan(lowWaterMarkLSN);
+            logRecord = logReader.next();
+            while (logRecord != null) {
+                if (IS_DEBUG_MODE) {
+                    LOGGER.info(logRecord.getLogRecordForDisplay());
+                }
+                //TODO update this to check for partitions instead of node id once the log format is updated to include partitions
+                if (logRecord.getNodeId().equals(failedNode)) {
+                    LSN = logRecord.getLSN();
+                    jobId = logRecord.getJobId();
+                    foundWinner = false;
+                    switch (logRecord.getLogType()) {
+                        case LogType.UPDATE:
+                            if (winnerJobSet.contains(jobId)) {
+                                foundWinner = true;
+                            } else if (jobId2WinnerEntitiesMap.containsKey(jobId)) {
+                                jobEntityWinners = jobId2WinnerEntitiesMap.get(jobId);
+                                tempKeyTxnId.setTxnId(jobId, logRecord.getDatasetId(), logRecord.getPKHashValue(),
+                                        logRecord.getPKValue(), logRecord.getPKValueSize());
+                                if (jobEntityWinners.containsEntityCommitForTxnId(LSN, tempKeyTxnId)) {
+                                    foundWinner = true;
+                                }
+                            }
+                            if (foundWinner) {
+                                resourceId = logRecord.getResourceId();
+                                localResource = resourcesMap.get(resourceId);
+                                /*******************************************************************
+                                 * [Notice]
+                                 * -> Issue
+                                 * Delete index may cause a problem during redo.
+                                 * The index operation to be redone couldn't be redone because the corresponding index
+                                 * may not exist in NC due to the possible index drop DDL operation.
+                                 * -> Approach
+                                 * Avoid the problem during redo.
+                                 * More specifically, the problem will be detected when the localResource of
+                                 * the corresponding index is retrieved, which will end up with 'null'.
+                                 * If null is returned, then just go and process the next
+                                 * log record.
+                                 *******************************************************************/
+                                if (localResource == null) {
+                                    logRecord = logReader.next();
+                                    continue;
+                                }
+                                /*******************************************************************/
+
+                                //get index instance from IndexLifeCycleManager
+                                //if index is not registered into IndexLifeCycleManager,
+                                //create the index using LocalMetadata stored in LocalResourceRepository
+                                //get partition path in this node
+                                String partitionIODevicePath = localResourceRepository
+                                        .getPartitionPath(localResource.getPartition());
+                                String resourceAbsolutePath = partitionIODevicePath + File.separator
+                                        + localResource.getResourceName();
+                                localResource.setResourcePath(resourceAbsolutePath);
+                                index = (ILSMIndex) datasetLifecycleManager.getIndex(resourceAbsolutePath);
+                                if (index == null) {
+                                    //#. create index instance and register to indexLifeCycleManager
+                                    localResourceMetadata = (ILocalResourceMetadata) localResource.getResourceObject();
+                                    index = localResourceMetadata.createIndexInstance(appRuntimeContext,
+                                            resourceAbsolutePath, localResource.getPartition());
+                                    datasetLifecycleManager.register(resourceAbsolutePath, index);
+                                    datasetLifecycleManager.open(resourceAbsolutePath);
+
+                                    //#. get maxDiskLastLSN
+                                    ILSMIndex lsmIndex = index;
+                                    try {
+                                        maxDiskLastLsn = ((AbstractLSMIOOperationCallback) lsmIndex
+                                                .getIOOperationCallback())
+                                                        .getComponentLSN(lsmIndex.getImmutableComponents());
+                                    } catch (HyracksDataException e) {
+                                        datasetLifecycleManager.close(resourceAbsolutePath);
+                                        throw e;
+                                    }
+
+                                    //#. set resourceId and maxDiskLastLSN to the map
+                                    resourceId2MaxLSNMap.put(resourceId, maxDiskLastLsn);
+                                } else {
+                                    maxDiskLastLsn = resourceId2MaxLSNMap.get(resourceId);
+                                }
+
+                                if (LSN > maxDiskLastLsn) {
+                                    redo(logRecord, datasetLifecycleManager);
+                                    redoCount++;
+                                }
+                            }
+                            break;
+                        case LogType.JOB_COMMIT:
+                        case LogType.ENTITY_COMMIT:
+                        case LogType.ABORT:
+                        case LogType.FLUSH:
+                            //do nothing
+                            break;
+                        default:
+                            throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
+                    }
+                }
+                logRecord = logReader.next();
+            }
+
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("[RecoveryMgr] recovery is completed.");
+                LOGGER.info("[RecoveryMgr's recovery log count] update/entityCommit/jobCommit/abort/redo = "
+                        + updateLogCount + "/" + entityCommitLogCount + "/" + jobCommitLogCount + "/" + abortLogCount
+                        + "/" + redoCount);
+            }
+        } finally {
+            logReader.close();
+
+            //close all indexes
+            Set<Long> resourceIdList = resourceId2MaxLSNMap.keySet();
+            for (long r : resourceIdList) {
+                datasetLifecycleManager.close(resourcesMap.get(r).getResourcePath());
+            }
+
+            //delete any recovery files after completing recovery
+            deleteRecoveryTemporaryFiles();
+        }
+    }
+
     private class JobEntityCommits {
         private static final String PARTITION_FILE_NAME_SEPARATOR = "_";
         private final int jobId;
@@ -1145,7 +1367,7 @@
 
         /**
          * Call this method when no more entity commits will be added to this job.
-         * 
+         *
          * @throws IOException
          */
         public void prepareForSearch() throws IOException {
@@ -1182,7 +1404,6 @@
          */
         public ArrayList<File> getCandidiatePartitions(long logLSN) {
             ArrayList<File> candidiatePartitions = new ArrayList<File>();
-
             for (File partition : jobEntitCommitOnDiskPartitionsFiles) {
                 String partitionName = partition.getName();
                 //entity commit log must come after the update log, therefore, consider only partitions with max LSN > logLSN