[ASTERIXDB-3218][*DB] Introduce cloud client profiler

- user model changes: yes
- storage format changes: no
- interface changes: yes

Details:
As part of optimizing/reducing cloud requests,
this patch introduces a profiler, which counts
the number of requests for each request's type.
This may help  to identify which type of requests
may need further optimization.

Change-Id: Ic95b766b033f90394ba0081c4a1a6fbd39fb528e
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17631
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
index a6ab2e0..a67d326 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
@@ -10,6 +10,7 @@
     "active\.suspend\.timeout" : 3600,
     "azure.request.timeout" : 120,
     "cloud.deployment" : false,
+    "cloud.profiler.log.interval" : 0,
     "cloud.storage.anonymous.auth" : false,
     "cloud.storage.bucket" : "",
     "cloud.storage.cache.policy" : "lazy",
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
index 395c96c..0e90266 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
@@ -10,6 +10,7 @@
     "active\.suspend\.timeout" : 3600,
     "azure.request.timeout" : 120,
     "cloud.deployment" : false,
+    "cloud.profiler.log.interval" : 0,
     "cloud.storage.anonymous.auth" : false,
     "cloud.storage.bucket" : "",
     "cloud.storage.cache.policy" : "lazy",
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
index 8396311..931fd06 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
@@ -10,6 +10,7 @@
     "active\.suspend\.timeout" : 3600,
     "azure.request.timeout" : 120,
     "cloud.deployment" : false,
+    "cloud.profiler.log.interval" : 0,
     "cloud.storage.anonymous.auth" : false,
     "cloud.storage.bucket" : "",
     "cloud.storage.cache.policy" : "lazy",
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java
index 6c02bbc..71246f1 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java
@@ -24,6 +24,7 @@
 import java.util.concurrent.TimeUnit;
 
 import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
