Allow Replication to be Enabled on Virtual Cluster

- Allow replication port assignment per NC.
- Allow replication to be enabled on virtual cluster.
- Wait for JOB_ABORT ACK from remote replicas.
- Fix LSM component mask file name.
- Fix index directory deletion on index drop.
- Eliminate multiple partition takeover requests.
- Free LogFlusher thread from sending replication ACKs.
- Fix possible deadlock between LogFlusher and Logs Replication Thread.
- Remove wait for FLUSH_LOG for replicated LSM components:
  This wait is not needed since on node failure, complete remote recovery is done.

Change-Id: I34a38f59c4915a19242adb6a4eaa6ee1c82d2372
Reviewed-on: https://asterix-gerrit.ics.uci.edu/743
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ian Maxon <imaxon@apache.org>
diff --git a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index a568464..c67eb70 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.api.common;
 
 import java.io.File;
+import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.List;
 
@@ -83,6 +84,7 @@
             ncConfig1.nodeId = ncName;
             ncConfig1.resultTTL = 30000;
             ncConfig1.resultSweepThreshold = 1000;
+            ncConfig1.appArgs = Arrays.asList("-virtual-NC");
             String tempPath = System.getProperty(IO_DIR_KEY);
             if (tempPath.endsWith(File.separator)) {
                 tempPath = tempPath.substring(0, tempPath.length() - 1);
@@ -109,8 +111,19 @@
                 }
             }
             ncConfig1.appNCMainClass = NCApplicationEntryPoint.class.getName();
-            ncs[n] = new NodeControllerService(ncConfig1);
-            ncs[n].start();
+            NodeControllerService nodeControllerService = new NodeControllerService(ncConfig1);
+            ncs[n] = nodeControllerService;
+            Thread ncStartThread = new Thread() {
+                @Override
+                public void run() {
+                    try {
+                        nodeControllerService.start();
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+            };
+            ncStartThread.start();
             ++n;
         }
         hcc = new HyracksConnection(cc.getConfig().clientNetIpAddress, cc.getConfig().clientNetPort);
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index 4922ae6..643bb16 100644
--- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -64,6 +64,9 @@
     @Option(name = "-initial-run", usage = "A flag indicating if it's the first time the NC is started (default: false)", required = false)
     public boolean initialRun = false;
 
+    @Option(name = "-virtual-NC", usage = "A flag indicating if this NC is running on virtual cluster (default: false)", required = false)
+    public boolean virtualNC = false;
+
     private INCApplicationContext ncApplicationContext = null;
     private IAsterixAppRuntimeContext runtimeContext;
     private String nodeId;
@@ -88,7 +91,6 @@
 
         ncAppCtx.setThreadFactory(new AsterixThreadFactory(ncAppCtx.getLifeCycleComponentManager()));
         ncApplicationContext = ncAppCtx;
-
         nodeId = ncApplicationContext.getNodeId();
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Starting Asterix node controller: " + nodeId);
@@ -120,7 +122,8 @@
                 LOGGER.info("System is in a state: " + systemState);
             }
 
