[ASTERIXDB-3442][STO] Use PUT for single part upload
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
Use PUT instead of multipart upload when uploading
single-part files.
Change-Id: I30a8e035b3ee9a851ff5fb89b372fc836cdf1ca9
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18394
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Hussain Towaileb <hussainht@gmail.com>
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
index 885d612..9e35020 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
@@ -31,9 +31,8 @@
public class CloudResettableInputStream extends InputStream implements ICloudWriter {
private static final Logger LOGGER = LogManager.getLogger();
private final IWriteBufferProvider bufferProvider;
- private ByteBuffer writeBuffer;
-
private final ICloudBufferedWriter bufferedWriter;
+ private ByteBuffer writeBuffer;
public CloudResettableInputStream(ICloudBufferedWriter bufferedWriter, IWriteBufferProvider bufferProvider) {
this.bufferedWriter = bufferedWriter;
@@ -140,7 +139,13 @@
* OR
* (2) nothing was written to the file at all to ensure writing empty file
*/
- uploadAndWait();
+ writeBuffer.flip();
+ try {
+ bufferedWriter.uploadLast(this, writeBuffer);
+ } catch (Exception e) {
+ LOGGER.error(e);
+ throw HyracksDataException.create(e);
+ }
}
bufferedWriter.finish();
} finally {
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudBufferedWriter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudBufferedWriter.java
index 6b51964..35047ce 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudBufferedWriter.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudBufferedWriter.java
@@ -19,6 +19,7 @@
package org.apache.asterix.cloud.clients;
import java.io.InputStream;
+import java.nio.ByteBuffer;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -29,9 +30,16 @@
*
* @param stream stream
* @param length length
- * @return amount uploaded
*/
- int upload(InputStream stream, int length) throws HyracksDataException;
+ void upload(InputStream stream, int length) throws HyracksDataException;
+
+ /**
+ * Upload the last content of the stream or buffer depending on whether a previous part was uploaded
+ *
+ * @param stream stream
+ * @param buffer buffer (should be used instead of stream if no previous bytes were written)
+ */
+ void uploadLast(InputStream stream, ByteBuffer buffer) throws HyracksDataException;
/**
* Checks whether the writer has not written anything
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java
index 93be80c..d0dda2a 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java
@@ -19,6 +19,7 @@
package org.apache.asterix.cloud.clients.aws.s3;
import java.io.InputStream;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -38,9 +39,11 @@
import software.amazon.awssdk.services.s3.model.CompletedPart;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse;
+import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
public class S3BufferedWriter implements ICloudBufferedWriter {
+ private static final String PUT_UPLOAD_ID = "putUploadId";
private static final int MAX_RETRIES = 3;
private static final Logger LOGGER = LogManager.getLogger();
@@ -65,7 +68,7 @@
}
@Override
- public int upload(InputStream stream, int length) {
+ public void upload(InputStream stream, int length) {
guardian.checkIsolatedWriteAccess(bucket, path);
profiler.objectMultipartUpload();
setUploadId();
@@ -73,8 +76,21 @@
UploadPartRequest.builder().uploadId(uploadId).partNumber(partNumber).bucket(bucket).key(path).build();
String etag = s3Client.uploadPart(upReq, RequestBody.fromInputStream(stream, length)).eTag();
partQueue.add(CompletedPart.builder().partNumber(partNumber).eTag(etag).build());
+ partNumber++;
+ }
- return partNumber++;
+ @Override
+ public void uploadLast(InputStream stream, ByteBuffer buffer) {
+ if (uploadId == null) {
+ profiler.objectWrite();
+ PutObjectRequest request = PutObjectRequest.builder().bucket(bucket).key(path).build();
+ // TODO make retryable
+ s3Client.putObject(request, RequestBody.fromByteBuffer(buffer));
+ // Only set the uploadId if the putObject succeeds
+ uploadId = PUT_UPLOAD_ID;
+ } else {
+ upload(stream, buffer.limit());
+ }
}
@Override
@@ -86,9 +102,12 @@
public void finish() throws HyracksDataException {
if (uploadId == null) {
throw new IllegalStateException("Cannot finish without writing any bytes");
+ } else if (PUT_UPLOAD_ID.equals(uploadId)) {
+ LOGGER.debug("FINISHED multipart upload as PUT for {}", path);
+ return;
}
- // A non-empty files, proceed with completing the multipart upload
+ // Finishing a multipart file. Proceed with completing the multipart upload
CompletedMultipartUpload completedMultipartUpload = CompletedMultipartUpload.builder().parts(partQueue).build();
CompleteMultipartUploadRequest completeMultipartUploadRequest = CompleteMultipartUploadRequest.builder()
.bucket(bucket).key(path).uploadId(uploadId).multipartUpload(completedMultipartUpload).build();
@@ -119,7 +138,7 @@
@Override
public void abort() throws HyracksDataException {
- if (uploadId == null) {
+ if (uploadId == null || PUT_UPLOAD_ID.equals(uploadId)) {
return;
}
s3Client.abortMultipartUpload(