Merge branch 'gerrit/neo' into 'gerrit/trinity'
Change-Id: I511c46368cb0dbf06103d4bd9cf4d2e9c2f558d9
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
index bbcf9cd..0f7f71a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
@@ -37,6 +37,7 @@
import org.apache.hyracks.api.util.CleanupUtils;
import org.apache.hyracks.util.LogRedactionUtil;
+import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
@@ -48,6 +49,7 @@
// Configuration
private final String bucket;
private final S3Client s3Client;
+ private ResponseInputStream<?> s3InStream;
private static final int MAX_RETRIES = 5; // We will retry 5 times in case of internal error from AWS S3 service
public AwsS3InputStream(Map<String, String> configuration, List<String> filePaths) throws HyracksDataException {
@@ -82,7 +84,8 @@
int retries = 0;
while (retries < MAX_RETRIES) {
try {
- in = s3Client.getObject(request);
+ s3InStream = s3Client.getObject(request);
+ in = s3InStream;
break;
} catch (NoSuchKeyException ex) {
LOGGER.debug(() -> "Key " + LogRedactionUtil.userData(request.key()) + " was not found in bucket "
@@ -114,6 +117,9 @@
@Override
public void close() throws IOException {
if (in != null) {
+ if (s3InStream != null) {
+ s3InStream.abort();
+ }
CleanupUtils.close(in, null);
}
if (s3Client != null) {