Allow Replication to be Enabled on Virtual Cluster

- Allow replication port assignment per NC.
- Allow replication to be enabled on virtual cluster.
- Wait for JOB_ABORT ACK from remote replicas.
- Fix LSM component mask file name.
- Fix index directory deletion on index drop.
- Eliminate multiple partition takeover requests.
- Free LogFlusher thread from sending replication ACKs.
- Fix possible deadlock between LogFlusher and Logs Replication Thread.
- Remove wait for FLUSH_LOG for replicated LSM components:
  This wait is not needed since on node failure, complete remote recovery is done.

Change-Id: I34a38f59c4915a19242adb6a4eaa6ee1c82d2372
Reviewed-on: https://asterix-gerrit.ics.uci.edu/743
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ian Maxon <imaxon@apache.org>
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
index cb7bcab..5d31d9a 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
@@ -77,10 +77,15 @@
     }
 
     public int getDataReplicationPort(String nodeId) {
-        if (cluster != null) {
-            return cluster.getDataReplication().getReplicationPort().intValue();
+        if (cluster != null && cluster.getDataReplication() != null) {
+            for (int i = 0; i < cluster.getNode().size(); i++) {
+                Node node = cluster.getNode().get(i);
+                if (getRealCluserNodeID(node.getId()).equals(nodeId)) {
+                    return node.getReplicationPort() != null ? node.getReplicationPort().intValue()
+                            : cluster.getDataReplication().getReplicationPort().intValue();
+                }
+            }
         }
-
         return REPLICATION_DATAPORT_DEFAULT;
     }
 
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationThread.java b/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationThread.java
index 9226a66..a88b82a 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationThread.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationThread.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.common.replication;
 
+import java.nio.channels.SocketChannel;
+
 import org.apache.asterix.common.transactions.LogRecord;
 
 public interface IReplicationThread extends Runnable {
@@ -29,4 +31,9 @@
      *            The log that has been flushed.
      */
     public void notifyLogReplicationRequester(LogRecord logRecord);
+
+    /**
+     * @return The replication client socket channel.
+     */
+    public SocketChannel getReplicationClientSocket();
 }
diff --git a/asterix-common/src/main/resources/schema/cluster.xsd b/asterix-common/src/main/resources/schema/cluster.xsd
index 935d33f..ae09b16 100644
--- a/asterix-common/src/main/resources/schema/cluster.xsd
+++ b/asterix-common/src/main/resources/schema/cluster.xsd
@@ -123,6 +123,7 @@
 				<xs:element ref="cl:txn_log_dir" minOccurs="0" />
 				<xs:element ref="cl:iodevices" minOccurs="0" />
 				<xs:element ref="cl:debug_port" minOccurs="0" />
+				<xs:element ref="cl:replication_port" minOccurs="0" />
 			</xs:sequence>
 		</xs:complexType>
 	</xs:element>