Introducing Data Replication To AsterixDB

This change includes the following:
- Add data replication properties to cluster properties and Managix validate command.
- Introduce Data Replication components.
- Add data replication required fields to LogRecord.
- Specialized LogManager for data replication.
- Fix for invalid cluster state on nodes failure.
- ASTERIXDB-139: Fix for cleaning workspace files on startup/shutdown.
- Fix for temp datasets storage reclamation.
- Allow MetadataNode rebinding with CC.
- Add flag to checkpoint to identify sharp checkpoints.
- ASTERIXDB-1170: Fix shutdown sequence

Change-Id: I729fdd1144dbc9ff039b4bc414494860d7553810
Reviewed-on: https://asterix-gerrit.ics.uci.edu/338
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Young-Seok Kim <kisskys@gmail.com>
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java b/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
index f86ed8a..63851bf 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
@@ -25,6 +25,10 @@
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.feeds.api.IFeedManager;
+import org.apache.asterix.common.replication.IRemoteRecoveryManager;
+import org.apache.asterix.common.replication.IReplicaResourcesManager;
+import org.apache.asterix.common.replication.IReplicationChannel;
+import org.apache.asterix.common.replication.IReplicationManager;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IIOManager;
@@ -76,4 +80,14 @@
     public List<IVirtualBufferCache> getVirtualBufferCaches(int datasetID);
 
     public IFeedManager getFeedManager();
+
+    public IRemoteRecoveryManager getRemoteRecoveryManager();
+
+    public IReplicaResourcesManager getReplicaResourcesManager();
+
+    public IReplicationManager getReplicationManager();
+
+    public IReplicationChannel getReplicationChannel();
+
+    public void initializeResourceIdFactory() throws HyracksDataException;
 }
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java b/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
index e1e6d96..803e708 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
@@ -21,6 +21,7 @@
 import java.util.List;
 
 import org.apache.asterix.common.context.DatasetLifecycleManager.DatasetInfo;
+import org.apache.asterix.common.context.DatasetLifecycleManager.IndexInfo;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.common.api.IIndex;
 import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
@@ -38,12 +39,14 @@
 
     /**
      * Flushes all open datasets synchronously.
+     * 
      * @throws HyracksDataException
      */
     void flushAllDatasets() throws HyracksDataException;
 
     /**
      * Schedules asynchronous flush on datasets that have memory components with first LSN < nonSharpCheckpointTargetLSN.
+     * 
      * @param nonSharpCheckpointTargetLSN
      * @throws HyracksDataException
      */
@@ -51,6 +54,7 @@
 
     /**
      * creates (if necessary) and returns the dataset info.
+     * 
      * @param datasetID
      * @return
      */
@@ -67,6 +71,7 @@
 
     /**
      * creates (if necessary) and returns the primary index operation tracker of a dataset.
+     * 
      * @param datasetID
      * @return
      */
@@ -74,8 +79,19 @@
 
     /**
      * creates (if necessary) and returns the dataset virtual buffer caches.
+     * 
      * @param datasetID
      * @return
      */
     List<IVirtualBufferCache> getVirtualBufferCaches(int datasetID);
+
+    /**
+     * Flushes then closes all open datasets
+     */
+    void closeAllDatasets() throws HyracksDataException;
+
+    /**
+     * @return a list of all indexes that are open at the time of the call.
+     */
+    List<IndexInfo> getOpenIndexesInfo();
 }
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
new file mode 100644
index 0000000..1ef7e3e
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.config;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.replication.Replica;
+import org.apache.asterix.event.schema.cluster.Cluster;
+import org.apache.asterix.event.schema.cluster.Node;
+
+public class AsterixReplicationProperties extends AbstractAsterixProperties {
+
+    private static final Logger LOGGER = Logger.getLogger(AsterixReplicationProperties.class.getName());
+
+    private static int REPLICATION_DATAPORT_DEFAULT = 2000;
+    private static int REPLICATION_FACTOR_DEFAULT = 1;
+    private static int REPLICATION_TIME_OUT_DEFAULT = 15;
+
+    private static final String NODE_IP_ADDRESS_DEFAULT = "127.0.0.1";
+    private static final String REPLICATION_STORE_DEFAULT = "asterix-replication";
+    private final String NODE_NAME_PREFIX;
+    private final Cluster cluster;
+
+    public AsterixReplicationProperties(AsterixPropertiesAccessor accessor, Cluster cluster) {
+        super(accessor);
+        this.cluster = cluster;
+
+        if (cluster != null) {
+            NODE_NAME_PREFIX = cluster.getInstanceName() + "_";
+        } else {
+            NODE_NAME_PREFIX = "";
+        }
+    }
+
+    public boolean isReplicationEnabled() {
+        if (cluster != null && cluster.getDataReplication() != null) {
+            if (getReplicationFactor() == 1) {
+                return false;
+            }
+
+            return cluster.getDataReplication().isEnabled();
+
+        } else {
+            return false;
+        }
+    }
+
+    public String getReplicaIPAddress(String nodeId) {
+        if (cluster != null) {
+
+            for (int i = 0; i < cluster.getNode().size(); i++) {
+                Node node = cluster.getNode().get(i);
+                if (getRealCluserNodeID(node.getId()).equals(nodeId)) {
+                    return node.getClusterIp();
+                }
+            }
+        }
+        return NODE_IP_ADDRESS_DEFAULT;
+    }
+
+    public int getDataReplicationPort(String nodeId) {
+        if (cluster != null) {
+            return cluster.getDataReplication().getReplicationPort().intValue();
+        }
+
+        return REPLICATION_DATAPORT_DEFAULT;
+    }
+
+    public Set<Replica> getRemoteReplicas(String nodeId) {
+        Set<Replica> remoteReplicas = new HashSet<Replica>();;
+
+        int numberOfRemoteReplicas = getReplicationFactor() - 1;
+
+        //Using chained-declustering
+        if (cluster != null) {
+            int nodeIndex = -1;
+            for (int i = 0; i < cluster.getNode().size(); i++) {
+                Node node = cluster.getNode().get(i);
+                if (getRealCluserNodeID(node.getId()).equals(nodeId)) {
+                    nodeIndex = i;
+                    break;
+                }
+            }
+
+            if (nodeIndex == -1) {
+                LOGGER.log(Level.WARNING, "Could not find node " + getRealCluserNodeID(nodeId)
+                        + " in cluster configurations");
+                return null;
+            }
+
+            for (int i = nodeIndex + 1; i < cluster.getNode().size(); i++) {
+                remoteReplicas.add(getReplicaByNodeIndex(i));
+
+                if (remoteReplicas.size() == numberOfRemoteReplicas) {
+                    break;
+                }
+            }
+
+            if (remoteReplicas.size() != numberOfRemoteReplicas) {
+                for (int i = 0; i < cluster.getNode().size(); i++) {
+
+                    remoteReplicas.add(getReplicaByNodeIndex(i));
+
+                    if (remoteReplicas.size() == numberOfRemoteReplicas) {
+                        break;
+                    }
+                }
+            }
+        }
+        return remoteReplicas;
+    }
+
+    private Replica getReplicaByNodeIndex(int nodeIndex) {
+        Node node = cluster.getNode().get(nodeIndex);
+        Node replicaNode = new Node();
+        replicaNode.setId(getRealCluserNodeID(node.getId()));
+        replicaNode.setClusterIp(node.getClusterIp());
+        return new Replica(replicaNode);
+    }
+
+    public Replica getReplicaById(String nodeId) {
+        int nodeIndex = -1;
+        if (cluster != null) {
+            for (int i = 0; i < cluster.getNode().size(); i++) {
+                Node node = cluster.getNode().get(i);
+
+                if (getRealCluserNodeID(node.getId()).equals(nodeId)) {
+                    nodeIndex = i;
+                    break;
+                }
+            }
+        }
+
+        if (nodeIndex < 0) {
+            return null;
+        }
+
+        return getReplicaByNodeIndex(nodeIndex);
+    }
+
+    public Set<String> getRemoteReplicasIds(String nodeId) {
+        Set<String> remoteReplicasIds = new HashSet<String>();
+        Set<Replica> remoteReplicas = getRemoteReplicas(nodeId);
+
+        for (Replica replica : remoteReplicas) {
+            remoteReplicasIds.add(replica.getId());
+        }
+
+        return remoteReplicasIds;
+    }
+
+    public String getRealCluserNodeID(String nodeId) {
+        return NODE_NAME_PREFIX + nodeId;
+    }
+
+    public Set<String> getNodeReplicasIds(String nodeId) {
+        Set<String> replicaIds = new HashSet<String>();
+        replicaIds.add(nodeId);
+        replicaIds.addAll(getRemoteReplicasIds(nodeId));
+        return replicaIds;
+    }
+
+    public String getReplicationStore() {
+        if (cluster != null) {
+            return cluster.getDataReplication().getReplicationStore();
+        }
+        return REPLICATION_STORE_DEFAULT;
+    }
+
+    public int getReplicationFactor() {
+        if (cluster != null) {
+            if (cluster.getDataReplication() == null || cluster.getDataReplication().getReplicationFactor() == null) {
+                return REPLICATION_FACTOR_DEFAULT;
+            }
+            return cluster.getDataReplication().getReplicationFactor().intValue();
+        }
+        return REPLICATION_FACTOR_DEFAULT;
+    }
+
+    public int getReplicationTimeOut() {
+        if (cluster != null) {
+            return cluster.getDataReplication().getReplicationTimeOut().intValue();
+        }
+        return REPLICATION_TIME_OUT_DEFAULT;
+    }
+
+}
\ No newline at end of file
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/config/IAsterixPropertiesProvider.java b/asterix-common/src/main/java/org/apache/asterix/common/config/IAsterixPropertiesProvider.java
index 93c58be..e6f383f 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/config/IAsterixPropertiesProvider.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/config/IAsterixPropertiesProvider.java
@@ -28,8 +28,10 @@
     public AsterixMetadataProperties getMetadataProperties();
 
     public AsterixExternalProperties getExternalProperties();