+import org.apache.asterix.cloud.clients.profiler.IRequestProfiler;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -39,18 +40,21 @@
 import software.amazon.awssdk.services.s3.model.UploadPartRequest;
 
 public class S3BufferedWriter implements ICloudBufferedWriter {
-    private final List<CompletedPart> partQueue;
-    private final String path;
-    private final S3Client s3Client;
-    private final String bucket;
-    private String uploadId;
-    private int partNumber;
     private static final int MAX_RETRIES = 3;
 
     private static final Logger LOGGER = LogManager.getLogger();
+    private final S3Client s3Client;
+    private final IRequestProfiler profiler;
+    private final String bucket;
+    private final String path;
+    private final List<CompletedPart> partQueue;
 
-    public S3BufferedWriter(S3Client s3client, String bucket, String path) {
+    private String uploadId;
+    private int partNumber;
+
+    public S3BufferedWriter(S3Client s3client, IRequestProfiler profiler, String bucket, String path) {
         this.s3Client = s3client;
+        this.profiler = profiler;
         this.bucket = bucket;
         this.path = path;
         partQueue = new ArrayList<>();
@@ -58,6 +62,7 @@
 
     @Override
     public int upload(InputStream stream, int length) {
+        profiler.objectMultipartUpload();
         setUploadId();
         UploadPartRequest upReq =
                 UploadPartRequest.builder().uploadId(uploadId).partNumber(partNumber).bucket(bucket).key(path).build();
@@ -108,6 +113,7 @@
     }
 
     private void completeMultipartUpload(CompleteMultipartUploadRequest request) throws HyracksDataException {
+        profiler.objectMultipartUpload();
         try {
             s3Client.completeMultipartUpload(request);
         } catch (Exception e) {
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
index bbe8586..fe1f4af 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
@@ -30,17 +30,21 @@
     private final String endpoint;
     private final String prefix;
     private final boolean anonymousAuth;
+    private final long profilerLogInterval;
 
-    public S3ClientConfig(String region, String endpoint, String prefix, boolean anonymousAuth) {
+    public S3ClientConfig(String region, String endpoint, String prefix, boolean anonymousAuth,
+            long profilerLogInterval) {
         this.region = region;
         this.endpoint = endpoint;
         this.prefix = prefix;
         this.anonymousAuth = anonymousAuth;
+        this.profilerLogInterval = profilerLogInterval;
     }
 
     public static S3ClientConfig of(CloudProperties cloudProperties) {
         return new S3ClientConfig(cloudProperties.getStorageRegion(), cloudProperties.getStorageEndpoint(),
-                cloudProperties.getStoragePrefix(), cloudProperties.isStorageAnonymousAuth());
+                cloudProperties.getStoragePrefix(), cloudProperties.isStorageAnonymousAuth(),
+                cloudProperties.getProfilerLogInterval());
     }
 
     public String getRegion() {
@@ -64,6 +68,10 @@
         return anonymousAuth ? AnonymousCredentialsProvider.create() : DefaultCredentialsProvider.create();
     }
 
+    public long getProfilerLogInterval() {
+        return profilerLogInterval;
+    }
+
     private boolean isS3Mock() {
         return endpoint != null && !endpoint.isEmpty();
     }
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
index 0fb9c08..c51f84a 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
@@ -41,6 +41,9 @@
 
 import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
 import org.apache.asterix.cloud.clients.ICloudClient;
+import org.apache.asterix.cloud.clients.profiler.CountRequestProfiler;
+import org.apache.asterix.cloud.clients.profiler.IRequestProfiler;
+import org.apache.asterix.cloud.clients.profiler.NoOpRequestProfiler;
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
@@ -73,17 +76,24 @@
 public class S3CloudClient implements ICloudClient {
 
     private static final Logger LOGGER = LogManager.getLogger();
+    // TODO(htowaileb): Temporary variables, can we get this from the used instance?
+    private static final double MAX_HOST_BANDWIDTH = 10.0; // in Gbps
 
     private final S3ClientConfig config;
     private final S3Client s3Client;
+    private final IRequestProfiler profiler;
     private S3TransferManager s3TransferManager;
 
-    // TODO(htowaileb): Temporary variables, can we get this from the used instance?
-    private static final double MAX_HOST_BANDWIDTH = 10.0; // in Gbps
-
     public S3CloudClient(S3ClientConfig config) {
         this.config = config;
         s3Client = buildClient();
+        long profilerInterval = config.getProfilerLogInterval();
+        if (profilerInterval > 0) {
+            profiler = new CountRequestProfiler(profilerInterval);
+        } else {
+            profiler = NoOpRequestProfiler.INSTANCE;
+        }
+
     }
 
     private S3Client buildClient() {
@@ -104,17 +114,19 @@
 
     @Override
     public ICloudBufferedWriter createBufferedWriter(String bucket, String path) {
-        return new S3BufferedWriter(s3Client, bucket, path);
+        return new S3BufferedWriter(s3Client, profiler, bucket, path);
     }
 
     @Override
     public Set<String> listObjects(String bucket, String path, FilenameFilter filter) {
+        profiler.objectsList();
         path = config.isEncodeKeys() ? encodeURI(path) : path;
         return filterAndGet(listS3Objects(s3Client, bucket, path), filter);
     }
 
     @Override
     public int read(String bucket, String path, long offset, ByteBuffer buffer) throws HyracksDataException {
+        profiler.objectGet();
         long readTo = offset + buffer.remaining();
         GetObjectRequest rangeGetObjectRequest =
                 GetObjectRequest.builder().range("bytes=" + offset + "-" + readTo).bucket(bucket).key(path).build();
@@ -141,6 +153,7 @@
 
     @Override
     public byte[] readAllBytes(String bucket, String path) throws HyracksDataException {
+        profiler.objectGet();
         GetObjectRequest getReq = GetObjectRequest.builder().bucket(bucket).key(path).build();
 
         try (ResponseInputStream<GetObjectResponse> stream = s3Client.getObject(getReq)) {
@@ -154,6 +167,7 @@
 
     @Override
     public InputStream getObjectStream(String bucket, String path) {
+        profiler.objectGet();
         GetObjectRequest getReq = GetObjectRequest.builder().bucket(bucket).key(path).build();
         try {
             return s3Client.getObject(getReq);
@@ -165,6 +179,7 @@
 
     @Override
     public void write(String bucket, String path, byte[] data) {
+        profiler.objectWrite();
         PutObjectRequest putReq = PutObjectRequest.builder().bucket(bucket).key(path).build();
 
         // TODO(htowaileb): add retry logic here
@@ -175,7 +190,10 @@
     public void copy(String bucket, String srcPath, FileReference destPath) {
         srcPath = config.isEncodeKeys() ? encodeURI(srcPath) : srcPath;
         List<S3Object> objects = listS3Objects(s3Client, bucket, srcPath);
+
+        profiler.objectsList();
         for (S3Object object : objects) {
+            profiler.objectCopy();
             String srcKey = object.key();
             String destKey = destPath.getChildPath(IoUtil.getFileNameFromPath(srcKey));
             CopyObjectRequest copyReq = CopyObjectRequest.builder().sourceBucket(bucket).sourceKey(srcKey)
@@ -191,6 +209,7 @@
             return;
         }
 
+        profiler.objectDelete();
         List<ObjectIdentifier> objectIdentifiers = new ArrayList<>();
         for (String file : fileList) {
             objectIdentifiers.add(ObjectIdentifier.builder().key(file).build());
@@ -202,6 +221,7 @@
 
     @Override
     public long getObjectSize(String bucket, String path) throws HyracksDataException {
+        profiler.objectGet();
         try {
             return s3Client.headObject(HeadObjectRequest.builder().bucket(bucket).key(path).build()).contentLength();
         } catch (NoSuchKeyException ex) {
@@ -213,6 +233,7 @@
 
     @Override
     public boolean exists(String bucket, String path) throws HyracksDataException {
+        profiler.objectGet();
         try {
             s3Client.headObject(HeadObjectRequest.builder().bucket(bucket).key(path).build());
             return true;
@@ -255,6 +276,8 @@
 
         try {
             for (CompletableFuture<CompletedDirectoryDownload> download : downloads) {
+                // multipart download
+                profiler.objectMultipartDownload();
                 download.join();
                 CompletedDirectoryDownload completedDirectoryDownload = download.get();
 
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/CountRequestProfiler.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/CountRequestProfiler.java
new file mode 100644
index 0000000..d5fc2db
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/CountRequestProfiler.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.clients.profiler;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+public class CountRequestProfiler implements IRequestProfiler {
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    private static final Logger LOGGER = LogManager.getLogger();
+    private final long logInterval;
+    private final AtomicLong listObjectsCounter;
+    private final AtomicLong getObjectCounter;
+    private final AtomicLong writeObjectCounter;
+    private final AtomicLong deleteObjectCounter;
+    private final AtomicLong copyObjectCounter;
+    private final AtomicLong multipartUploadCounter;
+    private final AtomicLong multipartDownloadCounter;
+    private long lastLogTimestamp;
+
+    public CountRequestProfiler(long logIntervalNanoSec) {
+        this.logInterval = logIntervalNanoSec;
+        listObjectsCounter = new AtomicLong();
+        getObjectCounter = new AtomicLong();
+        writeObjectCounter = new AtomicLong();
+        deleteObjectCounter = new AtomicLong();
+        copyObjectCounter = new AtomicLong();
+        multipartUploadCounter = new AtomicLong();
+        multipartDownloadCounter = new AtomicLong();
+        lastLogTimestamp = System.nanoTime();
+    }
+
+    @Override
+    public void objectsList() {
+        listObjectsCounter.incrementAndGet();
+        log();
+    }
+
+    @Override
+    public void objectGet() {
+        getObjectCounter.incrementAndGet();
+        log();
+    }
+
+    @Override
+    public void objectWrite() {
+        writeObjectCounter.incrementAndGet();
+        log();
+    }
+
+    @Override
+    public void objectDelete() {
+        deleteObjectCounter.incrementAndGet();
+        log();
+    }
+
+    @Override
+    public void objectCopy() {
+        copyObjectCounter.incrementAndGet();
+        log();
+    }
+
+    @Override
+    public void objectMultipartUpload() {
+        multipartUploadCounter.incrementAndGet();
+        log();
+    }
+
+    @Override
+    public void objectMultipartDownload() {
+        multipartDownloadCounter.incrementAndGet();
+        log();
+    }
+
+    private void log() {
+        long currentTime = System.nanoTime();
+        if (currentTime - lastLogTimestamp >= logInterval) {
+            // Might log multiple times
+            lastLogTimestamp = currentTime;
+            ObjectNode countersNode = OBJECT_MAPPER.createObjectNode();
+            countersNode.put("listObjectsCounter", listObjectsCounter.get());
+            countersNode.put("getObjectCounter", getObjectCounter.get());
+            countersNode.put("writeObjectCounter", writeObjectCounter.get());
+            countersNode.put("deleteObjectCounter", deleteObjectCounter.get());
+            countersNode.put("copyObjectCounter", copyObjectCounter.get());
+            countersNode.put("multipartUploadCounter", multipartUploadCounter.get());
+            countersNode.put("multipartDownloadCounter", multipartDownloadCounter.get());
+            LOGGER.debug("Cloud request counters: {}", countersNode.toString());
+        }
+    }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/IRequestProfiler.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/IRequestProfiler.java
new file mode 100644
index 0000000..9d1d86e
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/IRequestProfiler.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.clients.profiler;
+
+public interface IRequestProfiler {
+    void objectsList();
+
+    void objectGet();
+
+    void objectWrite();
+
+    void objectDelete();
+
+    void objectCopy();
+
+    void objectMultipartUpload();
+
+    void objectMultipartDownload();
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/NoOpRequestProfiler.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/NoOpRequestProfiler.java
new file mode 100644
index 0000000..2ef2769
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/NoOpRequestProfiler.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.clients.profiler;
+
+public class NoOpRequestProfiler implements IRequestProfiler {
+    public static final IRequestProfiler INSTANCE = new NoOpRequestProfiler();
+
+    private NoOpRequestProfiler() {
+    }
+
+    @Override
+    public void objectsList() {
+        // NoOp
+    }
+
+    @Override
+    public void objectGet() {
+        // NoOp
+    }
+
+    @Override
+    public void objectWrite() {
+        // NoOp
+    }
+
+    @Override
+    public void objectDelete() {
+        // NoOp
+    }
+
+    @Override
+    public void objectCopy() {
+        // NoOp
+    }
+
+    @Override
+    public void objectMultipartUpload() {
+        // NoOp
+    }
+
+    @Override
+    public void objectMultipartDownload() {
+        // NoOp
+    }
+}
diff --git a/asterixdb/asterix-cloud/src/test/java/org/apach/asterix/cloud/s3/LSMS3Test.java b/asterixdb/asterix-cloud/src/test/java/org/apach/asterix/cloud/s3/LSMS3Test.java
index 91b47e2..06286cc 100644
--- a/asterixdb/asterix-cloud/src/test/java/org/apach/asterix/cloud/s3/LSMS3Test.java
+++ b/asterixdb/asterix-cloud/src/test/java/org/apach/asterix/cloud/s3/LSMS3Test.java
@@ -64,7 +64,7 @@
         cleanup();
         client.createBucket(CreateBucketRequest.builder().bucket(PLAYGROUND_CONTAINER).build());
         LOGGER.info("Client created successfully");
-        S3ClientConfig config = new S3ClientConfig(MOCK_SERVER_REGION, MOCK_SERVER_HOSTNAME, "", true);
+        S3ClientConfig config = new S3ClientConfig(MOCK_SERVER_REGION, MOCK_SERVER_HOSTNAME, "", true, 0);
         CLOUD_CLIENT = new S3CloudClient(config);
     }
 
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
index 90c3acb..4b7649a8 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
@@ -19,8 +19,11 @@
 package org.apache.asterix.common.config;
 
 import static org.apache.hyracks.control.common.config.OptionTypes.BOOLEAN;
+import static org.apache.hyracks.control.common.config.OptionTypes.NONNEGATIVE_INTEGER;
 import static org.apache.hyracks.control.common.config.OptionTypes.STRING;
 
+import java.util.concurrent.TimeUnit;
+
 import org.apache.asterix.common.cloud.CloudCachePolicy;
 import org.apache.hyracks.api.config.IOption;
 import org.apache.hyracks.api.config.IOptionType;
@@ -39,7 +42,8 @@
         CLOUD_STORAGE_REGION(STRING, ""),
         CLOUD_STORAGE_ENDPOINT(STRING, ""),
         CLOUD_STORAGE_ANONYMOUS_AUTH(BOOLEAN, false),
-        CLOUD_STORAGE_CACHE_POLICY(STRING, "lazy");
+        CLOUD_STORAGE_CACHE_POLICY(STRING, "lazy"),
+        CLOUD_PROFILER_LOG_INTERVAL(NONNEGATIVE_INTEGER, 0);
 
         private final IOptionType interpreter;
         private final Object defaultValue;
@@ -59,6 +63,7 @@
                 case CLOUD_STORAGE_ENDPOINT:
                 case CLOUD_STORAGE_ANONYMOUS_AUTH:
                 case CLOUD_STORAGE_CACHE_POLICY:
+                case CLOUD_PROFILER_LOG_INTERVAL:
                     return Section.COMMON;
                 default:
                     return Section.NC;
@@ -84,6 +89,10 @@
                     return "The caching policy (either eager or lazy). 'Eager' caching will download all partitions"
                             + " upon booting, whereas lazy caching will download a file upon request to open it."
                             + " (default: 'lazy')";
+                case CLOUD_PROFILER_LOG_INTERVAL:
+                    return "The waiting time (in minutes) to log cloud request statistics (default: 0, which means"
+                            + " the profiler is disabled by default). The minimum is 1 minute."
+                            + " NOTE: Enabling the profiler could perturb the performance of cloud requests";
                 default:
                     throw new IllegalStateException("NYI: " + this);
             }
@@ -128,4 +137,9 @@
     public CloudCachePolicy getCloudCachePolicy() {
         return CloudCachePolicy.fromName(accessor.getString(Option.CLOUD_STORAGE_CACHE_POLICY));
     }
+
+    public long getProfilerLogInterval() {
+        long interval = TimeUnit.MINUTES.toNanos(accessor.getInt(Option.CLOUD_PROFILER_LOG_INTERVAL));
+        return interval == 0 ? 0 : Math.max(interval, TimeUnit.MINUTES.toNanos(1));
+    }
 }