ASTERIXDB-3493: The storage engine should use the cloud storage prefix whilst writing objects to Google Cloud Storage (GCS)

- user model changes: no
- storage format changes: no
- interface changes: no

Ext-ref: MB-63369

Details:

When the object store used at the backend is Google Cloud Storage, the current storage engine implementation does not use
the cloud storage prefix, as a result the database objects are written directly under the bucket.

After the implementation the expectation is that the database objects would be written under the prefix directory.

Change-Id: I7b84bf98272581bc96851855d4bd8663780ab611
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18770
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSClientConfig.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSClientConfig.java
index fe5dd4d..e4e471d 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSClientConfig.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSClientConfig.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.cloud.clients.google.gcs;
 
 import static org.apache.asterix.external.util.google.gcs.GCSConstants.ENDPOINT_FIELD_NAME;
+import static org.apache.asterix.external.util.google.gcs.GCSConstants.STORAGE_PREFIX;
 
 import java.io.IOException;
 import java.util.Map;
@@ -42,10 +43,11 @@
     private final int readMaxRequestsPerSeconds;
     private final int writeMaxRequestsPerSeconds;
     private final int writeBufferSize;
+    private final String prefix;
 
     private GCSClientConfig(String region, String endpoint, boolean anonymousAuth, long profilerLogInterval,
             long tokenAcquireTimeout, int writeMaxRequestsPerSeconds, int readMaxRequestsPerSeconds,
-            int writeBufferSize) {
+            int writeBufferSize, String prefix) {
         this.region = region;
         this.endpoint = endpoint;
         this.anonymousAuth = anonymousAuth;
@@ -54,18 +56,20 @@
         this.writeMaxRequestsPerSeconds = writeMaxRequestsPerSeconds;
         this.readMaxRequestsPerSeconds = readMaxRequestsPerSeconds;
         this.writeBufferSize = writeBufferSize;
+        this.prefix = prefix;
     }
 
     public GCSClientConfig(String region, String endpoint, boolean anonymousAuth, long profilerLogInterval,
-            int writeBufferSize) {
-        this(region, endpoint, anonymousAuth, profilerLogInterval, 1, 0, 0, writeBufferSize);
+            int writeBufferSize, String prefix) {
+        this(region, endpoint, anonymousAuth, profilerLogInterval, 1, 0, 0, writeBufferSize, prefix);
     }
 
     public static GCSClientConfig of(CloudProperties cloudProperties) {
         return new GCSClientConfig(cloudProperties.getStorageRegion(), cloudProperties.getStorageEndpoint(),
                 cloudProperties.isStorageAnonymousAuth(), cloudProperties.getProfilerLogInterval(),
                 cloudProperties.getTokenAcquireTimeout(), cloudProperties.getWriteMaxRequestsPerSecond(),
-                cloudProperties.getReadMaxRequestsPerSecond(), cloudProperties.getWriteBufferSize());
+                cloudProperties.getReadMaxRequestsPerSecond(), cloudProperties.getWriteBufferSize(),
+                cloudProperties.getStoragePrefix());
     }
 
     public static GCSClientConfig of(Map<String, String> configuration, int writeBufferSize) {
@@ -73,10 +77,10 @@
         long profilerLogInterval = 0;
 
         String region = "";
-        String prefix = "";
+        String prefix = configuration.getOrDefault(STORAGE_PREFIX, "");
         boolean anonymousAuth = false;
 
-        return new GCSClientConfig(region, endPoint, anonymousAuth, profilerLogInterval, writeBufferSize);
+        return new GCSClientConfig(region, endPoint, anonymousAuth, profilerLogInterval, writeBufferSize, prefix);
     }
 
     public String getRegion() {
@@ -118,4 +122,8 @@
     public int getWriteBufferSize() {
         return writeBufferSize;
     }
+
+    public String getPrefix() {
+        return prefix;
+    }
 }
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
index 010a6bb..a74e7d3 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
@@ -100,20 +100,20 @@
 
     @Override
     public ICloudWriter createWriter(String bucket, String path, IWriteBufferProvider bufferProvider) {
-        return new GCSWriter(bucket, path, gcsClient, profilerLimiter, guardian, writeBufferSize);
+        return new GCSWriter(bucket, config.getPrefix() + path, gcsClient, profilerLimiter, guardian, writeBufferSize);
     }
 
     @Override
     public Set<CloudFile> listObjects(String bucket, String path, FilenameFilter filter) {
         guardian.checkReadAccess(bucket, path);
         profilerLimiter.objectsList();
-        Page<Blob> blobs =
-                gcsClient.list(bucket, BlobListOption.prefix(path), BlobListOption.fields(Storage.BlobField.SIZE));
+        Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.prefix(config.getPrefix() + path),
+                BlobListOption.fields(Storage.BlobField.SIZE));
 
         Set<CloudFile> files = new HashSet<>();
         for (Blob blob : blobs.iterateAll()) {
             if (filter.accept(null, IoUtil.getFileNameFromPath(blob.getName()))) {
-                files.add(CloudFile.of(blob.getName(), blob.getSize()));
+                files.add(CloudFile.of(stripCloudPrefix(blob.getName()), blob.getSize()));
             }
         }
         return files;