-    
+
     public AsterixFeedProperties getFeedProperties();
 
     AsterixBuildProperties getBuildProperties();
+
+    public AsterixReplicationProperties getReplicationProperties();
 }
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java b/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
index 776549a..21500b7 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
@@ -39,7 +39,8 @@
     @Override
     public void beforeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
             IModificationOperationCallback modificationCallback) throws HyracksDataException {
-        if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE) {
+        if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE
+                || opType == LSMOperationType.REPLICATE) {
             dsInfo.declareActiveIOOperation();
         }
     }
@@ -47,7 +48,8 @@
     @Override
     public void afterOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
             IModificationOperationCallback modificationCallback) throws HyracksDataException {
-        if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE) {
+        if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE
+                || opType == LSMOperationType.REPLICATE) {
             dsInfo.undeclareActiveIOOperation();
         }
     }
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 20b07fa..adf1152 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -74,6 +74,7 @@
         capacity = storageProperties.getMemoryComponentGlobalBudget();
         used = 0;
         logRecord = new LogRecord();
+        logRecord.setNodeId(logManager.getNodeId());
     }
 
     @Override
@@ -112,10 +113,10 @@
         if (dsInfo.indexes.containsKey(resourceID)) {
             throw new HyracksDataException("Index with resource ID " + resourceID + " already exists.");
         }
-        dsInfo.indexes.put(resourceID, new IndexInfo((ILSMIndex) index));
+        dsInfo.indexes.put(resourceID, new IndexInfo((ILSMIndex) index, dsInfo.datasetID, resourceID));
     }
 
-    private int getDIDfromResourceName(String resourceName) throws HyracksDataException {
+    public int getDIDfromResourceName(String resourceName) throws HyracksDataException {
         LocalResource lr = resourceRepository.getResourceByName(resourceName);
         if (lr == null) {
             return -1;
@@ -123,7 +124,7 @@
         return ((ILocalResourceMetadata) lr.getResourceObject()).getDatasetID();
     }
 
-    private long getResourceIDfromResourceName(String resourceName) throws HyracksDataException {
+    public long getResourceIDfromResourceName(String resourceName) throws HyracksDataException {
         LocalResource lr = resourceRepository.getResourceByName(resourceName);
         if (lr == null) {
             return -1;
@@ -279,15 +280,25 @@
 
     @Override
     public synchronized List<IIndex> getOpenIndexes() {
+        List<IndexInfo> openIndexesInfo = getOpenIndexesInfo();
         List<IIndex> openIndexes = new ArrayList<IIndex>();
+        for (IndexInfo iInfo : openIndexesInfo) {
+            openIndexes.add(iInfo.index);
+        }
+        return openIndexes;
+    }
+
+    @Override
+    public synchronized List<IndexInfo> getOpenIndexesInfo() {
+        List<IndexInfo> openIndexesInfo = new ArrayList<IndexInfo>();
         for (DatasetInfo dsInfo : datasetInfos.values()) {
             for (IndexInfo iInfo : dsInfo.indexes.values()) {
                 if (iInfo.isOpen) {
-                    openIndexes.add(iInfo.index);
+                    openIndexesInfo.add(iInfo);
                 }
             }
         }
-        return openIndexes;
+        return openIndexesInfo;
     }
 
     public List<IVirtualBufferCache> getVirtualBufferCaches(int datasetID) {
@@ -358,11 +369,27 @@
         }
     }
 
-    private class IndexInfo extends Info {
+    public class IndexInfo extends Info {
         private final ILSMIndex index;
+        private final long resourceId;
+        private final int datasetId;
 
-        public IndexInfo(ILSMIndex index) {
+        public IndexInfo(ILSMIndex index, int datasetId, long resourceId) {
             this.index = index;
+            this.datasetId = datasetId;
+            this.resourceId = resourceId;
+        }
+
+        public ILSMIndex getIndex() {
+            return index;
+        }
+
+        public long getResourceId() {
+            return resourceId;
+        }
+
+        public int getDatasetId() {
+            return datasetId;
         }
     }
 
@@ -456,14 +483,6 @@
                     + ", lastAccess: " + lastAccess + ", isRegistered: " + isRegistered + ", memoryAllocated: "
                     + memoryAllocated;
         }
-
-        public boolean isMemoryAllocated() {
-            return memoryAllocated;
-        }
-
-        public int getDatasetID() {
-            return datasetID;
-        }
     }
 
     @Override
@@ -520,7 +539,7 @@
     private void flushDatasetOpenIndexes(DatasetInfo dsInfo, boolean asyncFlush) throws HyracksDataException {
         if (!dsInfo.isExternal) {
             synchronized (logRecord) {
-                logRecord.formFlushLogRecord(dsInfo.datasetID, null);
+                logRecord.formFlushLogRecord(dsInfo.datasetID, null, dsInfo.indexes.size());
                 try {
                     logManager.log(logRecord);
                 } catch (ACIDException e) {
@@ -588,15 +607,19 @@
         removeDatasetFromCache(dsInfo.datasetID);
     }
 
+    public void closeAllDatasets() throws HyracksDataException {
+        for (DatasetInfo dsInfo : datasetInfos.values()) {
+            closeDataset(dsInfo);
+        }
+    }
+
     @Override
     public synchronized void stop(boolean dumpState, OutputStream outputStream) throws IOException {
         if (dumpState) {
             dumpState(outputStream);
         }
 
-        for (DatasetInfo dsInfo : datasetInfos.values()) {
-            closeDataset(dsInfo);
-        }
+        closeAllDatasets();
 
         datasetVirtualBufferCaches.clear();
         datasetOpTrackers.clear();
@@ -686,4 +709,4 @@
         int did = Integer.parseInt(resourceName);
         allocateDatasetMemory(did);
     }
-}
\ No newline at end of file
+}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index b4a3ac9..437fac4 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -59,7 +59,8 @@
             IModificationOperationCallback modificationCallback) throws HyracksDataException {
         if (opType == LSMOperationType.MODIFICATION || opType == LSMOperationType.FORCE_MODIFICATION) {
             incrementNumActiveOperations(modificationCallback);
-        } else if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE) {
+        } else if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE
+                || opType == LSMOperationType.REPLICATE) {
             dsInfo.declareActiveIOOperation();
         }
     }
@@ -68,7 +69,8 @@
     public void afterOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
             IModificationOperationCallback modificationCallback) throws HyracksDataException {
         // Searches are immediately considered complete, because they should not prevent the execution of flushes.
-        if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE) {
+        if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE
+                || opType == LSMOperationType.REPLICATE) {
             completeOperation(index, opType, searchCallback, modificationCallback);
         }
     }
