[NO ISSUE]: Add Guardian to GCSWriter, Request Limits for GCS
Ext-ref: MB-63055
Change-Id: Id639afcadb1d88b4630e12dde40dbaae94e15f23
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18645
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageGCSTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageGCSTest.java
index 3a03445..89a4781 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageGCSTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageGCSTest.java
@@ -36,7 +36,6 @@
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.FixMethodOrder;
-import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.MethodSorters;
@@ -55,7 +54,6 @@
*/
@RunWith(Parameterized.class)
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
-@Ignore
public class CloudStorageGCSTest {
private static final Logger LOGGER = LogManager.getLogger();
diff --git a/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-gcs.conf b/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-gcs.conf
index 3c883a8..0046644 100644
--- a/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-gcs.conf
+++ b/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-gcs.conf
@@ -70,3 +70,6 @@
cloud.storage.endpoint=http://127.0.0.1:4443
cloud.storage.anonymous.auth=true
cloud.storage.cache.policy=selective
+cloud.max.write.requests.per.second=1000
+cloud.max.read.requests.per.second=5000
+cloud.write.buffer.size=5
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSClientConfig.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSClientConfig.java
index 4edb7a7..fe5dd4d 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSClientConfig.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSClientConfig.java
@@ -25,38 +25,50 @@
import org.apache.asterix.common.config.CloudProperties;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.util.StorageUtil;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.auth.oauth2.OAuth2Credentials;
import com.google.cloud.NoCredentials;
public class GCSClientConfig {
- public static final int WRITE_BUFFER_SIZE = StorageUtil.getIntSizeInBytes(1, StorageUtil.StorageUnit.MEGABYTE);
+
// The maximum number of files that can be deleted (GCS restriction): https://cloud.google.com/storage/quotas#json-requests
static final int DELETE_BATCH_SIZE = 100;
private final String region;
private final String endpoint;
- private final String prefix;
private final boolean anonymousAuth;
private final long profilerLogInterval;
+ private final long tokenAcquireTimeout;
+ private final int readMaxRequestsPerSeconds;
+ private final int writeMaxRequestsPerSeconds;
+ private final int writeBufferSize;
- public GCSClientConfig(String region, String endpoint, String prefix, boolean anonymousAuth,
- long profilerLogInterval) {
+ private GCSClientConfig(String region, String endpoint, boolean anonymousAuth, long profilerLogInterval,
+ long tokenAcquireTimeout, int writeMaxRequestsPerSeconds, int readMaxRequestsPerSeconds,
+ int writeBufferSize) {
this.region = region;
this.endpoint = endpoint;
- this.prefix = prefix;
this.anonymousAuth = anonymousAuth;
this.profilerLogInterval = profilerLogInterval;
+ this.tokenAcquireTimeout = tokenAcquireTimeout;
+ this.writeMaxRequestsPerSeconds = writeMaxRequestsPerSeconds;
+ this.readMaxRequestsPerSeconds = readMaxRequestsPerSeconds;
+ this.writeBufferSize = writeBufferSize;
+ }
+
+ public GCSClientConfig(String region, String endpoint, boolean anonymousAuth, long profilerLogInterval,
+ int writeBufferSize) {
+ this(region, endpoint, anonymousAuth, profilerLogInterval, 1, 0, 0, writeBufferSize);
}
public static GCSClientConfig of(CloudProperties cloudProperties) {
return new GCSClientConfig(cloudProperties.getStorageRegion(), cloudProperties.getStorageEndpoint(),
- cloudProperties.getStoragePrefix(), cloudProperties.isStorageAnonymousAuth(),
- cloudProperties.getProfilerLogInterval());
+ cloudProperties.isStorageAnonymousAuth(), cloudProperties.getProfilerLogInterval(),
+ cloudProperties.getTokenAcquireTimeout(), cloudProperties.getWriteMaxRequestsPerSecond(),
+ cloudProperties.getReadMaxRequestsPerSecond(), cloudProperties.getWriteBufferSize());
}
- public static GCSClientConfig of(Map<String, String> configuration) {
+ public static GCSClientConfig of(Map<String, String> configuration, int writeBufferSize) {
String endPoint = configuration.getOrDefault(ENDPOINT_FIELD_NAME, "");
long profilerLogInterval = 0;
@@ -64,7 +76,7 @@
String prefix = "";
boolean anonymousAuth = false;
- return new GCSClientConfig(region, endPoint, prefix, anonymousAuth, profilerLogInterval);
+ return new GCSClientConfig(region, endPoint, anonymousAuth, profilerLogInterval, writeBufferSize);
}
public String getRegion() {
@@ -75,10 +87,6 @@
return endpoint;
}
- public String getPrefix() {
- return prefix;
- }
-
public long getProfilerLogInterval() {
return profilerLogInterval;
}
@@ -94,4 +102,20 @@
throw HyracksDataException.create(e);
}
}
+
+ public long getTokenAcquireTimeout() {
+ return tokenAcquireTimeout;
+ }
+
+ public int getWriteMaxRequestsPerSeconds() {
+ return writeMaxRequestsPerSeconds;
+ }
+
+ public int getReadMaxRequestsPerSeconds() {
+ return readMaxRequestsPerSeconds;
+ }
+
+ public int getWriteBufferSize() {
+ return writeBufferSize;
+ }
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
index de242bd..010a6bb 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
@@ -40,8 +40,7 @@
import org.apache.asterix.cloud.clients.IParallelDownloader;
import org.apache.asterix.cloud.clients.profiler.CountRequestProfilerLimiter;
import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter;
-import org.apache.asterix.cloud.clients.profiler.NoOpRequestProfilerLimiter;
-import org.apache.asterix.cloud.clients.profiler.limiter.NoOpRequestLimiter;
+import org.apache.asterix.cloud.clients.profiler.RequestLimiterNoOpProfiler;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.util.IoUtil;
@@ -68,16 +67,19 @@
private final GCSClientConfig config;
private final ICloudGuardian guardian;
private final IRequestProfilerLimiter profilerLimiter;
+ private final int writeBufferSize;
public GCSCloudClient(GCSClientConfig config, Storage gcsClient, ICloudGuardian guardian) {
this.gcsClient = gcsClient;
this.config = config;
this.guardian = guardian;
+ this.writeBufferSize = config.getWriteBufferSize();
long profilerInterval = config.getProfilerLogInterval();
+ GCSRequestRateLimiter limiter = new GCSRequestRateLimiter(config);
if (profilerInterval > 0) {
- profilerLimiter = new CountRequestProfilerLimiter(profilerInterval, NoOpRequestLimiter.INSTANCE);
+ profilerLimiter = new CountRequestProfilerLimiter(profilerInterval, limiter);
} else {
- profilerLimiter = NoOpRequestProfilerLimiter.INSTANCE;
+ profilerLimiter = new RequestLimiterNoOpProfiler(limiter);
}
guardian.setCloudClient(this);
}
@@ -88,7 +90,7 @@
@Override
public int getWriteBufferSize() {
- return GCSClientConfig.WRITE_BUFFER_SIZE;
+ return writeBufferSize;
}
@Override
@@ -98,7 +100,7 @@
@Override
public ICloudWriter createWriter(String bucket, String path, IWriteBufferProvider bufferProvider) {
- return new GCSWriter(bucket, path, gcsClient, profilerLimiter);
+ return new GCSWriter(bucket, path, gcsClient, profilerLimiter, guardian, writeBufferSize);
}
@Override
@@ -119,6 +121,7 @@
@Override
public int read(String bucket, String path, long offset, ByteBuffer buffer) throws HyracksDataException {
+ guardian.checkReadAccess(bucket, path);
profilerLimiter.objectGet();
BlobId blobId = BlobId.of(bucket, path);
long readTo = offset + buffer.remaining();
@@ -140,6 +143,7 @@
@Override
public byte[] readAllBytes(String bucket, String path) {
+ guardian.checkReadAccess(bucket, path);
profilerLimiter.objectGet();
BlobId blobId = BlobId.of(bucket, path);
try {
@@ -151,6 +155,7 @@
@Override
public InputStream getObjectStream(String bucket, String path, long offset, long length) {
+ guardian.checkReadAccess(bucket, path);
profilerLimiter.objectGet();
try (ReadChannel reader = gcsClient.reader(bucket, path).limit(offset + length)) {
reader.seek(offset);
@@ -170,8 +175,9 @@
@Override
public void copy(String bucket, String srcPath, FileReference destPath) {
- Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.prefix(srcPath));
+ guardian.checkReadAccess(bucket, srcPath);
profilerLimiter.objectsList();
+ Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.prefix(srcPath));
for (Blob blob : blobs.iterateAll()) {
profilerLimiter.objectCopy();
BlobId source = blob.getBlobId();
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSRequestRateLimiter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSRequestRateLimiter.java
new file mode 100644
index 0000000..71f6b8c
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSRequestRateLimiter.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.clients.google.gcs;
+
+import org.apache.asterix.cloud.clients.profiler.limiter.IRateLimiter;
+import org.apache.asterix.cloud.clients.profiler.limiter.IRequestRateLimiter;
+import org.apache.asterix.cloud.clients.profiler.limiter.NoOpRateLimiter;
+import org.apache.asterix.cloud.clients.profiler.limiter.TokenBasedRateLimiter;
+
+public class GCSRequestRateLimiter implements IRequestRateLimiter {
+ private final IRateLimiter writeLimiter;
+ private final IRateLimiter readLimiter;
+
+ public GCSRequestRateLimiter(GCSClientConfig config) {
+ long tokenAcquireTimeout = config.getTokenAcquireTimeout();
+ this.writeLimiter = createLimiter(config.getWriteMaxRequestsPerSeconds(), tokenAcquireTimeout);
+ this.readLimiter = createLimiter(config.getReadMaxRequestsPerSeconds(), tokenAcquireTimeout);
+ }
+
+ @Override
+ public void writeRequest() {
+ writeLimiter.acquire();
+ }
+
+ @Override
+ public void readRequest() {
+ readLimiter.acquire();
+ }
+
+ @Override
+ public void listRequest() {
+ readLimiter.acquire();
+ }
+
+ private static IRateLimiter createLimiter(int maxRequestsPerSecond, long tokeAcquireTimeout) {
+ if (maxRequestsPerSecond > 0) {
+ return new TokenBasedRateLimiter(maxRequestsPerSecond, tokeAcquireTimeout);
+ }
+ return NoOpRateLimiter.INSTANCE;
+ }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
index 41d1a71..8d68f01 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
@@ -18,11 +18,10 @@
*/
package org.apache.asterix.cloud.clients.google.gcs;
-import static org.apache.asterix.cloud.clients.google.gcs.GCSClientConfig.WRITE_BUFFER_SIZE;
-
import java.io.IOException;
import java.nio.ByteBuffer;
+import org.apache.asterix.cloud.clients.ICloudGuardian;
import org.apache.asterix.cloud.clients.ICloudWriter;
import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -40,14 +39,20 @@
private final String path;
private final IRequestProfilerLimiter profiler;
private final Storage gcsClient;
+ private final ICloudGuardian guardian;
+ private final int writeBufferSize;
+
private WriteChannel writer = null;
private long writtenBytes;
- public GCSWriter(String bucket, String path, Storage gcsClient, IRequestProfilerLimiter profiler) {
+ public GCSWriter(String bucket, String path, Storage gcsClient, IRequestProfilerLimiter profiler,
+ ICloudGuardian guardian, int writeBufferSize) {
this.bucket = bucket;
this.path = path;
this.profiler = profiler;
this.gcsClient = gcsClient;
+ this.guardian = guardian;
+ this.writeBufferSize = writeBufferSize;
writtenBytes = 0;
}
@@ -58,6 +63,7 @@
@Override
public int write(ByteBuffer page) throws HyracksDataException {
+ guardian.checkIsolatedWriteAccess(bucket, path);
profiler.objectMultipartUpload();
setUploadId();
int written = 0;
@@ -93,6 +99,7 @@
@Override
public void finish() throws HyracksDataException {
+ guardian.checkWriteAccess(bucket, path);
setUploadId();
profiler.objectMultipartUpload();
try {
@@ -115,7 +122,7 @@
private void setUploadId() {
if (writer == null) {
writer = gcsClient.writer(BlobInfo.newBuilder(BlobId.of(bucket, path)).build());
- writer.setChunkSize(WRITE_BUFFER_SIZE);
+ writer.setChunkSize(writeBufferSize);
writtenBytes = 0;
log("STARTED");
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
index 9e9c003..886f20d 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
@@ -62,7 +62,7 @@
@Override
ICloudClient createCloudClient() throws CompilationException {
- GCSClientConfig config = GCSClientConfig.of(configuration);
+ GCSClientConfig config = GCSClientConfig.of(configuration, writeBufferSize);
return new GCSCloudClient(config, GCSUtils.buildClient(configuration),
ICloudGuardian.NoOpCloudGuardian.INSTANCE);
}
diff --git a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java
index 3c62cce..09cc3f6 100644
--- a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java
+++ b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java
@@ -22,6 +22,7 @@
import org.apache.asterix.cloud.clients.ICloudGuardian;
import org.apache.asterix.cloud.clients.google.gcs.GCSClientConfig;
import org.apache.asterix.cloud.clients.google.gcs.GCSCloudClient;
+import org.apache.hyracks.util.StorageUtil;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -48,7 +49,9 @@
client.create(BucketInfo.newBuilder(PLAYGROUND_CONTAINER).setStorageClass(StorageClass.STANDARD)
.setLocation(MOCK_SERVER_REGION).build());
LOGGER.info("Client created successfully");
- GCSClientConfig config = new GCSClientConfig(MOCK_SERVER_REGION, MOCK_SERVER_HOSTNAME, "", true, 0);
+ int writeBufferSize = StorageUtil.getIntSizeInBytes(5, StorageUtil.StorageUnit.MEGABYTE);
+ GCSClientConfig config =
+ new GCSClientConfig(MOCK_SERVER_REGION, MOCK_SERVER_HOSTNAME, true, 0, writeBufferSize);
CLOUD_CLIENT = new GCSCloudClient(config, ICloudGuardian.NoOpCloudGuardian.INSTANCE);
}