-            if (replicationEnabled) {
+            //do not attempt to perform remote recovery if this is a virtual NC
+            if (replicationEnabled && !virtualNC) {
                 if (systemState == SystemState.NEW_UNIVERSE || systemState == SystemState.CORRUPTED) {
                     //Try to perform remote recovery
                     IRemoteRecoveryManager remoteRecoveryMgr = runtimeContext.getRemoteRecoveryManager();
diff --git a/asterix-app/src/main/resources/asterix-build-configuration.xml b/asterix-app/src/main/resources/asterix-build-configuration.xml
index 731113b..ff03ab6 100644
--- a/asterix-app/src/main/resources/asterix-build-configuration.xml
+++ b/asterix-app/src/main/resources/asterix-build-configuration.xml
@@ -98,4 +98,10 @@
 		<description>Enabling plot of Algebricks plan to tmp folder. (Default = false)
 		</description>
 	</property>
+    <property>
+        <name>log.level</name>
+        <value>WARNING</value>
+        <description>The minimum log level to be displayed. (Default = INFO)
+        </description>
+    </property>
 </asterixConfiguration>
diff --git a/asterix-app/src/main/resources/cluster.xml b/asterix-app/src/main/resources/cluster.xml
new file mode 100644
index 0000000..8f0b694
--- /dev/null
+++ b/asterix-app/src/main/resources/cluster.xml
@@ -0,0 +1,49 @@
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements.  See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership.  The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License.  You may obtain a copy of the License at
+ !
+ !   http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied.  See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<cluster xmlns="cluster">
+  <instance_name>asterix</instance_name>
+  <store>storage</store>
+
+  <data_replication>
+    <enabled>false</enabled>
+    <replication_port>2016</replication_port>
+    <replication_factor>2</replication_factor>
+    <auto_failover>false</auto_failover>
+    <replication_time_out>30</replication_time_out>
+  </data_replication>
+
+  <master_node>
+    <id>master</id>
+    <client_ip>127.0.0.1</client_ip>
+    <cluster_ip>127.0.0.1</cluster_ip>
+    <client_port>1098</client_port>
+    <cluster_port>1099</cluster_port>
+    <http_port>8888</http_port>
+  </master_node>
+  <node>
+    <id>nc1</id>
+    <cluster_ip>127.0.0.1</cluster_ip>
+    <replication_port>2016</replication_port>
+  </node>
+  <node>
+    <id>nc2</id>
+    <cluster_ip>127.0.0.1</cluster_ip>
+    <replication_port>2017</replication_port>
+  </node>
+</cluster>
\ No newline at end of file
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
index cb7bcab..5d31d9a 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
@@ -77,10 +77,15 @@
     }
 
     public int getDataReplicationPort(String nodeId) {
-        if (cluster != null) {
-            return cluster.getDataReplication().getReplicationPort().intValue();
+        if (cluster != null && cluster.getDataReplication() != null) {
+            for (int i = 0; i < cluster.getNode().size(); i++) {
+                Node node = cluster.getNode().get(i);
+                if (getRealCluserNodeID(node.getId()).equals(nodeId)) {
+                    return node.getReplicationPort() != null ? node.getReplicationPort().intValue()
+                            : cluster.getDataReplication().getReplicationPort().intValue();
+                }
+            }
         }
-
         return REPLICATION_DATAPORT_DEFAULT;
     }
 
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationThread.java b/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationThread.java
index 9226a66..a88b82a 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationThread.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationThread.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.common.replication;
 
+import java.nio.channels.SocketChannel;
+
 import org.apache.asterix.common.transactions.LogRecord;
 
 public interface IReplicationThread extends Runnable {
@@ -29,4 +31,9 @@
      *            The log that has been flushed.
      */
     public void notifyLogReplicationRequester(LogRecord logRecord);
+
+    /**
+     * @return The replication client socket channel.
+     */
+    public SocketChannel getReplicationClientSocket();
 }
diff --git a/asterix-common/src/main/resources/schema/cluster.xsd b/asterix-common/src/main/resources/schema/cluster.xsd
index 935d33f..ae09b16 100644
--- a/asterix-common/src/main/resources/schema/cluster.xsd
+++ b/asterix-common/src/main/resources/schema/cluster.xsd
@@ -123,6 +123,7 @@
 				<xs:element ref="cl:txn_log_dir" minOccurs="0" />
 				<xs:element ref="cl:iodevices" minOccurs="0" />
 				<xs:element ref="cl:debug_port" minOccurs="0" />
+				<xs:element ref="cl:replication_port" minOccurs="0" />
 			</xs:sequence>
 		</xs:complexType>
 	</xs:element>
diff --git a/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java b/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java
index 29765fd..c92262c 100644
--- a/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java
+++ b/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java
@@ -41,7 +41,7 @@
 public class EventDriver {
 
     public static final String CLIENT_NODE_ID = "client_node";
-    public static final Node CLIENT_NODE = new Node(CLIENT_NODE_ID, "127.0.0.1", null, null, null, null, null);
+    public static final Node CLIENT_NODE = new Node(CLIENT_NODE_ID, "127.0.0.1", null, null, null, null, null, null);
 
     private static String eventsDir;
     private static Events events;
diff --git a/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java b/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java
index b83faa2..57648b8 100644
--- a/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java
+++ b/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java
@@ -20,6 +20,7 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.math.BigInteger;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
@@ -191,7 +192,7 @@
             String javaHome = cluster.getMasterNode().getJavaHome() == null ? cluster.getJavaHome() : cluster
                     .getMasterNode().getJavaHome();
             return new Node(cluster.getMasterNode().getId(), cluster.getMasterNode().getClusterIp(), javaHome, logDir,
-                    null, null, cluster.getMasterNode().getDebugPort());
+                    null, null, cluster.getMasterNode().getDebugPort(), null);
         }
 
         List<Node> nodeList = cluster.getNode();
diff --git a/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java b/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
index 9559394..4037eaf 100644
--- a/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
+++ b/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
@@ -105,7 +105,7 @@
 
             MasterNode masterNode = cluster.getMasterNode();
             Node master = new Node(masterNode.getId(), masterNode.getClusterIp(), masterNode.getJavaHome(),
-                    masterNode.getLogDir(), null, null, null);
+                    masterNode.getLogDir(), null, null, null, null);
             ipAddresses.add(masterNode.getClusterIp());
 
             valid = valid & validateNodeConfiguration(master, cluster);
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManager.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManager.java
index 3e37694..e7bf3bf 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManager.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManager.java
@@ -62,9 +62,10 @@
 
     private ClusterManager() {
         Cluster asterixCluster = AsterixClusterProperties.INSTANCE.getCluster();
-        String eventHome = asterixCluster == null ? null : asterixCluster.getWorkingDir().getDir();
+        String eventHome = asterixCluster == null ? null
+                : asterixCluster.getWorkingDir() == null ? null : asterixCluster.getWorkingDir().getDir();
 
-        if (asterixCluster != null) {
+        if (eventHome != null) {
             String asterixDir = System.getProperty("user.dir") + File.separator + "asterix";
             File configFile = new File(System.getProperty("user.dir") + File.separator + "configuration.xml");
             Configuration configuration = null;
@@ -74,8 +75,8 @@
                 Unmarshaller unmarshaller = configCtx.createUnmarshaller();
                 configuration = (Configuration) unmarshaller.unmarshal(configFile);
                 AsterixEventService.initialize(configuration, asterixDir, eventHome);
-                client = AsterixEventService.getAsterixEventServiceClient(AsterixClusterProperties.INSTANCE
-                        .getCluster());
+                client = AsterixEventService
+                        .getAsterixEventServiceClient(AsterixClusterProperties.INSTANCE.getCluster());
 
                 lookupService = ServiceProvider.INSTANCE.getLookupService();
                 if (!lookupService.isRunning(configuration)) {
diff --git a/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java b/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
index 3173525..abfcf67 100644
--- a/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
+++ b/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
@@ -371,6 +371,7 @@
                             partitionRecoveryPlan.get(replica).add(partition.getPartitionId());
                         }
                     }
+                    break;
                 }
             }
 
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java b/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java
index d2380c1..1ff6cc4 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java
@@ -37,7 +37,7 @@
     /**
      * All replication messages start with ReplicationFunctions (4 bytes), then the length of the request in bytes
      */
-    public static final String JOB_COMMIT_ACK = "$";
+    public static final String JOB_REPLICATION_ACK = "$";
 
     public final static int REPLICATION_REQUEST_TYPE_SIZE = Integer.BYTES;
     public final static int REPLICATION_REQUEST_HEADER_SIZE = REPLICATION_REQUEST_TYPE_SIZE + Integer.BYTES;
@@ -297,11 +297,11 @@
     }
 
     public static int getJobIdFromLogAckMessage(String msg) {
-        return Integer.parseInt(msg.substring((msg.indexOf(JOB_COMMIT_ACK) + 1)));
+        return Integer.parseInt(msg.substring((msg.indexOf(JOB_REPLICATION_ACK) + 1)));
     }
 
     public static String getNodeIdFromLogAckMessage(String msg) {
-        return msg.substring(0, msg.indexOf(JOB_COMMIT_ACK));
+        return msg.substring(0, msg.indexOf(JOB_REPLICATION_ACK));
     }
 
     /**
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogMapping.java b/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogMapping.java
index f61fbc6..3b2aff7 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogMapping.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogMapping.java
@@ -24,18 +24,9 @@
 
     private String remoteNodeID;
     private long remoteLSN;
-    private boolean isFlushed = false;
     private long localLSN;
     public AtomicInteger numOfFlushedIndexes = new AtomicInteger();
 
-    public boolean isFlushed() {
-        return isFlushed;
-    }
-
-    public void setFlushed(boolean isFlushed) {
-        this.isFlushed = isFlushed;
-    }
-
     public String getRemoteNodeID() {
         return remoteNodeID;
     }
@@ -66,7 +57,10 @@
         sb.append("Remote Node: " + remoteNodeID);
         sb.append(" Remote LSN: " + remoteLSN);
         sb.append(" Local LSN: " + localLSN);
-        sb.append(" isFlushed : " + isFlushed);
         return sb.toString();
     }
+
+    public String getNodeUniqueLSN() {
+        return TxnLogUtil.getNodeUniqueLSN(remoteNodeID, remoteLSN);
+    }
 }
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnLogUtil.java b/asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnLogUtil.java
new file mode 100644
index 0000000..f51a64d
--- /dev/null
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnLogUtil.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.logging;
+
+public class TxnLogUtil {
+
+    private TxnLogUtil() {
+        //prevent util class construction
+    }
+
+    /**
+     * @param nodeId
+     * @param LSN
+     * @return Concatenation of nodeId and LSN
+     */
+    public static String getNodeUniqueLSN(String nodeId, long LSN) {
+        return nodeId + LSN;
+    }
+}
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
index b9447af..331116b 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
@@ -95,10 +95,13 @@
     private final IAsterixAppRuntimeContextProvider asterixAppRuntimeContextProvider;
     private final static int INTIAL_BUFFER_SIZE = 4000; //4KB
     private final LinkedBlockingQueue<LSMComponentLSNSyncTask> lsmComponentRemoteLSN2LocalLSNMappingTaskQ;