@@ -84,7 +86,8 @@
             } else if (numActiveOperations.get() < 0) {
                 throw new HyracksDataException("The number of active operations cannot be negative!");
             }
-        } else if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE) {
+        } else if (opType == LSMOperationType.FLUSH || opType == LSMOperationType.MERGE
+                || opType == LSMOperationType.REPLICATE) {
             dsInfo.undeclareActiveIOOperation();
         }
     }
@@ -119,7 +122,7 @@
             }
 
             LogRecord logRecord = new LogRecord();
-            logRecord.formFlushLogRecord(datasetID, this);
+            logRecord.formFlushLogRecord(datasetID, this, logManager.getNodeId(), dsInfo.getDatasetIndexes().size());
 
             try {
                 logManager.log(logRecord);
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMIndexUtil.java b/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMIndexUtil.java
index 2cd0554..4259a10 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMIndexUtil.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMIndexUtil.java
@@ -41,4 +41,10 @@
             }
         }
     }
+
+    public static boolean lsmComponentFileHasLSN(AbstractLSMIndex lsmIndex, String componentFilePath) {
+        AbstractLSMIOOperationCallback ioOpCallback = (AbstractLSMIOOperationCallback) lsmIndex
+                .getIOOperationCallback();
+        return ioOpCallback.componentFileHasLSN(componentFilePath);
+    }
 }
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java b/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
index abf7ba9..76a11d1 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
@@ -151,4 +151,7 @@
         }
         return false;
     }
+    
+    public abstract boolean componentFileHasLSN(String componentFilePath);
+
 }
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java b/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
index 12680fd..8b4fa01 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
@@ -24,6 +24,7 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.btree.impls.BTree;
 import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeDiskComponent;
+import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeFileManager;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
 
@@ -60,4 +61,13 @@
         }
         return maxLSN;
     }
+
+    @Override
+    public boolean componentFileHasLSN(String componentFilePath) {
+        if (componentFilePath.endsWith(LSMBTreeFileManager.BTREE_STRING)) {
+            return true;
+        }
+
+        return false;
+    }
 }
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java b/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java
index b5ce879..229ccd6 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java
@@ -22,6 +22,7 @@
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeWithBuddyDiskComponent;
+import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeWithBuddyFileManager;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
 
@@ -55,4 +56,14 @@
         return maxLSN;
     }
 
+    @Override
+    public boolean componentFileHasLSN(String componentFilePath) {
+        if (componentFilePath.endsWith(LSMBTreeWithBuddyFileManager.BTREE_STRING)
+                || componentFilePath.endsWith(LSMBTreeWithBuddyFileManager.BUDDY_BTREE_STRING)) {
+            return true;
+        }
+
+        return false;
+    }
+
 }
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java b/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
index fd3cf12..3e4ff04 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
@@ -25,6 +25,7 @@
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
 import org.apache.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndexDiskComponent;
+import org.apache.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndexFileManager;
 
 public class LSMInvertedIndexIOOperationCallback extends AbstractLSMIOOperationCallback {
 
@@ -58,4 +59,13 @@
         }
         return maxLSN;
     }
+
+    @Override
+    public boolean componentFileHasLSN(String componentFilePath) {
+        if (componentFilePath.endsWith(LSMInvertedIndexFileManager.DELETED_KEYS_BTREE_SUFFIX)) {
+            return true;
+        }
+
+        return false;
+    }
 }
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java b/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
index 5d243a3..7c483f3 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
@@ -25,6 +25,7 @@
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
 import org.apache.hyracks.storage.am.lsm.rtree.impls.LSMRTreeDiskComponent;
+import org.apache.hyracks.storage.am.lsm.rtree.impls.LSMRTreeFileManager;
 
 public class LSMRTreeIOOperationCallback extends AbstractLSMIOOperationCallback {
 
@@ -59,4 +60,14 @@
         }
         return maxLSN;
     }
