[NO ISSUE][*DB][STO][CLOUD] Respect specified cloud storage prefix
Ext-ref: MB-62581
Change-Id: Iad27033d0afb2b2b2a6c9b4e5fe04debf4bc2e69
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18444
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
Reviewed-by: Hussain Towaileb <hussainht@gmail.com>
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
index 4357b37..37872dd 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
@@ -19,6 +19,7 @@
package org.apache.asterix.cloud.clients.aws.s3;
import java.util.Map;
+import java.util.Objects;
import org.apache.asterix.common.config.CloudProperties;
import org.apache.asterix.external.util.aws.s3.S3Constants;
@@ -40,9 +41,9 @@
public S3ClientConfig(String region, String endpoint, String prefix, boolean anonymousAuth,
long profilerLogInterval, int writeBufferSize) {
- this.region = region;
- this.endpoint = endpoint;
- this.prefix = prefix;
+ this.region = Objects.requireNonNull(region, "region");
+ this.endpoint = Objects.requireNonNull(endpoint, "endpoint");
+ this.prefix = Objects.requireNonNull(prefix, "prefix");
this.anonymousAuth = anonymousAuth;
this.profilerLogInterval = profilerLogInterval;
this.writeBufferSize = writeBufferSize;
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
index 254bd03..a46b61f 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
@@ -111,7 +111,8 @@
@Override
public ICloudWriter createWriter(String bucket, String path, IWriteBufferProvider bufferProvider) {
- ICloudBufferedWriter bufferedWriter = new S3BufferedWriter(s3Client, profiler, guardian, bucket, path);
+ ICloudBufferedWriter bufferedWriter =
+ new S3BufferedWriter(s3Client, profiler, guardian, bucket, config.getPrefix() + path);
return new CloudResettableInputStream(bufferedWriter, bufferProvider);
}
@@ -120,7 +121,7 @@
guardian.checkReadAccess(bucket, path);
profiler.objectsList();
path = config.isLocalS3Provider() ? encodeURI(path) : path;
- return filterAndGet(listS3Objects(s3Client, bucket, path), filter);
+ return filterAndGet(listS3Objects(s3Client, bucket, config.getPrefix() + path), filter);
}
@Override
@@ -128,8 +129,8 @@
guardian.checkReadAccess(bucket, path);
profiler.objectGet();
long readTo = offset + buffer.remaining() - 1;
- GetObjectRequest rangeGetObjectRequest =
- GetObjectRequest.builder().range("bytes=" + offset + "-" + readTo).bucket(bucket).key(path).build();
+ GetObjectRequest rangeGetObjectRequest = GetObjectRequest.builder().range("bytes=" + offset + "-" + readTo)
+ .bucket(bucket).key(config.getPrefix() + path).build();
int totalRead = 0;
int read = 0;
@@ -155,7 +156,7 @@
public byte[] readAllBytes(String bucket, String path) throws HyracksDataException {
guardian.checkReadAccess(bucket, path);
profiler.objectGet();
- GetObjectRequest getReq = GetObjectRequest.builder().bucket(bucket).key(path).build();
+ GetObjectRequest getReq = GetObjectRequest.builder().bucket(bucket).key(config.getPrefix() + path).build();
try (ResponseInputStream<GetObjectResponse> stream = s3Client.getObject(getReq)) {
return stream.readAllBytes();
@@ -171,8 +172,8 @@
guardian.checkReadAccess(bucket, path);
profiler.objectGet();
long readTo = offset + length - 1;
- GetObjectRequest getReq =
- GetObjectRequest.builder().range("bytes=" + offset + "-" + readTo).bucket(bucket).key(path).build();
+ GetObjectRequest getReq = GetObjectRequest.builder().range("bytes=" + offset + "-" + readTo).bucket(bucket)
+ .key(config.getPrefix() + path).build();
try {
return s3Client.getObject(getReq);
} catch (NoSuchKeyException e) {
@@ -185,13 +186,14 @@
public void write(String bucket, String path, byte[] data) {
guardian.checkWriteAccess(bucket, path);
profiler.objectWrite();
- PutObjectRequest putReq = PutObjectRequest.builder().bucket(bucket).key(path).build();
+ PutObjectRequest putReq = PutObjectRequest.builder().bucket(bucket).key(config.getPrefix() + path).build();
s3Client.putObject(putReq, RequestBody.fromBytes(data));
}
@Override
public void copy(String bucket, String srcPath, FileReference destPath) {
guardian.checkReadAccess(bucket, srcPath);
+ srcPath = config.getPrefix() + srcPath;
srcPath = config.isLocalS3Provider() ? encodeURI(srcPath) : srcPath;
List<S3Object> objects = listS3Objects(s3Client, bucket, srcPath);
@@ -202,7 +204,7 @@
String srcKey = object.key();
String destKey = destPath.getChildPath(IoUtil.getFileNameFromPath(srcKey));
CopyObjectRequest copyReq = CopyObjectRequest.builder().sourceBucket(bucket).sourceKey(srcKey)
- .destinationBucket(bucket).destinationKey(destKey).build();
+ .destinationBucket(bucket).destinationKey(config.getPrefix() + destKey).build();
s3Client.copyObject(copyReq);
}
}
@@ -221,7 +223,7 @@
for (int i = 0; pathIter.hasNext() && i < DELETE_BATCH_SIZE; i++) {
String path = pathIter.next();
guardian.checkWriteAccess(bucket, path);
- objectIdentifiers.add(builder.key(path).build());
+ objectIdentifiers.add(builder.key(config.getPrefix() + path).build());
}
Delete delete = Delete.builder().objects(objectIdentifiers).build();
@@ -236,7 +238,9 @@
guardian.checkReadAccess(bucket, path);
profiler.objectGet();
try {
- return s3Client.headObject(HeadObjectRequest.builder().bucket(bucket).key(path).build()).contentLength();
+ return s3Client
+ .headObject(HeadObjectRequest.builder().bucket(bucket).key(config.getPrefix() + path).build())
+ .contentLength();
} catch (NoSuchKeyException ex) {
return 0;
} catch (Exception ex) {
@@ -249,7 +253,7 @@
guardian.checkReadAccess(bucket, path);
profiler.objectGet();
try {
- s3Client.headObject(HeadObjectRequest.builder().bucket(bucket).key(path).build());
+ s3Client.headObject(HeadObjectRequest.builder().bucket(bucket).key(config.getPrefix() + path).build());
return true;
} catch (NoSuchKeyException ex) {
return false;
@@ -261,7 +265,7 @@
@Override
public boolean isEmptyPrefix(String bucket, String path) throws HyracksDataException {
profiler.objectsList();
- return S3ClientUtils.isEmptyPrefix(s3Client, bucket, path);
+ return S3ClientUtils.isEmptyPrefix(s3Client, bucket, config.getPrefix() + path);
}
@Override
@@ -271,13 +275,13 @@
@Override
public JsonNode listAsJson(ObjectMapper objectMapper, String bucket) {
- List<S3Object> objects = listS3Objects(s3Client, bucket, "/");
+ List<S3Object> objects = listS3Objects(s3Client, bucket, config.getPrefix());
ArrayNode objectsInfo = objectMapper.createArrayNode();
objects.sort((x, y) -> String.CASE_INSENSITIVE_ORDER.compare(x.key(), y.key()));
for (S3Object object : objects) {
ObjectNode objectInfo = objectsInfo.addObject();
- objectInfo.put("path", object.key());
+ objectInfo.put("path", object.key().substring(config.getPrefix().length()));
objectInfo.put("size", object.size());
}
return objectsInfo;
@@ -292,7 +296,7 @@
* FOR TESTING ONLY
*/
public ICloudBufferedWriter createBufferedWriter(String bucket, String path) {
- return new S3BufferedWriter(s3Client, profiler, guardian, bucket, path);
+ return new S3BufferedWriter(s3Client, profiler, guardian, bucket, config.getPrefix() + path);
}
private static S3Client buildClient(S3ClientConfig config) {
@@ -316,6 +320,7 @@
for (S3Object s3Object : contents) {
String path = config.isLocalS3Provider() ? S3ClientUtils.decodeURI(s3Object.key()) : s3Object.key();
if (filter.accept(null, IoUtil.getFileNameFromPath(path))) {
+ path = path.substring(config.getPrefix().length());
files.add(CloudFile.of(path, s3Object.size()));
}
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
index c5d9b73..005fa5a 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
@@ -57,11 +57,13 @@
private final IOManager ioManager;
private final S3AsyncClient s3AsyncClient;
private final S3TransferManager transferManager;
+ private final S3ClientConfig config;
private final IRequestProfiler profiler;
S3ParallelDownloader(String bucket, IOManager ioManager, S3ClientConfig config, IRequestProfiler profiler) {
this.bucket = bucket;
this.ioManager = ioManager;
+ this.config = config;
this.profiler = profiler;
s3AsyncClient = createAsyncClient(config);
transferManager = createS3TransferManager(s3AsyncClient);
@@ -110,7 +112,7 @@
// GetObjectRequest
GetObjectRequest.Builder requestBuilder = GetObjectRequest.builder();
requestBuilder.bucket(bucket);
- requestBuilder.key(fileReference.getRelativePath());
+ requestBuilder.key(config.getPrefix() + fileReference.getRelativePath());
// Download object
DownloadFileRequest.Builder builder = DownloadFileRequest.builder();
@@ -138,7 +140,8 @@
DownloadDirectoryRequest.Builder builder = DownloadDirectoryRequest.builder();
builder.bucket(bucket);
builder.destination(fileReference.getFile().toPath());
- builder.listObjectsV2RequestTransformer(l -> l.prefix(fileReference.getRelativePath()));
+ builder.listObjectsV2RequestTransformer(
+ l -> l.prefix(config.getPrefix() + fileReference.getRelativePath()));
DirectoryDownload directoryDownload = transferManager.downloadDirectory(builder.build());
downloads.add(directoryDownload.completionFuture());
}