[NO ISSUE][*DB][STO][CLOUD] Respect specified cloud storage prefix

Ext-ref: MB-62581
Change-Id: Iad27033d0afb2b2b2a6c9b4e5fe04debf4bc2e69
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18444
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
Reviewed-by: Hussain Towaileb <hussainht@gmail.com>
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
index 4357b37..37872dd 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.cloud.clients.aws.s3;
 
 import java.util.Map;
+import java.util.Objects;
 
 import org.apache.asterix.common.config.CloudProperties;
 import org.apache.asterix.external.util.aws.s3.S3Constants;
@@ -40,9 +41,9 @@
 
     public S3ClientConfig(String region, String endpoint, String prefix, boolean anonymousAuth,
             long profilerLogInterval, int writeBufferSize) {
-        this.region = region;
-        this.endpoint = endpoint;
-        this.prefix = prefix;
+        this.region = Objects.requireNonNull(region, "region");
+        this.endpoint = Objects.requireNonNull(endpoint, "endpoint");
+        this.prefix = Objects.requireNonNull(prefix, "prefix");
         this.anonymousAuth = anonymousAuth;
         this.profilerLogInterval = profilerLogInterval;
         this.writeBufferSize = writeBufferSize;
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
index 254bd03..a46b61f 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
@@ -111,7 +111,8 @@
 
     @Override
     public ICloudWriter createWriter(String bucket, String path, IWriteBufferProvider bufferProvider) {
-        ICloudBufferedWriter bufferedWriter = new S3BufferedWriter(s3Client, profiler, guardian, bucket, path);
+        ICloudBufferedWriter bufferedWriter =
+                new S3BufferedWriter(s3Client, profiler, guardian, bucket, config.getPrefix() + path);
         return new CloudResettableInputStream(bufferedWriter, bufferProvider);
     }
 
@@ -120,7 +121,7 @@
         guardian.checkReadAccess(bucket, path);
         profiler.objectsList();
         path = config.isLocalS3Provider() ? encodeURI(path) : path;
-        return filterAndGet(listS3Objects(s3Client, bucket, path), filter);
+        return filterAndGet(listS3Objects(s3Client, bucket, config.getPrefix() + path), filter);
     }
 
     @Override
@@ -128,8 +129,8 @@
         guardian.checkReadAccess(bucket, path);
         profiler.objectGet();
         long readTo = offset + buffer.remaining() - 1;
-        GetObjectRequest rangeGetObjectRequest =
-                GetObjectRequest.builder().range("bytes=" + offset + "-" + readTo).bucket(bucket).key(path).build();
+        GetObjectRequest rangeGetObjectRequest = GetObjectRequest.builder().range("bytes=" + offset + "-" + readTo)
+                .bucket(bucket).key(config.getPrefix() + path).build();
 
         int totalRead = 0;
         int read = 0;
