[NO ISSUE]: Fix rate limiting
- user model changes: no
- storage format changes: no
- interface changes: no
details:
- Fix write limits in GCS
Ext-ref: MB-64820
Change-Id: Ibce236873adbbb1a4f231dd30176c1ed247905a9
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19329
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Tested-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
index 8d68f01..a95a9d9 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
@@ -64,14 +64,21 @@
@Override
public int write(ByteBuffer page) throws HyracksDataException {
guardian.checkIsolatedWriteAccess(bucket, path);
- profiler.objectMultipartUpload();
+ // The GCS library triggers a new upload when its internal buffer is full, not on each call to writer.write().
+ // uploadsToBeTriggered estimates upload count, and we acquire matching tokens from the limiter.
+ int uploadsToBeTriggered =
+ (int) ((writtenBytes + page.remaining()) / writeBufferSize) - (int) (writtenBytes / writeBufferSize);
+ while (uploadsToBeTriggered-- > 0) {
+ profiler.objectMultipartUpload();
+ }
setUploadId();
+
int written = 0;
try {
while (page.hasRemaining()) {
written += writer.write(page);
}
- } catch (IOException e) {
+ } catch (IOException | RuntimeException e) {
throw HyracksDataException.create(e);
}
@@ -81,9 +88,7 @@
@Override
public int write(byte[] b, int off, int len) throws HyracksDataException {
- int written = write(ByteBuffer.wrap(b, off, len));
- writtenBytes += written;
- return written;
+ return write(ByteBuffer.wrap(b, off, len));
}
@Override
@@ -94,7 +99,6 @@
@Override
public void write(int b) throws HyracksDataException {
write(ByteBuffer.wrap(new byte[] { (byte) b }));
- writtenBytes += 1;
}
@Override
@@ -105,7 +109,7 @@
try {
writer.close();
writer = null;
- } catch (IOException e) {
+ } catch (IOException | RuntimeException e) {
throw HyracksDataException.create(e);
}
log("FINISHED");