[ASTERIXDB-2870][EXT]: Close client after it is used
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Close the S3 client after it is used to release
the connections.
Change-Id: I8611b5a05fcbd8a4a9a4556c290281fd5cbd56a4
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/10885
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
index f558820..8bd7a51 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
@@ -30,6 +30,7 @@
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.external.input.stream.AbstractMultipleInputStream;
import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.util.CleanupUtils;
import org.apache.hyracks.util.LogRedactionUtil;
@@ -46,7 +47,7 @@
private static final Logger LOGGER = LogManager.getLogger();
// Configuration
- private final Map<String, String> configuration;
+ private final String bucket;
private final int bufferSize;
private final S3Client s3Client;
@@ -56,10 +57,10 @@
private int nextFileIndex = 0;
public AwsS3InputStream(Map<String, String> configuration, List<String> filePaths) throws HyracksDataException {
- this.configuration = configuration;
this.filePaths = filePaths;
this.s3Client = buildAwsS3Client(configuration);
this.bufferSize = ExternalDataUtils.getOrDefaultBufferSize(configuration);
+ this.bucket = configuration.get(AwsS3.CONTAINER_NAME_FIELD_NAME);
}
@Override
@@ -71,9 +72,6 @@
// Finished reading all the files
if (nextFileIndex >= filePaths.size()) {
- if (in != null) {
- CleanupUtils.close(in, null);
- }
return false;
}
@@ -82,9 +80,9 @@
CleanupUtils.close(in, null);
}
- String bucket = configuration.get(AwsS3.CONTAINER_NAME_FIELD_NAME);
+ String fileName = filePaths.get(nextFileIndex);
GetObjectRequest.Builder getObjectBuilder = GetObjectRequest.builder();
- GetObjectRequest getObjectRequest = getObjectBuilder.bucket(bucket).key(filePaths.get(nextFileIndex)).build();
+ GetObjectRequest getObjectRequest = getObjectBuilder.bucket(bucket).key(fileName).build();
// Have a reference to the S3 stream to ensure that if GZipInputStream causes an IOException because of reading
// the header, then the S3 stream gets closed in the close method
@@ -100,9 +98,8 @@
}
// Use gzip stream if needed
- String filename = filePaths.get(nextFileIndex).toLowerCase();
- if (filename.endsWith(".gz") || filename.endsWith(".gzip")) {
- in = new GZIPInputStream(s3Client.getObject(getObjectRequest), bufferSize);
+ if (StringUtils.endsWithIgnoreCase(fileName, ".gz") || StringUtils.endsWithIgnoreCase(fileName, ".gzip")) {
+ in = new GZIPInputStream(in, bufferSize);
}
// Current file ready, point to the next file
@@ -136,6 +133,9 @@
if (in != null) {
CleanupUtils.close(in, null);
}
+ if (s3Client != null) {
+ CleanupUtils.close(s3Client, null);
+ }
}
@Override