[NO ISSUE][REPL] Exclude Non-Replicated Datasets From Delta Recovery

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Exclude non-replicated datasets files from
  delta recovery.
- Fix used read buffer for large replication
  requests.

Change-Id: Ic734af7becf26082e79fae52bd2c01ba567c1c99
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2412
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
index 54d3a02..b2b1ad1 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
@@ -26,9 +26,10 @@
 import java.util.stream.Collectors;
 
 import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.replication.api.IReplicationWorker;
+import org.apache.asterix.common.replication.IReplicationStrategy;
 import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.asterix.replication.api.IReplicaTask;
+import org.apache.asterix.replication.api.IReplicationWorker;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
@@ -48,8 +49,10 @@
         final PersistentLocalResourceRepository localResourceRepository =
                 (PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
         localResourceRepository.cleanup(partition);
-        final List<String> partitionResources = localResourceRepository.getPartitionIndexesFiles(partition).stream()
-                .map(StoragePathUtil::getFileRelativePath).collect(Collectors.toList());
+        final IReplicationStrategy replicationStrategy = appCtx.getReplicationManager().getReplicationStrategy();
+        final List<String> partitionResources =
+                localResourceRepository.getPartitionReplicatedFiles(partition, replicationStrategy).stream()
+                        .map(StoragePathUtil::getFileRelativePath).collect(Collectors.toList());
         final PartitionResourcesListResponse response =
                 new PartitionResourcesListResponse(partition, partitionResources);
         ReplicationProtocol.sendTo(worker.getChannel(), response, worker.getReusableBuffer());
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java
index 280a2d4..41e7d9e 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java
@@ -72,7 +72,7 @@
         final ByteBuffer buf = ensureSize(dataBuffer, requestSize);
         // read request
         NetworkingUtil.readBytes(socketChannel, buf, requestSize);
-        return dataBuffer;
+        return buf;
     }
 
     public static ReplicationRequestType getRequestType(SocketChannel socketChannel, ByteBuffer byteBuffer)
@@ -135,6 +135,7 @@
             requestBuffer.put(outputStream.getByteArray(), 0, outputStream.getLength());
             requestBuffer.flip();
             NetworkingUtil.transferBufferToChannel(channel, requestBuffer);
+            channel.socket().getOutputStream().flush();
         } catch (IOException e) {
             throw new ReplicationException(e);
         }
@@ -148,9 +149,9 @@
     public static IReplicationMessage readMessage(ReplicationRequestType type, SocketChannel socketChannel,
             ByteBuffer buffer) {
         try {
-            ReplicationProtocol.readRequest(socketChannel, buffer);
+            final ByteBuffer requestBuf = ReplicationProtocol.readRequest(socketChannel, buffer);
             final ByteArrayInputStream input =
-                    new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit());
+                    new ByteArrayInputStream(requestBuf.array(), requestBuf.position(), requestBuf.limit());
             try (DataInputStream dis = new DataInputStream(input)) {
                 switch (type) {
                     case PARTITION_RESOURCES_REQUEST:
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
index 5658779..fae6ed6 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
@@ -27,6 +27,7 @@
 import java.util.stream.Collectors;
 
 import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.replication.IReplicationStrategy;
 import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.asterix.replication.api.PartitionReplica;
 import org.apache.asterix.replication.messaging.PartitionResourcesListResponse;
@@ -52,8 +53,10 @@
         final Set<String> replicaFiles = getReplicaFiles(partition);
         final PersistentLocalResourceRepository localResourceRepository =
                 (PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
-        final Set<String> masterFiles = localResourceRepository.getPartitionIndexesFiles(partition).stream()
-                .map(StoragePathUtil::getFileRelativePath).collect(Collectors.toSet());
+        final IReplicationStrategy replicationStrategy = appCtx.getReplicationManager().getReplicationStrategy();
+        final Set<String> masterFiles =
+                localResourceRepository.getPartitionReplicatedFiles(partition, replicationStrategy).stream()
+                        .map(StoragePathUtil::getFileRelativePath).collect(Collectors.toSet());
         // find files on master and not on replica
         final List<String> replicaMissingFiles =
                 masterFiles.stream().filter(file -> !replicaFiles.contains(file)).collect(Collectors.toList());
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index ca22a84..7206382 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -52,6 +52,7 @@
 import org.apache.asterix.common.dataflow.DatasetLocalResource;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.replication.IReplicationManager;
+import org.apache.asterix.common.replication.IReplicationStrategy;
 import org.apache.asterix.common.replication.ReplicationJob;
 import org.apache.asterix.common.storage.DatasetResourceReference;
 import org.apache.asterix.common.storage.IIndexCheckpointManager;
@@ -342,18 +343,32 @@
         });
     }
 
-    public List<String> getPartitionIndexesFiles(int partition) throws HyracksDataException {
-        List<String> partitionFiles = new ArrayList<>();
-        Set<File> partitionIndexes = getPartitionIndexes(partition);
-        for (File indexDir : partitionIndexes) {
-            if (indexDir.isDirectory()) {
-                File[] indexFiles = indexDir.listFiles(LSM_INDEX_FILES_FILTER);
-                if (indexFiles != null) {
-                    Stream.of(indexFiles).map(File::getAbsolutePath).forEach(partitionFiles::add);
-                }
+    public List<String> getPartitionReplicatedFiles(int partition, IReplicationStrategy strategy)
+            throws HyracksDataException {
+        final List<String> partitionReplicatedFiles = new ArrayList<>();
+        final Set<File> replicatedIndexes = new HashSet<>();
+        final Map<Long, LocalResource> partitionResources = getPartitionResources(partition);
+        for (LocalResource lr : partitionResources.values()) {
+            DatasetLocalResource datasetLocalResource = (DatasetLocalResource) lr.getResource();
+            if (strategy.isMatch(datasetLocalResource.getDatasetId())) {
+                replicatedIndexes.add(ioManager.resolve(lr.getPath()).getFile());
             }
         }
-        return partitionFiles;
+        for (File indexDir : replicatedIndexes) {
+            partitionReplicatedFiles.addAll(getIndexFiles(indexDir));
+        }
+        return partitionReplicatedFiles;
+    }
+
+    private List<String> getIndexFiles(File indexDir) {
+        final List<String> indexFiles = new ArrayList<>();
+        if (indexDir.isDirectory()) {
+            File[] indexFilteredFiles = indexDir.listFiles(LSM_INDEX_FILES_FILTER);
+            if (indexFilteredFiles != null) {
+                Stream.of(indexFilteredFiles).map(File::getAbsolutePath).forEach(indexFiles::add);
+            }
+        }
+        return indexFiles;
     }
 
     private void createStorageRoots() {