@@ -123,7 +123,7 @@
     public int read(String bucket, String path, long offset, ByteBuffer buffer) throws HyracksDataException {
         guardian.checkReadAccess(bucket, path);
         profilerLimiter.objectGet();
-        BlobId blobId = BlobId.of(bucket, path);
+        BlobId blobId = BlobId.of(bucket, config.getPrefix() + path);
         long readTo = offset + buffer.remaining();
         int totalRead = 0;
         try (ReadChannel from = gcsClient.reader(blobId).limit(readTo)) {
@@ -145,7 +145,7 @@
     public byte[] readAllBytes(String bucket, String path) {
         guardian.checkReadAccess(bucket, path);
         profilerLimiter.objectGet();
-        BlobId blobId = BlobId.of(bucket, path);
+        BlobId blobId = BlobId.of(bucket, config.getPrefix() + path);
         try {
             return gcsClient.readAllBytes(blobId);
         } catch (StorageException e) {
@@ -157,7 +157,7 @@
     public InputStream getObjectStream(String bucket, String path, long offset, long length) {
         guardian.checkReadAccess(bucket, path);
         profilerLimiter.objectGet();
-        try (ReadChannel reader = gcsClient.reader(bucket, path).limit(offset + length)) {
+        try (ReadChannel reader = gcsClient.reader(bucket, config.getPrefix() + path).limit(offset + length)) {
             reader.seek(offset);
             return Channels.newInputStream(reader);
         } catch (StorageException | IOException e) {
@@ -169,7 +169,7 @@
     public void write(String bucket, String path, byte[] data) {
         guardian.checkWriteAccess(bucket, path);
         profilerLimiter.objectWrite();
-        BlobInfo blobInfo = BlobInfo.newBuilder(bucket, path).build();
+        BlobInfo blobInfo = BlobInfo.newBuilder(bucket, config.getPrefix() + path).build();
         gcsClient.create(blobInfo, data);
     }
 
@@ -177,7 +177,7 @@
     public void copy(String bucket, String srcPath, FileReference destPath) {
         guardian.checkReadAccess(bucket, srcPath);
         profilerLimiter.objectsList();
-        Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.prefix(srcPath));
+        Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.prefix(config.getPrefix() + srcPath));
         for (Blob blob : blobs.iterateAll()) {
             profilerLimiter.objectCopy();
             BlobId source = blob.getBlobId();
@@ -200,7 +200,7 @@
         while (pathIter.hasNext()) {
             batchRequest = gcsClient.batch();
             for (int i = 0; pathIter.hasNext() && i < DELETE_BATCH_SIZE; i++) {
-                BlobId blobId = BlobId.of(bucket, pathIter.next());
+                BlobId blobId = BlobId.of(bucket, config.getPrefix() + pathIter.next());
                 guardian.checkWriteAccess(bucket, blobId.getName());
                 batchRequest.delete(blobId);
             }
@@ -214,7 +214,8 @@
     public long getObjectSize(String bucket, String path) {
         guardian.checkReadAccess(bucket, path);
         profilerLimiter.objectGet();
-        Blob blob = gcsClient.get(bucket, path, Storage.BlobGetOption.fields(Storage.BlobField.SIZE));
+        Blob blob =
+                gcsClient.get(bucket, config.getPrefix() + path, Storage.BlobGetOption.fields(Storage.BlobField.SIZE));
         if (blob == null) {
             return 0;
         }
@@ -225,7 +226,8 @@
     public boolean exists(String bucket, String path) {
         guardian.checkReadAccess(bucket, path);
         profilerLimiter.objectGet();
-        Blob blob = gcsClient.get(bucket, path, Storage.BlobGetOption.fields(Storage.BlobField.values()));
+        Blob blob = gcsClient.get(bucket, config.getPrefix() + path,
+                Storage.BlobGetOption.fields(Storage.BlobField.values()));
         return blob != null && blob.exists();
     }
 
@@ -233,7 +235,7 @@
     public boolean isEmptyPrefix(String bucket, String path) {
         guardian.checkReadAccess(bucket, path);
         profilerLimiter.objectsList();
-        Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.prefix(path));
+        Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.prefix(config.getPrefix() + path));
         return !blobs.hasNextPage();
     }
 
@@ -278,4 +280,8 @@
         }
         return builder.build().getService();
     }
+
+    private String stripCloudPrefix(String objectName) {
+        return objectName.substring(config.getPrefix().length());
+    }
 }
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java
index 0994cea..0d30120 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java
@@ -56,6 +56,7 @@
     private final Storage gcsClient;
     private final TransferManager transferManager;
     private final IRequestProfilerLimiter profiler;
+    private final GCSClientConfig config;
 
     public GCSParallelDownloader(String bucket, IOManager ioManager, GCSClientConfig config,
             IRequestProfilerLimiter profiler) throws HyracksDataException {
@@ -70,18 +71,21 @@
         this.gcsClient = builder.build().getService();
         this.transferManager =
                 TransferManagerConfig.newBuilder().setStorageOptions(builder.build()).build().getService();
+        this.config = config;
     }
 
     @Override
     public void downloadFiles(Collection<FileReference> toDownload) throws HyracksDataException {
-        ParallelDownloadConfig.Builder config = ParallelDownloadConfig.newBuilder().setBucketName(bucket);
+        ParallelDownloadConfig.Builder downConfig =
+                ParallelDownloadConfig.newBuilder().setBucketName(bucket).setStripPrefix(this.config.getPrefix());
+
         Map<Path, List<BlobInfo>> pathListMap = new HashMap<>();
         try {
             for (FileReference fileReference : toDownload) {
                 profiler.objectGet();
                 FileUtils.createParentDirectories(fileReference.getFile());
-                addToMap(pathListMap, fileReference.getDeviceHandle().getMount().toPath(),
-                        BlobInfo.newBuilder(BlobId.of(bucket, fileReference.getRelativePath())).build());
+                addToMap(pathListMap, fileReference.getDeviceHandle().getMount().toPath(), BlobInfo
+                        .newBuilder(BlobId.of(bucket, config.getPrefix() + fileReference.getRelativePath())).build());
             }
         } catch (IOException e) {
             throw HyracksDataException.create(e);
@@ -89,7 +93,7 @@
         List<DownloadJob> downloadJobs = new ArrayList<>(pathListMap.size());
         for (Map.Entry<Path, List<BlobInfo>> entry : pathListMap.entrySet()) {
             downloadJobs.add(transferManager.downloadBlobs(entry.getValue(),
-                    config.setDownloadDirectory(entry.getKey()).build()));
+                    downConfig.setDownloadDirectory(entry.getKey()).build()));
         }
         downloadJobs.forEach(DownloadJob::getDownloadResults);
     }
@@ -98,20 +102,22 @@
     public Collection<FileReference> downloadDirectories(Collection<FileReference> toDownload)
             throws HyracksDataException {
         Set<FileReference> failedFiles = new HashSet<>();
-        ParallelDownloadConfig.Builder config = ParallelDownloadConfig.newBuilder().setBucketName(bucket);
+        ParallelDownloadConfig.Builder config =
+                ParallelDownloadConfig.newBuilder().setBucketName(bucket).setStripPrefix(this.config.getPrefix());
 
         Map<Path, List<BlobInfo>> pathListMap = new HashMap<>();
         for (FileReference fileReference : toDownload) {
             profiler.objectMultipartDownload();
-            Page<Blob> blobs = gcsClient.list(bucket, Storage.BlobListOption.prefix(fileReference.getRelativePath()));
+            Page<Blob> blobs = gcsClient.list(bucket,
+                    Storage.BlobListOption.prefix(this.config.getPrefix() + fileReference.getRelativePath()));
             for (Blob blob : blobs.iterateAll()) {
                 addToMap(pathListMap, fileReference.getDeviceHandle().getMount().toPath(), blob.asBlobInfo());
             }
         }
         List<DownloadJob> downloadJobs = new ArrayList<>(pathListMap.size());
         for (Map.Entry<Path, List<BlobInfo>> entry : pathListMap.entrySet()) {
-            downloadJobs.add(transferManager.downloadBlobs(entry.getValue(),
-                    config.setDownloadDirectory(entry.getKey()).build()));
+            ParallelDownloadConfig parallelDownloadConfig = config.setDownloadDirectory(entry.getKey()).build();
+            downloadJobs.add(transferManager.downloadBlobs(entry.getValue(), parallelDownloadConfig));
         }
         List<DownloadResult> results;
         for (DownloadJob job : downloadJobs) {
diff --git a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java
index 09cc3f6..08864ac 100644
--- a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java
+++ b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java
@@ -51,7 +51,7 @@
         LOGGER.info("Client created successfully");
         int writeBufferSize = StorageUtil.getIntSizeInBytes(5, StorageUtil.StorageUnit.MEGABYTE);
         GCSClientConfig config =
-                new GCSClientConfig(MOCK_SERVER_REGION, MOCK_SERVER_HOSTNAME, true, 0, writeBufferSize);
+                new GCSClientConfig(MOCK_SERVER_REGION, MOCK_SERVER_HOSTNAME, true, 0, writeBufferSize, "");
         CLOUD_CLIENT = new GCSCloudClient(config, ICloudGuardian.NoOpCloudGuardian.INSTANCE);
     }
 
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java
index f2dbde7..6314ce8 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java
@@ -26,6 +26,7 @@
     public static final String APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME = "applicationDefaultCredentials";
     public static final String JSON_CREDENTIALS_FIELD_NAME = "jsonCredentials";
     public static final String ENDPOINT_FIELD_NAME = "endpoint";
+    public static final String STORAGE_PREFIX = "prefix";
 
     /*
      * Hadoop internal configuration