ASTERIXDB-3493: The storage engine should use the cloud storage prefix whilst writing objects to Google Cloud Storage (GCS)
- user model changes: no
- storage format changes: no
- interface changes: no
Ext-ref: MB-63369
Details:
When the object store used at the backend is Google Cloud Storage, the current storage engine implementation does not use
the cloud storage prefix, as a result the database objects are written directly under the bucket.
After the implementation the expectation is that the database objects would be written under the prefix directory.
Change-Id: I7b84bf98272581bc96851855d4bd8663780ab611
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18770
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSClientConfig.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSClientConfig.java
index fe5dd4d..e4e471d 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSClientConfig.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSClientConfig.java
@@ -19,6 +19,7 @@
package org.apache.asterix.cloud.clients.google.gcs;
import static org.apache.asterix.external.util.google.gcs.GCSConstants.ENDPOINT_FIELD_NAME;
+import static org.apache.asterix.external.util.google.gcs.GCSConstants.STORAGE_PREFIX;
import java.io.IOException;
import java.util.Map;
@@ -42,10 +43,11 @@
private final int readMaxRequestsPerSeconds;
private final int writeMaxRequestsPerSeconds;
private final int writeBufferSize;
+ private final String prefix;
private GCSClientConfig(String region, String endpoint, boolean anonymousAuth, long profilerLogInterval,
long tokenAcquireTimeout, int writeMaxRequestsPerSeconds, int readMaxRequestsPerSeconds,
- int writeBufferSize) {
+ int writeBufferSize, String prefix) {
this.region = region;
this.endpoint = endpoint;
this.anonymousAuth = anonymousAuth;
@@ -54,18 +56,20 @@
this.writeMaxRequestsPerSeconds = writeMaxRequestsPerSeconds;
this.readMaxRequestsPerSeconds = readMaxRequestsPerSeconds;
this.writeBufferSize = writeBufferSize;
+ this.prefix = prefix;
}
public GCSClientConfig(String region, String endpoint, boolean anonymousAuth, long profilerLogInterval,
- int writeBufferSize) {
- this(region, endpoint, anonymousAuth, profilerLogInterval, 1, 0, 0, writeBufferSize);
+ int writeBufferSize, String prefix) {
+ this(region, endpoint, anonymousAuth, profilerLogInterval, 1, 0, 0, writeBufferSize, prefix);
}
public static GCSClientConfig of(CloudProperties cloudProperties) {
return new GCSClientConfig(cloudProperties.getStorageRegion(), cloudProperties.getStorageEndpoint(),
cloudProperties.isStorageAnonymousAuth(), cloudProperties.getProfilerLogInterval(),
cloudProperties.getTokenAcquireTimeout(), cloudProperties.getWriteMaxRequestsPerSecond(),
- cloudProperties.getReadMaxRequestsPerSecond(), cloudProperties.getWriteBufferSize());
+ cloudProperties.getReadMaxRequestsPerSecond(), cloudProperties.getWriteBufferSize(),
+ cloudProperties.getStoragePrefix());
}
public static GCSClientConfig of(Map<String, String> configuration, int writeBufferSize) {
@@ -73,10 +77,10 @@
long profilerLogInterval = 0;
String region = "";
- String prefix = "";
+ String prefix = configuration.getOrDefault(STORAGE_PREFIX, "");
boolean anonymousAuth = false;
- return new GCSClientConfig(region, endPoint, anonymousAuth, profilerLogInterval, writeBufferSize);
+ return new GCSClientConfig(region, endPoint, anonymousAuth, profilerLogInterval, writeBufferSize, prefix);
}
public String getRegion() {
@@ -118,4 +122,8 @@
public int getWriteBufferSize() {
return writeBufferSize;
}
+
+ public String getPrefix() {
+ return prefix;
+ }
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
index 010a6bb..a74e7d3 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
@@ -100,20 +100,20 @@
@Override
public ICloudWriter createWriter(String bucket, String path, IWriteBufferProvider bufferProvider) {
- return new GCSWriter(bucket, path, gcsClient, profilerLimiter, guardian, writeBufferSize);
+ return new GCSWriter(bucket, config.getPrefix() + path, gcsClient, profilerLimiter, guardian, writeBufferSize);
}
@Override
public Set<CloudFile> listObjects(String bucket, String path, FilenameFilter filter) {
guardian.checkReadAccess(bucket, path);
profilerLimiter.objectsList();
- Page<Blob> blobs =
- gcsClient.list(bucket, BlobListOption.prefix(path), BlobListOption.fields(Storage.BlobField.SIZE));
+ Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.prefix(config.getPrefix() + path),
+ BlobListOption.fields(Storage.BlobField.SIZE));
Set<CloudFile> files = new HashSet<>();
for (Blob blob : blobs.iterateAll()) {
if (filter.accept(null, IoUtil.getFileNameFromPath(blob.getName()))) {
- files.add(CloudFile.of(blob.getName(), blob.getSize()));
+ files.add(CloudFile.of(stripCloudPrefix(blob.getName()), blob.getSize()));
}
}
return files;
@@ -123,7 +123,7 @@
public int read(String bucket, String path, long offset, ByteBuffer buffer) throws HyracksDataException {
guardian.checkReadAccess(bucket, path);
profilerLimiter.objectGet();
- BlobId blobId = BlobId.of(bucket, path);
+ BlobId blobId = BlobId.of(bucket, config.getPrefix() + path);
long readTo = offset + buffer.remaining();
int totalRead = 0;
try (ReadChannel from = gcsClient.reader(blobId).limit(readTo)) {
@@ -145,7 +145,7 @@
public byte[] readAllBytes(String bucket, String path) {
guardian.checkReadAccess(bucket, path);
profilerLimiter.objectGet();
- BlobId blobId = BlobId.of(bucket, path);
+ BlobId blobId = BlobId.of(bucket, config.getPrefix() + path);
try {
return gcsClient.readAllBytes(blobId);
} catch (StorageException e) {
@@ -157,7 +157,7 @@
public InputStream getObjectStream(String bucket, String path, long offset, long length) {
guardian.checkReadAccess(bucket, path);
profilerLimiter.objectGet();
- try (ReadChannel reader = gcsClient.reader(bucket, path).limit(offset + length)) {
+ try (ReadChannel reader = gcsClient.reader(bucket, config.getPrefix() + path).limit(offset + length)) {
reader.seek(offset);
return Channels.newInputStream(reader);
} catch (StorageException | IOException e) {
@@ -169,7 +169,7 @@
public void write(String bucket, String path, byte[] data) {
guardian.checkWriteAccess(bucket, path);
profilerLimiter.objectWrite();
- BlobInfo blobInfo = BlobInfo.newBuilder(bucket, path).build();
+ BlobInfo blobInfo = BlobInfo.newBuilder(bucket, config.getPrefix() + path).build();
gcsClient.create(blobInfo, data);
}
@@ -177,7 +177,7 @@
public void copy(String bucket, String srcPath, FileReference destPath) {
guardian.checkReadAccess(bucket, srcPath);
profilerLimiter.objectsList();
- Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.prefix(srcPath));
+ Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.prefix(config.getPrefix() + srcPath));
for (Blob blob : blobs.iterateAll()) {
profilerLimiter.objectCopy();
BlobId source = blob.getBlobId();
@@ -200,7 +200,7 @@
while (pathIter.hasNext()) {
batchRequest = gcsClient.batch();
for (int i = 0; pathIter.hasNext() && i < DELETE_BATCH_SIZE; i++) {
- BlobId blobId = BlobId.of(bucket, pathIter.next());
+ BlobId blobId = BlobId.of(bucket, config.getPrefix() + pathIter.next());
guardian.checkWriteAccess(bucket, blobId.getName());
batchRequest.delete(blobId);
}
@@ -214,7 +214,8 @@
public long getObjectSize(String bucket, String path) {
guardian.checkReadAccess(bucket, path);
profilerLimiter.objectGet();
- Blob blob = gcsClient.get(bucket, path, Storage.BlobGetOption.fields(Storage.BlobField.SIZE));
+ Blob blob =
+ gcsClient.get(bucket, config.getPrefix() + path, Storage.BlobGetOption.fields(Storage.BlobField.SIZE));
if (blob == null) {
return 0;
}
@@ -225,7 +226,8 @@
public boolean exists(String bucket, String path) {
guardian.checkReadAccess(bucket, path);
profilerLimiter.objectGet();
- Blob blob = gcsClient.get(bucket, path, Storage.BlobGetOption.fields(Storage.BlobField.values()));
+ Blob blob = gcsClient.get(bucket, config.getPrefix() + path,
+ Storage.BlobGetOption.fields(Storage.BlobField.values()));
return blob != null && blob.exists();
}
@@ -233,7 +235,7 @@
public boolean isEmptyPrefix(String bucket, String path) {
guardian.checkReadAccess(bucket, path);
profilerLimiter.objectsList();
- Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.prefix(path));
+ Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.prefix(config.getPrefix() + path));
return !blobs.hasNextPage();
}
@@ -278,4 +280,8 @@
}
return builder.build().getService();
}
+
+ private String stripCloudPrefix(String objectName) {
+ return objectName.substring(config.getPrefix().length());
+ }
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java
index 0994cea..0d30120 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java
@@ -56,6 +56,7 @@
private final Storage gcsClient;
private final TransferManager transferManager;
private final IRequestProfilerLimiter profiler;
+ private final GCSClientConfig config;
public GCSParallelDownloader(String bucket, IOManager ioManager, GCSClientConfig config,
IRequestProfilerLimiter profiler) throws HyracksDataException {
@@ -70,18 +71,21 @@
this.gcsClient = builder.build().getService();
this.transferManager =
TransferManagerConfig.newBuilder().setStorageOptions(builder.build()).build().getService();
+ this.config = config;
}
@Override
public void downloadFiles(Collection<FileReference> toDownload) throws HyracksDataException {
- ParallelDownloadConfig.Builder config = ParallelDownloadConfig.newBuilder().setBucketName(bucket);
+ ParallelDownloadConfig.Builder downConfig =
+ ParallelDownloadConfig.newBuilder().setBucketName(bucket).setStripPrefix(this.config.getPrefix());
+
Map<Path, List<BlobInfo>> pathListMap = new HashMap<>();
try {
for (FileReference fileReference : toDownload) {
profiler.objectGet();
FileUtils.createParentDirectories(fileReference.getFile());
- addToMap(pathListMap, fileReference.getDeviceHandle().getMount().toPath(),
- BlobInfo.newBuilder(BlobId.of(bucket, fileReference.getRelativePath())).build());
+ addToMap(pathListMap, fileReference.getDeviceHandle().getMount().toPath(), BlobInfo
+ .newBuilder(BlobId.of(bucket, config.getPrefix() + fileReference.getRelativePath())).build());
}
} catch (IOException e) {
throw HyracksDataException.create(e);
@@ -89,7 +93,7 @@
List<DownloadJob> downloadJobs = new ArrayList<>(pathListMap.size());
for (Map.Entry<Path, List<BlobInfo>> entry : pathListMap.entrySet()) {
downloadJobs.add(transferManager.downloadBlobs(entry.getValue(),
- config.setDownloadDirectory(entry.getKey()).build()));
+ downConfig.setDownloadDirectory(entry.getKey()).build()));
}
downloadJobs.forEach(DownloadJob::getDownloadResults);
}
@@ -98,20 +102,22 @@
public Collection<FileReference> downloadDirectories(Collection<FileReference> toDownload)
throws HyracksDataException {
Set<FileReference> failedFiles = new HashSet<>();
- ParallelDownloadConfig.Builder config = ParallelDownloadConfig.newBuilder().setBucketName(bucket);
+ ParallelDownloadConfig.Builder config =
+ ParallelDownloadConfig.newBuilder().setBucketName(bucket).setStripPrefix(this.config.getPrefix());
Map<Path, List<BlobInfo>> pathListMap = new HashMap<>();
for (FileReference fileReference : toDownload) {
profiler.objectMultipartDownload();
- Page<Blob> blobs = gcsClient.list(bucket, Storage.BlobListOption.prefix(fileReference.getRelativePath()));
+ Page<Blob> blobs = gcsClient.list(bucket,
+ Storage.BlobListOption.prefix(this.config.getPrefix() + fileReference.getRelativePath()));
for (Blob blob : blobs.iterateAll()) {
addToMap(pathListMap, fileReference.getDeviceHandle().getMount().toPath(), blob.asBlobInfo());
}
}
List<DownloadJob> downloadJobs = new ArrayList<>(pathListMap.size());
for (Map.Entry<Path, List<BlobInfo>> entry : pathListMap.entrySet()) {
- downloadJobs.add(transferManager.downloadBlobs(entry.getValue(),
- config.setDownloadDirectory(entry.getKey()).build()));
+ ParallelDownloadConfig parallelDownloadConfig = config.setDownloadDirectory(entry.getKey()).build();
+ downloadJobs.add(transferManager.downloadBlobs(entry.getValue(), parallelDownloadConfig));
}
List<DownloadResult> results;
for (DownloadJob job : downloadJobs) {
diff --git a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java
index 09cc3f6..08864ac 100644
--- a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java
+++ b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java
@@ -51,7 +51,7 @@
LOGGER.info("Client created successfully");
int writeBufferSize = StorageUtil.getIntSizeInBytes(5, StorageUtil.StorageUnit.MEGABYTE);
GCSClientConfig config =
- new GCSClientConfig(MOCK_SERVER_REGION, MOCK_SERVER_HOSTNAME, true, 0, writeBufferSize);
+ new GCSClientConfig(MOCK_SERVER_REGION, MOCK_SERVER_HOSTNAME, true, 0, writeBufferSize, "");
CLOUD_CLIENT = new GCSCloudClient(config, ICloudGuardian.NoOpCloudGuardian.INSTANCE);
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java
index f2dbde7..6314ce8 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java
@@ -26,6 +26,7 @@
public static final String APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME = "applicationDefaultCredentials";
public static final String JSON_CREDENTIALS_FIELD_NAME = "jsonCredentials";
public static final String ENDPOINT_FIELD_NAME = "endpoint";
+ public static final String STORAGE_PREFIX = "prefix";
/*
* Hadoop internal configuration