+
+    @Override
+    public boolean componentFileHasLSN(String componentFilePath) {
+        if (componentFilePath.endsWith(LSMRTreeFileManager.RTREE_STRING)
+                || componentFilePath.endsWith(LSMRTreeFileManager.BTREE_STRING)) {
+            return true;
+        }
+
+        return false;
+    }
 }
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/replication/AsterixReplicationJob.java b/asterix-common/src/main/java/org/apache/asterix/common/replication/AsterixReplicationJob.java
new file mode 100644
index 0000000..fc4f1ab
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/replication/AsterixReplicationJob.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import java.util.Set;
+
+import org.apache.hyracks.api.replication.impl.AbstractReplicationJob;
+
+/**
+ * LSMIndexReplicationJob is used for LSM Components only in Hyracks level.
+ * AsterixReplicationJob is used for everything else.
+ * Currently it is used to transfer indexes metadata files.
+ */
+public class AsterixReplicationJob extends AbstractReplicationJob {
+
+    public AsterixReplicationJob(ReplicationJobType jobType, ReplicationOperation operation,
+            ReplicationExecutionType executionType, Set<String> filesToReplicate) {
+        super(jobType, operation, executionType, filesToReplicate);
+    }
+
+}
\ No newline at end of file
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java b/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
new file mode 100644
index 0000000..63d29a0
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+public interface IRemoteRecoveryManager {
+
+    public void performRemoteRecovery();
+
+}
\ No newline at end of file
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java b/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java
new file mode 100644
index 0000000..f9481a0
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import java.io.IOException;
+import java.util.Set;
+
+public interface IReplicaResourcesManager {
+
+    public String getIndexPath(String backupNodeId, int IODeviceNum, String dataverse, String dataset);
+
+    public String getLocalStorageFolder();
+
+    public long getMinRemoteLSN(Set<String> remoteNodes);
+
+    public void deleteAsterixStorageData() throws IOException;
+
+}
\ No newline at end of file
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationChannel.java b/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationChannel.java
new file mode 100644
index 0000000..56ae20f
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationChannel.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import java.io.IOException;
+
+public interface IReplicationChannel {
+
+    /**
+     * Opens the replication channel and starts accepting replication requests.
+     */
+    public void start();
+
+    /**
+     * Closes the replication channel.
+     * 
+     * @throws IOException
+     */
+    public void close() throws IOException;
+
+}
\ No newline at end of file
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java b/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
new file mode 100644
index 0000000..276d498
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Set;
+
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.ILogRecord;
+import org.apache.hyracks.api.replication.IIOReplicationManager;
+
+public interface IReplicationManager extends IIOReplicationManager {
+
+    /**
+     * Asynchronously sends a serialized version of the record to remote replicas.
+     * 
+     * @param logRecord
+     *            The log record to be replicated,
+     */
+    public void replicateLog(ILogRecord logRecord);
+
+    /**
+     * Checks whether a log record has been replicated
+     * 
+     * @param logRecord
+     *            the log to check for.
+     * @return true, if all ACKs were received from remote replicas.
+     */
+    public boolean hasBeenReplicated(ILogRecord logRecord);
+
+    /**
+     * Requests txns logs from a remote replica.
+     * 
+     * @param remoteReplicaId
+     *            The replica id to send the request to.
+     * @param replicasDataToRecover
+     *            Get logs that belong to those replicas.
+     * @param fromLSN
+     *            Low water mark for logs to be requested.
+     * @return The logs received that belong to the local node.
+     * @throws IOException
+     * @throws ACIDException
+     */
+    public ArrayList<ILogRecord> requestReplicaLogs(String remoteReplicaId, Set<String> replicasDataToRecover,
+            long fromLSN) throws IOException, ACIDException;
+
+    /**
+     * Requests LSM components files from a remote replica.
+     * 
+     * @param remoteReplicaId
+     *            The replica id to send the request to.
+     * @param replicasDataToRecover
+     *            Get files that belong to those replicas.
+     * @throws IOException
+     */
+    public void requestReplicaFiles(String remoteReplicaId, Set<String> replicasDataToRecover) throws IOException;
+
+    /**
+     * Requests current maximum LSN from remote replicas.
+     * 
+     * @param remoteReplicaIds
+     *            remote replicas to send the request to.
+     * @return The maximum of the received maximum LSNs.
+     * @throws IOException
+     */
+    public long getMaxRemoteLSN(Set<String> remoteReplicaIds) throws IOException;
+
+    /**
+     * Sends the IP address of the local replica to all remote replicas.
+     * 
+     * @throws IOException
+     */
+    public void broadcastNewIPAddress() throws IOException;
+
+    /**
+     * @return The number of remote replicas that are in ACTIVE state.
+     */
+    public int getActiveReplicasCount();
+
+    /**
+     * @return The IDs of the remote replicas that are in DEAD state.
+     */
+    public Set<String> getDeadReplicasIds();
+
+    /**
+     * Starts processing of ASYNC replication jobs as well as Txn logs.
+     */
+    public void startReplicationThreads();
+
+    /**
+     * Checks and sets each remote replica state.
+     */
+    public void initializeReplicasState();
+
+    /**
+     * Updates remote replica (in-memory) information.
+     * 
+     * @param replica
+     *            the replica to update.
+     */
+    public void updateReplicaInfo(Replica replica);
+
+    /**
+     * @return The IDs of the remote replicas that are in ACTIVE state.
+     */
+    public Set<String> getActiveReplicasIds();
+
+    /**
+     * Submits a ReplicaEvent to ReplicationEventsMonitor thread.
+     * 
+     * @param event
+     */
+    public void reportReplicaEvent(ReplicaEvent event);
+
+    /**
+     * Requests the current minimum LSN of a remote replica.
+     * 
+     * @param replicaId
+     *            The replica to send the request to.
+     * @return The returned minimum LSN from the remote replica.
+     * @throws IOException
+     */
+    public long requestReplicaMinLSN(String replicaId) throws IOException;
+
+    /**
+     * Sends a request to remote replicas to flush indexes that have LSN less than nonSharpCheckpointTargetLSN
+     * 
+     * @param nonSharpCheckpointTargetLSN
+     * @throws IOException
+     */
+    public void requestFlushLaggingReplicaIndexes(long nonSharpCheckpointTargetLSN) throws IOException;
+
+}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationThread.java b/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationThread.java
new file mode 100644
index 0000000..3e2569d
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationThread.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import org.apache.asterix.common.transactions.LogRecord;
+
+public interface IReplicationThread extends Runnable {
+
+    /**
+     * Sends a notification to this thread that logRecord has been flushed.
+     * 
+     * @param logRecord
+     *            The log that has been flushed.
+     */
+    public void notifyLogReplicationRequester(LogRecord logRecord);
+}
\ No newline at end of file
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java b/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java
new file mode 100644
index 0000000..4c3f728
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+
+import org.apache.asterix.common.config.AsterixReplicationProperties;
+import org.apache.asterix.event.schema.cluster.Node;
+
+public class Replica {
+
+    public enum ReplicaState {
+        ACTIVE,
+        DEAD,
+        UNKNOWN
+    }
+
+    final Node node;
+    private ReplicaState state = ReplicaState.UNKNOWN;
+
+    public Replica(Node node) {
+        this.node = node;
+    }
+
+    public ReplicaState getState() {
+        return state;
+    }
+
+    public void setState(ReplicaState state) {
+        this.state = state;
+    }
+
+    public Node getNode() {
+        return node;
+    }
+
+    public String getId() {
+        return node.getId();
+    }
+
+    public InetSocketAddress getAddress(AsterixReplicationProperties asterixReplicationProperties) {
+        String replicaIPAddress = node.getClusterIp();
+        int replicationPort = asterixReplicationProperties.getDataReplicationPort(node.getId());
+        InetSocketAddress replicaAddress = InetSocketAddress.createUnresolved(replicaIPAddress, replicationPort);
+        return replicaAddress;
+    }
+
+    public static Replica create(DataInput input) throws IOException {
+        Node node = new Node();
+        Replica replica = new Replica(node);
+        replica.readFields(input);
+        return replica;
+    }
+
+    public void writeFields(DataOutput output) throws IOException {
+        output.writeUTF(node.getId());
+        output.writeUTF(node.getClusterIp());
+        output.writeInt(state.ordinal());
+    }
+
+    public void readFields(DataInput input) throws IOException {
+        this.node.setId(input.readUTF());
+        this.node.setClusterIp(input.readUTF());
+        this.state = ReplicaState.values()[input.readInt()];
+    }
+
+    public void serialize(OutputStream out) throws IOException {
+        DataOutputStream dos = new DataOutputStream(out);
+        writeFields(dos);
+    }
+}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/replication/ReplicaEvent.java b/asterix-common/src/main/java/org/apache/asterix/common/replication/ReplicaEvent.java
new file mode 100644
index 0000000..0797a02
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/replication/ReplicaEvent.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import java.io.DataInput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class ReplicaEvent {
+
+    /*
+     * FAIL: remote replica failed.
+     * JOIN: remote replica rejoined the cluster.
+     * SHUTDOWN: remote replica is shutting down normally
+     * */
+    public enum ReplicaEventType {
+        FAIL,
+        JOIN,
+        SHUTDOWN
+    }
+
+    Replica replica;
+    ReplicaEventType eventType;
+
+    public ReplicaEvent(Replica replica, ReplicaEventType eventType) {
+        this.replica = replica;
+        this.eventType = eventType;
+    }
+
+    public Replica getReplica() {
+        return replica;
+    }
+
+    public void setReplica(Replica replica) {
+        this.replica = replica;
+    }
+
+    public ReplicaEventType getEventType() {
+        return eventType;
+    }
+
+    public void setEventType(ReplicaEventType eventType) {
+        this.eventType = eventType;
+    }
+
+    public void serialize(OutputStream out) throws IOException {
+        DataOutputStream dos = new DataOutputStream(out);
+        replica.writeFields(dos);
+        dos.writeInt(eventType.ordinal());
+    }
+
+    public static ReplicaEvent create(DataInput input) throws IOException {
+        Replica replica = Replica.create(input);
+        ReplicaEventType eventType = ReplicaEventType.values()[input.readInt()];
+        ReplicaEvent event = new ReplicaEvent(replica, eventType);
+        return event;
+    }
+
+}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBuffer.java b/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBuffer.java
index 9e28cda..02d69e5 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBuffer.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogBuffer.java
@@ -24,4 +24,6 @@
 
     public void flush();
 
