[ASTERIXDB-3423][CONF] Make cloud write buffer size configurable
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Change the cloud write buffer size to 8MB.
- Enable the cloud profiler by setting the interval to 5 minutes.
- Make the cloud profiler logging level TRACE.
- Reserve the write buffer size from the jobExecutionMemory.
Change-Id: I41955440f0b3525a42e13ed03ff0909fc788e238
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18347
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Tested-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 18e24ab..7da3838 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -697,7 +697,7 @@
String schedulerName = storageProperties.getIoScheduler();
int numPartitions = ioManager.getIODevices().size();
- int maxConcurrentFlushes = storageProperties.geMaxConcurrentFlushes(numPartitions);
+ int maxConcurrentFlushes = storageProperties.getMaxConcurrentFlushes(numPartitions);
int maxScheduledMerges = storageProperties.getMaxScheduledMerges(numPartitions);
int maxConcurrentMerges = storageProperties.getMaxConcurrentMerges(numPartitions);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index d5659cf..d02dc4f 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -338,7 +338,7 @@
@Override
public NodeCapacity getCapacity() {
StorageProperties storageProperties = runtimeContext.getStorageProperties();
- final long memorySize = storageProperties.getJobExecutionMemoryBudget();
+ final long memorySize = storageProperties.getJobExecutionMemoryBudget(runtimeContext);
int allCores = Runtime.getRuntime().availableProcessors();
return new NodeCapacity(memorySize, allCores);
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
index 161fb37..c98e0e8 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
@@ -29,7 +29,7 @@
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
public final class S3ClientConfig {
- static final int WRITE_BUFFER_SIZE = StorageUtil.getIntSizeInBytes(5, StorageUtil.StorageUnit.MEGABYTE);
+
// The maximum number of file that can be deleted (AWS restriction)
static final int DELETE_BATCH_SIZE = 1000;
private final String region;
@@ -37,20 +37,22 @@
private final String prefix;
private final boolean anonymousAuth;
private final long profilerLogInterval;
+ private final int writeBufferSize;
public S3ClientConfig(String region, String endpoint, String prefix, boolean anonymousAuth,
- long profilerLogInterval) {
+ long profilerLogInterval, int writeBufferSize) {
this.region = region;
this.endpoint = endpoint;
this.prefix = prefix;
this.anonymousAuth = anonymousAuth;
this.profilerLogInterval = profilerLogInterval;
+ this.writeBufferSize = writeBufferSize;
}
public static S3ClientConfig of(CloudProperties cloudProperties) {
return new S3ClientConfig(cloudProperties.getStorageRegion(), cloudProperties.getStorageEndpoint(),
cloudProperties.getStoragePrefix(), cloudProperties.isStorageAnonymousAuth(),
- cloudProperties.getProfilerLogInterval());
+ cloudProperties.getProfilerLogInterval(), cloudProperties.getWriteBufferSize());
}
public static S3ClientConfig of(Map<String, String> configuration) {
@@ -63,8 +65,9 @@
String region = "";
String prefix = "";
boolean anonymousAuth = false;
+ int writeBufferSize = StorageUtil.getIntSizeInBytes(5, StorageUtil.StorageUnit.MEGABYTE);
- return new S3ClientConfig(region, endPoint, prefix, anonymousAuth, profilerLogInterval);
+ return new S3ClientConfig(region, endPoint, prefix, anonymousAuth, profilerLogInterval, writeBufferSize);
}
public String getRegion() {
@@ -92,6 +95,10 @@
return profilerLogInterval;
}
+ public int getWriteBufferSize() {
+ return writeBufferSize;
+ }
+
private boolean isS3Mock() {
return endpoint != null && !endpoint.isEmpty();
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
index 97169db..2eae455 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
@@ -79,6 +79,7 @@
private final S3Client s3Client;
private final ICloudGuardian guardian;
private final IRequestProfiler profiler;
+ private final int writeBufferSize;
public S3CloudClient(S3ClientConfig config, ICloudGuardian guardian) {
this(config, buildClient(config), guardian);
@@ -88,6 +89,7 @@
this.config = config;
this.s3Client = s3Client;
this.guardian = guardian;
+ this.writeBufferSize = config.getWriteBufferSize();
long profilerInterval = config.getProfilerLogInterval();
if (profilerInterval > 0) {
profiler = new CountRequestProfiler(profilerInterval);
@@ -99,7 +101,7 @@
@Override
public int getWriteBufferSize() {
- return S3ClientConfig.WRITE_BUFFER_SIZE;
+ return writeBufferSize;
}
@Override
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/CountRequestProfiler.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/CountRequestProfiler.java
index d5fc2db..624395b 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/CountRequestProfiler.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/CountRequestProfiler.java
@@ -20,6 +20,7 @@
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -29,6 +30,7 @@
public class CountRequestProfiler implements IRequestProfiler {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final Logger LOGGER = LogManager.getLogger();
+ private static final Level LOG_LEVEL = Level.TRACE;
private final long logInterval;
private final AtomicLong listObjectsCounter;
private final AtomicLong getObjectCounter;
@@ -94,19 +96,21 @@
}
private void log() {
- long currentTime = System.nanoTime();
- if (currentTime - lastLogTimestamp >= logInterval) {
- // Might log multiple times
- lastLogTimestamp = currentTime;
- ObjectNode countersNode = OBJECT_MAPPER.createObjectNode();
- countersNode.put("listObjectsCounter", listObjectsCounter.get());
- countersNode.put("getObjectCounter", getObjectCounter.get());
- countersNode.put("writeObjectCounter", writeObjectCounter.get());
- countersNode.put("deleteObjectCounter", deleteObjectCounter.get());
- countersNode.put("copyObjectCounter", copyObjectCounter.get());
- countersNode.put("multipartUploadCounter", multipartUploadCounter.get());
- countersNode.put("multipartDownloadCounter", multipartDownloadCounter.get());
- LOGGER.debug("Cloud request counters: {}", countersNode.toString());
+ if (LOGGER.isEnabled(LOG_LEVEL)) {
+ long currentTime = System.nanoTime();
+ if (currentTime - lastLogTimestamp >= logInterval) {
+ // Might log multiple times
+ lastLogTimestamp = currentTime;
+ ObjectNode countersNode = OBJECT_MAPPER.createObjectNode();
+ countersNode.put("listObjectsCounter", listObjectsCounter.get());
+ countersNode.put("getObjectCounter", getObjectCounter.get());
+ countersNode.put("writeObjectCounter", writeObjectCounter.get());
+ countersNode.put("deleteObjectCounter", deleteObjectCounter.get());
+ countersNode.put("copyObjectCounter", copyObjectCounter.get());
+ countersNode.put("multipartUploadCounter", multipartUploadCounter.get());
+ countersNode.put("multipartDownloadCounter", multipartDownloadCounter.get());
+ LOGGER.log(LOG_LEVEL, "Cloud request counters: {}", countersNode.toString());
+ }
}
}
}
diff --git a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/s3/LSMS3Test.java b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/s3/LSMS3Test.java
index 01d3422..fad8081 100644
--- a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/s3/LSMS3Test.java
+++ b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/s3/LSMS3Test.java
@@ -24,6 +24,7 @@
import org.apache.asterix.cloud.clients.ICloudGuardian;
import org.apache.asterix.cloud.clients.aws.s3.S3ClientConfig;
import org.apache.asterix.cloud.clients.aws.s3.S3CloudClient;
+import org.apache.hyracks.util.StorageUtil;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -65,7 +66,9 @@
cleanup();
client.createBucket(CreateBucketRequest.builder().bucket(PLAYGROUND_CONTAINER).build());
LOGGER.info("Client created successfully");
- S3ClientConfig config = new S3ClientConfig(MOCK_SERVER_REGION, MOCK_SERVER_HOSTNAME, "", true, 0);
+ int writeBufferSize = StorageUtil.getIntSizeInBytes(5, StorageUtil.StorageUnit.MEGABYTE);
+ S3ClientConfig config =
+ new S3ClientConfig(MOCK_SERVER_REGION, MOCK_SERVER_HOSTNAME, "", true, 0, writeBufferSize);
CLOUD_CLIENT = new S3CloudClient(config, ICloudGuardian.NoOpCloudGuardian.INSTANCE);
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
index ea12621..5c612dc 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
@@ -56,7 +56,8 @@
CLOUD_STORAGE_INDEX_INACTIVE_DURATION_THRESHOLD(POSITIVE_INTEGER, 360),
CLOUD_STORAGE_DEBUG_MODE_ENABLED(BOOLEAN, false),
CLOUD_STORAGE_DEBUG_SWEEP_THRESHOLD_SIZE(LONG_BYTE_UNIT, StorageUtil.getLongSizeInBytes(1, GIGABYTE)),
- CLOUD_PROFILER_LOG_INTERVAL(NONNEGATIVE_INTEGER, 0);
+ CLOUD_PROFILER_LOG_INTERVAL(NONNEGATIVE_INTEGER, 5),
+ CLOUD_WRITE_BUFFER_SIZE(POSITIVE_INTEGER, StorageUtil.getIntSizeInBytes(8, StorageUtil.StorageUnit.MEGABYTE));
private final IOptionType interpreter;
private final Object defaultValue;
@@ -83,6 +84,7 @@
case CLOUD_STORAGE_DEBUG_SWEEP_THRESHOLD_SIZE:
case CLOUD_STORAGE_DEBUG_MODE_ENABLED:
case CLOUD_PROFILER_LOG_INTERVAL:
+ case CLOUD_WRITE_BUFFER_SIZE:
return Section.COMMON;
default:
return Section.NC;
@@ -139,6 +141,8 @@
return "The waiting time (in minutes) to log cloud request statistics (default: 0, which means"
+ " the profiler is disabled by default). The minimum is 1 minute."
+ " NOTE: Enabling the profiler could perturb the performance of cloud requests";
+ case CLOUD_WRITE_BUFFER_SIZE:
+ return "The write buffer size in bytes. (default: 8MB)";
default:
throw new IllegalStateException("NYI: " + this);
}
@@ -213,4 +217,8 @@
long interval = TimeUnit.MINUTES.toNanos(accessor.getInt(Option.CLOUD_PROFILER_LOG_INTERVAL));
return interval == 0 ? 0 : Math.max(interval, TimeUnit.MINUTES.toNanos(1));
}
+
+ public int getWriteBufferSize() {
+ return accessor.getInt(Option.CLOUD_WRITE_BUFFER_SIZE);
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
index 6fe0eb8..5fbc7c6 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
@@ -32,6 +32,7 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
+import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
import org.apache.asterix.common.utils.PartitioningScheme;
import org.apache.hyracks.api.config.IApplicationConfig;
@@ -240,15 +241,17 @@
return accessor.getInt(Option.STORAGE_MEMORYCOMPONENT_MAX_SCHEDULED_FLUSHES);
}
- public long getJobExecutionMemoryBudget() {
- final long jobExecutionMemory = MAX_HEAP_BYTES - getBufferCacheSize() - getMemoryComponentGlobalBudget();
- if (jobExecutionMemory <= 0) {
- final String msg = String.format(
- "Invalid node memory configuration, more memory budgeted than available in JVM. Runtime max memory:"
- + " (%d), Buffer cache memory (%d), memory component global budget (%d)",
- MAX_HEAP_BYTES, getBufferCacheSize(), getMemoryComponentGlobalBudget());
- throw new IllegalStateException(msg);
+ public long getJobExecutionMemoryBudget(INcApplicationContext runtimeContext) {
+ long jobExecutionMemory = MAX_HEAP_BYTES - getBufferCacheSize() - getMemoryComponentGlobalBudget();
+ if (runtimeContext.isCloudDeployment()) {
+ int numPartitions = runtimeContext.getIoManager().getIODevices().size();
+ int maxConcurrentMerges = getMaxConcurrentMerges(numPartitions);
+ int maxConcurrentFlushes = getMaxConcurrentFlushes(numPartitions);
+ int writeBufferSize = runtimeContext.getCloudProperties().getWriteBufferSize();
+ jobExecutionMemory -= (long) (maxConcurrentFlushes + maxConcurrentMerges) * writeBufferSize;
+
}
+ ensureJobExecutionMemory(jobExecutionMemory, runtimeContext);
return jobExecutionMemory;
}
@@ -260,7 +263,7 @@
return accessor.getString(Option.STORAGE_IO_SCHEDULER);
}
- public int geMaxConcurrentFlushes(int numPartitions) {
+ public int getMaxConcurrentFlushes(int numPartitions) {
int value = accessor.getInt(Option.STORAGE_MAX_CONCURRENT_FLUSHES_PER_PARTITION);
return value != 0 ? value * numPartitions : Integer.MAX_VALUE;
}
@@ -326,4 +329,23 @@
public long getStorageMaxComponentSize() {
return accessor.getLong(Option.STORAGE_MAX_COMPONENT_SIZE);
}
+
+ private void ensureJobExecutionMemory(long jobExecutionMemory, INcApplicationContext runtimeContext) {
+ if (jobExecutionMemory <= 0) {
+ String msg;
+ if (runtimeContext.isCloudDeployment()) {
+ msg = String.format(
+ "Invalid node memory configuration, more memory budgeted than available in JVM. Runtime max memory:"
+ + " (%d), Buffer cache memory (%d), memory component global budget (%d), cloud write buffer size (%d)",
+ MAX_HEAP_BYTES, getBufferCacheSize(), getMemoryComponentGlobalBudget(),
+ runtimeContext.getCloudProperties().getWriteBufferSize());
+ } else {
+ msg = String.format(
+ "Invalid node memory configuration, more memory budgeted than available in JVM. Runtime max memory:"
+ + " (%d), Buffer cache memory (%d), memory component global budget (%d)",
+ MAX_HEAP_BYTES, getBufferCacheSize(), getMemoryComponentGlobalBudget());
+ }
+ throw new IllegalStateException(msg);
+ }
+ }
}