[ASTERIXDB-2198][REPL] Introduce Dynamic Replica Placement
- user model changes: no
- storage format changes: no
- interface changes: yes
- Add IReplicationMessage and IReplicaTask.
- Add notifyMetadataNodeChange to IFaultToleranceStrategy.
- Add register to IReplicationManager to allow registering
replicas at runtime.
Details:
- Add cluster APIs for:
- changing partition master node.
- changing metadata node.
- Add NC storage management API for promoting a partition replica
to master replica.
- Implement changing metadata node at runtime in
MetadataNodeFaultToleranceStrategy.
- Allow MetadataNodeFaultToleranceStrategy to have zero replica
at initialization.
- Add a flag to LangExecutionUtil to skip storage distribution
check at the end of each test.
- Add test case for metadata node failover as follows:
1- start with nc1 as metadata node.
2- add replica for metadata partition on nc2 at runtime.
3- performs metadata transactions on nc1.
4- promote metadata partition on nc2.
5- failover metadata node to nc2.
6- ensure the effects of the metadata transactions on (2) exists.
7- performs more metadata transactions on nc2.
8- ensure the effects of the metadata transactions on (7) exists.
Change-Id: I11f82efcad29d2c37324fe9d3c11d872b0348f49
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2215
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java
index 6826b26..18e837a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java
@@ -30,6 +30,7 @@
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.hyracks.api.config.IOption;
import org.apache.hyracks.api.config.Section;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.control.common.config.ConfigUtils;
import org.apache.hyracks.control.common.controllers.ControllerConfig;
import org.apache.hyracks.http.api.IServletRequest;
@@ -90,6 +91,21 @@
responseWriter.flush();
}
+ @Override
+ protected void post(IServletRequest request, IServletResponse response) throws Exception {
+ switch (localPath(request)) {
+ case "/partition/master":
+ processPartitionMaster(request, response);
+ break;
+ case "/metadataNode":
+ processMetadataNode(request, response);
+ break;
+ default:
+ sendError(response, HttpResponseStatus.NOT_FOUND);
+ break;
+ }
+ }
+
protected ObjectNode getClusterStateSummaryJSON() {
return appCtx.getClusterStateManager().getClusterStateSummary();
}
@@ -143,4 +159,16 @@
&& option != ControllerConfig.Option.CONFIG_FILE_URL;
}
+ private void processPartitionMaster(IServletRequest request, IServletResponse response) {
+ final String partition = request.getParameter("partition");
+ final String node = request.getParameter("node");
+ appCtx.getClusterStateManager().updateClusterPartition(Integer.valueOf(partition), node, true);
+ response.setStatus(HttpResponseStatus.OK);
+ }
+
+ private void processMetadataNode(IServletRequest request, IServletResponse response) throws HyracksDataException {
+ final String node = request.getParameter("node");
+ appCtx.getFaultToleranceStrategy().notifyMetadataNodeChange(node);
+ response.setStatus(HttpResponseStatus.OK);
+ }
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
index 8e73405..c2cda4e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
@@ -33,6 +33,7 @@
import org.apache.asterix.common.replication.IPartitionReplica;
import org.apache.asterix.common.storage.IReplicaManager;
import org.apache.asterix.common.storage.ReplicaIdentifier;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.http.api.IServletRequest;
import org.apache.hyracks.http.api.IServletResponse;
import org.apache.hyracks.http.server.AbstractServlet;
@@ -91,6 +92,9 @@
case "/removeReplica":
processRemoveReplica(request, response);
break;
+ case "/promote":
+ processPromote(request, response);
+ break;
default:
sendError(response, HttpResponseStatus.NOT_FOUND);
break;
@@ -160,4 +164,14 @@
final InetSocketAddress replicaAddress = new InetSocketAddress(host, Integer.valueOf(port));
return ReplicaIdentifier.of(Integer.valueOf(partition), replicaAddress);
}
+
+ private void processPromote(IServletRequest request, IServletResponse response) throws HyracksDataException {
+ final String partition = request.getParameter("partition");
+ if (partition == null) {
+ response.setStatus(HttpResponseStatus.BAD_REQUEST);
+ return;
+ }
+ appCtx.getReplicaManager().promote(Integer.valueOf(partition));
+ response.setStatus(HttpResponseStatus.OK);
+ }
}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 75159af..81b232a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -213,7 +213,7 @@
final ClusterPartition[] nodePartitions = metadataProperties.getNodePartitions().get(nodeId);
final Set<Integer> nodePartitionsIds = Arrays.stream(nodePartitions).map(ClusterPartition::getPartitionId)
.collect(Collectors.toSet());
- replicaManager = new ReplicaManager(nodePartitionsIds);
+ replicaManager = new ReplicaManager(this, nodePartitionsIds);
isShuttingdown = false;
activeManager = new ActiveManager(threadExecutor, getServiceContext().getNodeId(),
activeProperties.getMemoryComponentGlobalBudget(), compilerProperties.getFrameSize(),
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index f0ed5e9..22fa459 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -375,8 +375,8 @@
} else {
maxDiskLastLsn = resourceId2MaxLSNMap.get(resourceId);
}
-
- if (lsn > maxDiskLastLsn) {
+ // lsn @ maxDiskLastLsn is either a flush log or a master replica log
+ if (lsn >= maxDiskLastLsn) {
redo(logRecord, datasetLifecycleManager);
redoCount++;
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
index 0c84a6e..bf17a5b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
@@ -25,14 +25,19 @@
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.replication.IPartitionReplica;
+import org.apache.asterix.common.replication.IRemoteRecoveryManager;
import org.apache.asterix.common.storage.IReplicaManager;
import org.apache.asterix.common.storage.ReplicaIdentifier;
import org.apache.asterix.replication.storage.PartitionReplica;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
public class ReplicaManager implements IReplicaManager {
+ private final INcApplicationContext appCtx;
/**
* the partitions to which the current node is master
*/
@@ -42,7 +47,8 @@
*/
private final Map<ReplicaIdentifier, PartitionReplica> replicas = new HashMap<>();
- public ReplicaManager(Set<Integer> partitions) {
+ public ReplicaManager(INcApplicationContext appCtx, Set<Integer> partitions) {
+ this.appCtx = appCtx;
this.partitions.addAll(partitions);
}
@@ -52,7 +58,7 @@
throw new IllegalStateException(
"This node is not the current master of partition(" + id.getPartition() + ")");
}
- replicas.computeIfAbsent(id, key -> new PartitionReplica(key));
+ replicas.computeIfAbsent(id, k -> new PartitionReplica(k, appCtx));
replicas.get(id).sync();
}
@@ -74,4 +80,11 @@
public Set<Integer> getPartitions() {
return Collections.unmodifiableSet(partitions);
}
+
+ @Override
+ public synchronized void promote(int partition) throws HyracksDataException {
+ final IRemoteRecoveryManager remoteRecoveryManager = appCtx.getRemoteRecoveryManager();
+ remoteRecoveryManager.replayReplicaPartitionLogs(Stream.of(partition).collect(Collectors.toSet()), true);
+ partitions.add(partition);
+ }
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java
index 23f225e..e4a6f4b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java
@@ -45,8 +45,8 @@
import org.apache.asterix.app.replication.message.PreparePartitionsFailbackResponseMessage;
import org.apache.asterix.app.replication.message.RegistrationTasksRequestMessage;
import org.apache.asterix.app.replication.message.RegistrationTasksResponseMessage;
-import org.apache.asterix.app.replication.message.TakeoverMetadataNodeRequestMessage;
-import org.apache.asterix.app.replication.message.TakeoverMetadataNodeResponseMessage;
+import org.apache.asterix.app.replication.message.MetadataNodeRequestMessage;
+import org.apache.asterix.app.replication.message.MetadataNodeResponseMessage;
import org.apache.asterix.app.replication.message.TakeoverPartitionsRequestMessage;
import org.apache.asterix.app.replication.message.TakeoverPartitionsResponseMessage;
import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
@@ -337,7 +337,7 @@
validateClusterState();
}
- public synchronized void process(TakeoverMetadataNodeResponseMessage response) throws HyracksDataException {
+ public synchronized void process(MetadataNodeResponseMessage response) throws HyracksDataException {
currentMetadataNode = response.getNodeId();
metadataNodeActive = true;
clusterManager.updateMetadataNode(currentMetadataNode, metadataNodeActive);
@@ -403,7 +403,7 @@
ICcApplicationContext appCtx = (ICcApplicationContext) serviceCtx.getApplicationContext();
ClusterPartition metadataPartiton = appCtx.getMetadataProperties().getMetadataPartition();
//request the metadataPartition node to register itself as the metadata node
- TakeoverMetadataNodeRequestMessage takeoverRequest = new TakeoverMetadataNodeRequestMessage();
+ MetadataNodeRequestMessage takeoverRequest = new MetadataNodeRequestMessage(true);
try {
messageBroker.sendApplicationMessageToNC(takeoverRequest, metadataPartiton.getActiveNodeId());
// Since the metadata node will be changed, we need to rebind the proxy object
@@ -440,8 +440,8 @@
case TAKEOVER_PARTITION_RESPONSE:
process((TakeoverPartitionsResponseMessage) message);
break;
- case TAKEOVER_METADATA_NODE_RESPONSE:
- process((TakeoverMetadataNodeResponseMessage) message);
+ case METADATA_NODE_RESPONSE:
+ process((MetadataNodeResponseMessage) message);
break;
case PREPARE_FAILBACK_RESPONSE:
process((PreparePartitionsFailbackResponseMessage) message);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java
index 3341813..3470d7b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java
@@ -38,6 +38,8 @@
import org.apache.asterix.app.nc.task.ReportLocalCountersTask;
import org.apache.asterix.app.nc.task.StartLifecycleComponentsTask;
import org.apache.asterix.app.nc.task.StartReplicationServiceTask;
+import org.apache.asterix.app.replication.message.MetadataNodeRequestMessage;
+import org.apache.asterix.app.replication.message.MetadataNodeResponseMessage;
import org.apache.asterix.app.replication.message.NCLifecycleTaskReportMessage;
import org.apache.asterix.app.replication.message.RegistrationTasksRequestMessage;
import org.apache.asterix.app.replication.message.ReplayPartitionLogsRequestMessage;
@@ -132,6 +134,9 @@
case REPLAY_LOGS_RESPONSE:
process((ReplayPartitionLogsResponseMessage) message);
break;
+ case METADATA_NODE_RESPONSE:
+ process((MetadataNodeResponseMessage) message);
+ break;
default:
throw new RuntimeDataException(ErrorCode.UNSUPPORTED_MESSAGE_TYPE, message.getType().name());
}
@@ -143,6 +148,26 @@
this.metadataNodeId = clusterManager.getCurrentMetadataNodeId();
}
+ @Override
+ public void notifyMetadataNodeChange(String node) throws HyracksDataException {
+ if (metadataNodeId.equals(node)) {
+ return;
+ }
+ // if current metadata node is active, we need to unbind its metadata proxy object
+ if (clusterManager.isMetadataNodeActive()) {
+ MetadataNodeRequestMessage msg = new MetadataNodeRequestMessage(false);
+ try {
+ messageBroker.sendApplicationMessageToNC(msg, metadataNodeId);
+ // when the current node responses, we will bind to the new one
+ metadataNodeId = node;
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
+ } else {
+ requestMetadataNodeTakeover(node);
+ }
+ }
+
private synchronized void process(ReplayPartitionLogsResponseMessage msg) {
hotStandbyMetadataReplica.add(msg.getNodeId());
if (LOGGER.isLoggable(Level.INFO)) {
@@ -256,4 +281,20 @@
recoveryPlan.put(hotStandbyMetadataReplica.iterator().next(), metadataPartition);
return new RemoteRecoveryTask(recoveryPlan);
}
+
+ private void process(MetadataNodeResponseMessage response) throws HyracksDataException {
+ clusterManager.updateMetadataNode(response.getNodeId(), response.isExported());
+ if (!response.isExported()) {
+ requestMetadataNodeTakeover(metadataNodeId);
+ }
+ }
+
+ private void requestMetadataNodeTakeover(String node) throws HyracksDataException {
+ MetadataNodeRequestMessage msg = new MetadataNodeRequestMessage(true);
+ try {
+ messageBroker.sendApplicationMessageToNC(msg, node);
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
+ }
}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverMetadataNodeRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/MetadataNodeRequestMessage.java
similarity index 73%
rename from asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverMetadataNodeRequestMessage.java
rename to asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/MetadataNodeRequestMessage.java
index 2137924..bebd133 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverMetadataNodeRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/MetadataNodeRequestMessage.java
@@ -27,24 +27,33 @@
import org.apache.asterix.common.replication.INCLifecycleMessage;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-public class TakeoverMetadataNodeRequestMessage implements INCLifecycleMessage, INcAddressedMessage {
+public class MetadataNodeRequestMessage implements INCLifecycleMessage, INcAddressedMessage {
private static final long serialVersionUID = 1L;
- private static final Logger LOGGER = Logger.getLogger(TakeoverMetadataNodeRequestMessage.class.getName());
+ private static final Logger LOGGER = Logger.getLogger(MetadataNodeRequestMessage.class.getName());
+ private final boolean export;
+
+ public MetadataNodeRequestMessage(boolean export) {
+ this.export = export;
+ }
@Override
public void handle(INcApplicationContext appContext) throws HyracksDataException, InterruptedException {
INCMessageBroker broker = (INCMessageBroker) appContext.getServiceContext().getMessageBroker();
HyracksDataException hde = null;
try {
- appContext.initializeMetadata(false);
- appContext.exportMetadataNodeStub();
+ if (export) {
+ appContext.initializeMetadata(false);
+ appContext.exportMetadataNodeStub();
+ } else {
+ appContext.unexportMetadataNodeStub();
+ }
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Failed taking over metadata", e);
hde = HyracksDataException.create(e);
} finally {
- TakeoverMetadataNodeResponseMessage reponse =
- new TakeoverMetadataNodeResponseMessage(appContext.getTransactionSubsystem().getId());
+ MetadataNodeResponseMessage reponse =
+ new MetadataNodeResponseMessage(appContext.getTransactionSubsystem().getId(), export);
try {
broker.sendMessageToCC(reponse);
} catch (Exception e) {
@@ -59,11 +68,11 @@
@Override
public String toString() {
- return TakeoverMetadataNodeRequestMessage.class.getSimpleName();
+ return MetadataNodeRequestMessage.class.getSimpleName();
}
@Override
public MessageType getType() {
- return MessageType.TAKEOVER_METADATA_NODE_REQUEST;
+ return MessageType.METADATA_NODE_REQUEST;
}
}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverMetadataNodeResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/MetadataNodeResponseMessage.java
similarity index 80%
rename from asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverMetadataNodeResponseMessage.java
rename to asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/MetadataNodeResponseMessage.java
index ff1b2d2..ebde9b9 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverMetadataNodeResponseMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/MetadataNodeResponseMessage.java
@@ -24,13 +24,15 @@
import org.apache.asterix.runtime.utils.CcApplicationContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-public class TakeoverMetadataNodeResponseMessage implements INCLifecycleMessage, ICcAddressedMessage {
+public class MetadataNodeResponseMessage implements INCLifecycleMessage, ICcAddressedMessage {
private static final long serialVersionUID = 1L;
private final String nodeId;
+ private final boolean exported;
- public TakeoverMetadataNodeResponseMessage(String nodeId) {
+ public MetadataNodeResponseMessage(String nodeId, boolean exported) {
this.nodeId = nodeId;
+ this.exported = exported;
}
public String getNodeId() {
@@ -44,11 +46,15 @@
@Override
public String toString() {
- return TakeoverMetadataNodeResponseMessage.class.getSimpleName();
+ return MetadataNodeResponseMessage.class.getSimpleName();
}
@Override
public MessageType getType() {
- return MessageType.TAKEOVER_METADATA_NODE_RESPONSE;
+ return MessageType.METADATA_NODE_RESPONSE;
+ }
+
+ public boolean isExported() {
+ return exported;
}
}
diff --git a/asterixdb/asterix-app/src/main/resources/cluster.xml b/asterixdb/asterix-app/src/main/resources/cluster.xml
index 7b7d52a..41be696 100644
--- a/asterixdb/asterix-app/src/main/resources/cluster.xml
+++ b/asterixdb/asterix-app/src/main/resources/cluster.xml
@@ -30,9 +30,6 @@
</data_replication>
<fault_tolerance>
<strategy>metadata_node</strategy>
- <replica>
- <node_id>nc2</node_id>
- </replica>
</fault_tolerance>
</high_availability>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
index 9d73407..09761a0 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
@@ -32,7 +32,6 @@
import java.util.List;
import org.apache.asterix.app.external.ExternalUDFLibrarian;
-import org.apache.asterix.common.config.ClusterProperties;
import org.apache.asterix.common.library.ILibraryManager;
import org.apache.asterix.common.utils.StorageConstants;
import org.apache.asterix.test.common.TestExecutor;
@@ -40,8 +39,8 @@
import org.apache.commons.lang.SystemUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hyracks.api.io.IODeviceHandle;
-import org.apache.hyracks.util.ThreadDumpUtil;
import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.util.ThreadDumpUtil;
/**
* Utils for running SQL++ or AQL runtime tests.
@@ -59,6 +58,7 @@
private static ExternalUDFLibrarian librarian;
private static final int repeat = Integer.getInteger("test.repeat", 1);
+ private static boolean checkStorageDistribution = true;
public static void setUp(String configFile, TestExecutor executor) throws Exception {
testExecutor = executor;
@@ -126,7 +126,9 @@
testExecutor.executeTest(PATH_ACTUAL, tcCtx, null, false, ExecutionTestUtil.FailedGroup);
try {
- checkStorageFiles();
+ if (checkStorageDistribution) {
+ checkStorageFiles();
+ }
} finally {
testExecutor.cleanup(tcCtx.toString(), badTestCases);
}
@@ -223,6 +225,10 @@
}
}
+ public static void setCheckStorageDistribution(boolean checkStorageDistribution) {
+ LangExecutionUtil.checkStorageDistribution = checkStorageDistribution;
+ }
+
private static void outputLeakedOpenFiles(String processId) throws IOException {
Process process =
Runtime.getRuntime().exec(new String[] { "bash", "-c", "lsof -p " + processId + "|grep waf" });
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ReplicationExecutionTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ReplicationExecutionTest.java
index 56c7bc0..32b756b 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ReplicationExecutionTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ReplicationExecutionTest.java
@@ -25,10 +25,12 @@
import java.util.Map;
import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.config.ClusterProperties;
import org.apache.asterix.test.common.TestExecutor;
import org.apache.asterix.testframework.context.TestCaseContext;
import org.apache.hyracks.control.nc.NodeControllerService;
-import org.junit.AfterClass;
+import org.junit.After;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -39,28 +41,38 @@
public class ReplicationExecutionTest {
protected static final String TEST_CONFIG_FILE_NAME = "asterix-build-configuration.xml";
private static final TestExecutor testExecutor = new TestExecutor();
+ private static boolean configured = false;
@BeforeClass
- public static void setUp() throws Exception {
- LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, testExecutor);
- final NodeControllerService[] ncs = ExecutionTestUtil.integrationUtil.ncs;
- Map<String, InetSocketAddress> ncEndPoints = new HashMap<>();
- Map<String, InetSocketAddress> replicationAddress = new HashMap<>();
- final String ip = InetAddress.getLoopbackAddress().getHostAddress();
- for (NodeControllerService nc : ncs) {
- final String nodeId = nc.getId();
- final INcApplicationContext appCtx = (INcApplicationContext) nc.getApplicationContext();
- int apiPort = appCtx.getExternalProperties().getNcApiPort();
- int replicationPort = appCtx.getReplicationProperties().getDataReplicationPort(nodeId);
- ncEndPoints.put(nodeId, InetSocketAddress.createUnresolved(ip, apiPort));
- replicationAddress.put(nodeId, InetSocketAddress.createUnresolved(ip, replicationPort));
- }
- testExecutor.setNcEndPoints(ncEndPoints);
- testExecutor.setNcReplicationAddress(replicationAddress);
+ public static void setUp() {
+ ClusterProperties.INSTANCE.getCluster().getHighAvailability().setEnabled(String.valueOf(true));
+ LangExecutionUtil.setCheckStorageDistribution(false);
}
- @AfterClass
- public static void tearDown() throws Exception {
+ @Before
+ public void before() throws Exception {
+ LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, testExecutor);
+ if (!configured) {
+ final NodeControllerService[] ncs = ExecutionTestUtil.integrationUtil.ncs;
+ Map<String, InetSocketAddress> ncEndPoints = new HashMap<>();
+ Map<String, InetSocketAddress> replicationAddress = new HashMap<>();
+ final String ip = InetAddress.getLoopbackAddress().getHostAddress();
+ for (NodeControllerService nc : ncs) {
+ final String nodeId = nc.getId();
+ final INcApplicationContext appCtx = (INcApplicationContext) nc.getApplicationContext();
+ int apiPort = appCtx.getExternalProperties().getNcApiPort();
+ int replicationPort = appCtx.getReplicationProperties().getDataReplicationPort(nodeId);
+ ncEndPoints.put(nodeId, InetSocketAddress.createUnresolved(ip, apiPort));
+ replicationAddress.put(nodeId, InetSocketAddress.createUnresolved(ip, replicationPort));
+ }
+ testExecutor.setNcEndPoints(ncEndPoints);
+ testExecutor.setNcReplicationAddress(replicationAddress);
+ configured = true;
+ }
+ }
+
+ @After
+ public void after() throws Exception {
LangExecutionUtil.tearDown();
}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.pollget.http
similarity index 97%
rename from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
rename to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.pollget.http
index d287fad..6867a5d 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.pollget.http
@@ -16,4 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/
-nc:asterix_nc1 /admin/storage/partition/0
+//polltimeoutsecs=30
+
+nc:asterix_nc1 /admin/storage/partition/0
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.1.sto.cmd
similarity index 94%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.1.sto.cmd
index d287fad..7ddaa20 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.1.sto.cmd
@@ -16,4 +16,4 @@
* specific language governing permissions and limitations
* under the License.
*/
-nc:asterix_nc1 /admin/storage/partition/0
+nc:asterix_nc1 /addReplica 0 asterix_nc2
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.10.ddl.sqlpp
similarity index 94%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.10.ddl.sqlpp
index d287fad..f96d5a8 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.10.ddl.sqlpp
@@ -16,4 +16,4 @@
* specific language governing permissions and limitations
* under the License.
*/
-nc:asterix_nc1 /admin/storage/partition/0
+CREATE DATASET ds_2(MyType) PRIMARY KEY id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.11.query.sqlpp
similarity index 91%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.11.query.sqlpp
index d287fad..555954d 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.11.query.sqlpp
@@ -16,4 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/
-nc:asterix_nc1 /admin/storage/partition/0
+SELECT value count(*)
+FROM Metadata.`Dataset`
+WHERE DatasetName = 'ds_2';
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.2.pollget.http
similarity index 96%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.2.pollget.http
index d287fad..6867a5d 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.2.pollget.http
@@ -16,4 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/
-nc:asterix_nc1 /admin/storage/partition/0
+//polltimeoutsecs=30
+
+nc:asterix_nc1 /admin/storage/partition/0
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.3.ddl.sqlpp
similarity index 90%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.3.ddl.sqlpp
index d287fad..15bc3c5 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.3.ddl.sqlpp
@@ -16,4 +16,8 @@
* specific language governing permissions and limitations
* under the License.
*/
-nc:asterix_nc1 /admin/storage/partition/0
+CREATE TYPE MyType AS {
+ id : int
+};
+
+CREATE DATASET ds_1(MyType) PRIMARY KEY id;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.4.sto.cmd
similarity index 94%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.4.sto.cmd
index d287fad..a5753f0 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.4.sto.cmd
@@ -16,4 +16,4 @@
* specific language governing permissions and limitations
* under the License.
*/
-nc:asterix_nc1 /admin/storage/partition/0
+nc:asterix_nc2 /promote 0 asterix_nc2
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.5.get.http
similarity index 94%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.5.get.http
index d287fad..4a53aed 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.5.get.http
@@ -16,4 +16,4 @@
* specific language governing permissions and limitations
* under the License.
*/
-nc:asterix_nc1 /admin/storage/partition/0
+nc:asterix_nc2 /admin/storage
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.6.post.http
similarity index 92%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.6.post.http
index d287fad..2e8fc63 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.6.post.http
@@ -16,4 +16,4 @@
* specific language governing permissions and limitations
* under the License.
*/
-nc:asterix_nc1 /admin/storage/partition/0
+/admin/cluster/partition/master?partition=0&node=asterix_nc2
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.7.post.http
similarity index 94%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.7.post.http
index d287fad..e8dca0b 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.7.post.http
@@ -16,4 +16,4 @@
* specific language governing permissions and limitations
* under the License.
*/
-nc:asterix_nc1 /admin/storage/partition/0
+/admin/cluster/metadataNode?node=asterix_nc2
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.8.pollget.http
similarity index 94%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.8.pollget.http
index d287fad..32e2f78 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.8.pollget.http
@@ -16,4 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/
-nc:asterix_nc1 /admin/storage/partition/0
+//polltimeoutsecs=30
+
+/admin/cluster/summary
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.9.query.sqlpp
similarity index 91%
copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.9.query.sqlpp
index d287fad..a612cbb 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/metadata_failover/metadata_failover.9.query.sqlpp
@@ -16,4 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/
-nc:asterix_nc1 /admin/storage/partition/0
+SELECT value count(*)
+FROM Metadata.`Dataset`
+WHERE DatasetName = 'ds_1';
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/replication.xml b/asterixdb/asterix-app/src/test/resources/runtimets/replication.xml
index a635676..c3dfb3c 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/replication.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/replication.xml
@@ -24,5 +24,10 @@
<output-dir compare="Text">add_replica</output-dir>
</compilation-unit>
</test-case>
+ <test-case FilePath="replication">
+ <compilation-unit name="metadata_failover">
+ <output-dir compare="Text">metadata_failover</output-dir>
+ </compilation-unit>
+ </test-case>
</test-group>
</test-suite>
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/add_replica/add_replica.2.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/add_replica/add_replica.2.adm
index 3553d9c..6836f71 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/add_replica/add_replica.2.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/add_replica/add_replica.2.adm
@@ -2,6 +2,6 @@
"partition" : 0,
"replicas" : [ {
"location" : "127.0.0.1:2017",
- "status" : "DISCONNECTED"
+ "status" : "IN_SYNC"
} ]
} ]
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/metadata_failover/metadata_failover.11.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/metadata_failover/metadata_failover.11.adm
new file mode 100644
index 0000000..56a6051
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/metadata_failover/metadata_failover.11.adm
@@ -0,0 +1 @@
+1
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/metadata_failover/metadata_failover.2.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/metadata_failover/metadata_failover.2.adm
new file mode 100644
index 0000000..6836f71
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/metadata_failover/metadata_failover.2.adm
@@ -0,0 +1,7 @@
+[ {
+ "partition" : 0,
+ "replicas" : [ {
+ "location" : "127.0.0.1:2017",
+ "status" : "IN_SYNC"
+ } ]
+} ]
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/metadata_failover/metadata_failover.5.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/metadata_failover/metadata_failover.5.adm
new file mode 100644
index 0000000..3d3204d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/metadata_failover/metadata_failover.5.adm
@@ -0,0 +1,10 @@
+[ {
+ "partition" : 0,
+ "replicas" : [ ]
+}, {
+ "partition" : 2,
+ "replicas" : [ ]
+}, {
+ "partition" : 3,
+ "replicas" : [ ]
+} ]
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/metadata_failover/metadata_failover.6.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/metadata_failover/metadata_failover.6.adm
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/metadata_failover/metadata_failover.6.adm
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/metadata_failover/metadata_failover.7.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/metadata_failover/metadata_failover.7.adm
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/metadata_failover/metadata_failover.7.adm
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/metadata_failover/metadata_failover.8.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/metadata_failover/metadata_failover.8.adm
new file mode 100644
index 0000000..fa5cfb4
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/metadata_failover/metadata_failover.8.adm
@@ -0,0 +1,38 @@
+{
+ "metadata_node" : "asterix_nc2",
+ "partitions" : {
+ "0" : {
+ "active" : true,
+ "activeNodeId" : "asterix_nc2",
+ "iodeviceNum" : 0,
+ "nodeId" : "asterix_nc1",
+ "partitionId" : 0,
+ "pendingActivation" : false
+ },
+ "1" : {
+ "active" : true,
+ "activeNodeId" : "asterix_nc1",
+ "iodeviceNum" : 1,
+ "nodeId" : "asterix_nc1",
+ "partitionId" : 1,
+ "pendingActivation" : false
+ },
+ "2" : {
+ "active" : true,
+ "activeNodeId" : "asterix_nc2",
+ "iodeviceNum" : 0,
+ "nodeId" : "asterix_nc2",
+ "partitionId" : 2,
+ "pendingActivation" : false
+ },
+ "3" : {
+ "active" : true,
+ "activeNodeId" : "asterix_nc2",
+ "iodeviceNum" : 1,
+ "nodeId" : "asterix_nc2",
+ "partitionId" : 3,
+ "pendingActivation" : false
+ }
+ },
+ "state" : "ACTIVE"
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/metadata_failover/metadata_failover.9.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/metadata_failover/metadata_failover.9.adm
new file mode 100644
index 0000000..56a6051
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/metadata_failover/metadata_failover.9.adm
@@ -0,0 +1 @@
+1
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IFaultToleranceStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IFaultToleranceStrategy.java
index 5c286cc..e871374 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IFaultToleranceStrategy.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IFaultToleranceStrategy.java
@@ -64,4 +64,12 @@
*/
IFaultToleranceStrategy from(ICCServiceContext serviceCtx, IReplicationStrategy replicationStrategy);
+ /**
+ * Performs the required steps to change the metadata node to {@code node}
+ *
+ * @param node
+ */
+ default void notifyMetadataNodeChange(String node) throws HyracksDataException {
+ throw new UnsupportedOperationException(getClass() + " does not support metadata node change");
+ }
}
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/INCLifecycleMessage.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/INCLifecycleMessage.java
index cb9fa8f..372a88a 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/INCLifecycleMessage.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/INCLifecycleMessage.java
@@ -34,8 +34,8 @@
REGISTRATION_TASKS_RESULT,
TAKEOVER_PARTITION_REQUEST,
TAKEOVER_PARTITION_RESPONSE,
- TAKEOVER_METADATA_NODE_REQUEST,
- TAKEOVER_METADATA_NODE_RESPONSE
+ METADATA_NODE_REQUEST,
+ METADATA_NODE_RESPONSE
}
/**
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java
index 1c3f030..72a7f9d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.common.replication;
+import java.util.List;
import java.util.Set;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -30,4 +31,6 @@
* @throws HyracksDataException
*/
long getPartitionsMinLSN(Set<Integer> partitions) throws HyracksDataException;
+
+ List<String> getPartitionIndexesFiles(int partition, boolean relativePath) throws HyracksDataException;
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
index b969bef..04c4437 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
@@ -127,4 +127,11 @@
* @param buffer
*/
public void replicateTxnLogBatch(ByteBuffer buffer);
+
+ /**
+ * Registers {@code replica}. After registration, the replica will be included in all replication events
+ *
+ * @param replica
+ */
+ void register(IPartitionReplica replica);
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationThread.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationThread.java
index a88b82a..5b9d4fa 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationThread.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationThread.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.common.replication;
+import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import org.apache.asterix.common.transactions.LogRecord;
@@ -27,13 +28,19 @@
/**
* Sends a notification to this thread that logRecord has been flushed.
*
- * @param logRecord
- * The log that has been flushed.
+ * @param logRecord The log that has been flushed.
*/
- public void notifyLogReplicationRequester(LogRecord logRecord);
+ void notifyLogReplicationRequester(LogRecord logRecord);
/**
- * @return The replication client socket channel.
+ * @return The replication socket channel.
*/
- public SocketChannel getReplicationClientSocket();
+ SocketChannel getChannel();
+
+ /**
+ * Gets a reusable buffer that can be used to send data
+ *
+ * @return the reusable buffer
+ */
+ ByteBuffer getReusableBuffer();
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java
index bd4b32f..f0bba41 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/MetadataOnlyReplicationStrategy.java
@@ -69,21 +69,16 @@
if (metadataNode == null) {
throw new IllegalStateException("Invalid metadata node specified");
}
-
- if (cluster.getHighAvailability().getFaultTolerance().getReplica() == null
- || cluster.getHighAvailability().getFaultTolerance().getReplica().getNodeId() == null
- || cluster.getHighAvailability().getFaultTolerance().getReplica().getNodeId().isEmpty()) {
- throw new RuntimeDataException(ErrorCode.INVALID_CONFIGURATION,
- "One or more replicas must be specified for metadata node.");
- }
-
final Set<Replica> replicas = new HashSet<>();
- for (String nodeId : cluster.getHighAvailability().getFaultTolerance().getReplica().getNodeId()) {
- Node node = ClusterProperties.INSTANCE.getNodeById(nodeId);
- if (node == null) {
- throw new RuntimeDataException(ErrorCode.INVALID_CONFIGURATION, "Invalid replica specified: " + nodeId);
+ if (cluster.getHighAvailability().getFaultTolerance().getReplica() != null) {
+ for (String nodeId : cluster.getHighAvailability().getFaultTolerance().getReplica().getNodeId()) {
+ Node node = ClusterProperties.INSTANCE.getNodeById(nodeId);
+ if (node == null) {
+ throw new RuntimeDataException(ErrorCode.INVALID_CONFIGURATION,
+ "Invalid replica specified: " + nodeId);
+ }
+ replicas.add(new Replica(node));
}
- replicas.add(new Replica(node));
}
MetadataOnlyReplicationStrategy st = new MetadataOnlyReplicationStrategy();
st.metadataNodeId = cluster.getMetadataNode();
@@ -91,4 +86,9 @@
st.metadataNodeReplicas = replicas;
return st;
}
+
+ @Override
+ public boolean isParticipant(String nodeId) {
+ return true;
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
index a3b2b50..19eee8f 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
@@ -22,6 +22,7 @@
import java.util.Set;
import org.apache.asterix.common.replication.IPartitionReplica;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
public interface IReplicaManager {
@@ -54,4 +55,11 @@
* @return The list of partition
*/
Set<Integer> getPartitions();
+
+ /**
+ * Promotes a partition by making this node its master replica
+ *
+ * @param partition
+ */
+ void promote(int partition) throws HyracksDataException;
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/PartitionReplica.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/PartitionReplica.java
deleted file mode 100644
index 18733ce..0000000
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/PartitionReplica.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.storage;
-
-import static org.apache.asterix.common.storage.PartitionReplica.PartitionReplicaStatus.CATCHING_UP;
-import static org.apache.asterix.common.storage.PartitionReplica.PartitionReplicaStatus.DISCONNECTED;
-import static org.apache.asterix.common.storage.PartitionReplica.PartitionReplicaStatus.IN_SYNC;
-
-import org.apache.hyracks.util.JSONUtil;
-import org.apache.hyracks.util.annotations.ThreadSafe;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
-@ThreadSafe
-public class PartitionReplica {
-
- public enum PartitionReplicaStatus {
- /* replica is in-sync with master */
- IN_SYNC,
- /* replica is still catching up with master */
- CATCHING_UP,
- /* replica is not connected with master */
- DISCONNECTED
- }
-
- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
- private final ReplicaIdentifier id;
- private PartitionReplicaStatus status = DISCONNECTED;
-
- public PartitionReplica(ReplicaIdentifier id) {
- this.id = id;
- }
-
- public synchronized PartitionReplicaStatus getStatus() {
- return status;
- }
-
- public ReplicaIdentifier getIdentifier() {
- return id;
- }
-
- public synchronized void sync() {
- if (status == IN_SYNC || status == CATCHING_UP) {
- return;
- }
- //TODO complete implementation
- }
-
- public JsonNode asJson() {
- ObjectNode json = OBJECT_MAPPER.createObjectNode();
- json.put("id", id.toString());
- json.put("state", status.name());
- return json;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- PartitionReplica that = (PartitionReplica) o;
- return id.equals(that.id);
- }
-
- @Override
- public int hashCode() {
- return id.hashCode();
- }
-
- @Override
- public String toString() {
- try {
- return JSONUtil.convertNode(asJson());
- } catch (JsonProcessingException e) {
- throw new IllegalStateException(e);
- }
- }
-}
\ No newline at end of file
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
index 4aa6982..8cc11a8 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
@@ -71,6 +71,10 @@
return Paths.get(root, partition, dataverse, dataset, rebalance, index);
}
+ public Path getFileRelativePath() {
+ return Paths.get(root, partition, dataverse, dataset, rebalance, index, name);
+ }
+
protected static void parse(ResourceReference ref, String path) {
// format: root/partition/dataverse/dataset/rebalanceCount/index/fileName
final String[] tokens = path.split(File.separator);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
index 220b089..f59914d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
@@ -35,6 +35,7 @@
*/
public static final String INDEX_CHECKPOINT_FILE_PREFIX = ".idx_checkpoint_";
public static final String METADATA_FILE_NAME = ".metadata";
+ public static final String MASK_FILE_PREFIX = ".mask_";
public static final String LEGACY_DATASET_INDEX_NAME_SEPARATOR = "_idx_";
/**
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
index b93ccb5..d2c5ad7 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
@@ -90,13 +90,21 @@
/**
* @param fileAbsolutePath
- * @return the file relative path starting from the partition directory
+ * @return the file's index relative path starting from the storage directory
*/
public static String getIndexFileRelativePath(String fileAbsolutePath) {
return ResourceReference.of(fileAbsolutePath).getRelativePath().toString();
}
/**
+ * @param fileAbsolutePath
+ * @return the file's relative path starting from the storage directory
+ */
+ public static String getFileRelativePath(String fileAbsolutePath) {
+ return ResourceReference.of(fileAbsolutePath).getFileRelativePath().toString();
+ }
+
+ /**
* Create a file
* Note: this method is not thread safe. It is the responsibility of the caller to ensure no path conflict when
* creating files simultaneously
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicaTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicaTask.java
new file mode 100644
index 0000000..001d41f
--- /dev/null
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicaTask.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.api;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.replication.IReplicationThread;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface IReplicaTask extends IReplicationMessage {
+
+ /**
+ * Performs the task on the replica
+ *
+ * @param appCtx
+ * @param worker
+ * @throws HyracksDataException
+ */
+ void perform(INcApplicationContext appCtx, IReplicationThread worker) throws HyracksDataException;
+}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationMessage.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationMessage.java
new file mode 100644
index 0000000..2e1cb8a
--- /dev/null
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationMessage.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.api;
+
+import java.io.OutputStream;
+
+import org.apache.asterix.replication.functions.ReplicationProtocol;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface IReplicationMessage {
+
+ /**
+ * @return the message type
+ */
+ ReplicationProtocol.ReplicationRequestType getMessageType();
+
+ /**
+ * Serializes {@link IReplicationMessage} to {@code out}
+ *
+ * @param out
+ * @throws HyracksDataException
+ */
+ void serialize(OutputStream out) throws HyracksDataException;
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java
index 8a52529..8094548 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java
@@ -25,10 +25,18 @@
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
+import org.apache.asterix.common.exceptions.ReplicationException;
import org.apache.asterix.common.replication.ReplicaEvent;
+import org.apache.asterix.replication.api.IReplicationMessage;
import org.apache.asterix.replication.management.NetworkingUtil;
+import org.apache.asterix.replication.messaging.CheckpointPartitionIndexesTask;
+import org.apache.asterix.replication.messaging.DeleteFileTask;
+import org.apache.asterix.replication.messaging.PartitionResourcesListResponse;
+import org.apache.asterix.replication.messaging.PartitionResourcesListTask;
+import org.apache.asterix.replication.messaging.ReplicateFileTask;
import org.apache.asterix.replication.storage.LSMComponentProperties;
import org.apache.asterix.replication.storage.LSMIndexFileProperties;
+import org.apache.asterix.replication.storage.PartitionReplica;
import org.apache.hyracks.data.std.util.ExtendedByteArrayOutputStream;
public class ReplicationProtocol {
@@ -38,8 +46,8 @@
*/
public static final String JOB_REPLICATION_ACK = "$";
- public final static int REPLICATION_REQUEST_TYPE_SIZE = Integer.BYTES;
- public final static int REPLICATION_REQUEST_HEADER_SIZE = REPLICATION_REQUEST_TYPE_SIZE + Integer.BYTES;
+ public static final int REPLICATION_REQUEST_TYPE_SIZE = Integer.BYTES;
+ private static final int REPLICATION_REQUEST_HEADER_SIZE = REPLICATION_REQUEST_TYPE_SIZE + Integer.BYTES;
/*
* ReplicationRequestType:
@@ -64,7 +72,12 @@
REPLICA_EVENT,
LSM_COMPONENT_PROPERTIES,
ACK,
- FLUSH_INDEX
+ FLUSH_INDEX,
+ PARTITION_RESOURCES_REQUEST,
+ PARTITION_RESOURCES_RESPONSE,
+ REPLICATE_RESOURCE_FILE,
+ DELETE_RESOURCE_FILE,
+ CHECKPOINT_PARTITION
}
public static ByteBuffer readRequest(SocketChannel socketChannel, ByteBuffer dataBuffer) throws IOException {
@@ -256,4 +269,86 @@
ByteBuffer ackBuffer = ReplicationProtocol.getAckBuffer();
NetworkingUtil.transferBufferToChannel(socketChannel, ackBuffer);
}
-}
+
+ public static void sendAck(SocketChannel socketChannel, ByteBuffer buf) {
+ try {
+ buf.clear();
+ buf.putInt(ReplicationRequestType.ACK.ordinal());
+ buf.flip();
+ NetworkingUtil.transferBufferToChannel(socketChannel, buf);
+ } catch (IOException e) {
+ throw new ReplicationException(e);
+ }
+ }
+
+ public static void waitForAck(PartitionReplica replica) throws IOException {
+ final SocketChannel channel = replica.getChannel();
+ final ByteBuffer buf = replica.gerReusableBuffer();
+ ReplicationRequestType responseFunction = ReplicationProtocol.getRequestType(channel, buf);
+ if (responseFunction != ReplicationRequestType.ACK) {
+ throw new IllegalStateException("Unexpected response while waiting for ack.");
+ }
+ }
+
+ public static void sendTo(PartitionReplica replica, IReplicationMessage task) {
+ final SocketChannel channel = replica.getChannel();
+ final ByteBuffer buf = replica.gerReusableBuffer();
+ sendTo(channel, task, buf);
+ }
+
+ public static void sendTo(SocketChannel channel, IReplicationMessage task, ByteBuffer buf) {
+ ExtendedByteArrayOutputStream outputStream = new ExtendedByteArrayOutputStream();
+ try (DataOutputStream oos = new DataOutputStream(outputStream)) {
+ task.serialize(oos);
+ final int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size();
+ final ByteBuffer requestBuffer = ensureSize(buf, requestSize);
+ requestBuffer.putInt(task.getMessageType().ordinal());
+ requestBuffer.putInt(oos.size());
+ requestBuffer.put(outputStream.getByteArray(), 0, outputStream.getLength());
+ requestBuffer.flip();
+ NetworkingUtil.transferBufferToChannel(channel, requestBuffer);
+ } catch (IOException e) {
+ throw new ReplicationException(e);
+ }
+ }
+
+ public static IReplicationMessage read(SocketChannel socketChannel, ByteBuffer buffer) throws IOException {
+ final ReplicationRequestType type = getRequestType(socketChannel, buffer);
+ return readMessage(type, socketChannel, buffer);
+ }
+
+ public static IReplicationMessage readMessage(ReplicationRequestType type, SocketChannel socketChannel,
+ ByteBuffer buffer) {
+ try {
+ ReplicationProtocol.readRequest(socketChannel, buffer);
+ final ByteArrayInputStream input =
+ new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit());
+ try (DataInputStream dis = new DataInputStream(input)) {
+ switch (type) {
+ case PARTITION_RESOURCES_REQUEST:
+ return PartitionResourcesListTask.create(dis);
+ case PARTITION_RESOURCES_RESPONSE:
+ return PartitionResourcesListResponse.create(dis);
+ case REPLICATE_RESOURCE_FILE:
+ return ReplicateFileTask.create(dis);
+ case DELETE_RESOURCE_FILE:
+ return DeleteFileTask.create(dis);
+ case CHECKPOINT_PARTITION:
+ return CheckpointPartitionIndexesTask.create(dis);
+ default:
+ throw new IllegalStateException("Unrecognized replication message");
+ }
+ }
+ } catch (IOException e) {
+ throw new ReplicationException(e);
+ }
+ }
+
+ private static ByteBuffer ensureSize(ByteBuffer buffer, int size) {
+ if (buffer.capacity() < size) {
+ return ByteBuffer.allocate(size);
+ }
+ buffer.clear();
+ return buffer;
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
index dfc23d3..ba24e07 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
@@ -24,10 +24,10 @@
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.FileChannel;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
-import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -71,6 +71,10 @@
import org.apache.asterix.replication.functions.ReplicationProtocol;
import org.apache.asterix.replication.functions.ReplicationProtocol.ReplicationRequestType;
import org.apache.asterix.replication.logging.RemoteLogMapping;
+import org.apache.asterix.replication.messaging.CheckpointPartitionIndexesTask;
+import org.apache.asterix.replication.messaging.DeleteFileTask;
+import org.apache.asterix.replication.messaging.PartitionResourcesListTask;
+import org.apache.asterix.replication.messaging.ReplicateFileTask;
import org.apache.asterix.replication.storage.LSMComponentLSNSyncTask;
import org.apache.asterix.replication.storage.LSMComponentProperties;
import org.apache.asterix.replication.storage.LSMIndexFileProperties;
@@ -108,12 +112,12 @@
private final Map<Long, RemoteLogMapping> localLsn2RemoteMapping;
private final Map<String, RemoteLogMapping> replicaUniqueLSN2RemoteMapping;
private final LSMComponentsSyncService lsmComponentLSNMappingService;
- private final Set<Integer> nodeHostedPartitions;
private final ReplicationNotifier replicationNotifier;
private final Object flushLogslock = new Object();
private final IDatasetLifecycleManager dsLifecycleManager;
private final PersistentLocalResourceRepository localResourceRep;
private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
+ private final INcApplicationContext appCtx;
public ReplicationChannel(String nodeId, ReplicationProperties replicationProperties, ILogManager logManager,
IReplicaResourcesManager replicaResoucesManager, IReplicationManager replicationManager,
@@ -135,19 +139,8 @@
lsmComponentLSNMappingService = new LSMComponentsSyncService();
replicationNotifier = new ReplicationNotifier();
replicationThreads = Executors.newCachedThreadPool(ncServiceContext.getThreadFactory());
- Map<String, ClusterPartition[]> nodePartitions =
- asterixAppRuntimeContextProvider.getAppContext().getMetadataProperties().getNodePartitions();
- Set<String> nodeReplicationClients = replicationProperties.getRemotePrimaryReplicasIds(nodeId);
- List<Integer> clientsPartitions = new ArrayList<>();
- for (String clientId : nodeReplicationClients) {
- for (ClusterPartition clusterPartition : nodePartitions.get(clientId)) {
- clientsPartitions.add(clusterPartition.getPartitionId());
- }
- }
- nodeHostedPartitions = new HashSet<>(clientsPartitions.size());
- nodeHostedPartitions.addAll(clientsPartitions);
- this.indexCheckpointManagerProvider =
- ((INcApplicationContext) ncServiceContext.getApplicationContext()).getIndexCheckpointManagerProvider();
+ this.appCtx = (INcApplicationContext) ncServiceContext.getApplicationContext();
+ this.indexCheckpointManagerProvider = appCtx.getIndexCheckpointManagerProvider();
}
@Override
@@ -167,12 +160,14 @@
LOGGER.log(Level.INFO, "opened Replication Channel @ IP Address: " + nodeIP + ":" + dataPort);
//start accepting replication requests
- while (true) {
+ while (serverSocketChannel.isOpen()) {
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(true);
//start a new thread to handle the request
replicationThreads.execute(new ReplicationThread(socketChannel));
}
+ } catch (AsynchronousCloseException e) {
+ LOGGER.log(Level.WARNING, "Replication channel closed", e);
} catch (IOException e) {
throw new IllegalStateException(
"Could not open replication channel @ IP Address: " + nodeIP + ":" + dataPort, e);
@@ -209,10 +204,8 @@
@Override
public void close() throws IOException {
- if (!serverSocketChannel.isOpen()) {
- serverSocketChannel.close();
- LOGGER.log(Level.INFO, "Replication channel closed.");
- }
+ serverSocketChannel.close();
+ LOGGER.log(Level.INFO, "Replication channel closed.");
}
/**
@@ -263,6 +256,18 @@
case FLUSH_INDEX:
handleFlushIndex();
break;
+ case PARTITION_RESOURCES_REQUEST:
+ handleGetPartitionResources();
+ break;
+ case REPLICATE_RESOURCE_FILE:
+ handleReplicateResourceFile();
+ break;
+ case DELETE_RESOURCE_FILE:
+ handleDeleteResourceFile();
+ break;
+ case CHECKPOINT_PARTITION:
+ handleCheckpointPartition();
+ break;
default:
throw new IllegalStateException("Unknown replication request");
}
@@ -476,10 +481,7 @@
switch (remoteLog.getLogType()) {
case LogType.UPDATE:
case LogType.ENTITY_COMMIT:
- //if the log partition belongs to a partitions hosted on this node, replicated it
- if (nodeHostedPartitions.contains(remoteLog.getResourcePartition())) {
- logManager.log(remoteLog);
- }
+ logManager.log(remoteLog);
break;
case LogType.JOB_COMMIT:
case LogType.ABORT:
@@ -542,10 +544,15 @@
}
@Override
- public SocketChannel getReplicationClientSocket() {
+ public SocketChannel getChannel() {
return socketChannel;
}
+ @Override
+ public ByteBuffer getReusableBuffer() {
+ return outBuffer;
+ }
+
private void checkpointReplicaIndexes(RemoteLogMapping remoteLogMapping, int datasetId) {
try {
Predicate<LocalResource> replicaIndexesPredicate = lr -> {
@@ -568,6 +575,30 @@
LOGGER.log(Level.SEVERE, "Failed to checkpoint replica indexes", e);
}
}
+
+ private void handleGetPartitionResources() throws IOException {
+ final PartitionResourcesListTask task = (PartitionResourcesListTask) ReplicationProtocol
+ .readMessage(ReplicationRequestType.PARTITION_RESOURCES_REQUEST, socketChannel, inBuffer);
+ task.perform(appCtx, this);
+ }
+
+ private void handleReplicateResourceFile() throws HyracksDataException {
+ ReplicateFileTask task = (ReplicateFileTask) ReplicationProtocol
+ .readMessage(ReplicationRequestType.REPLICATE_RESOURCE_FILE, socketChannel, inBuffer);
+ task.perform(appCtx, this);
+ }
+
+ private void handleDeleteResourceFile() throws HyracksDataException {
+ DeleteFileTask task = (DeleteFileTask) ReplicationProtocol
+ .readMessage(ReplicationRequestType.DELETE_RESOURCE_FILE, socketChannel, inBuffer);
+ task.perform(appCtx, this);
+ }
+
+ private void handleCheckpointPartition() throws HyracksDataException {
+ CheckpointPartitionIndexesTask task = (CheckpointPartitionIndexesTask) ReplicationProtocol
+ .readMessage(ReplicationRequestType.CHECKPOINT_PARTITION, socketChannel, inBuffer);
+ task.perform(appCtx, this);
+ }
}
/**
@@ -581,7 +612,7 @@
try {
LogRecord logRecord = pendingNotificationRemoteLogsQ.take();
//send ACK to requester
- logRecord.getReplicationThread().getReplicationClientSocket().socket().getOutputStream()
+ logRecord.getReplicationThread().getChannel().socket().getOutputStream()
.write((localNodeID + ReplicationProtocol.JOB_REPLICATION_ACK + logRecord.getTxnId()
+ System.lineSeparator()).getBytes());
} catch (InterruptedException e) {
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
index 5cf7eab..b933db8 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
@@ -40,6 +40,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
@@ -53,7 +54,9 @@
import java.util.stream.Collectors;
import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.common.config.ClusterProperties;
import org.apache.asterix.common.config.ReplicationProperties;
+import org.apache.asterix.common.replication.IPartitionReplica;
import org.apache.asterix.common.replication.IReplicaResourcesManager;
import org.apache.asterix.common.replication.IReplicationManager;
import org.apache.asterix.common.replication.IReplicationStrategy;
@@ -1177,6 +1180,25 @@
buffer.position(buffer.limit());
}
+ @Override
+ public void register(IPartitionReplica replica) {
+ // find the replica node based on ip and replication port
+ final Optional<Node> replicaNode = ClusterProperties.INSTANCE.getCluster().getNode().stream()
+ .filter(node -> node.getClusterIp().equals(replica.getIdentifier().getLocation().getHostString())
+ && node.getReplicationPort().intValue() == replica.getIdentifier().getLocation().getPort())
+ .findAny();
+ if (!replicaNode.isPresent()) {
+ throw new IllegalStateException("Couldn't find node for replica: " + replica);
+ }
+ Replica replicaRef = new Replica(replicaNode.get());
+ final String replicaId = replicaRef.getId();
+ replicas.putIfAbsent(replicaId, replicaRef);
+ replica2PartitionsMap.computeIfAbsent(replicaId, k -> new HashSet<>());
+ replica2PartitionsMap.get(replicaId).add(replica.getIdentifier().getPartition());
+ updateReplicaInfo(replicaRef);
+ checkReplicaState(replicaId, false, true);
+ }
+
//supporting classes
/**
* This class is responsible for processing replica events.
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
new file mode 100644
index 0000000..2c1937b
--- /dev/null
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.messaging;
+
+import java.io.DataInput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Collection;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.replication.IReplicationThread;
+import org.apache.asterix.common.storage.DatasetResourceReference;
+import org.apache.asterix.common.storage.IIndexCheckpointManager;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
+import org.apache.asterix.replication.api.IReplicaTask;
+import org.apache.asterix.replication.functions.ReplicationProtocol;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.common.LocalResource;
+
+/**
+ * A task to initialize the checkpoints for all indexes in a partition with the replica's current LSN
+ */
+public class CheckpointPartitionIndexesTask implements IReplicaTask {
+
+ private final int partition;
+
+ public CheckpointPartitionIndexesTask(int partition) {
+ this.partition = partition;
+ }
+
+ @Override
+ public void perform(INcApplicationContext appCtx, IReplicationThread worker) throws HyracksDataException {
+ final IIndexCheckpointManagerProvider indexCheckpointManagerProvider =
+ appCtx.getIndexCheckpointManagerProvider();
+ PersistentLocalResourceRepository resRepo =
+ (PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
+ final Collection<LocalResource> partitionResources = resRepo.getPartitionResources(partition).values();
+ final long currentLSN = appCtx.getTransactionSubsystem().getLogManager().getAppendLSN();
+ for (LocalResource ls : partitionResources) {
+ final IIndexCheckpointManager indexCheckpointManager =
+ indexCheckpointManagerProvider.get(DatasetResourceReference.of(ls));
+ indexCheckpointManager.delete();
+ indexCheckpointManager.init(currentLSN);
+ }
+ ReplicationProtocol.sendAck(worker.getChannel(), worker.getReusableBuffer());
+ }
+
+ @Override
+ public ReplicationProtocol.ReplicationRequestType getMessageType() {
+ return ReplicationProtocol.ReplicationRequestType.CHECKPOINT_PARTITION;
+ }
+
+ @Override
+ public void serialize(OutputStream out) throws HyracksDataException {
+ try {
+ DataOutputStream dos = new DataOutputStream(out);
+ dos.writeInt(partition);
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ public static CheckpointPartitionIndexesTask create(DataInput input) throws HyracksDataException {
+ try {
+ int partition = input.readInt();
+ return new CheckpointPartitionIndexesTask(partition);
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java
new file mode 100644
index 0000000..ea43ee9
--- /dev/null
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.messaging;
+
+import java.io.DataInput;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.exceptions.ReplicationException;
+import org.apache.asterix.common.replication.IReplicationThread;
+import org.apache.asterix.replication.api.IReplicaTask;
+import org.apache.asterix.replication.functions.ReplicationProtocol;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IIOManager;
+
+/**
+ * A task to delete a file on a replica if exists
+ */
+public class DeleteFileTask implements IReplicaTask {
+
+ private static final Logger LOGGER = Logger.getLogger(DeleteFileTask.class.getName());
+ private final String file;
+
+ public DeleteFileTask(String file) {
+ this.file = file;
+ }
+
+ @Override
+ public void perform(INcApplicationContext appCtx, IReplicationThread worker) {
+ try {
+ final IIOManager ioManager = appCtx.getIoManager();
+ final File localFile = ioManager.resolve(file).getFile();
+ if (localFile.exists()) {
+ Files.delete(localFile.toPath());
+ LOGGER.info(() -> "Deleted file: " + localFile.getAbsolutePath());
+ } else {
+ LOGGER.warning(() -> "Requested to delete a non-existing file: " + localFile.getAbsolutePath());
+ }
+ ReplicationProtocol.sendAck(worker.getChannel(), worker.getReusableBuffer());
+ } catch (IOException e) {
+ throw new ReplicationException(e);
+ }
+ }
+
+ @Override
+ public ReplicationProtocol.ReplicationRequestType getMessageType() {
+ return ReplicationProtocol.ReplicationRequestType.DELETE_RESOURCE_FILE;
+ }
+
+ @Override
+ public void serialize(OutputStream out) throws HyracksDataException {
+ try {
+ DataOutputStream dos = new DataOutputStream(out);
+ dos.writeUTF(file);
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ public static DeleteFileTask create(DataInput input) throws IOException {
+ return new DeleteFileTask(input.readUTF());
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListResponse.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListResponse.java
new file mode 100644
index 0000000..85b7bb9
--- /dev/null
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListResponse.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.messaging;
+
+import java.io.DataInput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.replication.api.IReplicationMessage;
+import org.apache.asterix.replication.functions.ReplicationProtocol;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class PartitionResourcesListResponse implements IReplicationMessage {
+
+ private final int partition;
+ private final List<String> resources;
+
+ public PartitionResourcesListResponse(int partition, List<String> resources) {
+ this.partition = partition;
+ this.resources = resources;
+ }
+
+ @Override
+ public ReplicationProtocol.ReplicationRequestType getMessageType() {
+ return ReplicationProtocol.ReplicationRequestType.PARTITION_RESOURCES_RESPONSE;
+ }
+
+ @Override
+ public void serialize(OutputStream out) throws HyracksDataException {
+ try {
+ DataOutputStream dos = new DataOutputStream(out);
+ dos.writeInt(partition);
+ dos.writeInt(resources.size());
+ for (String file : resources) {
+ dos.writeUTF(file);
+ }
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ public List<String> getResources() {
+ return resources;
+ }
+
+ public static PartitionResourcesListResponse create(DataInput input) throws IOException {
+ int partition = input.readInt();
+ int size = input.readInt();
+ List<String> resources = new ArrayList<>();
+ for (int i = 0; i < size; i++) {
+ resources.add(input.readUTF());
+ }
+ return new PartitionResourcesListResponse(partition, resources);
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
new file mode 100644
index 0000000..b2b8ac6
--- /dev/null
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.messaging;
+
+import java.io.DataInput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.replication.IReplicationThread;
+import org.apache.asterix.common.utils.StoragePathUtil;
+import org.apache.asterix.replication.api.IReplicaTask;
+import org.apache.asterix.replication.functions.ReplicationProtocol;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * A task to get the list of the files in a partition on a replica
+ */
+public class PartitionResourcesListTask implements IReplicaTask {
+
+ private final int partition;
+
+ public PartitionResourcesListTask(int partition) {
+ this.partition = partition;
+ }
+
+ @Override
+ public void perform(INcApplicationContext appCtx, IReplicationThread worker) throws HyracksDataException {
+ //TODO delete any invalid files with masks
+ final List<String> partitionResources =
+ appCtx.getReplicaResourcesManager().getPartitionIndexesFiles(partition, false).stream()
+ .map(StoragePathUtil::getFileRelativePath).collect(Collectors.toList());
+ final PartitionResourcesListResponse response =
+ new PartitionResourcesListResponse(partition, partitionResources);
+ ReplicationProtocol.sendTo(worker.getChannel(), response, worker.getReusableBuffer());
+ }
+
+ @Override
+ public ReplicationProtocol.ReplicationRequestType getMessageType() {
+ return ReplicationProtocol.ReplicationRequestType.PARTITION_RESOURCES_REQUEST;
+ }
+
+ @Override
+ public void serialize(OutputStream out) throws HyracksDataException {
+ try {
+ DataOutputStream dos = new DataOutputStream(out);
+ dos.writeInt(partition);
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ public static PartitionResourcesListTask create(DataInput input) throws HyracksDataException {
+ try {
+ int partition = input.readInt();
+ return new PartitionResourcesListTask(partition);
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
new file mode 100644
index 0000000..14e9180
--- /dev/null
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.messaging;
+
+import java.io.DataInput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.exceptions.ReplicationException;
+import org.apache.asterix.common.replication.IReplicationThread;
+import org.apache.asterix.common.utils.StorageConstants;
+import org.apache.asterix.replication.api.IReplicaTask;
+import org.apache.asterix.replication.functions.ReplicationProtocol;
+import org.apache.asterix.replication.management.NetworkingUtil;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
+
+/**
+ * A task to replicate a file from a master replica
+ */
+public class ReplicateFileTask implements IReplicaTask {
+
+ private static final Logger LOGGER = Logger.getLogger(DeleteFileTask.class.getName());
+ private final String file;
+ private final long size;
+
+ public ReplicateFileTask(String file, long size) {
+ this.file = file;
+ this.size = size;
+ }
+
+ @Override
+ public void perform(INcApplicationContext appCtx, IReplicationThread worker) throws HyracksDataException {
+ try {
+ final IIOManager ioManager = appCtx.getIoManager();
+ // resolve path
+ final FileReference localPath = ioManager.resolve(file);
+ final Path resourceDir = Files.createDirectories(localPath.getFile().getParentFile().toPath());
+ // create mask
+ final Path maskPath = Paths.get(resourceDir.toString(),
+ StorageConstants.MASK_FILE_PREFIX + localPath.getFile().getName());
+ Files.createFile(maskPath);
+
+ // receive actual file
+ final Path filePath = Paths.get(resourceDir.toString(), localPath.getFile().getName());
+ Files.createFile(filePath);
+ try (RandomAccessFile fileOutputStream = new RandomAccessFile(filePath.toFile(),
+ "rw"); FileChannel fileChannel = fileOutputStream.getChannel()) {
+ fileOutputStream.setLength(size);
+ NetworkingUtil.downloadFile(fileChannel, worker.getChannel());
+ fileChannel.force(true);
+ }
+ //delete mask
+ Files.delete(maskPath);
+ LOGGER.info(() -> "Replicated file: " + localPath);
+ ReplicationProtocol.sendAck(worker.getChannel(), worker.getReusableBuffer());
+ } catch (IOException e) {
+ throw new ReplicationException(e);
+ }
+ }
+
+ @Override
+ public ReplicationProtocol.ReplicationRequestType getMessageType() {
+ return ReplicationProtocol.ReplicationRequestType.REPLICATE_RESOURCE_FILE;
+ }
+
+ @Override
+ public void serialize(OutputStream out) throws HyracksDataException {
+ try {
+ DataOutputStream dos = new DataOutputStream(out);
+ dos.writeUTF(file);
+ dos.writeLong(size);
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ public static ReplicateFileTask create(DataInput input) throws IOException {
+ final String s = input.readUTF();
+ final long i = input.readLong();
+ return new ReplicateFileTask(s, i);
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/FileSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/FileSynchronizer.java
new file mode 100644
index 0000000..8aa4487
--- /dev/null
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/FileSynchronizer.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.recovery;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.nio.channels.SocketChannel;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.exceptions.ReplicationException;
+import org.apache.asterix.replication.functions.ReplicationProtocol;
+import org.apache.asterix.replication.management.NetworkingUtil;
+import org.apache.asterix.replication.messaging.DeleteFileTask;
+import org.apache.asterix.replication.messaging.ReplicateFileTask;
+import org.apache.asterix.replication.storage.PartitionReplica;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
+
+public class FileSynchronizer {
+
+ private final INcApplicationContext appCtx;
+ private final PartitionReplica replica;
+
+ public FileSynchronizer(INcApplicationContext appCtx, PartitionReplica replica) {
+ this.appCtx = appCtx;
+ this.replica = replica;
+ }
+
+ public void replicate(String file) {
+ try {
+ final IIOManager ioManager = appCtx.getIoManager();
+ final SocketChannel channel = replica.getChannel();
+ final FileReference filePath = ioManager.resolve(file);
+ ReplicateFileTask task = new ReplicateFileTask(file, filePath.getFile().length());
+ ReplicationProtocol.sendTo(replica, task);
+ // send the file itself
+ try (RandomAccessFile fromFile = new RandomAccessFile(filePath.getFile(),
+ "r"); FileChannel fileChannel = fromFile.getChannel()) {
+ NetworkingUtil.sendFile(fileChannel, channel);
+ }
+ ReplicationProtocol.waitForAck(replica);
+ } catch (IOException e) {
+ throw new ReplicationException(e);
+ }
+ }
+
+ public void delete(String file) {
+ try {
+ final DeleteFileTask task = new DeleteFileTask(file);
+ ReplicationProtocol.sendTo(replica, task);
+ ReplicationProtocol.waitForAck(replica);
+ } catch (IOException e) {
+ throw new ReplicationException(e);
+ }
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/ReplicaFilesSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/ReplicaFilesSynchronizer.java
new file mode 100644
index 0000000..2021cee
--- /dev/null
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/ReplicaFilesSynchronizer.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.recovery;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.utils.StoragePathUtil;
+import org.apache.asterix.replication.functions.ReplicationProtocol;
+import org.apache.asterix.replication.messaging.PartitionResourcesListResponse;
+import org.apache.asterix.replication.messaging.PartitionResourcesListTask;
+import org.apache.asterix.replication.storage.PartitionReplica;
+
+/**
+ * Ensures that the files between master and a replica are synchronized
+ */
+public class ReplicaFilesSynchronizer {
+
+ private final PartitionReplica replica;
+ private final INcApplicationContext appCtx;
+
+ public ReplicaFilesSynchronizer(INcApplicationContext appCtx, PartitionReplica replica) {
+ this.appCtx = appCtx;
+ this.replica = replica;
+ }
+
+ public void sync() throws IOException {
+ final int partition = replica.getIdentifier().getPartition();
+ final Set<String> replicaFiles = getReplicaFiles(partition);
+ final Set<String> masterFiles =
+ appCtx.getReplicaResourcesManager().getPartitionIndexesFiles(partition, false).stream()
+ .map(StoragePathUtil::getFileRelativePath).collect(Collectors.toSet());
+ // find files on master and not on replica
+ final List<String> replicaMissingFiles =
+ masterFiles.stream().filter(file -> !replicaFiles.contains(file)).collect(Collectors.toList());
+ replicateMissingFiles(replicaMissingFiles);
+ // find files on replica and not on master
+ final List<String> replicaInvalidFiles =
+ replicaFiles.stream().filter(file -> !masterFiles.contains(file)).collect(Collectors.toList());
+ deleteInvalidFiles(replicaInvalidFiles);
+ }
+
+ private Set<String> getReplicaFiles(int partition) throws IOException {
+ final PartitionResourcesListTask replicaFilesRequest = new PartitionResourcesListTask(partition);
+ final SocketChannel channel = replica.getChannel();
+ final ByteBuffer reusableBuffer = replica.gerReusableBuffer();
+ ReplicationProtocol.sendTo(replica, replicaFilesRequest);
+ final PartitionResourcesListResponse response =
+ (PartitionResourcesListResponse) ReplicationProtocol.read(channel, reusableBuffer);
+ return new HashSet<>(response.getResources());
+ }
+
+ private void replicateMissingFiles(List<String> files) {
+ final FileSynchronizer sync = new FileSynchronizer(appCtx, replica);
+ files.forEach(sync::replicate);
+ }
+
+ private void deleteInvalidFiles(List<String> files) {
+ final FileSynchronizer sync = new FileSynchronizer(appCtx, replica);
+ files.forEach(sync::delete);
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/ReplicaSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/ReplicaSynchronizer.java
new file mode 100644
index 0000000..5c88460
--- /dev/null
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/ReplicaSynchronizer.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.recovery;
+
+import java.io.IOException;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.config.ReplicationProperties;
+import org.apache.asterix.common.replication.IReplicationStrategy;
+import org.apache.asterix.replication.functions.ReplicationProtocol;
+import org.apache.asterix.replication.messaging.CheckpointPartitionIndexesTask;
+import org.apache.asterix.replication.storage.PartitionReplica;
+
+/**
+ * Performs the steps required to ensure any newly added replica
+ * will be in-sync with master
+ */
+public class ReplicaSynchronizer {
+
+ private final INcApplicationContext appCtx;
+ private final PartitionReplica replica;
+
+ public ReplicaSynchronizer(INcApplicationContext appCtx, PartitionReplica replica) {
+ this.appCtx = appCtx;
+ this.replica = replica;
+ }
+
+ public void sync() throws IOException {
+ syncFiles();
+ checkpointReplicaIndexes();
+ appCtx.getReplicationManager().register(replica);
+ }
+
+ private void syncFiles() throws IOException {
+ final ReplicaFilesSynchronizer fileSync = new ReplicaFilesSynchronizer(appCtx, replica);
+ fileSync.sync();
+ // flush replicated dataset to generate disk component for any remaining in-memory components
+ final ReplicationProperties repl = appCtx.getReplicationProperties();
+ final IReplicationStrategy replStrategy = repl.getReplicationStrategy();
+ appCtx.getDatasetLifecycleManager().flushDataset(replStrategy);
+ // sync any newly generated files
+ fileSync.sync();
+ }
+
+ private void checkpointReplicaIndexes() throws IOException {
+ CheckpointPartitionIndexesTask task =
+ new CheckpointPartitionIndexesTask(replica.getIdentifier().getPartition());
+ ReplicationProtocol.sendTo(replica, task);
+ ReplicationProtocol.waitForAck(replica);
+ }
+}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/PartitionReplica.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/PartitionReplica.java
index c6d1b60..d9ce75e 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/PartitionReplica.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/PartitionReplica.java
@@ -22,12 +22,20 @@
import static org.apache.asterix.common.replication.IPartitionReplica.PartitionReplicaStatus.DISCONNECTED;
import static org.apache.asterix.common.replication.IPartitionReplica.PartitionReplicaStatus.IN_SYNC;
+import java.io.IOException;
+import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.exceptions.ReplicationException;
import org.apache.asterix.common.replication.IPartitionReplica;
import org.apache.asterix.common.storage.ReplicaIdentifier;
+import org.apache.asterix.replication.functions.ReplicationProtocol;
+import org.apache.asterix.replication.recovery.ReplicaSynchronizer;
import org.apache.hyracks.util.JSONUtil;
+import org.apache.hyracks.util.StorageUtil;
import org.apache.hyracks.util.annotations.ThreadSafe;
import com.fasterxml.jackson.core.JsonProcessingException;
@@ -38,12 +46,18 @@
@ThreadSafe
public class PartitionReplica implements IPartitionReplica {
+ private static final Logger LOGGER = Logger.getLogger(PartitionReplica.class.getName());
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final int INITIAL_BUFFER_SIZE = StorageUtil.getIntSizeInBytes(4, StorageUtil.StorageUnit.KILOBYTE);
+ private final INcApplicationContext appCtx;
private final ReplicaIdentifier id;
+ private ByteBuffer reusbaleBuf;
private PartitionReplicaStatus status = DISCONNECTED;
+ private SocketChannel sc;
- public PartitionReplica(ReplicaIdentifier id) {
+ public PartitionReplica(ReplicaIdentifier id, INcApplicationContext appCtx) {
this.id = id;
+ this.appCtx = appCtx;
}
@Override
@@ -60,9 +74,53 @@
if (status == IN_SYNC || status == CATCHING_UP) {
return;
}
+ setStatus(CATCHING_UP);
+ appCtx.getThreadExecutor().execute(() -> {
+ try {
+ new ReplicaSynchronizer(appCtx, this).sync();
+ setStatus(IN_SYNC);
+ } catch (Exception e) {
+ LOGGER.log(Level.SEVERE, e, () -> "Failed to sync replica " + this);
+ setStatus(DISCONNECTED);
+ } finally {
+ close();
+ }
+ });
}
- public JsonNode asJson() {
+ public synchronized SocketChannel getChannel() {
+ try {
+ if (sc == null || !sc.isOpen() || !sc.isConnected()) {
+ sc = SocketChannel.open();
+ sc.configureBlocking(true);
+ sc.connect(id.getLocation());
+ }
+ return sc;
+ } catch (IOException e) {
+ throw new ReplicationException(e);
+ }
+ }
+
+ public synchronized void close() {
+ try {
+ if (sc != null && sc.isOpen()) {
+ ReplicationProtocol.sendGoodbye(sc);
+ sc.close();
+ sc = null;
+ }
+ } catch (IOException e) {
+ throw new ReplicationException(e);
+ }
+ }
+
+ public synchronized ByteBuffer gerReusableBuffer() {
+ if (reusbaleBuf == null) {
+ reusbaleBuf = ByteBuffer.allocate(INITIAL_BUFFER_SIZE);
+ }
+ return reusbaleBuf;
+ }
+
+ private JsonNode asJson() {
ObjectNode json = OBJECT_MAPPER.createObjectNode();
json.put("id", id.toString());
json.put("state", status.name());
@@ -94,4 +152,9 @@
throw new ReplicationException(e);
}
}
+
+ private synchronized void setStatus(PartitionReplicaStatus status) {
+ LOGGER.info(() -> "Replica " + this + " status changing: " + this.status + " -> " + status);
+ this.status = status;
+ }
}
\ No newline at end of file
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
index 2ff74a8..398f97d 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
@@ -182,6 +182,7 @@
* @param partition
* @return Absolute paths to all partition files
*/
+ @Override
public List<String> getPartitionIndexesFiles(int partition, boolean relativePath) throws HyracksDataException {
List<String> partitionFiles = new ArrayList<String>();
Set<File> partitionIndexes = localRepository.getPartitionIndexes(partition);