+    public void appendWithReplication(ILogRecord logRecord, long appendLSN);
+
 }
\ No newline at end of file
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java b/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java
index fc0b407..82a0cca 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogManager.java
@@ -18,40 +18,71 @@
  */
 package org.apache.asterix.common.transactions;
 
+import java.io.IOException;
+
 import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.replication.IReplicationManager;
 
 public interface ILogManager {
 
     /**
      * Submits a logRecord to log Manager which appends it to the log tail
+     * 
      * @param logRecord
      * @throws ACIDException
      */
     public void log(ILogRecord logRecord) throws ACIDException;
 
     /**
-     * 
      * @param isRecoveryMode
      * @returnLogReader instance which enables reading the log files
      */
     public ILogReader getLogReader(boolean isRecoveryMode);
-    
+
     /**
-     * 
      * @return the last LSN the log manager used
      */
-    public long getAppendLSN(); 
-    
+    public long getAppendLSN();
+
     /**
      * Deletes all log partitions which have a maximum LSN less than checkpointLSN
+     * 
      * @param checkpointLSN
      */
     public void deleteOldLogFiles(long checkpointLSN);
-    
+
     /**
-     * 
      * @return the smallest readable LSN on the current log partitions
      */
     public long getReadableSmallestLSN();
 
+    /**
+     * @return The local NC ID
+     */
+    public String getNodeId();
+
+    /**
+     * Delete all log files and start new log partition > LSNtoStartFrom
+     * 
+     * @param LSNtoStartFrom
+     * @throws IOException
+     */
+    public void renewLogFilesAndStartFromLSN(long LSNtoStartFrom) throws IOException;
+
+    /**
+     * @return the log page size in bytes
+     */
+    public int getLogPageSize();
+
+    /**
+     * @param replicationManager
+     *            the replication manager to be used to replicate logs
+     */
+    public void setReplicationManager(IReplicationManager replicationManager);
+
+    /**
+     * @return the number of log pages
+     */
+    public int getNumLogPages();
+
 }
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java b/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
index 16c51fe..9694949 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
@@ -20,22 +20,22 @@
 
 import java.nio.ByteBuffer;
 
+import org.apache.asterix.common.replication.IReplicationThread;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 
 public interface ILogRecord {
 
-    public static final int JOB_TERMINATE_LOG_SIZE = 13; //JOB_COMMIT or ABORT log type
-    public static final int ENTITY_COMMIT_LOG_BASE_SIZE = 25;
-    public static final int UPDATE_LOG_BASE_SIZE = 54;
-    public static final int FLUSH_LOG_SIZE = 17;
-
-
     public enum RECORD_STATUS{
         TRUNCATED,
         BAD_CHKSUM,
         OK
     }
 
+    public static final int JOB_TERMINATE_LOG_SIZE = 18; //JOB_COMMIT or ABORT log type
+    public static final int ENTITY_COMMIT_LOG_BASE_SIZE = 30;
+    public static final int UPDATE_LOG_BASE_SIZE = 59;
+    public static final int FLUSH_LOG_SIZE = 22;
+
     public LogRecord.RECORD_STATUS readLogRecord(ByteBuffer buffer);
 
     public void writeLogRecord(ByteBuffer buffer);
@@ -115,4 +115,26 @@
 
     public void setPKValue(ITupleReference PKValue);
 
+    public String getNodeId();
+
+    public void setNodeId(String nodeId);
+
+    public int serialize(ByteBuffer buffer);
+
+    public void deserialize(ByteBuffer buffer, boolean remoteRecoveryLog, String localNodeId);
+
+    public void setReplicationThread(IReplicationThread replicationThread);
+
+    public void setLogSource(byte logSource);
+
+    public byte getLogSource();
+
+    public int getSerializedLogSize();
+
+    public void writeLogRecord(ByteBuffer buffer, long appendLSN);
+
+    public ByteBuffer getSerializedLog();
+
+    public void formJobTerminateLogRecord(int jobId, boolean isCommit, String nodeId);
+
 }
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java b/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
index e8d408b..9ea9957 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.common.transactions;
 
 import java.io.IOException;
+import java.util.ArrayList;
 
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -89,5 +90,25 @@
      */
     public long checkpoint(boolean isSharpCheckpoint, long nonSharpCheckpointTargetLSN) throws ACIDException, HyracksDataException;
     
+    /**
+     * Performs recovery based on the passed logs
+     * @param remoteLogs the remote logs to be replayed
+     * @throws HyracksDataException
+     * @throws ACIDException
+     */
+    public void replayRemoteLogs(ArrayList<ILogRecord> remoteLogs) throws HyracksDataException, ACIDException;
+
+    /**
+     * 
+     * @return min first LSN of the open indexes (including remote indexes if replication is enabled)
+     * @throws HyracksDataException
+     */
+    public long getMinFirstLSN() throws HyracksDataException;
     
+    /**
+     * 
+     * @return min first LSN of the open indexes
+     * @throws HyracksDataException
+     */
+    public long getLocalMinFirstLSN() throws HyracksDataException;
 }
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java b/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
index a510e51..bbbe59e 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
@@ -20,10 +20,13 @@
 
 import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.zip.CRC32;
 
 import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
+import org.apache.asterix.common.replication.IReplicationThread;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.common.tuples.SimpleTupleReference;
 import org.apache.hyracks.storage.am.common.tuples.SimpleTupleWriter;
