Allow Replication to be Enabled on Virtual Cluster
- Allow replication port assignment per NC.
- Allow replication to be enabled on virtual cluster.
- Wait for JOB_ABORT ACK from remote replicas.
- Fix LSM component mask file name.
- Fix index directory deletion on index drop.
- Eliminate multiple partition takeover requests.
- Free LogFlusher thread from sending replication ACKs.
- Fix possible deadlock between LogFlusher and Logs Replication Thread.
- Remove wait for FLUSH_LOG for replicated LSM components:
This wait is not needed since on node failure, complete remote recovery is done.
Change-Id: I34a38f59c4915a19242adb6a4eaa6ee1c82d2372
Reviewed-on: https://asterix-gerrit.ics.uci.edu/743
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ian Maxon <imaxon@apache.org>
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
index cb7bcab..5d31d9a 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
@@ -77,10 +77,15 @@
}
public int getDataReplicationPort(String nodeId) {
- if (cluster != null) {
- return cluster.getDataReplication().getReplicationPort().intValue();
+ if (cluster != null && cluster.getDataReplication() != null) {
+ for (int i = 0; i < cluster.getNode().size(); i++) {
+ Node node = cluster.getNode().get(i);
+ if (getRealCluserNodeID(node.getId()).equals(nodeId)) {
+ return node.getReplicationPort() != null ? node.getReplicationPort().intValue()
+ : cluster.getDataReplication().getReplicationPort().intValue();
+ }
+ }
}
-
return REPLICATION_DATAPORT_DEFAULT;
}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationThread.java b/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationThread.java
index 9226a66..a88b82a 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationThread.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationThread.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.common.replication;
+import java.nio.channels.SocketChannel;
+
import org.apache.asterix.common.transactions.LogRecord;
public interface IReplicationThread extends Runnable {
@@ -29,4 +31,9 @@
* The log that has been flushed.
*/
public void notifyLogReplicationRequester(LogRecord logRecord);
+
+ /**
+ * @return The replication client socket channel.
+ */
+ public SocketChannel getReplicationClientSocket();
}
diff --git a/asterix-common/src/main/resources/schema/cluster.xsd b/asterix-common/src/main/resources/schema/cluster.xsd
index 935d33f..ae09b16 100644
--- a/asterix-common/src/main/resources/schema/cluster.xsd
+++ b/asterix-common/src/main/resources/schema/cluster.xsd
@@ -123,6 +123,7 @@
<xs:element ref="cl:txn_log_dir" minOccurs="0" />
<xs:element ref="cl:iodevices" minOccurs="0" />
<xs:element ref="cl:debug_port" minOccurs="0" />
+ <xs:element ref="cl:replication_port" minOccurs="0" />
</xs:sequence>
</xs:complexType>
</xs:element>