+    private final LinkedBlockingQueue<LogRecord> pendingNotificationRemoteLogsQ;
     private final Map<String, LSMComponentProperties> lsmComponentId2PropertiesMap;
-    private final Map<Long, RemoteLogMapping> localLSN2RemoteLSNMap;
+    private final Map<String, RemoteLogMapping> replicaUniqueLSN2RemoteMapping;
     private final LSMComponentsSyncService lsmComponentLSNMappingService;
     private final Set<Integer> nodeHostedPartitions;
+    private final ReplicationNotifier replicationNotifier;
+    private final Object flushLogslock = new Object();
 
     public ReplicationChannel(String nodeId, AsterixReplicationProperties replicationProperties, ILogManager logManager,
             IReplicaResourcesManager replicaResoucesManager, IReplicationManager replicationManager,
@@ -110,9 +113,11 @@
         this.replicationProperties = replicationProperties;
         this.asterixAppRuntimeContextProvider = asterixAppRuntimeContextProvider;
         lsmComponentRemoteLSN2LocalLSNMappingTaskQ = new LinkedBlockingQueue<LSMComponentLSNSyncTask>();
+        pendingNotificationRemoteLogsQ = new LinkedBlockingQueue<>();
         lsmComponentId2PropertiesMap = new ConcurrentHashMap<String, LSMComponentProperties>();
-        localLSN2RemoteLSNMap = new ConcurrentHashMap<Long, RemoteLogMapping>();
+        replicaUniqueLSN2RemoteMapping = new ConcurrentHashMap<>();
         lsmComponentLSNMappingService = new LSMComponentsSyncService();
+        replicationNotifier = new ReplicationNotifier();
         replicationThreads = Executors.newCachedThreadPool(appContext.getThreadFactory());
         Map<String, ClusterPartition[]> nodePartitions = ((IAsterixPropertiesProvider) asterixAppRuntimeContextProvider
                 .getAppContext()).getMetadataProperties().getNodePartitions();
@@ -140,7 +145,7 @@
                     dataPort);
             serverSocketChannel.socket().bind(replicationChannelAddress);
             lsmComponentLSNMappingService.start();
