[ASTERIXDB-3423][CONF] Make cloud write buffer size configurable

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Change the cloud write buffer size to 8MB.
- Enable the cloud profiler by setting the interval to 5 minutes.
- Make the cloud profiler logging level TRACE.
- Reserve the write buffer size from the jobExecutionMemory.

Change-Id: I41955440f0b3525a42e13ed03ff0909fc788e238
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18347
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Tested-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 18e24ab..7da3838 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -697,7 +697,7 @@
         String schedulerName = storageProperties.getIoScheduler();
         int numPartitions = ioManager.getIODevices().size();
 
-        int maxConcurrentFlushes = storageProperties.geMaxConcurrentFlushes(numPartitions);
+        int maxConcurrentFlushes = storageProperties.getMaxConcurrentFlushes(numPartitions);
         int maxScheduledMerges = storageProperties.getMaxScheduledMerges(numPartitions);
         int maxConcurrentMerges = storageProperties.getMaxConcurrentMerges(numPartitions);
 
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index d5659cf..d02dc4f 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -338,7 +338,7 @@
     @Override
     public NodeCapacity getCapacity() {
         StorageProperties storageProperties = runtimeContext.getStorageProperties();
-        final long memorySize = storageProperties.getJobExecutionMemoryBudget();
+        final long memorySize = storageProperties.getJobExecutionMemoryBudget(runtimeContext);
         int allCores = Runtime.getRuntime().availableProcessors();
         return new NodeCapacity(memorySize, allCores);
     }
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
index 161fb37..c98e0e8 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
@@ -29,7 +29,7 @@
 import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
 
 public final class S3ClientConfig {
-    static final int WRITE_BUFFER_SIZE = StorageUtil.getIntSizeInBytes(5, StorageUtil.StorageUnit.MEGABYTE);
+
     // The maximum number of file that can be deleted (AWS restriction)
     static final int DELETE_BATCH_SIZE = 1000;
     private final String region;
@@ -37,20 +37,22 @@
     private final String prefix;
     private final boolean anonymousAuth;
     private final long profilerLogInterval;
+    private final int writeBufferSize;
 
     public S3ClientConfig(String region, String endpoint, String prefix, boolean anonymousAuth,
-            long profilerLogInterval) {
+            long profilerLogInterval, int writeBufferSize) {
         this.region = region;
         this.endpoint = endpoint;
         this.prefix = prefix;
         this.anonymousAuth = anonymousAuth;
         this.profilerLogInterval = profilerLogInterval;
+        this.writeBufferSize = writeBufferSize;
     }
 
     public static S3ClientConfig of(CloudProperties cloudProperties) {
         return new S3ClientConfig(cloudProperties.getStorageRegion(), cloudProperties.getStorageEndpoint(),
                 cloudProperties.getStoragePrefix(), cloudProperties.isStorageAnonymousAuth(),
-                cloudProperties.getProfilerLogInterval());
+                cloudProperties.getProfilerLogInterval(), cloudProperties.getWriteBufferSize());
     }
 
     public static S3ClientConfig of(Map<String, String> configuration) {
@@ -63,8 +65,9 @@
         String region = "";
         String prefix = "";
         boolean anonymousAuth = false;
+        int writeBufferSize = StorageUtil.getIntSizeInBytes(5, StorageUtil.StorageUnit.MEGABYTE);
 
-        return new S3ClientConfig(region, endPoint, prefix, anonymousAuth, profilerLogInterval);
+        return new S3ClientConfig(region, endPoint, prefix, anonymousAuth, profilerLogInterval, writeBufferSize);
     }
 
     public String getRegion() {
@@ -92,6 +95,10 @@
         return profilerLogInterval;
     }
 
+    public int getWriteBufferSize() {
+        return writeBufferSize;
+    }
+
     private boolean isS3Mock() {
         return endpoint != null && !endpoint.isEmpty();
     }
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
index 97169db..2eae455 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
@@ -79,6 +79,7 @@
     private final S3Client s3Client;
     private final ICloudGuardian guardian;
     private final IRequestProfiler profiler;