@@ -155,7 +156,7 @@
     public byte[] readAllBytes(String bucket, String path) throws HyracksDataException {
         guardian.checkReadAccess(bucket, path);
         profiler.objectGet();
-        GetObjectRequest getReq = GetObjectRequest.builder().bucket(bucket).key(path).build();
+        GetObjectRequest getReq = GetObjectRequest.builder().bucket(bucket).key(config.getPrefix() + path).build();
 
         try (ResponseInputStream<GetObjectResponse> stream = s3Client.getObject(getReq)) {
             return stream.readAllBytes();
@@ -171,8 +172,8 @@
         guardian.checkReadAccess(bucket, path);
         profiler.objectGet();
         long readTo = offset + length - 1;
-        GetObjectRequest getReq =
-                GetObjectRequest.builder().range("bytes=" + offset + "-" + readTo).bucket(bucket).key(path).build();
+        GetObjectRequest getReq = GetObjectRequest.builder().range("bytes=" + offset + "-" + readTo).bucket(bucket)
+                .key(config.getPrefix() + path).build();
         try {
             return s3Client.getObject(getReq);
         } catch (NoSuchKeyException e) {
@@ -185,13 +186,14 @@
     public void write(String bucket, String path, byte[] data) {
         guardian.checkWriteAccess(bucket, path);
         profiler.objectWrite();
-        PutObjectRequest putReq = PutObjectRequest.builder().bucket(bucket).key(path).build();
+        PutObjectRequest putReq = PutObjectRequest.builder().bucket(bucket).key(config.getPrefix() + path).build();
         s3Client.putObject(putReq, RequestBody.fromBytes(data));
     }
 
     @Override
     public void copy(String bucket, String srcPath, FileReference destPath) {
         guardian.checkReadAccess(bucket, srcPath);
+        srcPath = config.getPrefix() + srcPath;
         srcPath = config.isLocalS3Provider() ? encodeURI(srcPath) : srcPath;
         List<S3Object> objects = listS3Objects(s3Client, bucket, srcPath);
 
@@ -202,7 +204,7 @@
             String srcKey = object.key();
             String destKey = destPath.getChildPath(IoUtil.getFileNameFromPath(srcKey));
             CopyObjectRequest copyReq = CopyObjectRequest.builder().sourceBucket(bucket).sourceKey(srcKey)
-                    .destinationBucket(bucket).destinationKey(destKey).build();
+                    .destinationBucket(bucket).destinationKey(config.getPrefix() + destKey).build();
             s3Client.copyObject(copyReq);
         }
     }
@@ -221,7 +223,7 @@
             for (int i = 0; pathIter.hasNext() && i < DELETE_BATCH_SIZE; i++) {
                 String path = pathIter.next();
                 guardian.checkWriteAccess(bucket, path);
-                objectIdentifiers.add(builder.key(path).build());
+                objectIdentifiers.add(builder.key(config.getPrefix() + path).build());
             }
 
             Delete delete = Delete.builder().objects(objectIdentifiers).build();
@@ -236,7 +238,9 @@
         guardian.checkReadAccess(bucket, path);
         profiler.objectGet();
         try {
-            return s3Client.headObject(HeadObjectRequest.builder().bucket(bucket).key(path).build()).contentLength();
+            return s3Client
+                    .headObject(HeadObjectRequest.builder().bucket(bucket).key(config.getPrefix() + path).build())
+                    .contentLength();
         } catch (NoSuchKeyException ex) {
             return 0;
         } catch (Exception ex) {
@@ -249,7 +253,7 @@
         guardian.checkReadAccess(bucket, path);
         profiler.objectGet();
         try {
-            s3Client.headObject(HeadObjectRequest.builder().bucket(bucket).key(path).build());
+            s3Client.headObject(HeadObjectRequest.builder().bucket(bucket).key(config.getPrefix() + path).build());
             return true;
         } catch (NoSuchKeyException ex) {
             return false;
@@ -261,7 +265,7 @@
     @Override
     public boolean isEmptyPrefix(String bucket, String path) throws HyracksDataException {
         profiler.objectsList();
-        return S3ClientUtils.isEmptyPrefix(s3Client, bucket, path);
+        return S3ClientUtils.isEmptyPrefix(s3Client, bucket, config.getPrefix() + path);
     }
 
     @Override
@@ -271,13 +275,13 @@
 
     @Override
     public JsonNode listAsJson(ObjectMapper objectMapper, String bucket) {
-        List<S3Object> objects = listS3Objects(s3Client, bucket, "/");
+        List<S3Object> objects = listS3Objects(s3Client, bucket, config.getPrefix());
         ArrayNode objectsInfo = objectMapper.createArrayNode();
 
         objects.sort((x, y) -> String.CASE_INSENSITIVE_ORDER.compare(x.key(), y.key()));
         for (S3Object object : objects) {
             ObjectNode objectInfo = objectsInfo.addObject();
-            objectInfo.put("path", object.key());
+            objectInfo.put("path", object.key().substring(config.getPrefix().length()));
             objectInfo.put("size", object.size());
         }
         return objectsInfo;
@@ -292,7 +296,7 @@
      * FOR TESTING ONLY
      */
     public ICloudBufferedWriter createBufferedWriter(String bucket, String path) {
-        return new S3BufferedWriter(s3Client, profiler, guardian, bucket, path);
+        return new S3BufferedWriter(s3Client, profiler, guardian, bucket, config.getPrefix() + path);
     }
 
     private static S3Client buildClient(S3ClientConfig config) {
@@ -316,6 +320,7 @@
         for (S3Object s3Object : contents) {
             String path = config.isLocalS3Provider() ? S3ClientUtils.decodeURI(s3Object.key()) : s3Object.key();
             if (filter.accept(null, IoUtil.getFileNameFromPath(path))) {
+                path = path.substring(config.getPrefix().length());
                 files.add(CloudFile.of(path, s3Object.size()));
             }
         }
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
index c5d9b73..005fa5a 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
@@ -57,11 +57,13 @@
     private final IOManager ioManager;
     private final S3AsyncClient s3AsyncClient;
     private final S3TransferManager transferManager;
+    private final S3ClientConfig config;
     private final IRequestProfiler profiler;
 
     S3ParallelDownloader(String bucket, IOManager ioManager, S3ClientConfig config, IRequestProfiler profiler) {
         this.bucket = bucket;
         this.ioManager = ioManager;
+        this.config = config;
         this.profiler = profiler;
         s3AsyncClient = createAsyncClient(config);
         transferManager = createS3TransferManager(s3AsyncClient);
@@ -110,7 +112,7 @@
             // GetObjectRequest
             GetObjectRequest.Builder requestBuilder = GetObjectRequest.builder();
             requestBuilder.bucket(bucket);
-            requestBuilder.key(fileReference.getRelativePath());
+            requestBuilder.key(config.getPrefix() + fileReference.getRelativePath());
 
             // Download object
             DownloadFileRequest.Builder builder = DownloadFileRequest.builder();
@@ -138,7 +140,8 @@
             DownloadDirectoryRequest.Builder builder = DownloadDirectoryRequest.builder();
             builder.bucket(bucket);
             builder.destination(fileReference.getFile().toPath());
-            builder.listObjectsV2RequestTransformer(l -> l.prefix(fileReference.getRelativePath()));
+            builder.listObjectsV2RequestTransformer(
+                    l -> l.prefix(config.getPrefix() + fileReference.getRelativePath()));
             DirectoryDownload directoryDownload = transferManager.downloadDirectory(builder.build());
             downloads.add(directoryDownload.completionFuture());
         }