@@ -31,9 +34,12 @@
 /*
  * == LogRecordFormat ==
  * ---------------------------
- * [Header1] (5 bytes) : for all log types
+ * [Header1] (10 bytes + NodeId Length) : for all log types
+ * LogSource(1)
  * LogType(1)
  * JobId(4)
+ * NodeIdLength(4)
+ * NodeId(?)
  * ---------------------------
  * [Header2] (12 bytes + PKValueSize) : for entity_commit and update log types 
  * DatasetId(4) //stored in dataset_dataset in Metadata Node
@@ -60,12 +66,15 @@
  * 2) ENTITY_COMMIT: 25 + PKSize (5 + 12 + PKSize + 8)
  *    --> ENTITY_COMMIT_LOG_BASE_SIZE = 25
  * 3) UPDATE: 54 + PKValueSize + NewValueSize (5 + 12 + PKValueSize + 20 + 9 + NewValueSize + 8)
- * 4) FLUSH: 5 + 8 + DatasetId(4)
- *    --> UPDATE_LOG_BASE_SIZE = 54
+ * 4) FLUSH: 5 + 8 + DatasetId(4) (In case of serialize: + (8 bytes for LSN) + (4 bytes for number of flushed indexes)
  */
+
 public class LogRecord implements ILogRecord {
 
     //------------- fields in a log record (begin) ------------//
+    private byte logSource;
+    private String nodeId;
+    private int nodeIdLength;
     private byte logType;
     private int jobId;
     private int datasetId;
@@ -92,6 +101,11 @@
     private final CRC32 checksumGen;
     private int[] PKFields;
     private PrimaryIndexOperationTracker opTracker;
+    private IReplicationThread replicationThread;
+    private ByteBuffer serializedLog;
+    private final Map<String, byte[]> nodeIdsMap;
+    //this field is used for serialized flush logs only to indicate how many indexes were flushed using its LSN.
+    private int numOfFlushedIndexes;
 
     public LogRecord() {
         isFlushed = new AtomicBoolean(false);
@@ -99,29 +113,43 @@
         readPKValue = new PrimaryKeyTupleReference();
         readNewValue = (SimpleTupleReference) tupleWriter.createTupleReference();
         checksumGen = new CRC32();
+        this.nodeIdsMap = new HashMap<String, byte[]>();
+        logSource = LogSource.LOCAL;
     }
 
-    private final static int TYPE_LEN = Byte.SIZE / Byte.SIZE;
-    public final static int PKHASH_LEN = Integer.SIZE / Byte.SIZE;
-    public final static int PKSZ_LEN = Integer.SIZE / Byte.SIZE;
-    private final static int PRVLSN_LEN = Long.SIZE / Byte.SIZE;
-    private final static int RSID_LEN = Long.SIZE / Byte.SIZE;
-    private final static int LOGRCD_SZ_LEN = Integer.SIZE / Byte.SIZE;
-    private final static int FLDCNT_LEN = Integer.SIZE / Byte.SIZE;
-    private final static int NEWOP_LEN = Byte.SIZE / Byte.SIZE;
-    private final static int NEWVALSZ_LEN = Integer.SIZE / Byte.SIZE;
-    private final static int CHKSUM_LEN = Long.SIZE / Byte.SIZE;
+    private final static int LOG_SOURCE_LEN = Byte.BYTES;
+    private final static int NODE_ID_STRING_LENGTH = Integer.BYTES;
+    private final static int TYPE_LEN = Byte.BYTES;
+    public final static int PKHASH_LEN = Integer.BYTES;
+    public final static int PKSZ_LEN = Integer.BYTES;
+    private final static int PRVLSN_LEN = Long.BYTES;
+    private final static int RSID_LEN = Long.BYTES;
+    private final static int LOGRCD_SZ_LEN = Integer.BYTES;
+    private final static int FLDCNT_LEN = Integer.BYTES;
+    private final static int NEWOP_LEN = Byte.BYTES;
+    private final static int NEWVALSZ_LEN = Integer.BYTES;
+    private final static int CHKSUM_LEN = Long.BYTES;
 
-    private final static int ALL_RECORD_HEADER_LEN = TYPE_LEN + JobId.BYTES;
+    private final static int ALL_RECORD_HEADER_LEN = LOG_SOURCE_LEN + TYPE_LEN + JobId.BYTES + NODE_ID_STRING_LENGTH;
     private final static int ENTITYCOMMIT_UPDATE_HEADER_LEN = DatasetId.BYTES + PKHASH_LEN + PKSZ_LEN;
     private final static int UPDATE_LSN_HEADER = PRVLSN_LEN + RSID_LEN + LOGRCD_SZ_LEN;
     private final static int UPDATE_BODY_HEADER = FLDCNT_LEN + NEWOP_LEN + NEWVALSZ_LEN;
 
-    @Override
-    public void writeLogRecord(ByteBuffer buffer) {
-        int beginOffset = buffer.position();
+    private void writeLogRecordCommonFields(ByteBuffer buffer) {
+        buffer.put(logSource);
         buffer.put(logType);
         buffer.putInt(jobId);
+        if (nodeIdsMap.containsKey(nodeId)) {
+            buffer.put(nodeIdsMap.get(nodeId));
+        } else {
+            //byte array for node id length and string
+            byte[] bytes = new byte[(Integer.SIZE / 8) + nodeId.length()];
+            buffer.putInt(nodeId.length());
+            buffer.put(nodeId.getBytes(java.nio.charset.StandardCharsets.UTF_8));
+            buffer.position(buffer.position() - bytes.length);
+            buffer.get(bytes, 0, bytes.length);
+            nodeIdsMap.put(nodeId, bytes);
+        }
         if (logType == LogType.UPDATE || logType == LogType.ENTITY_COMMIT) {
             buffer.putInt(datasetId);
             buffer.putInt(PKHashValue);
@@ -144,15 +172,59 @@
         if (logType == LogType.FLUSH) {
             buffer.putInt(datasetId);
         }
+    }
+
+    @Override
+    public void writeLogRecord(ByteBuffer buffer) {
+        int beginOffset = buffer.position();
+        writeLogRecordCommonFields(buffer);
+        checksum = generateChecksum(buffer, beginOffset, logSize - CHKSUM_LEN);
+        buffer.putLong(checksum);
+    }
+
+    //this method is used when replication is enabled to include the log record LSN in the serialized version
+    @Override
+    public void writeLogRecord(ByteBuffer buffer, long appendLSN) {
+        int beginOffset = buffer.position();
+        writeLogRecordCommonFields(buffer);
+
+        if (logSource == LogSource.LOCAL) {
+            //copy the serialized log to send it to replicas
+            int serializedLogSize = getSerializedLogSize(logType, logSize);
+
+            if (serializedLog == null || serializedLog.capacity() < serializedLogSize) {
+                serializedLog = ByteBuffer.allocate(serializedLogSize);
+            } else {
+                serializedLog.clear();
+            }
+
+            int currentPosition = buffer.position();
+            int currentLogSize = (currentPosition - beginOffset);
+
+            buffer.position(beginOffset);
+            buffer.get(serializedLog.array(), 0, currentLogSize);
+            serializedLog.position(currentLogSize);
+            if (logType == LogType.FLUSH) {
+                serializedLog.putLong(appendLSN);
+                serializedLog.putInt(numOfFlushedIndexes);
+            }
+            serializedLog.flip();
+            buffer.position(currentPosition);
+        }
 
         checksum = generateChecksum(buffer, beginOffset, logSize - CHKSUM_LEN);
         buffer.putLong(checksum);
     }
 
     private void writePKValue(ByteBuffer buffer) {
-        int i;
-        for (i = 0; i < PKFieldCnt; i++) {
-            buffer.put(PKValue.getFieldData(0), PKValue.getFieldStart(PKFields[i]), PKValue.getFieldLength(PKFields[i]));
+        if (logSource == LogSource.LOCAL) {
+            for (int i = 0; i < PKFieldCnt; i++) {
+                buffer.put(PKValue.getFieldData(0), PKValue.getFieldStart(PKFields[i]),
+                        PKValue.getFieldLength(PKFields[i]));
+            }
+        } else {
+            //since PKValue is already serialized in remote logs, just put it into buffer
+            buffer.put(PKValue.getFieldData(0), 0, PKValueSize);
         }
     }
 
@@ -171,13 +243,57 @@
     @Override
     public RECORD_STATUS readLogRecord(ByteBuffer buffer) {
         int beginOffset = buffer.position();
-        //first we need the logtype and Job ID, if the buffer isn't that big, then no dice.
-        if (buffer.remaining() < ALL_RECORD_HEADER_LEN) {
+
+        //read header
+        RECORD_STATUS status = readLogHeader(buffer);
+        if (status != RECORD_STATUS.OK) {
+            buffer.position(beginOffset);
+            return status;
+        }
+
+        //read body
+        status = readLogBody(buffer, false);
+        if (status != RECORD_STATUS.OK) {
+            buffer.position(beginOffset);
+            return status;
+        }
+
+        //attempt to read checksum
+        if (buffer.remaining() < CHKSUM_LEN) {
             buffer.position(beginOffset);
             return RECORD_STATUS.TRUNCATED;
         }
+        checksum = buffer.getLong();
+        if (checksum != generateChecksum(buffer, beginOffset, logSize - CHKSUM_LEN)) {
+            return RECORD_STATUS.BAD_CHKSUM;
+        }
+
+        return RECORD_STATUS.OK;
+    }
+
+    private RECORD_STATUS readLogHeader(ByteBuffer buffer) {
+        //first we need the logtype and Job ID, if the buffer isn't that big, then no dice.
+        if (buffer.remaining() < ALL_RECORD_HEADER_LEN) {
+            return RECORD_STATUS.TRUNCATED;
+        }
+        logSource = buffer.get();
         logType = buffer.get();
         jobId = buffer.getInt();
+        nodeIdLength = buffer.getInt();
+        //attempt to read node id
+        if (buffer.remaining() < nodeIdLength) {
+            return RECORD_STATUS.TRUNCATED;
+        }
+        //read node id string
+        nodeId = new String(buffer.array(), buffer.position() + buffer.arrayOffset(), nodeIdLength,
+                java.nio.charset.StandardCharsets.UTF_8);
+        //skip node id string bytes
+        buffer.position(buffer.position() + nodeIdLength);
+
+        return RECORD_STATUS.OK;
+    }
+
+    private RECORD_STATUS readLogBody(ByteBuffer buffer, boolean allocateTupleBuffer) {
         if (logType != LogType.FLUSH) {
             if (logType == LogType.JOB_COMMIT || logType == LogType.ABORT) {
                 datasetId = -1;
@@ -185,7 +301,6 @@
             } else {
                 //attempt to read in the dsid, PK hash and PK length
                 if (buffer.remaining() < ENTITYCOMMIT_UPDATE_HEADER_LEN) {
-                    buffer.position(beginOffset);
                     return RECORD_STATUS.TRUNCATED;
                 }
                 datasetId = buffer.getInt();
@@ -193,7 +308,6 @@
                 PKValueSize = buffer.getInt();
                 //attempt to read in the PK
                 if (buffer.remaining() < PKValueSize) {
-                    buffer.position(beginOffset);
                     return RECORD_STATUS.TRUNCATED;
                 }
                 if (PKValueSize <= 0) {
@@ -201,10 +315,10 @@
                 }
                 PKValue = readPKValue(buffer);
             }
+
             if (logType == LogType.UPDATE) {
                 //attempt to read in the previous LSN, log size, new value size, and new record type
                 if (buffer.remaining() < UPDATE_LSN_HEADER + UPDATE_BODY_HEADER) {
-                    buffer.position(beginOffset);
                     return RECORD_STATUS.TRUNCATED;
                 }
                 prevLSN = buffer.getLong();
@@ -214,34 +328,50 @@
                 newOp = buffer.get();
                 newValueSize = buffer.getInt();
                 if (buffer.remaining() < newValueSize) {
-                    buffer.position(beginOffset);
                     return RECORD_STATUS.TRUNCATED;
                 }
-                newValue = readTuple(buffer, readNewValue, fieldCnt, newValueSize);
+                if (!allocateTupleBuffer) {
+                    newValue = readTuple(buffer, readNewValue, fieldCnt, newValueSize);
+                } else {
+                    ByteBuffer tupleBuffer = ByteBuffer.allocate(newValueSize);
+                    tupleBuffer.put(buffer.array(), buffer.position(), newValueSize);
+                    tupleBuffer.flip();
+                    newValue = readTuple(tupleBuffer, readNewValue, fieldCnt, newValueSize);
+                    //skip tuple bytes
+                    buffer.position(buffer.position() + newValueSize);
+                }
             } else {
                 computeAndSetLogSize();
             }
         } else {
             computeAndSetLogSize();
             if (buffer.remaining() < DatasetId.BYTES) {
-                buffer.position(beginOffset);
                 return RECORD_STATUS.TRUNCATED;
             }
             datasetId = buffer.getInt();
             resourceId = 0l;
-        }
-        //atempt to read checksum
-        if (buffer.remaining() < CHKSUM_LEN) {
-            buffer.position(beginOffset);
-            return RECORD_STATUS.TRUNCATED;
-        }
-        checksum = buffer.getLong();
-        if (checksum != generateChecksum(buffer, beginOffset, logSize - CHKSUM_LEN)) {
-            return RECORD_STATUS.BAD_CHKSUM;
+            computeAndSetLogSize();
         }
         return RECORD_STATUS.OK;
     }
 
+    @Override
+    public void deserialize(ByteBuffer buffer, boolean remoteRecoveryLog, String localNodeId) {
+        readLogHeader(buffer);
+        if (!remoteRecoveryLog || !nodeId.equals(localNodeId)) {
+            readLogBody(buffer, false);
+        } else {
+            //need to allocate buffer for tuple since the logs will be kept in memory to use during remote recovery
+            //TODO when this is redesigned to spill remote recovery logs to disk, this will not be needed
+            readLogBody(buffer, true);
+        }
+
+        if (logType == LogType.FLUSH) {
+            LSN = buffer.getLong();
+            numOfFlushedIndexes = buffer.getInt();
+        }
+    }
+
     private ITupleReference readPKValue(ByteBuffer buffer) {
         if (buffer.position() + PKValueSize > buffer.limit()) {
             throw new BufferUnderflowException();
@@ -251,7 +381,8 @@
         return readPKValue;
     }
 
-    private ITupleReference readTuple(ByteBuffer srcBuffer, SimpleTupleReference destTuple, int fieldCnt, int size) {
+    private static ITupleReference readTuple(ByteBuffer srcBuffer, SimpleTupleReference destTuple, int fieldCnt,
+            int size) {
         if (srcBuffer.position() + size > srcBuffer.limit()) {
             throw new BufferUnderflowException();
         }
@@ -264,18 +395,33 @@
     @Override
     public void formJobTerminateLogRecord(ITransactionContext txnCtx, boolean isCommit) {
         this.txnCtx = txnCtx;
+        formJobTerminateLogRecord(txnCtx.getJobId().getId(), isCommit, nodeId);
+    }
+
+    @Override
+    public void formJobTerminateLogRecord(int jobId, boolean isCommit, String nodeId) {
         this.logType = isCommit ? LogType.JOB_COMMIT : LogType.ABORT;
-        this.jobId = txnCtx.getJobId().getId();
+        this.jobId = jobId;
         this.datasetId = -1;
         this.PKHashValue = -1;
+        setNodeId(nodeId);
         computeAndSetLogSize();
     }
 
-    public void formFlushLogRecord(int datasetId, PrimaryIndexOperationTracker opTracker) {
+    public void formFlushLogRecord(int datasetId, PrimaryIndexOperationTracker opTracker, int numOfFlushedIndexes) {
+        formFlushLogRecord(datasetId, opTracker, null, numOfFlushedIndexes);
+    }
+
+    public void formFlushLogRecord(int datasetId, PrimaryIndexOperationTracker opTracker, String nodeId,
+            int numberOfIndexes) {
         this.logType = LogType.FLUSH;
         this.jobId = -1;
         this.datasetId = datasetId;
         this.opTracker = opTracker;
+        this.numOfFlushedIndexes = numberOfIndexes;
+        if (nodeId != null) {
+            setNodeId(nodeId);
+        }
         computeAndSetLogSize();
     }
 
@@ -326,11 +472,15 @@
             default:
                 throw new IllegalStateException("Unsupported Log Type");
         }
+
+        logSize += nodeIdLength;
     }
 
     @Override
     public String getLogRecordForDisplay() {
         StringBuilder builder = new StringBuilder();
+        builder.append(" Source : ").append(LogSource.toString(logSource));
+        builder.append(" NodeID : ").append(nodeId);
         builder.append(" LSN : ").append(LSN);
         builder.append(" LogType : ").append(LogType.toString(logType));
         builder.append(" LogSize : ").append(logSize);
@@ -348,6 +498,20 @@
         return builder.toString();
     }
 
+    @Override
+    public int serialize(ByteBuffer buffer) {
+        int bufferBegin = buffer.position();
+        writeLogRecordCommonFields(buffer);
+
+        if (logType == LogType.FLUSH) {
+            buffer.putLong(LSN);
+            buffer.putInt(numOfFlushedIndexes);
+        }
+
+        buffer.putLong(LSN);
+        return buffer.position() - bufferBegin;
+    }
+
     ////////////////////////////////////////////
     // getter and setter methods
     ////////////////////////////////////////////
@@ -438,6 +602,25 @@
     }
 
     @Override
+    public int getSerializedLogSize() {
+        return getSerializedLogSize(logType, logSize);
+    }
+
+    private static int getSerializedLogSize(Byte logType, int logSize) {
+        if (logType == LogType.FLUSH) {
+            //LSN
+            logSize += (Long.SIZE / 8);
+            //num of indexes 
+            logSize += (Integer.SIZE / 8);
+        }
+
+        //checksum not included in serialized version
+        logSize -= CHKSUM_LEN;
+
+        return logSize;
+    }
+
+    @Override
     public void setLogSize(int logSize) {
         this.logSize = logSize;
     }
@@ -517,4 +700,52 @@
     public PrimaryIndexOperationTracker getOpTracker() {
         return opTracker;
     }
+
+    @Override
+    public ByteBuffer getSerializedLog() {
+        return serializedLog;
+    }
+
+    public void setSerializedLog(ByteBuffer serializedLog) {
+        this.serializedLog = serializedLog;
+    }
+
+    @Override
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    @Override
+    public void setNodeId(String nodeId) {
+        this.nodeId = nodeId;
+        this.nodeIdLength = nodeId.length();
+    }
+
+    public IReplicationThread getReplicationThread() {
+        return replicationThread;
+    }
+
+    @Override
+    public void setReplicationThread(IReplicationThread replicationThread) {
+        this.replicationThread = replicationThread;
+    }
+
+    @Override
+    public void setLogSource(byte logSource) {
+        this.logSource = logSource;
+    }
+
+    @Override
+    public byte getLogSource() {
+        return logSource;
+    }
+
+    public int getNumOfFlushedIndexes() {
+        return numOfFlushedIndexes;
+    }
+
+    public void setNumOfFlushedIndexes(int numOfFlushedIndexes) {
+        this.numOfFlushedIndexes = numOfFlushedIndexes;
+    }
+
 }
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogSource.java b/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogSource.java
new file mode 100644
index 0000000..34e81db
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogSource.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.transactions;
+
+public class LogSource {
+
+    public static final byte LOCAL = 0;
+    public static final byte REMOTE = 1;
+    public static final byte REMOTE_RECOVERY = 2;
+
+    private static final String STRING_LOCAL = "LOCAL";
+    private static final String STRING_REMOTE = "REMOTE";
+    private static final String STRING_REMOTE_RECOVERY = "REMOTE_RECOVERY";
+
+    private static final String STRING_INVALID_LOG_SOURCE = "INVALID_LOG_SOURCE";
+
+    public static String toString(byte LogSource) {
+        switch (LogSource) {
+            case LOCAL:
+                return STRING_LOCAL;
+            case REMOTE:
+                return STRING_REMOTE;
+            case REMOTE_RECOVERY:
+                return STRING_REMOTE_RECOVERY;
+            default:
+                return STRING_INVALID_LOG_SOURCE;
+        }
+    }
+
+}
\ No newline at end of file
diff --git a/asterix-common/src/main/resources/schema/cluster.xsd b/asterix-common/src/main/resources/schema/cluster.xsd
index d9b62c2..d3203d5 100644
--- a/asterix-common/src/main/resources/schema/cluster.xsd
+++ b/asterix-common/src/main/resources/schema/cluster.xsd
@@ -44,7 +44,11 @@
 	<xs:element name="http_port" type="xs:integer" />
 	<xs:element name="debug_port" type="xs:integer" />
 	<xs:element name="metadata_node" type="xs:string" />
-	
+	<xs:element name="enabled" type="xs:boolean" />
+	<xs:element name="replication_port" type="xs:integer" />
+	<xs:element name="replication_factor" type="xs:integer" />
+	<xs:element name="replication_store" type="xs:string" />
+	<xs:element name="replication_time_out" type="xs:integer" />
 
 	<!-- definition of complex elements -->
 	<xs:element name="working_dir">
@@ -72,6 +76,18 @@
 		</xs:complexType>
 	</xs:element>
 
+	<xs:element name="data_replication">
+		<xs:complexType>
+			<xs:sequence>
+				<xs:element ref="cl:enabled" />
+				<xs:element ref="cl:replication_port" />
+				<xs:element ref="cl:replication_factor" />
+				<xs:element ref="cl:replication_store" />
+				<xs:element ref="cl:replication_time_out" />
+			</xs:sequence>
+		</xs:complexType>
+	</xs:element>
+
 	<xs:element name="property">
 		<xs:complexType>
 			<xs:sequence>
@@ -126,6 +142,7 @@
 				<xs:element ref="cl:iodevices" minOccurs="0" />
 				<xs:element ref="cl:working_dir" />
 				<xs:element ref="cl:metadata_node" />
+				<xs:element ref="cl:data_replication" minOccurs="0" />
 				<xs:element ref="cl:master_node" />
 				<xs:element ref="cl:node" maxOccurs="unbounded" />
 				<xs:element ref="cl:substitute_nodes" />