[NO ISSUE][REPL] Exclude Non-Replicated Datasets From Delta Recovery
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Exclude non-replicated datasets files from
delta recovery.
- Fix used read buffer for large replication
requests.
Change-Id: Ic734af7becf26082e79fae52bd2c01ba567c1c99
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2412
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
index 54d3a02..b2b1ad1 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
@@ -26,9 +26,10 @@
import java.util.stream.Collectors;
import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.replication.api.IReplicationWorker;
+import org.apache.asterix.common.replication.IReplicationStrategy;
import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.asterix.replication.api.IReplicaTask;
+import org.apache.asterix.replication.api.IReplicationWorker;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -48,8 +49,10 @@
final PersistentLocalResourceRepository localResourceRepository =
(PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
localResourceRepository.cleanup(partition);
- final List<String> partitionResources = localResourceRepository.getPartitionIndexesFiles(partition).stream()
- .map(StoragePathUtil::getFileRelativePath).collect(Collectors.toList());
+ final IReplicationStrategy replicationStrategy = appCtx.getReplicationManager().getReplicationStrategy();
+ final List<String> partitionResources =
+ localResourceRepository.getPartitionReplicatedFiles(partition, replicationStrategy).stream()
+ .map(StoragePathUtil::getFileRelativePath).collect(Collectors.toList());
final PartitionResourcesListResponse response =
new PartitionResourcesListResponse(partition, partitionResources);
ReplicationProtocol.sendTo(worker.getChannel(), response, worker.getReusableBuffer());
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java
index 280a2d4..41e7d9e 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java
@@ -72,7 +72,7 @@
final ByteBuffer buf = ensureSize(dataBuffer, requestSize);
// read request
NetworkingUtil.readBytes(socketChannel, buf, requestSize);
- return dataBuffer;
+ return buf;
}
public static ReplicationRequestType getRequestType(SocketChannel socketChannel, ByteBuffer byteBuffer)
@@ -135,6 +135,7 @@
requestBuffer.put(outputStream.getByteArray(), 0, outputStream.getLength());
requestBuffer.flip();
NetworkingUtil.transferBufferToChannel(channel, requestBuffer);
+ channel.socket().getOutputStream().flush();
} catch (IOException e) {
throw new ReplicationException(e);
}
@@ -148,9 +149,9 @@
public static IReplicationMessage readMessage(ReplicationRequestType type, SocketChannel socketChannel,
ByteBuffer buffer) {
try {
- ReplicationProtocol.readRequest(socketChannel, buffer);
+ final ByteBuffer requestBuf = ReplicationProtocol.readRequest(socketChannel, buffer);
final ByteArrayInputStream input =
- new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit());
+ new ByteArrayInputStream(requestBuf.array(), requestBuf.position(), requestBuf.limit());
try (DataInputStream dis = new DataInputStream(input)) {
switch (type) {
case PARTITION_RESOURCES_REQUEST:
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
index 5658779..fae6ed6 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
@@ -27,6 +27,7 @@
import java.util.stream.Collectors;
import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.replication.IReplicationStrategy;
import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.asterix.replication.api.PartitionReplica;
import org.apache.asterix.replication.messaging.PartitionResourcesListResponse;
@@ -52,8 +53,10 @@
final Set<String> replicaFiles = getReplicaFiles(partition);
final PersistentLocalResourceRepository localResourceRepository =
(PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
- final Set<String> masterFiles = localResourceRepository.getPartitionIndexesFiles(partition).stream()
- .map(StoragePathUtil::getFileRelativePath).collect(Collectors.toSet());
+ final IReplicationStrategy replicationStrategy = appCtx.getReplicationManager().getReplicationStrategy();
+ final Set<String> masterFiles =
+ localResourceRepository.getPartitionReplicatedFiles(partition, replicationStrategy).stream()
+ .map(StoragePathUtil::getFileRelativePath).collect(Collectors.toSet());
// find files on master and not on replica
final List<String> replicaMissingFiles =
masterFiles.stream().filter(file -> !replicaFiles.contains(file)).collect(Collectors.toList());
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index ca22a84..7206382 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -52,6 +52,7 @@
import org.apache.asterix.common.dataflow.DatasetLocalResource;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.replication.IReplicationManager;
+import org.apache.asterix.common.replication.IReplicationStrategy;
import org.apache.asterix.common.replication.ReplicationJob;
import org.apache.asterix.common.storage.DatasetResourceReference;
import org.apache.asterix.common.storage.IIndexCheckpointManager;
@@ -342,18 +343,32 @@
});
}
- public List<String> getPartitionIndexesFiles(int partition) throws HyracksDataException {
- List<String> partitionFiles = new ArrayList<>();
- Set<File> partitionIndexes = getPartitionIndexes(partition);
- for (File indexDir : partitionIndexes) {
- if (indexDir.isDirectory()) {
- File[] indexFiles = indexDir.listFiles(LSM_INDEX_FILES_FILTER);
- if (indexFiles != null) {
- Stream.of(indexFiles).map(File::getAbsolutePath).forEach(partitionFiles::add);
- }
+ public List<String> getPartitionReplicatedFiles(int partition, IReplicationStrategy strategy)
+ throws HyracksDataException {
+ final List<String> partitionReplicatedFiles = new ArrayList<>();
+ final Set<File> replicatedIndexes = new HashSet<>();
+ final Map<Long, LocalResource> partitionResources = getPartitionResources(partition);
+ for (LocalResource lr : partitionResources.values()) {
+ DatasetLocalResource datasetLocalResource = (DatasetLocalResource) lr.getResource();
+ if (strategy.isMatch(datasetLocalResource.getDatasetId())) {
+ replicatedIndexes.add(ioManager.resolve(lr.getPath()).getFile());
}
}
- return partitionFiles;
+ for (File indexDir : replicatedIndexes) {
+ partitionReplicatedFiles.addAll(getIndexFiles(indexDir));
+ }
+ return partitionReplicatedFiles;
+ }
+
+ private List<String> getIndexFiles(File indexDir) {
+ final List<String> indexFiles = new ArrayList<>();
+ if (indexDir.isDirectory()) {
+ File[] indexFilteredFiles = indexDir.listFiles(LSM_INDEX_FILES_FILTER);
+ if (indexFilteredFiles != null) {
+ Stream.of(indexFilteredFiles).map(File::getAbsolutePath).forEach(indexFiles::add);
+ }
+ }
+ return indexFiles;
}
private void createStorageRoots() {