-
+            replicationNotifier.start();
             LOGGER.log(Level.INFO, "opened Replication Channel @ IP Address: " + nodeIP + ":" + dataPort);
 
             //start accepting replication requests
@@ -152,7 +157,7 @@
             }
         } catch (IOException e) {
             throw new IllegalStateException(
-                    "Could not opened replication channel @ IP Address: " + nodeIP + ":" + dataPort, e);
+                    "Could not open replication channel @ IP Address: " + nodeIP + ":" + dataPort, e);
         }
     }
 
@@ -164,13 +169,13 @@
         if (remainingFile == 0) {
             if (lsmCompProp.getOpType() == LSMOperationType.FLUSH && lsmCompProp.getReplicaLSN() != null) {
                 //if this LSN wont be used for any other index, remove it
-                if (localLSN2RemoteLSNMap.containsKey(lsmCompProp.getReplicaLSN())) {
-                    int remainingIndexes = localLSN2RemoteLSNMap.get(lsmCompProp.getReplicaLSN()).numOfFlushedIndexes
-                            .decrementAndGet();
+                if (replicaUniqueLSN2RemoteMapping.containsKey(lsmCompProp.getNodeUniqueLSN())) {
+                    int remainingIndexes = replicaUniqueLSN2RemoteMapping
+                            .get(lsmCompProp.getNodeUniqueLSN()).numOfFlushedIndexes.decrementAndGet();
                     if (remainingIndexes == 0) {
                         //Note: there is a chance that this is never deleted because some index in the dataset was not flushed because it is empty.
                         //This could be solved by passing only the number of successfully flushed indexes
-                        localLSN2RemoteLSNMap.remove(lsmCompProp.getReplicaLSN());
+                        replicaUniqueLSN2RemoteMapping.remove(lsmCompProp.getNodeUniqueLSN());
                     }
                 }
             }
@@ -182,22 +187,6 @@
         }
     }
 
