[ASTERIXDB-3221][REPL] Use IO scheduler for replication ops

- user model changes: no
- storage format changes: no
- interface changes: no

Details:

- To allow concurrent replication operations, use the IO scheduler
  to schedule the operations using the exact same scheduling logic
  as the flush operations.
- Maximum concurrent replication operations = maximum concurrent flush
  operations.
- Allow connections to replicas to be recycled and reused by the
  replication operations and close them when no more pending operations.
- Do not halt on replicate operations failures since failures are
  expected and replicas can be re-synced.

Change-Id: I82aeb60381f90a254ca99274f7e9a38f64bc7a46
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17635
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/HaltCallback.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/HaltCallback.java
index 9802001..bb689ac 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/HaltCallback.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/HaltCallback.java
@@ -41,6 +41,8 @@
     @Override
     public void operationFailed(ILSMIOOperation operation, Throwable t) {
         LOGGER.error("Operation {} has failed", operation, t);
-        ExitUtil.halt(ExitUtil.EC_IO_OPERATION_FAILED);
+        if (operation.getIOOpertionType() != ILSMIOOperation.LSMIOOperationType.REPLICATE) {
+            ExitUtil.halt(ExitUtil.EC_IO_OPERATION_FAILED);
+        }
     }
 }
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java
index d803756..f7a739b 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java
@@ -20,7 +20,10 @@
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.ArrayDeque;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
@@ -29,6 +32,7 @@
 import org.apache.asterix.common.exceptions.ReplicationException;
 import org.apache.asterix.common.replication.IPartitionReplica;
 import org.apache.asterix.common.replication.IReplicationDestination;
+import org.apache.asterix.common.storage.ReplicaIdentifier;
 import org.apache.asterix.replication.management.NetworkingUtil;
 import org.apache.asterix.replication.messaging.ReplicationProtocol;
 import org.apache.hyracks.api.network.ISocketChannel;
@@ -41,6 +45,7 @@
     private static final Logger LOGGER = LogManager.getLogger();
     private final Set<IPartitionReplica> replicas = new HashSet<>();
     private final InetSocketAddress inputLocation;
+    private final Map<ReplicaIdentifier, ArrayDeque<PartitionReplica>> replicasConnPool = new HashMap<>();
     private InetSocketAddress resolvedLocation;
     private ISocketChannel logRepChannel;
 
@@ -64,6 +69,11 @@
     @Override
     public synchronized void remove(IPartitionReplica replica) {
         replicas.remove(replica);
+        ArrayDeque<PartitionReplica> partitionConnections = replicasConnPool.remove(replica.getIdentifier());
+        if (partitionConnections != null) {
+            partitionConnections.forEach(PartitionReplica::close);
+            partitionConnections.clear();
+        }
     }
 
     @Override
@@ -138,4 +148,26 @@
     public int hashCode() {
         return Objects.hash(inputLocation);
     }
+
+    public synchronized PartitionReplica getPartitionReplicaConnection(ReplicaIdentifier identifier,
+            INcApplicationContext appCtx) {
+        ArrayDeque<PartitionReplica> partitionReplicas =
+                replicasConnPool.computeIfAbsent(identifier, k -> new ArrayDeque<>());
+        if (!partitionReplicas.isEmpty()) {
+            return partitionReplicas.remove();
+        }
+        return new PartitionReplica(identifier, appCtx);
+    }
+
+    public synchronized void recycleConnection(PartitionReplica partitionReplica) {
+        ArrayDeque<PartitionReplica> partitionReplicas = replicasConnPool.get(partitionReplica.getIdentifier());
+        if (partitionReplicas != null) {
+            partitionReplicas.add(partitionReplica);
+        }
+    }
+
+    public synchronized void closeConnections() {
+        replicasConnPool
+                .forEach(((identifier, partitionReplicas) -> partitionReplicas.forEach(PartitionReplica::close)));
+    }
 }
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java
index 063709a..8c514bd 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java
@@ -21,22 +21,21 @@
 import java.util.HashSet;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.exceptions.ReplicationException;
-import org.apache.asterix.common.replication.IPartitionReplica;
 import org.apache.asterix.common.replication.IReplicationDestination;
 import org.apache.asterix.common.replication.IReplicationManager;
 import org.apache.asterix.common.replication.IReplicationStrategy;
 import org.apache.asterix.common.storage.DatasetResourceReference;
