[ASTERIXDB-3623][OTH] Wait for submitted cloud requests on exceeding max pending HTTP connections
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
When downloading files in parallel, download up to max allowed pending,
then wait for them to (acquire a connection and) finish.
Continue with the rest.
Ext-ref: MB-67243
Change-Id: Ifd1a30c6eed5f316a2d7a17b685f537c24e6c0d2
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19954
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
index 4d27c5a..2acc12e 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
@@ -77,8 +77,7 @@
@Override
public void downloadFiles(Collection<FileReference> toDownload) throws HyracksDataException {
try {
- List<CompletableFuture<CompletedFileDownload>> downloads = startDownloadingFiles(toDownload);
- waitForFileDownloads(downloads);
+ downloadFilesAndWait(toDownload);
} catch (IOException | ExecutionException | InterruptedException e) {
throw HyracksDataException.create(e);
}
@@ -104,9 +103,10 @@
s3AsyncClient.close();
}
- private List<CompletableFuture<CompletedFileDownload>> startDownloadingFiles(Collection<FileReference> toDownload)
- throws IOException {
+ private void downloadFilesAndWait(Collection<FileReference> toDownload)
+ throws IOException, ExecutionException, InterruptedException {
List<CompletableFuture<CompletedFileDownload>> downloads = new ArrayList<>();
+ int maxPending = config.getRequestsMaxPendingHttpConnections();
for (FileReference fileReference : toDownload) {
// multipart download
profiler.objectGet();
@@ -126,13 +126,18 @@
FileDownload fileDownload = transferManager.downloadFile(builder.build());
downloads.add(fileDownload.completionFuture());
+ if (maxPending > 0 && downloads.size() >= maxPending) {
+ waitForFileDownloads(downloads);
+ downloads.clear();
+ }
}
- return downloads;
+ if (!downloads.isEmpty()) {
+ waitForFileDownloads(downloads);
+ }
}
private void waitForFileDownloads(List<CompletableFuture<CompletedFileDownload>> downloads)
throws ExecutionException, InterruptedException {
-
for (CompletableFuture<CompletedFileDownload> download : downloads) {
download.get();
}