[ASTERIXDB-3221][REPL] Use IO scheduler for replication ops
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- To allow concurrent replication operations, use the IO scheduler
to schedule the operations using the exact same scheduling logic
as the flush operations.
- Maximum concurrent replication operations = maximum concurrent flush
operations.
- Allow connections to replicas to be recycled and reused by the
replication operations and close them when no more pending operations.
- Do not halt on replicate operations failures since failures are
expected and replicas can be re-synced.
Change-Id: I82aeb60381f90a254ca99274f7e9a38f64bc7a46
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17635
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/HaltCallback.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/HaltCallback.java
index 9802001..bb689ac 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/HaltCallback.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/HaltCallback.java
@@ -41,6 +41,8 @@
@Override
public void operationFailed(ILSMIOOperation operation, Throwable t) {
LOGGER.error("Operation {} has failed", operation, t);
- ExitUtil.halt(ExitUtil.EC_IO_OPERATION_FAILED);
+ if (operation.getIOOpertionType() != ILSMIOOperation.LSMIOOperationType.REPLICATE) {
+ ExitUtil.halt(ExitUtil.EC_IO_OPERATION_FAILED);
+ }
}
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java
index d803756..f7a739b 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java
@@ -20,7 +20,10 @@
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.ArrayDeque;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
@@ -29,6 +32,7 @@
import org.apache.asterix.common.exceptions.ReplicationException;
import org.apache.asterix.common.replication.IPartitionReplica;
import org.apache.asterix.common.replication.IReplicationDestination;
+import org.apache.asterix.common.storage.ReplicaIdentifier;
import org.apache.asterix.replication.management.NetworkingUtil;
import org.apache.asterix.replication.messaging.ReplicationProtocol;
import org.apache.hyracks.api.network.ISocketChannel;
@@ -41,6 +45,7 @@
private static final Logger LOGGER = LogManager.getLogger();
private final Set<IPartitionReplica> replicas = new HashSet<>();
private final InetSocketAddress inputLocation;
+ private final Map<ReplicaIdentifier, ArrayDeque<PartitionReplica>> replicasConnPool = new HashMap<>();
private InetSocketAddress resolvedLocation;
private ISocketChannel logRepChannel;
@@ -64,6 +69,11 @@
@Override
public synchronized void remove(IPartitionReplica replica) {
replicas.remove(replica);
+ ArrayDeque<PartitionReplica> partitionConnections = replicasConnPool.remove(replica.getIdentifier());
+ if (partitionConnections != null) {
+ partitionConnections.forEach(PartitionReplica::close);
+ partitionConnections.clear();
+ }
}
@Override
@@ -138,4 +148,26 @@
public int hashCode() {
return Objects.hash(inputLocation);
}
+
+ public synchronized PartitionReplica getPartitionReplicaConnection(ReplicaIdentifier identifier,
+ INcApplicationContext appCtx) {
+ ArrayDeque<PartitionReplica> partitionReplicas =
+ replicasConnPool.computeIfAbsent(identifier, k -> new ArrayDeque<>());
+ if (!partitionReplicas.isEmpty()) {
+ return partitionReplicas.remove();
+ }
+ return new PartitionReplica(identifier, appCtx);
+ }
+
+ public synchronized void recycleConnection(PartitionReplica partitionReplica) {
+ ArrayDeque<PartitionReplica> partitionReplicas = replicasConnPool.get(partitionReplica.getIdentifier());
+ if (partitionReplicas != null) {
+ partitionReplicas.add(partitionReplica);
+ }
+ }
+
+ public synchronized void closeConnections() {
+ replicasConnPool
+ .forEach(((identifier, partitionReplicas) -> partitionReplicas.forEach(PartitionReplica::close)));
+ }
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java
index 063709a..8c514bd 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java
@@ -21,22 +21,21 @@
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.exceptions.ReplicationException;
-import org.apache.asterix.common.replication.IPartitionReplica;
import org.apache.asterix.common.replication.IReplicationDestination;
import org.apache.asterix.common.replication.IReplicationManager;
import org.apache.asterix.common.replication.IReplicationStrategy;
import org.apache.asterix.common.storage.DatasetResourceReference;
-import org.apache.asterix.common.storage.ResourceReference;
-import org.apache.asterix.replication.api.PartitionReplica;
import org.apache.asterix.replication.api.ReplicationDestination;
-import org.apache.asterix.replication.sync.IndexSynchronizer;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.replication.IReplicationJob;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexReplicationJob;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -45,13 +44,15 @@
private static final Logger LOGGER = LogManager.getLogger();
private final IReplicationManager replicationManager;
- private final Set<ReplicationDestination> destinations = new HashSet<>();
+ private final Set<ReplicationDestination> destinations = ConcurrentHashMap.newKeySet();
private final LinkedBlockingQueue<IReplicationJob> replicationJobsQ = new LinkedBlockingQueue<>();
private final IReplicationStrategy replicationStrategy;
private final PersistentLocalResourceRepository resourceRepository;
private final INcApplicationContext appCtx;
+ private final ILSMIOOperationScheduler ioScheduler;
private final Object transferLock = new Object();
private final Set<ReplicationDestination> failedDest = new HashSet<>();
+ private final AtomicInteger pendingRepOpsCount = new AtomicInteger();
public IndexReplicationManager(INcApplicationContext appCtx, IReplicationManager replicationManager) {
this.appCtx = appCtx;
@@ -59,6 +60,8 @@
this.resourceRepository = (PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
replicationStrategy = replicationManager.getReplicationStrategy();
appCtx.getThreadExecutor().execute(new ReplicationJobsProcessor());
+ ioScheduler = appCtx.getStorageComponentProvider().getIoOperationSchedulerProvider()
+ .getIoScheduler(appCtx.getServiceContext());
}
public void register(ReplicationDestination dest) {
@@ -72,12 +75,18 @@
public void unregister(IReplicationDestination dest) {
synchronized (transferLock) {
LOGGER.info(() -> "unregister " + dest);
+ for (ReplicationDestination existingDest : destinations) {
+ if (existingDest.equals(dest)) {
+ existingDest.closeConnections();
+ break;
+ }
+ }
destinations.remove(dest);
failedDest.remove(dest);
}
}
- private void handleFailure(ReplicationDestination dest, Exception e) {
+ public void handleFailure(ReplicationDestination dest, Exception e) {
synchronized (transferLock) {
if (failedDest.contains(dest)) {
return;
@@ -87,6 +96,7 @@
LOGGER.error("replica at {} failed", dest);
failedDest.add(dest);
}
+ dest.closeConnections();
replicationManager.notifyFailure(dest, e);
}
}
@@ -99,71 +109,62 @@
process(job);
}
- private void process(IReplicationJob job) {
- try {
- if (skip(job)) {
- return;
- }
- synchronized (transferLock) {
- if (destinations.isEmpty()) {
- return;
- }
- final IndexSynchronizer synchronizer = new IndexSynchronizer(job, appCtx);
- final int indexPartition = getJobPartition(job);
- for (ReplicationDestination dest : destinations) {
- try {
- Optional<IPartitionReplica> partitionReplica = dest.getPartitionReplica(indexPartition);
- if (!partitionReplica.isPresent()) {
- continue;
- }
- PartitionReplica replica = (PartitionReplica) partitionReplica.get();
- synchronizer.sync(replica);
- } catch (Exception e) {
- handleFailure(dest, e);
- }
- }
- closeChannels();
- }
- } finally {
- afterReplication(job);
+ public Set<ReplicationDestination> getDestinations() {
+ synchronized (transferLock) {
+ return destinations;
}
}
- private boolean skip(IReplicationJob job) {
- try {
- final String fileToReplicate = job.getAnyFile();
- final Optional<DatasetResourceReference> indexFileRefOpt =
- resourceRepository.getLocalResourceReference(fileToReplicate);
- if (!indexFileRefOpt.isPresent()) {
- LOGGER.warn("skipping replication of {} due to missing dataset resource reference", fileToReplicate);
- return true;
+ private void process(IReplicationJob job) {
+ pendingRepOpsCount.incrementAndGet();
+ Optional<DatasetResourceReference> jobIndexRefOpt = getJobIndexRef(job);
+ if (jobIndexRefOpt.isEmpty()) {
+ LOGGER.warn("skipping replication of {} due to missing dataset resource reference", job.getAnyFile());
+ afterReplication(job);
+ return;
+ }
+ ReplicationOperation rp = new ReplicationOperation(appCtx, jobIndexRefOpt.get(), job, this);
+ if (job.getExecutionType() == IReplicationJob.ReplicationExecutionType.SYNC) {
+ rp.call();
+ } else {
+ try {
+ ioScheduler.scheduleOperation(rp);
+ } catch (HyracksDataException e) {
+ throw new ReplicationException(e);
}
- return !replicationStrategy.isMatch(indexFileRefOpt.get().getDatasetId());
+ }
+ }
+
+ public boolean skip(DatasetResourceReference indexRef) {
+ return !replicationStrategy.isMatch(indexRef.getDatasetId());
+ }
+
+ public Optional<DatasetResourceReference> getJobIndexRef(IReplicationJob job) {
+ final String fileToReplicate = job.getAnyFile();
+ try {
+ return resourceRepository.getLocalResourceReference(fileToReplicate);
} catch (HyracksDataException e) {
throw new IllegalStateException("Couldn't find resource for " + job.getAnyFile(), e);
}
}
- private int getJobPartition(IReplicationJob job) {
- return ResourceReference.of(job.getAnyFile()).getPartitionNum();
- }
-
private void closeChannels() {
- if (!replicationJobsQ.isEmpty()) {
- return;
- }
LOGGER.trace("no pending replication jobs; closing connections to replicas");
for (ReplicationDestination dest : destinations) {
- dest.getReplicas().stream().map(PartitionReplica.class::cast).forEach(PartitionReplica::close);
+ dest.closeConnections();
}
}
- private static void afterReplication(IReplicationJob job) {
+ public void afterReplication(IReplicationJob job) {
try {
+ int pendingOps = pendingRepOpsCount.decrementAndGet();
if (job.getOperation() == IReplicationJob.ReplicationOperation.REPLICATE
&& job instanceof ILSMIndexReplicationJob) {
((ILSMIndexReplicationJob) job).endReplication();
}
+ if (pendingOps == 0 && replicationJobsQ.isEmpty()) {
+ closeChannels();
+ }
} catch (HyracksDataException e) {
throw new ReplicationException(e);
}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationOperation.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationOperation.java
new file mode 100644
index 0000000..258f24a
--- /dev/null
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationOperation.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.management;
+
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.replication.IPartitionReplica;
+import org.apache.asterix.common.storage.DatasetResourceReference;
+import org.apache.asterix.replication.api.PartitionReplica;
+import org.apache.asterix.replication.api.ReplicationDestination;
+import org.apache.asterix.replication.sync.IndexSynchronizer;
+import org.apache.hyracks.api.replication.IReplicationJob;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
+import org.apache.hyracks.storage.am.lsm.common.impls.AbstractIoOperation;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
+import org.apache.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallbackFactory;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class ReplicationOperation extends AbstractIoOperation {
+
+ private static final Logger LOGGER = LogManager.getLogger();
+
+ private static final ILSMIOOperationCallback INSTANCE =
+ NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(null);
+ private final INcApplicationContext appCtx;
+ private final DatasetResourceReference indexRef;
+ private final IReplicationJob job;
+ private final IndexReplicationManager indexReplicationManager;
+
+ public ReplicationOperation(INcApplicationContext appCtx, DatasetResourceReference indexRef, IReplicationJob job,
+ IndexReplicationManager indexReplicationManager) {
+ super(null, null, INSTANCE, indexRef.getRelativePath().toString());
+ this.appCtx = appCtx;
+ this.indexRef = indexRef;
+ this.job = job;
+ this.indexReplicationManager = indexReplicationManager;
+ }
+
+ @Override
+ public LSMIOOperationType getIOOpertionType() {
+ return LSMIOOperationType.REPLICATE;
+ }
+
+ @Override
+ public LSMIOOperationStatus call() {
+ try {
+ Set<ReplicationDestination> destinations = indexReplicationManager.getDestinations();
+ if (destinations.isEmpty() || indexReplicationManager.skip(indexRef)) {
+ return LSMIOOperationStatus.SUCCESS;
+ }
+ LOGGER.debug("started replicate operation on index {}", indexRef);
+ final IndexSynchronizer synchronizer = new IndexSynchronizer(job, appCtx);
+ final int indexPartition = indexRef.getPartitionId();
+ for (ReplicationDestination dest : destinations) {
+ Optional<IPartitionReplica> partitionReplica = dest.getPartitionReplica(indexPartition);
+ if (partitionReplica.isEmpty()) {
+ continue;
+ }
+ PartitionReplica destReplica = null;
+ try {
+ destReplica = dest.getPartitionReplicaConnection(partitionReplica.get().getIdentifier(), appCtx);
+ synchronizer.sync(destReplica);
+ dest.recycleConnection(destReplica);
+ } catch (Exception e) {
+ if (destReplica != null) {
+ destReplica.close();
+ }
+ indexReplicationManager.handleFailure(dest, e);
+ }
+ }
+ LOGGER.debug("completed replicate operation on index {}", indexRef);
+ return LSMIOOperationStatus.SUCCESS;
+ } finally {
+ indexReplicationManager.afterReplication(job);
+ }
+ }
+
+ @Override
+ protected LSMComponentFileReferences getComponentFiles() {
+ return null;
+ }
+
+ @Override
+ public long getRemainingPages() {
+ return 0;
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractAsynchronousScheduler.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractAsynchronousScheduler.java
index e266a6f..049da38 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractAsynchronousScheduler.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractAsynchronousScheduler.java
@@ -38,9 +38,10 @@
private final int maxNumFlushes;
protected final Map<String, ILSMIOOperation> runningFlushOperations = new HashMap<>();
+ protected final Map<String, ILSMIOOperation> runningReplicateOperations = new HashMap<>();
protected final Deque<ILSMIOOperation> waitingFlushOperations = new ArrayDeque<>();
protected final Deque<ILSMIOOperation> waitingMergeOperations = new ArrayDeque<>();
-
+ protected final Deque<ILSMIOOperation> waitingReplicateOperations = new ArrayDeque<>();
protected final Map<String, Throwable> failedGroups = new HashMap<>();
public AbstractAsynchronousScheduler(ThreadFactory threadFactory, final IIoOperationFailedCallback callback,
@@ -58,8 +59,11 @@
case MERGE:
scheduleMerge(operation);
break;
+ case REPLICATE:
+ scheduleReplicate(operation);
+ break;
case NOOP:
- return;
+ break;
default:
// this should never happen
// just guard here to avoid silent failures in case of future extensions
@@ -75,6 +79,10 @@
break;
case MERGE:
completeMerge(operation);
+ break;
+ case REPLICATE:
+ completeReplicate(operation);
+ break;
case NOOP:
return;
default:
@@ -149,6 +157,46 @@
}
}
+ private void scheduleReplicate(ILSMIOOperation operation) {
+ String id = operation.getIndexIdentifier();
+ synchronized (executor) {
+ if (runningReplicateOperations.size() >= maxNumFlushes || runningReplicateOperations.containsKey(id)) {
+ waitingReplicateOperations.add(operation);
+ } else {
+ runningReplicateOperations.put(id, operation);
+ executor.submit(operation);
+ }
+ }
+ }
+
+ private void completeReplicate(ILSMIOOperation operation) {
+ String id = operation.getIndexIdentifier();
+ synchronized (executor) {
+ runningReplicateOperations.remove(id);
+ // Schedule replicate in FIFO order. Must make sure that there is at most one scheduled replicate for each index.
+ for (ILSMIOOperation replicateOp : waitingReplicateOperations) {
+ String replicateOpId = replicateOp.getIndexIdentifier();
+ if (runningReplicateOperations.size() < maxNumFlushes) {
+ if (!runningReplicateOperations.containsKey(replicateOpId) && !replicateOp.isCompleted()) {
+ runningReplicateOperations.put(replicateOpId, replicateOp);
+ executor.submit(replicateOp);
+ }
+ } else {
+ break;
+ }
+ }
+ // cleanup scheduled replicate
+ while (!waitingReplicateOperations.isEmpty()) {
+ ILSMIOOperation top = waitingReplicateOperations.peek();
+ if (top.isCompleted() || runningReplicateOperations.get(top.getIndexIdentifier()) == top) {
+ waitingReplicateOperations.poll();
+ } else {
+ break;
+ }
+ }
+ }
+ }
+
@Override
public void close() throws IOException {
executor.shutdown();