+    private final int writeBufferSize;
 
     public S3CloudClient(S3ClientConfig config, ICloudGuardian guardian) {
         this(config, buildClient(config), guardian);
@@ -88,6 +89,7 @@
         this.config = config;
         this.s3Client = s3Client;
         this.guardian = guardian;
+        this.writeBufferSize = config.getWriteBufferSize();
         long profilerInterval = config.getProfilerLogInterval();
         if (profilerInterval > 0) {
             profiler = new CountRequestProfiler(profilerInterval);
@@ -99,7 +101,7 @@
 
     @Override
     public int getWriteBufferSize() {
-        return S3ClientConfig.WRITE_BUFFER_SIZE;
+        return writeBufferSize;
     }
 
     @Override
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/CountRequestProfiler.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/CountRequestProfiler.java
index d5fc2db..624395b 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/CountRequestProfiler.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/CountRequestProfiler.java
@@ -20,6 +20,7 @@
 
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -29,6 +30,7 @@
 public class CountRequestProfiler implements IRequestProfiler {
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
     private static final Logger LOGGER = LogManager.getLogger();
+    private static final Level LOG_LEVEL = Level.TRACE;
     private final long logInterval;
     private final AtomicLong listObjectsCounter;
     private final AtomicLong getObjectCounter;
@@ -94,19 +96,21 @@
     }
 
     private void log() {
-        long currentTime = System.nanoTime();
-        if (currentTime - lastLogTimestamp >= logInterval) {
-            // Might log multiple times
-            lastLogTimestamp = currentTime;
-            ObjectNode countersNode = OBJECT_MAPPER.createObjectNode();
-            countersNode.put("listObjectsCounter", listObjectsCounter.get());
-            countersNode.put("getObjectCounter", getObjectCounter.get());
-            countersNode.put("writeObjectCounter", writeObjectCounter.get());
-            countersNode.put("deleteObjectCounter", deleteObjectCounter.get());
-            countersNode.put("copyObjectCounter", copyObjectCounter.get());
-            countersNode.put("multipartUploadCounter", multipartUploadCounter.get());
-            countersNode.put("multipartDownloadCounter", multipartDownloadCounter.get());
-            LOGGER.debug("Cloud request counters: {}", countersNode.toString());
+        if (LOGGER.isEnabled(LOG_LEVEL)) {
+            long currentTime = System.nanoTime();
+            if (currentTime - lastLogTimestamp >= logInterval) {
+                // Might log multiple times
+                lastLogTimestamp = currentTime;
+                ObjectNode countersNode = OBJECT_MAPPER.createObjectNode();
+                countersNode.put("listObjectsCounter", listObjectsCounter.get());
+                countersNode.put("getObjectCounter", getObjectCounter.get());
+                countersNode.put("writeObjectCounter", writeObjectCounter.get());
+                countersNode.put("deleteObjectCounter", deleteObjectCounter.get());
+                countersNode.put("copyObjectCounter", copyObjectCounter.get());
+                countersNode.put("multipartUploadCounter", multipartUploadCounter.get());
+                countersNode.put("multipartDownloadCounter", multipartDownloadCounter.get());
+                LOGGER.log(LOG_LEVEL, "Cloud request counters: {}", countersNode.toString());
+            }
         }
     }
 }
diff --git a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/s3/LSMS3Test.java b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/s3/LSMS3Test.java
index 01d3422..fad8081 100644
--- a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/s3/LSMS3Test.java
+++ b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/s3/LSMS3Test.java
@@ -24,6 +24,7 @@
 import org.apache.asterix.cloud.clients.ICloudGuardian;
 import org.apache.asterix.cloud.clients.aws.s3.S3ClientConfig;
 import org.apache.asterix.cloud.clients.aws.s3.S3CloudClient;
+import org.apache.hyracks.util.StorageUtil;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 
@@ -65,7 +66,9 @@
         cleanup();
         client.createBucket(CreateBucketRequest.builder().bucket(PLAYGROUND_CONTAINER).build());
         LOGGER.info("Client created successfully");
-        S3ClientConfig config = new S3ClientConfig(MOCK_SERVER_REGION, MOCK_SERVER_HOSTNAME, "", true, 0);
+        int writeBufferSize = StorageUtil.getIntSizeInBytes(5, StorageUtil.StorageUnit.MEGABYTE);
+        S3ClientConfig config =
+                new S3ClientConfig(MOCK_SERVER_REGION, MOCK_SERVER_HOSTNAME, "", true, 0, writeBufferSize);
         CLOUD_CLIENT = new S3CloudClient(config, ICloudGuardian.NoOpCloudGuardian.INSTANCE);
     }
 
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
index ea12621..5c612dc 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
@@ -56,7 +56,8 @@
         CLOUD_STORAGE_INDEX_INACTIVE_DURATION_THRESHOLD(POSITIVE_INTEGER, 360),
         CLOUD_STORAGE_DEBUG_MODE_ENABLED(BOOLEAN, false),
         CLOUD_STORAGE_DEBUG_SWEEP_THRESHOLD_SIZE(LONG_BYTE_UNIT, StorageUtil.getLongSizeInBytes(1, GIGABYTE)),
-        CLOUD_PROFILER_LOG_INTERVAL(NONNEGATIVE_INTEGER, 0);
+        CLOUD_PROFILER_LOG_INTERVAL(NONNEGATIVE_INTEGER, 5),
+        CLOUD_WRITE_BUFFER_SIZE(POSITIVE_INTEGER, StorageUtil.getIntSizeInBytes(8, StorageUtil.StorageUnit.MEGABYTE));
 
         private final IOptionType interpreter;
         private final Object defaultValue;
@@ -83,6 +84,7 @@
                 case CLOUD_STORAGE_DEBUG_SWEEP_THRESHOLD_SIZE:
                 case CLOUD_STORAGE_DEBUG_MODE_ENABLED:
                 case CLOUD_PROFILER_LOG_INTERVAL:
+                case CLOUD_WRITE_BUFFER_SIZE:
                     return Section.COMMON;
                 default:
                     return Section.NC;
@@ -139,6 +141,8 @@
                     return "The waiting time (in minutes) to log cloud request statistics (default: 0, which means"
                             + " the profiler is disabled by default). The minimum is 1 minute."
                             + " NOTE: Enabling the profiler could perturb the performance of cloud requests";
+                case CLOUD_WRITE_BUFFER_SIZE:
+                    return "The write buffer size in bytes. (default: 8MB)";
                 default:
                     throw new IllegalStateException("NYI: " + this);
             }