-    /**
-     * @param replicaId
-     *            the remote replica id this log belongs to.
-     * @param remoteLSN
-     *            the remote LSN received from the remote replica.
-     * @return The local log mapping if found. Otherwise null.
-     */
-    private RemoteLogMapping getRemoteLogMapping(String replicaId, long remoteLSN) {
-        for (RemoteLogMapping mapping : localLSN2RemoteLSNMap.values()) {
-            if (mapping.getRemoteLSN() == remoteLSN && mapping.getRemoteNodeID().equals(replicaId)) {
-                return mapping;
-            }
-        }
-        return null;
-    }
-
     @Override
     public void close() throws IOException {
         if (!serverSocketChannel.isOpen()) {
@@ -538,56 +527,65 @@
                     }
                     break;
                 case LogType.JOB_COMMIT:
-                    LogRecord jobCommitLog = new LogRecord();
-                    TransactionUtil.formJobTerminateLogRecord(jobCommitLog, remoteLog.getJobId(), true);
-                    jobCommitLog.setReplicationThread(this);
-                    jobCommitLog.setLogSource(LogSource.REMOTE);
-                    logManager.log(jobCommitLog);
+                case LogType.ABORT:
+                    LogRecord jobTerminationLog = new LogRecord();
+                    TransactionUtil.formJobTerminateLogRecord(jobTerminationLog, remoteLog.getJobId(),
+                            remoteLog.getLogType() == LogType.JOB_COMMIT);
+                    jobTerminationLog.setReplicationThread(this);
+                    jobTerminationLog.setLogSource(LogSource.REMOTE);
+                    logManager.log(jobTerminationLog);
                     break;
                 case LogType.FLUSH:
-                    LogRecord flushLog = new LogRecord();
-                    TransactionUtil.formFlushLogRecord(flushLog, remoteLog.getDatasetId(), null, remoteLog.getNodeId(),
-                            remoteLog.getNumOfFlushedIndexes());
-                    flushLog.setReplicationThread(this);
-                    flushLog.setLogSource(LogSource.REMOTE);
-                    synchronized (localLSN2RemoteLSNMap) {
-                        logManager.log(flushLog);
-                        //store mapping information for flush logs to use them in incoming LSM components.
-                        RemoteLogMapping flushLogMap = new RemoteLogMapping();
-                        flushLogMap.setRemoteLSN(remoteLog.getLSN());
-                        flushLogMap.setRemoteNodeID(remoteLog.getNodeId());
-                        flushLogMap.setLocalLSN(flushLog.getLSN());
-                        flushLogMap.numOfFlushedIndexes.set(remoteLog.getNumOfFlushedIndexes());
-                        localLSN2RemoteLSNMap.put(flushLog.getLSN(), flushLogMap);
-                        localLSN2RemoteLSNMap.notifyAll();
+                    //store mapping information for flush logs to use them in incoming LSM components.
+                    RemoteLogMapping flushLogMap = new RemoteLogMapping();
+                    flushLogMap.setRemoteNodeID(remoteLog.getNodeId());
+                    flushLogMap.setRemoteLSN(remoteLog.getLSN());
+                    logManager.log(remoteLog);
+                    //the log LSN value is updated by logManager.log(.) to a local value
+                    flushLogMap.setLocalLSN(remoteLog.getLSN());
+                    flushLogMap.numOfFlushedIndexes.set(remoteLog.getNumOfFlushedIndexes());
+                    replicaUniqueLSN2RemoteMapping.put(flushLogMap.getNodeUniqueLSN(), flushLogMap);
+                    synchronized (flushLogslock) {
+                        flushLogslock.notify();
                     }
                     break;
                 default:
-                    throw new ACIDException("Unsupported LogType: " + remoteLog.getLogType());
+                    LOGGER.severe("Unsupported LogType: " + remoteLog.getLogType());
             }
         }
 
