[NO ISSUE][REP] Extensible Replication Strategy Factory
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- Allow specifying replication strategy factory at
NC application level.
- Add replica getter to replica manager.
Change-Id: I7c71ae2d19c81050c4e338efac155ae39f4b202e
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/12825
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index eb8a92f..f532352 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -53,6 +53,7 @@
import org.apache.asterix.common.library.ILibraryManager;
import org.apache.asterix.common.replication.IReplicationChannel;
import org.apache.asterix.common.replication.IReplicationManager;
+import org.apache.asterix.common.replication.IReplicationStrategyFactory;
import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
import org.apache.asterix.common.storage.IReplicaManager;
import org.apache.asterix.common.transactions.IRecoveryManager;
@@ -176,7 +177,8 @@
@Override
public void initialize(IRecoveryManagerFactory recoveryManagerFactory, IReceptionistFactory receptionistFactory,
- IConfigValidatorFactory configValidatorFactory, boolean initialRun) throws IOException {
+ IConfigValidatorFactory configValidatorFactory, IReplicationStrategyFactory replicationStrategyFactory,
+ boolean initialRun) throws IOException {
ioManager = getServiceContext().getIoManager();
int ioQueueLen = getServiceContext().getAppConfig().getInt(NCConfig.Option.IO_QUEUE_SIZE);
threadExecutor =
@@ -231,7 +233,7 @@
receptionist = receptionistFactory.create();
if (replicationProperties.isReplicationEnabled()) {
- replicationManager = new ReplicationManager(this, replicationProperties);
+ replicationManager = new ReplicationManager(this, replicationStrategyFactory, replicationProperties);
//pass replication manager to replication required object
//LogManager to replicate logs
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
index 655f9da..954ff8d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
@@ -101,6 +101,11 @@
}
@Override
+ public synchronized IPartitionReplica getReplica(ReplicaIdentifier id) {
+ return replicas.get(id);
+ }
+
+ @Override
public synchronized Set<Integer> getPartitions() {
return Collections.unmodifiableSet(partitions);
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index b29843c..18e8e65 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -69,6 +69,8 @@
import org.apache.asterix.common.config.PropertiesFactory;
import org.apache.asterix.common.config.StorageProperties;
import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.replication.IReplicationStrategyFactory;
+import org.apache.asterix.common.replication.ReplicationStrategyFactory;
import org.apache.asterix.common.transactions.Checkpoint;
import org.apache.asterix.common.transactions.IRecoveryManager;
import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
@@ -163,7 +165,7 @@
updateOnNodeJoin();
}
runtimeContext.initialize(getRecoveryManagerFactory(), getReceptionistFactory(), getConfigValidatorFactory(),
- runtimeContext.getNodeProperties().isInitialRun());
+ getReplicationStrategyFactory(), runtimeContext.getNodeProperties().isInitialRun());
MessagingProperties messagingProperties = runtimeContext.getMessagingProperties();
NCMessageBroker messageBroker = new NCMessageBroker(controllerService, messagingProperties);
this.ncServiceCtx.setMessageBroker(messageBroker);
@@ -194,6 +196,10 @@
return Receptionist::new;
}
+ protected IReplicationStrategyFactory getReplicationStrategyFactory() {
+ return new ReplicationStrategyFactory();
+ }
+
protected IConfigValidatorFactory getConfigValidatorFactory() {
return ConfigValidator::new;
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
index 8c82979..5475b97 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
@@ -26,6 +26,7 @@
import org.apache.asterix.common.library.ILibraryManager;
import org.apache.asterix.common.replication.IReplicationChannel;
import org.apache.asterix.common.replication.IReplicationManager;
+import org.apache.asterix.common.replication.IReplicationStrategyFactory;
import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
import org.apache.asterix.common.storage.IReplicaManager;
import org.apache.asterix.common.transactions.IRecoveryManagerFactory;
@@ -71,7 +72,8 @@
IResourceIdFactory getResourceIdFactory();
void initialize(IRecoveryManagerFactory recoveryManagerFactory, IReceptionistFactory receptionistFactory,
- IConfigValidatorFactory configValidatorFactory, boolean initialRun) throws IOException, AlgebricksException;
+ IConfigValidatorFactory configValidatorFactory, IReplicationStrategyFactory replicationStrategyFactory,
+ boolean initialRun) throws IOException, AlgebricksException;
void setShuttingdown(boolean b);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategyFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategyFactory.java
new file mode 100644
index 0000000..384cd65
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationStrategyFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+public interface IReplicationStrategyFactory {
+
+ /**
+ * Creates a replication strategy based on the provided {@code name}
+ *
+ * @param name
+ * @return the replication strategy
+ */
+ IReplicationStrategy create(String name);
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ReplicationStrategyFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ReplicationStrategyFactory.java
index 9c129d8..935b08f 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ReplicationStrategyFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/ReplicationStrategyFactory.java
@@ -21,7 +21,7 @@
import java.util.HashMap;
import java.util.Map;
-public class ReplicationStrategyFactory {
+public class ReplicationStrategyFactory implements IReplicationStrategyFactory {
private static final Map<String, Class<? extends IReplicationStrategy>> BUILT_IN_REPLICATION_STRATEGY =
new HashMap<>();
@@ -32,11 +32,8 @@
BUILT_IN_REPLICATION_STRATEGY.put("metadata", MetadataOnlyReplicationStrategy.class);
}
- private ReplicationStrategyFactory() {
- throw new AssertionError();
- }
-
- public static IReplicationStrategy create(String name) {
+ @Override
+ public IReplicationStrategy create(String name) {
String strategyName = name.toLowerCase();
if (!BUILT_IN_REPLICATION_STRATEGY.containsKey(strategyName)) {
throw new IllegalStateException("Couldn't find strategy with name: " + name);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
index 1b8ec53..b081c65 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
@@ -79,4 +79,11 @@
* @return the synchronization lock
*/
Object getReplicaSyncLock();
+
+ /**
+ * Gets the partition replicas matching {@code id}
+ * @param id
+ * @return The partition replica if found. Otherwise, null.
+ */
+ IPartitionReplica getReplica(ReplicaIdentifier id);
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
index 7ed674e..5cd26de 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
@@ -30,7 +30,7 @@
import org.apache.asterix.common.replication.IReplicationDestination;
import org.apache.asterix.common.replication.IReplicationManager;
import org.apache.asterix.common.replication.IReplicationStrategy;
-import org.apache.asterix.common.replication.ReplicationStrategyFactory;
+import org.apache.asterix.common.replication.IReplicationStrategyFactory;
import org.apache.asterix.common.transactions.ILogRecord;
import org.apache.asterix.replication.api.ReplicationDestination;
import org.apache.hyracks.api.replication.IReplicationJob;
@@ -50,10 +50,11 @@
private final LogReplicationManager logReplicationManager;
private final IndexReplicationManager lsnIndexReplicationManager;
- public ReplicationManager(INcApplicationContext appCtx, ReplicationProperties replicationProperties) {
+ public ReplicationManager(INcApplicationContext appCtx, IReplicationStrategyFactory replicationStrategyFactory,
+ ReplicationProperties replicationProperties) {
this.replicationProperties = replicationProperties;
this.appCtx = appCtx;
- strategy = ReplicationStrategyFactory.create(replicationProperties.getReplicationStrategy());
+ strategy = replicationStrategyFactory.create(replicationProperties.getReplicationStrategy());
logReplicationManager = new LogReplicationManager(appCtx, this);
lsnIndexReplicationManager = new IndexReplicationManager(appCtx, this);
}