[ASTERIXDB-3442][STO] Use PUT for single part upload

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
Use PUT instead of multipart upload when uploading
single-part files.

Change-Id: I30a8e035b3ee9a851ff5fb89b372fc836cdf1ca9
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18394
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Hussain Towaileb <hussainht@gmail.com>
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
index 885d612..9e35020 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
@@ -31,9 +31,8 @@
 public class CloudResettableInputStream extends InputStream implements ICloudWriter {
     private static final Logger LOGGER = LogManager.getLogger();
     private final IWriteBufferProvider bufferProvider;
-    private ByteBuffer writeBuffer;
-
     private final ICloudBufferedWriter bufferedWriter;
+    private ByteBuffer writeBuffer;
 
     public CloudResettableInputStream(ICloudBufferedWriter bufferedWriter, IWriteBufferProvider bufferProvider) {
         this.bufferedWriter = bufferedWriter;
@@ -140,7 +139,13 @@
                  * OR
                  * (2) nothing was written to the file at all to ensure writing empty file
                  */
-                uploadAndWait();
+                writeBuffer.flip();
+                try {
+                    bufferedWriter.uploadLast(this, writeBuffer);
+                } catch (Exception e) {
+                    LOGGER.error(e);
+                    throw HyracksDataException.create(e);
+                }
             }
             bufferedWriter.finish();
         } finally {
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudBufferedWriter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudBufferedWriter.java
index 6b51964..35047ce 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudBufferedWriter.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudBufferedWriter.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.cloud.clients;
 
 import java.io.InputStream;
+import java.nio.ByteBuffer;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
@@ -29,9 +30,16 @@
      *
      * @param stream stream
      * @param length length
-     * @return amount uploaded
      */
-    int upload(InputStream stream, int length) throws HyracksDataException;
+    void upload(InputStream stream, int length) throws HyracksDataException;
+
+    /**
+     * Upload the last content of the stream or buffer depending on whether a previous part was uploaded
+     *
+     * @param stream stream
+     * @param buffer buffer (should be used instead of stream if no previous bytes were written)
+     */
+    void uploadLast(InputStream stream, ByteBuffer buffer) throws HyracksDataException;
 
     /**
      * Checks whether the writer has not written anything
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java
index 93be80c..d0dda2a 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.cloud.clients.aws.s3;
 
 import java.io.InputStream;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
@@ -38,9 +39,11 @@
 import software.amazon.awssdk.services.s3.model.CompletedPart;
 import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
 import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse;
+import software.amazon.awssdk.services.s3.model.PutObjectRequest;
 import software.amazon.awssdk.services.s3.model.UploadPartRequest;
 
 public class S3BufferedWriter implements ICloudBufferedWriter {
+    private static final String PUT_UPLOAD_ID = "putUploadId";
     private static final int MAX_RETRIES = 3;
 
     private static final Logger LOGGER = LogManager.getLogger();
@@ -65,7 +68,7 @@
     }
 
     @Override
-    public int upload(InputStream stream, int length) {
+    public void upload(InputStream stream, int length) {
         guardian.checkIsolatedWriteAccess(bucket, path);
         profiler.objectMultipartUpload();
         setUploadId();
@@ -73,8 +76,21 @@
                 UploadPartRequest.builder().uploadId(uploadId).partNumber(partNumber).bucket(bucket).key(path).build();
         String etag = s3Client.uploadPart(upReq, RequestBody.fromInputStream(stream, length)).eTag();
         partQueue.add(CompletedPart.builder().partNumber(partNumber).eTag(etag).build());
+        partNumber++;
+    }
 
-        return partNumber++;
+    @Override
+    public void uploadLast(InputStream stream, ByteBuffer buffer) {
+        if (uploadId == null) {
+            profiler.objectWrite();
+            PutObjectRequest request = PutObjectRequest.builder().bucket(bucket).key(path).build();
+            // TODO make retryable
+            s3Client.putObject(request, RequestBody.fromByteBuffer(buffer));
+            // Only set the uploadId if the putObject succeeds
+            uploadId = PUT_UPLOAD_ID;
+        } else {
+            upload(stream, buffer.limit());
+        }
     }
 
     @Override
@@ -86,9 +102,12 @@
     public void finish() throws HyracksDataException {
         if (uploadId == null) {
             throw new IllegalStateException("Cannot finish without writing any bytes");
+        } else if (PUT_UPLOAD_ID.equals(uploadId)) {
+            LOGGER.debug("FINISHED multipart upload as PUT for {}", path);
+            return;
         }
 
-        // A non-empty files, proceed with completing the multipart upload
+        // Finishing a multipart file. Proceed with completing the multipart upload
         CompletedMultipartUpload completedMultipartUpload = CompletedMultipartUpload.builder().parts(partQueue).build();
         CompleteMultipartUploadRequest completeMultipartUploadRequest = CompleteMultipartUploadRequest.builder()
                 .bucket(bucket).key(path).uploadId(uploadId).multipartUpload(completedMultipartUpload).build();
@@ -119,7 +138,7 @@
 
     @Override
     public void abort() throws HyracksDataException {
-        if (uploadId == null) {
+        if (uploadId == null || PUT_UPLOAD_ID.equals(uploadId)) {
             return;
         }
         s3Client.abortMultipartUpload(