-        //this method is called sequentially by LogPage (notifyReplicationTerminator) for JOB_COMMIT and FLUSH log types
+        //this method is called sequentially by LogPage (notifyReplicationTerminator) for JOB_COMMIT and JOB_ABORT log types.
         @Override
         public void notifyLogReplicationRequester(LogRecord logRecord) {
-            //Note: this could be optimized by moving this to a different thread and freeing the LogPage thread faster
-            if (logRecord.getLogType() == LogType.JOB_COMMIT) {
-                //send ACK to requester
+            pendingNotificationRemoteLogsQ.offer(logRecord);
+        }
+
+        @Override
+        public SocketChannel getReplicationClientSocket() {
+            return socketChannel;
+        }
+    }
+
+    /**
+     * This thread is responsible for sending JOB_COMMIT/ABORT ACKs to replication clients.
+     */
+    private class ReplicationNotifier extends Thread {
+        @Override
+        public void run() {
+            Thread.currentThread().setName("ReplicationNotifier Thread");
+            while (true) {
                 try {
-                    socketChannel.socket().getOutputStream()
-                            .write((localNodeID + ReplicationProtocol.JOB_COMMIT_ACK + logRecord.getJobId() + "\n")
-                                    .getBytes());
-                    socketChannel.socket().getOutputStream().flush();
-                } catch (IOException e) {
-                    e.printStackTrace();
-                }
-            } else if (logRecord.getLogType() == LogType.FLUSH) {
-                synchronized (localLSN2RemoteLSNMap) {
-                    RemoteLogMapping remoteLogMap = localLSN2RemoteLSNMap.get(logRecord.getLSN());
-                    synchronized (remoteLogMap) {
-                        remoteLogMap.setFlushed(true);
-                        remoteLogMap.notifyAll();
+                    LogRecord logRecord = pendingNotificationRemoteLogsQ.take();
+                    //send ACK to requester
+                    try {
+                        logRecord.getReplicationThread().getReplicationClientSocket().socket().getOutputStream()
+                                .write((localNodeID + ReplicationProtocol.JOB_REPLICATION_ACK + logRecord.getJobId()
+                                        + System.lineSeparator()).getBytes());
+                    } catch (IOException e) {
+                        LOGGER.severe("Failed to send job replication ACK " + logRecord.getLogRecordForDisplay());
                     }
+                } catch (InterruptedException e1) {
+                    LOGGER.severe("ReplicationNotifier Thread interrupted.");
                 }
             }
         }
@@ -629,26 +627,23 @@
                 return;
             }
 
+            //path to the LSM component file
+            Path path = Paths.get(syncTask.getComponentFilePath());
             if (lsmCompProp.getReplicaLSN() == null) {
                 if (lsmCompProp.getOpType() == LSMOperationType.FLUSH) {
                     //need to look up LSN mapping from memory
-                    RemoteLogMapping remoteLogMap = getRemoteLogMapping(lsmCompProp.getNodeId(), remoteLSN);
-
-                    //wait until flush log arrives
-                    while (remoteLogMap == null) {
-                        synchronized (localLSN2RemoteLSNMap) {
-                            localLSN2RemoteLSNMap.wait();
-                        }
-                        remoteLogMap = getRemoteLogMapping(lsmCompProp.getNodeId(), remoteLSN);
-                    }
-
-                    //wait until the log is flushed locally before updating the disk component LSN
-                    synchronized (remoteLogMap) {
-                        while (!remoteLogMap.isFlushed()) {
-                            remoteLogMap.wait();
+                    RemoteLogMapping remoteLogMap = replicaUniqueLSN2RemoteMapping.get(lsmCompProp.getNodeUniqueLSN());
+                    if (remoteLogMap == null) {
+                        synchronized (flushLogslock) {
+                            remoteLogMap = replicaUniqueLSN2RemoteMapping.get(lsmCompProp.getNodeUniqueLSN());
+                            //wait until flush log arrives, and verify the LSM component file still exists
+                            //The component file could be deleted if its NC fails.
+                            while (remoteLogMap == null && Files.exists(path)) {
+                                flushLogslock.wait();
+                                remoteLogMap = replicaUniqueLSN2RemoteMapping.get(lsmCompProp.getNodeUniqueLSN());
+                            }
                         }
                     }
-
                     lsmCompProp.setReplicaLSN(remoteLogMap.getLocalLSN());
                 } else if (lsmCompProp.getOpType() == LSMOperationType.MERGE) {
                     //need to load the LSN mapping from disk
@@ -665,13 +660,11 @@
                          *
                          */
                         mappingLSN = logManager.getAppendLSN();
-                    } else {
-                        lsmCompProp.setReplicaLSN(mappingLSN);
                     }
+                    lsmCompProp.setReplicaLSN(mappingLSN);
                 }
             }
 
-            Path path = Paths.get(syncTask.getComponentFilePath());
             if (Files.notExists(path)) {
                 /*
                  * This could happen when a merged component arrives and deletes the flushed
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
index 7243629..93d1085 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
@@ -213,7 +213,7 @@
 
     @Override
     public void replicateLog(ILogRecord logRecord) {
-        if (logRecord.getLogType() == LogType.JOB_COMMIT) {
+        if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT) {
             //if replication is suspended, wait until it is resumed.
             while (replicationSuspended.get()) {
                 synchronized (replicationSuspended) {
@@ -734,11 +734,7 @@
 
                 return true;
             } else {
-                if (!replicationJobsPendingAcks.containsKey(logRecord.getJobId())) {
-                    synchronized (replicationJobsPendingAcks) {
-                        replicationJobsPendingAcks.put(logRecord.getJobId(), logRecord);
-                    }
-                }
+                replicationJobsPendingAcks.putIfAbsent(logRecord.getJobId(), logRecord);
                 return false;
             }
         }
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java
index 841a99f..9749c7a 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java
@@ -26,6 +26,7 @@
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
+import org.apache.asterix.replication.logging.TxnLogUtil;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexReplicationJob;
@@ -96,6 +97,8 @@
     public String getMaskPath(ReplicaResourcesManager resourceManager) {
         if (maskPath == null) {
             LSMIndexFileProperties afp = new LSMIndexFileProperties(this);
+            //split the index file path to get the LSM component file name
+            afp.splitFileName();
             maskPath = getReplicaComponentPath(resourceManager) + File.separator + afp.getFileName()
                     + ReplicaResourcesManager.LSM_COMPONENT_MASK_SUFFIX;
         }
@@ -147,10 +150,6 @@
         return nodeId;
     }
 
-    public void setNodeId(String nodeId) {
-        this.nodeId = nodeId;
-    }
-
     public int getNumberOfFiles() {
         return numberOfFiles.get();
     }
@@ -178,4 +177,8 @@
     public void setOpType(LSMOperationType opType) {
         this.opType = opType;
     }
+
+    public String getNodeUniqueLSN() {
+        return TxnLogUtil.getNodeUniqueLSN(nodeId, originalLSN);
+    }
 }
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
index 413fd7ac..41fc0b8 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
@@ -67,14 +67,12 @@
         String indexPath = getIndexPath(afp);
         if (indexPath != null) {
             if (afp.isLSMComponentFile()) {
-                String backupFilePath = indexPath + File.separator + afp.getFileName();
-
-                //delete file
-                File destFile = new File(backupFilePath);
+                //delete index file
+                String indexFilePath = indexPath + File.separator + afp.getFileName();
+                File destFile = new File(indexFilePath);
                 FileUtils.deleteQuietly(destFile);
             } else {
-                //delete index files
-                indexPath = indexPath.substring(0, indexPath.lastIndexOf(File.separator));
+                //delete index directory
                 FileUtils.deleteQuietly(new File(indexPath));
             }
         }
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
index 6060dd7..502e9c7 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
@@ -134,8 +134,7 @@
                     flushQ.offer(logRecord);
                 }
             } else if (logRecord.getLogSource() == LogSource.REMOTE) {
-
-                if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.FLUSH) {
+                if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT) {
                     remoteJobsQ.offer(logRecord);
                 }
             }
@@ -276,7 +275,7 @@
                         notifyFlushTerminator();
                     }
                 } else if (logRecord.getLogSource() == LogSource.REMOTE) {
-                    if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.FLUSH) {
+                    if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT) {
                         notifyReplicationTerminator();
                     }
                 }
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
index 8363ff1..efd66a8 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
@@ -72,13 +72,8 @@
                             //ignore
                         }
                     }
-                }
-            }
 
-            //wait for job ACK from replicas
-            //TODO should JOB_ABORT be added as well?
-            if ((logRecord.getLogType() == LogType.JOB_COMMIT) && !replicationManager.hasBeenReplicated(logRecord)) {
-                synchronized (logRecord) {
+                    //wait for job Commit/Abort ACK from replicas
                     while (!replicationManager.hasBeenReplicated(logRecord)) {
                         try {
                             logRecord.wait();