@@ -213,4 +217,8 @@
         long interval = TimeUnit.MINUTES.toNanos(accessor.getInt(Option.CLOUD_PROFILER_LOG_INTERVAL));
         return interval == 0 ? 0 : Math.max(interval, TimeUnit.MINUTES.toNanos(1));
     }
+
+    public int getWriteBufferSize() {
+        return accessor.getInt(Option.CLOUD_WRITE_BUFFER_SIZE);
+    }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
index 6fe0eb8..5fbc7c6 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
@@ -32,6 +32,7 @@
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 
+import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
 import org.apache.asterix.common.utils.PartitioningScheme;
 import org.apache.hyracks.api.config.IApplicationConfig;
@@ -240,15 +241,17 @@
         return accessor.getInt(Option.STORAGE_MEMORYCOMPONENT_MAX_SCHEDULED_FLUSHES);
     }
 
-    public long getJobExecutionMemoryBudget() {
-        final long jobExecutionMemory = MAX_HEAP_BYTES - getBufferCacheSize() - getMemoryComponentGlobalBudget();
-        if (jobExecutionMemory <= 0) {
-            final String msg = String.format(
-                    "Invalid node memory configuration, more memory budgeted than available in JVM. Runtime max memory:"
-                            + " (%d), Buffer cache memory (%d), memory component global budget (%d)",
-                    MAX_HEAP_BYTES, getBufferCacheSize(), getMemoryComponentGlobalBudget());
-            throw new IllegalStateException(msg);
+    public long getJobExecutionMemoryBudget(INcApplicationContext runtimeContext) {
+        long jobExecutionMemory = MAX_HEAP_BYTES - getBufferCacheSize() - getMemoryComponentGlobalBudget();
+        if (runtimeContext.isCloudDeployment()) {
+            int numPartitions = runtimeContext.getIoManager().getIODevices().size();
+            int maxConcurrentMerges = getMaxConcurrentMerges(numPartitions);
+            int maxConcurrentFlushes = getMaxConcurrentFlushes(numPartitions);
+            int writeBufferSize = runtimeContext.getCloudProperties().getWriteBufferSize();
+            jobExecutionMemory -= (long) (maxConcurrentFlushes + maxConcurrentMerges) * writeBufferSize;
+
         }
+        ensureJobExecutionMemory(jobExecutionMemory, runtimeContext);
         return jobExecutionMemory;
     }
 
@@ -260,7 +263,7 @@
         return accessor.getString(Option.STORAGE_IO_SCHEDULER);
     }
 
-    public int geMaxConcurrentFlushes(int numPartitions) {
+    public int getMaxConcurrentFlushes(int numPartitions) {
         int value = accessor.getInt(Option.STORAGE_MAX_CONCURRENT_FLUSHES_PER_PARTITION);
         return value != 0 ? value * numPartitions : Integer.MAX_VALUE;
     }
@@ -326,4 +329,23 @@
     public long getStorageMaxComponentSize() {
         return accessor.getLong(Option.STORAGE_MAX_COMPONENT_SIZE);
     }
+
+    private void ensureJobExecutionMemory(long jobExecutionMemory, INcApplicationContext runtimeContext) {
+        if (jobExecutionMemory <= 0) {
+            String msg;
+            if (runtimeContext.isCloudDeployment()) {
+                msg = String.format(
+                        "Invalid node memory configuration, more memory budgeted than available in JVM. Runtime max memory:"
+                                + " (%d), Buffer cache memory (%d), memory component global budget (%d), cloud write buffer size (%d)",
+                        MAX_HEAP_BYTES, getBufferCacheSize(), getMemoryComponentGlobalBudget(),
+                        runtimeContext.getCloudProperties().getWriteBufferSize());
+            } else {
+                msg = String.format(
+                        "Invalid node memory configuration, more memory budgeted than available in JVM. Runtime max memory:"
+                                + " (%d), Buffer cache memory (%d), memory component global budget (%d)",
+                        MAX_HEAP_BYTES, getBufferCacheSize(), getMemoryComponentGlobalBudget());
+            }
+            throw new IllegalStateException(msg);
+        }
+    }
 }