-import org.apache.asterix.common.storage.ResourceReference;
-import org.apache.asterix.replication.api.PartitionReplica;
 import org.apache.asterix.replication.api.ReplicationDestination;
-import org.apache.asterix.replication.sync.IndexSynchronizer;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.replication.IReplicationJob;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexReplicationJob;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -45,13 +44,15 @@
 
     private static final Logger LOGGER = LogManager.getLogger();
     private final IReplicationManager replicationManager;
-    private final Set<ReplicationDestination> destinations = new HashSet<>();
+    private final Set<ReplicationDestination> destinations = ConcurrentHashMap.newKeySet();
     private final LinkedBlockingQueue<IReplicationJob> replicationJobsQ = new LinkedBlockingQueue<>();
     private final IReplicationStrategy replicationStrategy;
     private final PersistentLocalResourceRepository resourceRepository;
     private final INcApplicationContext appCtx;
+    private final ILSMIOOperationScheduler ioScheduler;
     private final Object transferLock = new Object();
     private final Set<ReplicationDestination> failedDest = new HashSet<>();
+    private final AtomicInteger pendingRepOpsCount = new AtomicInteger();
 
     public IndexReplicationManager(INcApplicationContext appCtx, IReplicationManager replicationManager) {
         this.appCtx = appCtx;
@@ -59,6 +60,8 @@
         this.resourceRepository = (PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
         replicationStrategy = replicationManager.getReplicationStrategy();
         appCtx.getThreadExecutor().execute(new ReplicationJobsProcessor());
+        ioScheduler = appCtx.getStorageComponentProvider().getIoOperationSchedulerProvider()
+                .getIoScheduler(appCtx.getServiceContext());
     }
 
     public void register(ReplicationDestination dest) {
@@ -72,12 +75,18 @@
     public void unregister(IReplicationDestination dest) {
         synchronized (transferLock) {
             LOGGER.info(() -> "unregister " + dest);
+            for (ReplicationDestination existingDest : destinations) {
+                if (existingDest.equals(dest)) {
+                    existingDest.closeConnections();
+                    break;
+                }
+            }
             destinations.remove(dest);
             failedDest.remove(dest);
         }
     }
 
-    private void handleFailure(ReplicationDestination dest, Exception e) {
+    public void handleFailure(ReplicationDestination dest, Exception e) {
         synchronized (transferLock) {
             if (failedDest.contains(dest)) {
                 return;
@@ -87,6 +96,7 @@
                 LOGGER.error("replica at {} failed", dest);
                 failedDest.add(dest);
             }
+            dest.closeConnections();
             replicationManager.notifyFailure(dest, e);
         }
     }
@@ -99,71 +109,62 @@
         process(job);
     }
 
-    private void process(IReplicationJob job) {
-        try {
-            if (skip(job)) {
-                return;
-            }
-            synchronized (transferLock) {
-                if (destinations.isEmpty()) {
-                    return;
-                }
-                final IndexSynchronizer synchronizer = new IndexSynchronizer(job, appCtx);
-                final int indexPartition = getJobPartition(job);
-                for (ReplicationDestination dest : destinations) {
-                    try {
-                        Optional<IPartitionReplica> partitionReplica = dest.getPartitionReplica(indexPartition);
-                        if (!partitionReplica.isPresent()) {
-                            continue;
-                        }
-                        PartitionReplica replica = (PartitionReplica) partitionReplica.get();
-                        synchronizer.sync(replica);
-                    } catch (Exception e) {
-                        handleFailure(dest, e);
-                    }
-                }
-                closeChannels();
-            }
-        } finally {
-            afterReplication(job);
+    public Set<ReplicationDestination> getDestinations() {
+        synchronized (transferLock) {
+            return destinations;
         }
     }
 
-    private boolean skip(IReplicationJob job) {
-        try {
-            final String fileToReplicate = job.getAnyFile();
-            final Optional<DatasetResourceReference> indexFileRefOpt =
-                    resourceRepository.getLocalResourceReference(fileToReplicate);
-            if (!indexFileRefOpt.isPresent()) {
-                LOGGER.warn("skipping replication of {} due to missing dataset resource reference", fileToReplicate);
-                return true;
+    private void process(IReplicationJob job) {
+        pendingRepOpsCount.incrementAndGet();
+        Optional<DatasetResourceReference> jobIndexRefOpt = getJobIndexRef(job);
+        if (jobIndexRefOpt.isEmpty()) {
+            LOGGER.warn("skipping replication of {} due to missing dataset resource reference", job.getAnyFile());
+            afterReplication(job);
+            return;
+        }
+        ReplicationOperation rp = new ReplicationOperation(appCtx, jobIndexRefOpt.get(), job, this);
+        if (job.getExecutionType() == IReplicationJob.ReplicationExecutionType.SYNC) {
+            rp.call();
+        } else {
+            try {
+                ioScheduler.scheduleOperation(rp);
+            } catch (HyracksDataException e) {
+                throw new ReplicationException(e);
             }
-            return !replicationStrategy.isMatch(indexFileRefOpt.get().getDatasetId());
+        }
+    }
+
+    public boolean skip(DatasetResourceReference indexRef) {
+        return !replicationStrategy.isMatch(indexRef.getDatasetId());
+    }
+
+    public Optional<DatasetResourceReference> getJobIndexRef(IReplicationJob job) {
+        final String fileToReplicate = job.getAnyFile();
+        try {
+            return resourceRepository.getLocalResourceReference(fileToReplicate);
         } catch (HyracksDataException e) {
             throw new IllegalStateException("Couldn't find resource for " + job.getAnyFile(), e);
         }
     }
 
-    private int getJobPartition(IReplicationJob job) {
-        return ResourceReference.of(job.getAnyFile()).getPartitionNum();
-    }
-
     private void closeChannels() {
-        if (!replicationJobsQ.isEmpty()) {
-            return;
-        }
         LOGGER.trace("no pending replication jobs; closing connections to replicas");
         for (ReplicationDestination dest : destinations) {
-            dest.getReplicas().stream().map(PartitionReplica.class::cast).forEach(PartitionReplica::close);
+            dest.closeConnections();
         }
     }
 
-    private static void afterReplication(IReplicationJob job) {
+    public void afterReplication(IReplicationJob job) {
         try {
+            int pendingOps = pendingRepOpsCount.decrementAndGet();
             if (job.getOperation() == IReplicationJob.ReplicationOperation.REPLICATE
                     && job instanceof ILSMIndexReplicationJob) {
                 ((ILSMIndexReplicationJob) job).endReplication();
             }
+            if (pendingOps == 0 && replicationJobsQ.isEmpty()) {
+                closeChannels();
+            }
         } catch (HyracksDataException e) {
             throw new ReplicationException(e);
         }
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationOperation.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationOperation.java
new file mode 100644
index 0000000..258f24a
--- /dev/null
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationOperation.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.management;
+
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.replication.IPartitionReplica;
+import org.apache.asterix.common.storage.DatasetResourceReference;
+import org.apache.asterix.replication.api.PartitionReplica;
+import org.apache.asterix.replication.api.ReplicationDestination;
+import org.apache.asterix.replication.sync.IndexSynchronizer;
+import org.apache.hyracks.api.replication.IReplicationJob;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
+import org.apache.hyracks.storage.am.lsm.common.impls.AbstractIoOperation;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences;
+import org.apache.hyracks.storage.am.lsm.common.impls.NoOpIOOperationCallbackFactory;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class ReplicationOperation extends AbstractIoOperation {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+
+    private static final ILSMIOOperationCallback INSTANCE =
+            NoOpIOOperationCallbackFactory.INSTANCE.createIoOpCallback(null);
+    private final INcApplicationContext appCtx;
+    private final DatasetResourceReference indexRef;
+    private final IReplicationJob job;
+    private final IndexReplicationManager indexReplicationManager;
+
+    public ReplicationOperation(INcApplicationContext appCtx, DatasetResourceReference indexRef, IReplicationJob job,
+            IndexReplicationManager indexReplicationManager) {
+        super(null, null, INSTANCE, indexRef.getRelativePath().toString());
+        this.appCtx = appCtx;
+        this.indexRef = indexRef;
+        this.job = job;
+        this.indexReplicationManager = indexReplicationManager;
+    }
+
+    @Override
+    public LSMIOOperationType getIOOpertionType() {
+        return LSMIOOperationType.REPLICATE;
+    }
+
+    @Override
+    public LSMIOOperationStatus call() {
+        try {
+            Set<ReplicationDestination> destinations = indexReplicationManager.getDestinations();
+            if (destinations.isEmpty() || indexReplicationManager.skip(indexRef)) {
+                return LSMIOOperationStatus.SUCCESS;
+            }
+            LOGGER.debug("started replicate operation on index {}", indexRef);
+            final IndexSynchronizer synchronizer = new IndexSynchronizer(job, appCtx);
+            final int indexPartition = indexRef.getPartitionId();
+            for (ReplicationDestination dest : destinations) {
+                Optional<IPartitionReplica> partitionReplica = dest.getPartitionReplica(indexPartition);
+                if (partitionReplica.isEmpty()) {
+                    continue;
+                }
+                PartitionReplica destReplica = null;
+                try {
+                    destReplica = dest.getPartitionReplicaConnection(partitionReplica.get().getIdentifier(), appCtx);
+                    synchronizer.sync(destReplica);
+                    dest.recycleConnection(destReplica);
+                } catch (Exception e) {
+                    if (destReplica != null) {
+                        destReplica.close();
+                    }
+                    indexReplicationManager.handleFailure(dest, e);
+                }
+            }
+            LOGGER.debug("completed replicate operation on index {}", indexRef);
+            return LSMIOOperationStatus.SUCCESS;
+        } finally {
+            indexReplicationManager.afterReplication(job);
+        }
+    }
+
+    @Override
+    protected LSMComponentFileReferences getComponentFiles() {
+        return null;
+    }
+
+    @Override
+    public long getRemainingPages() {
+        return 0;
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractAsynchronousScheduler.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractAsynchronousScheduler.java
index e266a6f..049da38 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractAsynchronousScheduler.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractAsynchronousScheduler.java
@@ -38,9 +38,10 @@
 
     private final int maxNumFlushes;
     protected final Map<String, ILSMIOOperation> runningFlushOperations = new HashMap<>();
+    protected final Map<String, ILSMIOOperation> runningReplicateOperations = new HashMap<>();
     protected final Deque<ILSMIOOperation> waitingFlushOperations = new ArrayDeque<>();
     protected final Deque<ILSMIOOperation> waitingMergeOperations = new ArrayDeque<>();
-
+    protected final Deque<ILSMIOOperation> waitingReplicateOperations = new ArrayDeque<>();
     protected final Map<String, Throwable> failedGroups = new HashMap<>();
 
     public AbstractAsynchronousScheduler(ThreadFactory threadFactory, final IIoOperationFailedCallback callback,
@@ -58,8 +59,11 @@
             case MERGE:
                 scheduleMerge(operation);
                 break;
+            case REPLICATE:
+                scheduleReplicate(operation);
+                break;
             case NOOP:
-                return;
+                break;
             default:
                 // this should never happen
                 // just guard here to avoid silent failures in case of future extensions
@@ -75,6 +79,10 @@
                 break;
             case MERGE:
                 completeMerge(operation);
+                break;
+            case REPLICATE:
+                completeReplicate(operation);
+                break;
             case NOOP:
                 return;
             default:
@@ -149,6 +157,46 @@
         }
     }
 
+    private void scheduleReplicate(ILSMIOOperation operation) {
+        String id = operation.getIndexIdentifier();
+        synchronized (executor) {
+            if (runningReplicateOperations.size() >= maxNumFlushes || runningReplicateOperations.containsKey(id)) {
+                waitingReplicateOperations.add(operation);
+            } else {
+                runningReplicateOperations.put(id, operation);
+                executor.submit(operation);
+            }
+        }
+    }
+
+    private void completeReplicate(ILSMIOOperation operation) {
+        String id = operation.getIndexIdentifier();
+        synchronized (executor) {
+            runningReplicateOperations.remove(id);
+            // Schedule replicate in FIFO order. Must make sure that there is at most one scheduled replicate for each index.
+            for (ILSMIOOperation replicateOp : waitingReplicateOperations) {
+                String replicateOpId = replicateOp.getIndexIdentifier();
+                if (runningReplicateOperations.size() < maxNumFlushes) {
+                    if (!runningReplicateOperations.containsKey(replicateOpId) && !replicateOp.isCompleted()) {
+                        runningReplicateOperations.put(replicateOpId, replicateOp);
+                        executor.submit(replicateOp);
+                    }
+                } else {
+                    break;
+                }
+            }
+            // cleanup scheduled replicate
+            while (!waitingReplicateOperations.isEmpty()) {
+                ILSMIOOperation top = waitingReplicateOperations.peek();
+                if (top.isCompleted() || runningReplicateOperations.get(top.getIndexIdentifier()) == top) {
+                    waitingReplicateOperations.poll();
+                } else {
+                    break;
+                }
+            }
+        }
+    }
+
     @Override
     public void close() throws IOException {
         executor.shutdown();