Merge commit '79af1625' into 'master'

Change-Id: I82844bcf6a198ff9190b5e496e921877359901b5
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
index 2d5b57d..d83f1ea 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
@@ -55,6 +55,7 @@
 import org.apache.hyracks.algebricks.core.algebra.properties.LocalMemoryRequirements;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
@@ -100,6 +101,11 @@
     }
 
     @Override
+    public void createLocalMemoryRequirements(ILogicalOperator op, PhysicalOptimizationConfig physicalOpConfig) {
+        localMemoryRequirements = LocalMemoryRequirements.variableMemoryBudget(MIN_FRAME_LIMIT_FOR_TEXT_SEARCH);
+    }
+
+    @Override
     public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
             IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
             throws AlgebricksException {
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java
index c09cb34..574dc8a 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java
@@ -67,6 +67,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.ReplicatePOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.StableSortPOperator;
 import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
+import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
 import org.apache.hyracks.api.job.resource.IClusterCapacity;
 import org.junit.Assert;
 import org.junit.Test;
@@ -78,6 +79,7 @@
     private static final int FRAME_SIZE = 32768;
     private static final int PARALLELISM = 10;
     private static final long MAX_BUFFER_PER_CONNECTION = 1L;
+    private static final PhysicalOptimizationConfig physicalConfig = new PhysicalOptimizationConfig();
 
     @Test
     public void noBlockingPlan() throws AlgebricksException {
@@ -336,7 +338,7 @@
     private void assertRequiredMemory(List<PlanStage> stages, long expectedMemory) {
         for (PlanStage stage : stages) {
             for (ILogicalOperator op : stage.getOperators()) {
-                ((AbstractLogicalOperator) op).getPhysicalOperator().createLocalMemoryRequirements(op);
+                ((AbstractLogicalOperator) op).getPhysicalOperator().createLocalMemoryRequirements(op, physicalConfig);
             }
         }
         final IClusterCapacity clusterCapacity =
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
index 805f785..a1736da 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
@@ -9,8 +9,11 @@
     "active\.stop\.timeout" : 3600,
     "active\.suspend\.timeout" : 3600,
     "azure.request.timeout" : 120,
+    "cloud.acquire.token.timeout" : 100,
     "cloud.deployment" : false,
     "cloud.eviction.plan.reevaluate.threshold" : 50,
+    "cloud.max.read.requests.per.second" : 4000,
+    "cloud.max.write.requests.per.second" : 2500,
     "cloud.profiler.log.interval" : 5,
     "cloud.storage.allocation.percentage" : 0.8,
     "cloud.storage.anonymous.auth" : false,
@@ -38,7 +41,11 @@
     "compiler\.indexonly" : true,
     "compiler\.internal\.sanitycheck" : true,
     "compiler\.joinmemory" : 262144,
+    "compiler.min.groupmemory" : 524288,
+    "compiler.min.joinmemory" : 524288,
     "compiler\.min\.memory\.allocation" : true,
+    "compiler.min.sortmemory" : 524288,
+    "compiler.min.windowmemory" : 524288,
     "compiler\.parallelism" : 0,
     "compiler.queryplanshape" : "zigzag",
     "compiler.runtime.memory.overhead" : 5,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
index 35a488a..6f3bc38 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
@@ -9,8 +9,11 @@
     "active\.stop\.timeout" : 3600,
     "active\.suspend\.timeout" : 3600,
     "azure.request.timeout" : 120,
+    "cloud.acquire.token.timeout" : 100,
     "cloud.deployment" : false,
     "cloud.eviction.plan.reevaluate.threshold" : 50,
+    "cloud.max.read.requests.per.second" : 4000,
+    "cloud.max.write.requests.per.second" : 2500,
     "cloud.profiler.log.interval" : 5,
     "cloud.storage.allocation.percentage" : 0.8,
     "cloud.storage.anonymous.auth" : false,
@@ -38,7 +41,11 @@
     "compiler\.indexonly" : true,
     "compiler\.internal\.sanitycheck" : false,
     "compiler\.joinmemory" : 262144,
+    "compiler.min.groupmemory" : 524288,
+    "compiler.min.joinmemory" : 524288,
     "compiler\.min\.memory\.allocation" : true,
+    "compiler.min.sortmemory" : 524288,
+    "compiler.min.windowmemory" : 524288,
     "compiler\.parallelism" : -1,
     "compiler.queryplanshape" : "zigzag",
     "compiler.runtime.memory.overhead" : 5,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
index 13e7a4e..415bdba 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
@@ -9,8 +9,11 @@
     "active\.stop\.timeout" : 3600,
     "active\.suspend\.timeout" : 3600,
     "azure.request.timeout" : 120,
+    "cloud.acquire.token.timeout" : 100,
     "cloud.deployment" : false,
     "cloud.eviction.plan.reevaluate.threshold" : 50,
+    "cloud.max.read.requests.per.second" : 4000,
+    "cloud.max.write.requests.per.second" : 2500,
     "cloud.profiler.log.interval" : 5,
     "cloud.storage.allocation.percentage" : 0.8,
     "cloud.storage.anonymous.auth" : false,
@@ -38,7 +41,11 @@
     "compiler\.indexonly" : true,
     "compiler\.internal\.sanitycheck" : false,
     "compiler\.joinmemory" : 262144,
+    "compiler.min.groupmemory" : 524288,
+    "compiler.min.joinmemory" : 524288,
     "compiler\.min\.memory\.allocation" : true,
+    "compiler.min.sortmemory" : 524288,
+    "compiler.min.windowmemory" : 524288,
     "compiler\.parallelism" : 3,
     "compiler.queryplanshape" : "zigzag",
     "compiler.runtime.memory.overhead" : 5,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/dataset-resources/dataset-resources.6.regex b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/dataset-resources/dataset-resources.6.regex
index 13bbece..7996c33 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/dataset-resources/dataset-resources.6.regex
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/dataset-resources/dataset-resources.6.regex
@@ -1 +1 @@
-/memory\D+240844/
\ No newline at end of file
+/memory\D+688128/
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/dataset-resources/dataset-resources.7.regex b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/dataset-resources/dataset-resources.7.regex
index 13bbece..7996c33 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/dataset-resources/dataset-resources.7.regex
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/dataset-resources/dataset-resources.7.regex
@@ -1 +1 @@
-/memory\D+240844/
\ No newline at end of file
+/memory\D+688128/
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/jobs/jobs.2.regex b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/jobs/jobs.2.regex
index 13bbece..7996c33 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/jobs/jobs.2.regex
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/jobs/jobs.2.regex
@@ -1 +1 @@
-/memory\D+240844/
\ No newline at end of file
+/memory\D+688128/
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/ping/ping.2.regex b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/ping/ping.2.regex
index 13bbece..7996c33 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/ping/ping.2.regex
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/ping/ping.2.regex
@@ -1 +1 @@
-/memory\D+240844/
\ No newline at end of file
+/memory\D+688128/
\ No newline at end of file
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
index 209245b..b208714 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
@@ -25,7 +25,7 @@
 import java.util.Set;
 
 import org.apache.asterix.cloud.IWriteBufferProvider;
-import org.apache.asterix.cloud.clients.profiler.IRequestProfiler;
+import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.control.nc.io.IOManager;
@@ -43,9 +43,9 @@
     int getWriteBufferSize();
 
     /**
-     * @return the requests profiler
+     * @return the requests profiler-limiter
      */
-    IRequestProfiler getProfiler();
+    IRequestProfilerLimiter getProfilerLimiter();
 
     /**
      * Creates a cloud buffered writer
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java
index 2ec5fb5..4e1c0f7 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java
@@ -29,7 +29,7 @@
 import org.apache.asterix.cloud.CloudResettableInputStream;
 import org.apache.asterix.cloud.IWriteBufferProvider;
 import org.apache.asterix.cloud.clients.aws.s3.S3CloudClient;
-import org.apache.asterix.cloud.clients.profiler.IRequestProfiler;
+import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.control.nc.io.IOManager;
@@ -53,8 +53,8 @@
     }
 
     @Override
-    public IRequestProfiler getProfiler() {
-        return cloudClient.getProfiler();
+    public IRequestProfilerLimiter getProfilerLimiter() {
+        return cloudClient.getProfilerLimiter();
     }
 
     @Override
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java
index 940a0a6..53d6546 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java
@@ -25,7 +25,7 @@
 
 import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
 import org.apache.asterix.cloud.clients.ICloudGuardian;
-import org.apache.asterix.cloud.clients.profiler.IRequestProfiler;
+import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.cloud.util.CloudRetryableRequestUtil;
 import org.apache.logging.log4j.LogManager;
@@ -47,7 +47,7 @@
 
     private static final Logger LOGGER = LogManager.getLogger();
     private final S3Client s3Client;
-    private final IRequestProfiler profiler;
+    private final IRequestProfilerLimiter profiler;
     private final ICloudGuardian guardian;
     private final String bucket;
     private final String path;
@@ -56,7 +56,7 @@
     private String uploadId;
     private int partNumber;
 
-    public S3BufferedWriter(S3Client s3client, IRequestProfiler profiler, ICloudGuardian guardian, String bucket,
+    public S3BufferedWriter(S3Client s3client, IRequestProfilerLimiter profiler, ICloudGuardian guardian, String bucket,
             String path) {
         this.s3Client = s3client;
         this.profiler = profiler;
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
index fe73a0a..0b9b15c 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
@@ -38,21 +38,35 @@
     private final boolean anonymousAuth;
     private final long profilerLogInterval;
     private final int writeBufferSize;
+    private final long tokenAcquireTimeout;
+    private final int readMaxRequestsPerSeconds;
+    private final int writeMaxRequestsPerSeconds;
 
     public S3ClientConfig(String region, String endpoint, String prefix, boolean anonymousAuth,
             long profilerLogInterval, int writeBufferSize) {
+        this(region, endpoint, prefix, anonymousAuth, profilerLogInterval, writeBufferSize, 1, 0, 0);
+    }
+
+    private S3ClientConfig(String region, String endpoint, String prefix, boolean anonymousAuth,
+            long profilerLogInterval, int writeBufferSize, long tokenAcquireTimeout, int writeMaxRequestsPerSeconds,
+            int readMaxRequestsPerSeconds) {
         this.region = Objects.requireNonNull(region, "region");
         this.endpoint = endpoint;
         this.prefix = Objects.requireNonNull(prefix, "prefix");
         this.anonymousAuth = anonymousAuth;
         this.profilerLogInterval = profilerLogInterval;
         this.writeBufferSize = writeBufferSize;
+        this.tokenAcquireTimeout = tokenAcquireTimeout;
+        this.writeMaxRequestsPerSeconds = writeMaxRequestsPerSeconds;
+        this.readMaxRequestsPerSeconds = readMaxRequestsPerSeconds;
     }
 
     public static S3ClientConfig of(CloudProperties cloudProperties) {
         return new S3ClientConfig(cloudProperties.getStorageRegion(), cloudProperties.getStorageEndpoint(),
                 cloudProperties.getStoragePrefix(), cloudProperties.isStorageAnonymousAuth(),
-                cloudProperties.getProfilerLogInterval(), cloudProperties.getWriteBufferSize());
+                cloudProperties.getProfilerLogInterval(), cloudProperties.getWriteBufferSize(),
+                cloudProperties.getTokenAcquireTimeout(), cloudProperties.getWriteMaxRequestsPerSecond(),
+                cloudProperties.getReadMaxRequestsPerSecond());
     }
 
     public static S3ClientConfig of(Map<String, String> configuration, int writeBufferSize) {
@@ -98,7 +112,20 @@
         return writeBufferSize;
     }
 
+    public long getTokenAcquireTimeout() {
+        return tokenAcquireTimeout;
+    }
+
+    public int getWriteMaxRequestsPerSeconds() {
+        return writeMaxRequestsPerSeconds;
+    }
+
+    public int getReadMaxRequestsPerSeconds() {
+        return readMaxRequestsPerSeconds;
+    }
+
     private boolean isS3Mock() {
         return endpoint != null && !endpoint.isEmpty();
     }
+
 }
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
index a46b61f..24d5fa9 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
@@ -43,9 +43,9 @@
 import org.apache.asterix.cloud.clients.ICloudGuardian;
 import org.apache.asterix.cloud.clients.ICloudWriter;
 import org.apache.asterix.cloud.clients.IParallelDownloader;
-import org.apache.asterix.cloud.clients.profiler.CountRequestProfiler;
-import org.apache.asterix.cloud.clients.profiler.IRequestProfiler;
-import org.apache.asterix.cloud.clients.profiler.NoOpRequestProfiler;
+import org.apache.asterix.cloud.clients.profiler.CountRequestProfilerLimiter;
+import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter;
+import org.apache.asterix.cloud.clients.profiler.RequestLimiterNoOpProfiler;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.util.IoUtil;
@@ -78,7 +78,7 @@
     private final S3ClientConfig config;
     private final S3Client s3Client;
     private final ICloudGuardian guardian;
-    private final IRequestProfiler profiler;
+    private final IRequestProfilerLimiter profiler;
     private final int writeBufferSize;
 
     public S3CloudClient(S3ClientConfig config, ICloudGuardian guardian) {
@@ -91,10 +91,11 @@
         this.guardian = guardian;
         this.writeBufferSize = config.getWriteBufferSize();
         long profilerInterval = config.getProfilerLogInterval();
+        S3RequestRateLimiter limiter = new S3RequestRateLimiter(config);
         if (profilerInterval > 0) {
-            profiler = new CountRequestProfiler(profilerInterval);
+            profiler = new CountRequestProfilerLimiter(profilerInterval, limiter);
         } else {
-            profiler = NoOpRequestProfiler.INSTANCE;
+            profiler = new RequestLimiterNoOpProfiler(limiter);
         }
         guardian.setCloudClient(this);
     }
@@ -105,7 +106,7 @@
     }
 
     @Override
-    public IRequestProfiler getProfiler() {
+    public IRequestProfilerLimiter getProfilerLimiter() {
         return profiler;
     }
 
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
index 005fa5a..2eb9f09 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
@@ -30,7 +30,7 @@
 import java.util.concurrent.ExecutionException;
 
 import org.apache.asterix.cloud.clients.IParallelDownloader;
-import org.apache.asterix.cloud.clients.profiler.IRequestProfiler;
+import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter;
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
@@ -58,9 +58,9 @@
     private final S3AsyncClient s3AsyncClient;
     private final S3TransferManager transferManager;
     private final S3ClientConfig config;
-    private final IRequestProfiler profiler;
+    private final IRequestProfilerLimiter profiler;
 
-    S3ParallelDownloader(String bucket, IOManager ioManager, S3ClientConfig config, IRequestProfiler profiler) {
+    S3ParallelDownloader(String bucket, IOManager ioManager, S3ClientConfig config, IRequestProfilerLimiter profiler) {
         this.bucket = bucket;
         this.ioManager = ioManager;
         this.config = config;
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3RequestRateLimiter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3RequestRateLimiter.java
new file mode 100644
index 0000000..37387a6
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3RequestRateLimiter.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.clients.aws.s3;
+
+import org.apache.asterix.cloud.clients.profiler.limiter.IRateLimiter;
+import org.apache.asterix.cloud.clients.profiler.limiter.IRequestRateLimiter;
+import org.apache.asterix.cloud.clients.profiler.limiter.NoOpRateLimiter;
+import org.apache.asterix.cloud.clients.profiler.limiter.TokenBasedRateLimiter;
+
+public final class S3RequestRateLimiter implements IRequestRateLimiter {
+    private final IRateLimiter writeLimiter;
+    private final IRateLimiter readLimiter;
+
+    public S3RequestRateLimiter(S3ClientConfig config) {
+        long tokenAcquireTimeout = config.getTokenAcquireTimeout();
+        this.writeLimiter = createLimiter(config.getWriteMaxRequestsPerSeconds(), tokenAcquireTimeout);
+        this.readLimiter = createLimiter(config.getReadMaxRequestsPerSeconds(), tokenAcquireTimeout);
+    }
+
+    @Override
+    public void writeRequest() {
+        writeLimiter.acquire();
+    }
+
+    @Override
+    public void readRequest() {
+        readLimiter.acquire();
+    }
+
+    @Override
+    public void listRequest() {
+        // List requests in S3 are considered as PUT
+        writeLimiter.acquire();
+    }
+
+    private static IRateLimiter createLimiter(int maxRequestsPerSecond, long tokeAcquireTimeout) {
+        if (maxRequestsPerSecond > 0) {
+            return new TokenBasedRateLimiter(maxRequestsPerSecond, tokeAcquireTimeout);
+        }
+        return NoOpRateLimiter.INSTANCE;
+    }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
index 2874b4e..de242bd 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
@@ -38,9 +38,10 @@
 import org.apache.asterix.cloud.clients.ICloudGuardian;
 import org.apache.asterix.cloud.clients.ICloudWriter;
 import org.apache.asterix.cloud.clients.IParallelDownloader;
-import org.apache.asterix.cloud.clients.profiler.CountRequestProfiler;
-import org.apache.asterix.cloud.clients.profiler.IRequestProfiler;
-import org.apache.asterix.cloud.clients.profiler.NoOpRequestProfiler;
+import org.apache.asterix.cloud.clients.profiler.CountRequestProfilerLimiter;
+import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter;
+import org.apache.asterix.cloud.clients.profiler.NoOpRequestProfilerLimiter;
+import org.apache.asterix.cloud.clients.profiler.limiter.NoOpRequestLimiter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.util.IoUtil;
@@ -66,7 +67,7 @@
     private final Storage gcsClient;
     private final GCSClientConfig config;
     private final ICloudGuardian guardian;
-    private final IRequestProfiler profiler;
+    private final IRequestProfilerLimiter profilerLimiter;
 
     public GCSCloudClient(GCSClientConfig config, Storage gcsClient, ICloudGuardian guardian) {
         this.gcsClient = gcsClient;
@@ -74,9 +75,9 @@
         this.guardian = guardian;
         long profilerInterval = config.getProfilerLogInterval();
         if (profilerInterval > 0) {
-            profiler = new CountRequestProfiler(profilerInterval);
+            profilerLimiter = new CountRequestProfilerLimiter(profilerInterval, NoOpRequestLimiter.INSTANCE);
         } else {
-            profiler = NoOpRequestProfiler.INSTANCE;
+            profilerLimiter = NoOpRequestProfilerLimiter.INSTANCE;
         }
         guardian.setCloudClient(this);
     }
@@ -91,19 +92,19 @@
     }
 
     @Override
-    public IRequestProfiler getProfiler() {
-        return profiler;
+    public IRequestProfilerLimiter getProfilerLimiter() {
+        return profilerLimiter;
     }
 
     @Override
     public ICloudWriter createWriter(String bucket, String path, IWriteBufferProvider bufferProvider) {
-        return new GCSWriter(bucket, path, gcsClient, profiler);
+        return new GCSWriter(bucket, path, gcsClient, profilerLimiter);
     }
 
     @Override
     public Set<CloudFile> listObjects(String bucket, String path, FilenameFilter filter) {
         guardian.checkReadAccess(bucket, path);
-        profiler.objectsList();
+        profilerLimiter.objectsList();
         Page<Blob> blobs =
                 gcsClient.list(bucket, BlobListOption.prefix(path), BlobListOption.fields(Storage.BlobField.SIZE));
 
@@ -118,7 +119,7 @@
 
     @Override
     public int read(String bucket, String path, long offset, ByteBuffer buffer) throws HyracksDataException {
-        profiler.objectGet();
+        profilerLimiter.objectGet();
         BlobId blobId = BlobId.of(bucket, path);
         long readTo = offset + buffer.remaining();
         int totalRead = 0;
@@ -139,7 +140,7 @@
 
     @Override
     public byte[] readAllBytes(String bucket, String path) {
-        profiler.objectGet();
+        profilerLimiter.objectGet();
         BlobId blobId = BlobId.of(bucket, path);
         try {
             return gcsClient.readAllBytes(blobId);
@@ -150,7 +151,7 @@
 
     @Override
     public InputStream getObjectStream(String bucket, String path, long offset, long length) {
-        profiler.objectGet();
+        profilerLimiter.objectGet();
         try (ReadChannel reader = gcsClient.reader(bucket, path).limit(offset + length)) {
             reader.seek(offset);
             return Channels.newInputStream(reader);
@@ -162,7 +163,7 @@
     @Override
     public void write(String bucket, String path, byte[] data) {
         guardian.checkWriteAccess(bucket, path);
-        profiler.objectWrite();
+        profilerLimiter.objectWrite();
         BlobInfo blobInfo = BlobInfo.newBuilder(bucket, path).build();
         gcsClient.create(blobInfo, data);
     }
@@ -170,9 +171,9 @@
     @Override
     public void copy(String bucket, String srcPath, FileReference destPath) {
         Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.prefix(srcPath));
-        profiler.objectsList();
+        profilerLimiter.objectsList();
         for (Blob blob : blobs.iterateAll()) {
-            profiler.objectCopy();
+            profilerLimiter.objectCopy();
             BlobId source = blob.getBlobId();
             String targetName = destPath.getChildPath(IoUtil.getFileNameFromPath(source.getName()));
             BlobId target = BlobId.of(bucket, targetName);
@@ -199,14 +200,14 @@
             }
 
             batchRequest.submit();
-            profiler.objectDelete();
+            profilerLimiter.objectDelete();
         }
     }
 
     @Override
     public long getObjectSize(String bucket, String path) {
         guardian.checkReadAccess(bucket, path);
-        profiler.objectGet();
+        profilerLimiter.objectGet();
         Blob blob = gcsClient.get(bucket, path, Storage.BlobGetOption.fields(Storage.BlobField.SIZE));
         if (blob == null) {
             return 0;
@@ -217,7 +218,7 @@
     @Override
     public boolean exists(String bucket, String path) {
         guardian.checkReadAccess(bucket, path);
-        profiler.objectGet();
+        profilerLimiter.objectGet();
         Blob blob = gcsClient.get(bucket, path, Storage.BlobGetOption.fields(Storage.BlobField.values()));
         return blob != null && blob.exists();
     }
@@ -225,7 +226,7 @@
     @Override
     public boolean isEmptyPrefix(String bucket, String path) {
         guardian.checkReadAccess(bucket, path);
-        profiler.objectsList();
+        profilerLimiter.objectsList();
         Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.prefix(path));
         return !blobs.hasNextPage();
     }
@@ -233,13 +234,13 @@
     @Override
     public IParallelDownloader createParallelDownloader(String bucket, IOManager ioManager)
             throws HyracksDataException {
-        return new GCSParallelDownloader(bucket, ioManager, config, profiler);
+        return new GCSParallelDownloader(bucket, ioManager, config, profilerLimiter);
     }
 
     @Override
     public JsonNode listAsJson(ObjectMapper objectMapper, String bucket) {
         guardian.checkReadAccess(bucket, "/");
-        profiler.objectsList();
+        profilerLimiter.objectsList();
         Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.fields(Storage.BlobField.SIZE));
         ArrayNode objectsInfo = objectMapper.createArrayNode();
 
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java
index 281a855..0994cea 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java
@@ -29,7 +29,7 @@
 import java.util.Set;
 
 import org.apache.asterix.cloud.clients.IParallelDownloader;
-import org.apache.asterix.cloud.clients.profiler.IRequestProfiler;
+import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter;
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
@@ -55,10 +55,10 @@
     private final IOManager ioManager;
     private final Storage gcsClient;
     private final TransferManager transferManager;
-    private final IRequestProfiler profiler;
+    private final IRequestProfilerLimiter profiler;
 
-    public GCSParallelDownloader(String bucket, IOManager ioManager, GCSClientConfig config, IRequestProfiler profiler)
-            throws HyracksDataException {
+    public GCSParallelDownloader(String bucket, IOManager ioManager, GCSClientConfig config,
+            IRequestProfilerLimiter profiler) throws HyracksDataException {
         this.bucket = bucket;
         this.ioManager = ioManager;
         this.profiler = profiler;
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
index d9119a5..41d1a71 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
@@ -24,7 +24,7 @@
 import java.nio.ByteBuffer;
 
 import org.apache.asterix.cloud.clients.ICloudWriter;
-import org.apache.asterix.cloud.clients.profiler.IRequestProfiler;
+import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -38,12 +38,12 @@
     private static final Logger LOGGER = LogManager.getLogger();
     private final String bucket;
     private final String path;
-    private final IRequestProfiler profiler;
+    private final IRequestProfilerLimiter profiler;
     private final Storage gcsClient;
     private WriteChannel writer = null;
     private long writtenBytes;
 
-    public GCSWriter(String bucket, String path, Storage gcsClient, IRequestProfiler profiler) {
+    public GCSWriter(String bucket, String path, Storage gcsClient, IRequestProfilerLimiter profiler) {
         this.bucket = bucket;
         this.path = path;
         this.profiler = profiler;
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/CountRequestProfiler.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/CountRequestProfilerLimiter.java
similarity index 89%
rename from asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/CountRequestProfiler.java
rename to asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/CountRequestProfilerLimiter.java
index 3fc378c..16ffb7a 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/CountRequestProfiler.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/CountRequestProfilerLimiter.java
@@ -20,6 +20,7 @@
 
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.asterix.cloud.clients.profiler.limiter.IRequestRateLimiter;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -27,11 +28,12 @@
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 
-public class CountRequestProfiler implements IRequestProfiler {
+public class CountRequestProfilerLimiter implements IRequestProfilerLimiter {
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
     private static final Logger LOGGER = LogManager.getLogger();
     private static final Level LOG_LEVEL = Level.TRACE;
     private final long logInterval;
+    private final IRequestRateLimiter limiter;
     private final AtomicLong listObjectsCounter;
     private final AtomicLong getObjectCounter;
     private final AtomicLong writeObjectCounter;
@@ -41,8 +43,9 @@
     private final AtomicLong multipartDownloadCounter;
     private long lastLogTimestamp;
 
-    public CountRequestProfiler(long logIntervalNanoSec) {
+    public CountRequestProfilerLimiter(long logIntervalNanoSec, IRequestRateLimiter limiter) {
         this.logInterval = logIntervalNanoSec;
+        this.limiter = limiter;
         listObjectsCounter = new AtomicLong();
         getObjectCounter = new AtomicLong();
         writeObjectCounter = new AtomicLong();
@@ -55,42 +58,49 @@
 
     @Override
     public void objectsList() {
+        limiter.listRequest();
         listObjectsCounter.incrementAndGet();
         log();
     }
 
     @Override
     public void objectGet() {
+        limiter.readRequest();
         getObjectCounter.incrementAndGet();
         log();
     }
 
     @Override
     public void objectWrite() {
+        limiter.writeRequest();
         writeObjectCounter.incrementAndGet();
         log();
     }
 
     @Override
     public void objectDelete() {
+        limiter.writeRequest();
         deleteObjectCounter.incrementAndGet();
         log();
     }
 
     @Override
     public void objectCopy() {
+        limiter.writeRequest();
         copyObjectCounter.incrementAndGet();
         log();
     }
 
     @Override
     public void objectMultipartUpload() {
+        limiter.writeRequest();
         multipartUploadCounter.incrementAndGet();
         log();
     }
 
     @Override
     public void objectMultipartDownload() {
+        limiter.readRequest();
         multipartDownloadCounter.incrementAndGet();
         log();
     }
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/IRequestProfiler.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/IRequestProfilerLimiter.java
similarity index 96%
rename from asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/IRequestProfiler.java
rename to asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/IRequestProfilerLimiter.java
index a23a78f..b86cd48 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/IRequestProfiler.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/IRequestProfilerLimiter.java
@@ -18,7 +18,7 @@
  */
 package org.apache.asterix.cloud.clients.profiler;
 
-public interface IRequestProfiler {
+public interface IRequestProfilerLimiter {
     void objectsList();
 
     void objectGet();
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/NoOpRequestProfiler.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/NoOpRequestProfilerLimiter.java
similarity index 89%
rename from asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/NoOpRequestProfiler.java
rename to asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/NoOpRequestProfilerLimiter.java
index f7e89c3..ab658f5 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/NoOpRequestProfiler.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/NoOpRequestProfilerLimiter.java
@@ -18,10 +18,10 @@
  */
 package org.apache.asterix.cloud.clients.profiler;
 
-public class NoOpRequestProfiler implements IRequestProfiler {
-    public static final IRequestProfiler INSTANCE = new NoOpRequestProfiler();
+public class NoOpRequestProfilerLimiter implements IRequestProfilerLimiter {
+    public static final IRequestProfilerLimiter INSTANCE = new NoOpRequestProfilerLimiter();
 
-    private NoOpRequestProfiler() {
+    private NoOpRequestProfilerLimiter() {
     }
 
     @Override
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/NoOpRequestProfiler.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/RequestLimiterNoOpProfiler.java
similarity index 77%
copy from asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/NoOpRequestProfiler.java
copy to asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/RequestLimiterNoOpProfiler.java
index f7e89c3..cce2f8e 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/NoOpRequestProfiler.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/RequestLimiterNoOpProfiler.java
@@ -18,45 +18,48 @@
  */
 package org.apache.asterix.cloud.clients.profiler;
 
-public class NoOpRequestProfiler implements IRequestProfiler {
-    public static final IRequestProfiler INSTANCE = new NoOpRequestProfiler();
+import org.apache.asterix.cloud.clients.profiler.limiter.IRequestRateLimiter;
 
-    private NoOpRequestProfiler() {
+public final class RequestLimiterNoOpProfiler implements IRequestProfilerLimiter {
+    private final IRequestRateLimiter limiter;
+
+    public RequestLimiterNoOpProfiler(IRequestRateLimiter limiter) {
+        this.limiter = limiter;
     }
 
     @Override
     public void objectsList() {
-        // NoOp
+        limiter.listRequest();
     }
 
     @Override
     public void objectGet() {
-        // NoOp
+        limiter.readRequest();
     }
 
     @Override
     public void objectWrite() {
-        // NoOp
+        limiter.writeRequest();
     }
 
     @Override
     public void objectDelete() {
-        // NoOp
+        limiter.writeRequest();
     }
 
     @Override
     public void objectCopy() {
-        // NoOp
+        limiter.writeRequest();
     }
 
     @Override
     public void objectMultipartUpload() {
-        // NoOp
+        limiter.writeRequest();
     }
 
     @Override
     public void objectMultipartDownload() {
-        // NoOp
+        limiter.writeRequest();
     }
 
     @Override
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/IRequestProfiler.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/limiter/IRateLimiter.java
similarity index 61%
copy from asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/IRequestProfiler.java
copy to asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/limiter/IRateLimiter.java
index a23a78f..741735b 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/IRequestProfiler.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/limiter/IRateLimiter.java
@@ -16,35 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.cloud.clients.profiler;
+package org.apache.asterix.cloud.clients.profiler.limiter;
 
-public interface IRequestProfiler {
-    void objectsList();
-
-    void objectGet();
-
-    void objectWrite();
-
-    void objectDelete();
-
-    void objectCopy();
-
-    void objectMultipartUpload();
-
-    void objectMultipartDownload();
-
-    long objectsListCount();
-
-    long objectGetCount();
-
-    long objectWriteCount();
-
-    long objectDeleteCount();
-
-    long objectCopyCount();
-
-    long objectMultipartUploadCount();
-
-    long objectMultipartDownloadCount();
-
+/**
+ * Rate limiter
+ */
+public interface IRateLimiter {
+    /**
+     * Acquire permit or wait if rate limit exceeded
+     */
+    void acquire();
 }
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/IRequestProfiler.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/limiter/IRequestRateLimiter.java
similarity index 61%
copy from asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/IRequestProfiler.java
copy to asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/limiter/IRequestRateLimiter.java
index a23a78f..98b2eab 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/IRequestProfiler.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/limiter/IRequestRateLimiter.java
@@ -16,35 +16,25 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.cloud.clients.profiler;
+package org.apache.asterix.cloud.clients.profiler.limiter;
 
-public interface IRequestProfiler {
-    void objectsList();
+/**
+ * Rate limiter for Cloud request. If the number of requests per seconds exceeds the provided limit, then
+ * the requester threads will be throttled.
+ */
+public interface IRequestRateLimiter {
+    /**
+     * Perform a write request
+     */
+    void writeRequest();
 
-    void objectGet();
+    /**
+     * Perform a read request
+     */
+    void readRequest();
 
-    void objectWrite();
-
-    void objectDelete();
-
-    void objectCopy();
-
-    void objectMultipartUpload();
-
-    void objectMultipartDownload();
-
-    long objectsListCount();
-
-    long objectGetCount();
-
-    long objectWriteCount();
-
-    long objectDeleteCount();
-
-    long objectCopyCount();
-
-    long objectMultipartUploadCount();
-
-    long objectMultipartDownloadCount();
-
+    /**
+     * Perform a list request
+     */
+    void listRequest();
 }
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/IRequestProfiler.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/limiter/NoOpRateLimiter.java
similarity index 61%
copy from asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/IRequestProfiler.java
copy to asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/limiter/NoOpRateLimiter.java
index a23a78f..4dd8c88 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/IRequestProfiler.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/limiter/NoOpRateLimiter.java
@@ -16,35 +16,16 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.cloud.clients.profiler;
+package org.apache.asterix.cloud.clients.profiler.limiter;
 
-public interface IRequestProfiler {
-    void objectsList();
+public class NoOpRateLimiter implements IRateLimiter {
+    public static final NoOpRateLimiter INSTANCE = new NoOpRateLimiter();
 
-    void objectGet();
+    private NoOpRateLimiter() {
+    }
 
-    void objectWrite();
-
-    void objectDelete();
-
-    void objectCopy();
-
-    void objectMultipartUpload();
-
-    void objectMultipartDownload();
-
-    long objectsListCount();
-
-    long objectGetCount();
-
-    long objectWriteCount();
-
-    long objectDeleteCount();
-
-    long objectCopyCount();
-
-    long objectMultipartUploadCount();
-
-    long objectMultipartDownloadCount();
-
+    @Override
+    public void acquire() {
+        // NoOp
+    }
 }
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/IRequestProfiler.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/limiter/NoOpRequestLimiter.java
similarity index 61%
copy from asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/IRequestProfiler.java
copy to asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/limiter/NoOpRequestLimiter.java
index a23a78f..ea89a2e 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/IRequestProfiler.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/limiter/NoOpRequestLimiter.java
@@ -16,35 +16,26 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.cloud.clients.profiler;
+package org.apache.asterix.cloud.clients.profiler.limiter;
 
-public interface IRequestProfiler {
-    void objectsList();
+public final class NoOpRequestLimiter implements IRequestRateLimiter {
+    public static final IRequestRateLimiter INSTANCE = new NoOpRequestLimiter();
 
-    void objectGet();
+    private NoOpRequestLimiter() {
+    }
 
-    void objectWrite();
+    @Override
+    public void writeRequest() {
+        // NoOp
+    }
 
-    void objectDelete();
+    @Override
+    public void readRequest() {
+        // NoOp
+    }
 
-    void objectCopy();
-
-    void objectMultipartUpload();
-
-    void objectMultipartDownload();
-
-    long objectsListCount();
-
-    long objectGetCount();
-
-    long objectWriteCount();
-
-    long objectDeleteCount();
-
-    long objectCopyCount();
-
-    long objectMultipartUploadCount();
-
-    long objectMultipartDownloadCount();
-
+    @Override
+    public void listRequest() {
+        // NoOp
+    }
 }
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/limiter/TokenBasedRateLimiter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/limiter/TokenBasedRateLimiter.java
new file mode 100644
index 0000000..a0273fb
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/profiler/limiter/TokenBasedRateLimiter.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.clients.profiler.limiter;
+
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public final class TokenBasedRateLimiter implements IRateLimiter {
+    private static final Logger LOGGER = LogManager.getLogger();
+    private static final long SECOND_NANO = TimeUnit.SECONDS.toNanos(1);
+    private final long acquireTimeoutNano;
+    private final int maxTokensPerSecond;
+    private final Semaphore semaphore;
+    private final AtomicLong lastRefillTime;
+
+    /**
+     * Token-based request limiter
+     *
+     * @param maxRequestsPerSecond maximum number of requests per seconds
+     * @param acquireTimeoutMillis timeout to refill and retry acquiring a token
+     */
+    public TokenBasedRateLimiter(int maxRequestsPerSecond, long acquireTimeoutMillis) {
+        this.maxTokensPerSecond = maxRequestsPerSecond;
+        this.acquireTimeoutNano = TimeUnit.MILLISECONDS.toNanos(acquireTimeoutMillis);
+        this.semaphore = new Semaphore(maxRequestsPerSecond);
+        this.lastRefillTime = new AtomicLong(System.nanoTime());
+    }
+
+    @Override
+    public void acquire() {
+        while (true) {
+            refillTokens();
+            try {
+                if (semaphore.tryAcquire(acquireTimeoutNano, TimeUnit.NANOSECONDS)) {
+                    return;
+                }
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                LOGGER.debug("Interrupted while waiting for acquiring a request token", e);
+                return;
+            }
+        }
+    }
+
+    private void refillTokens() {
+        long refillTime = lastRefillTime.get();
+        long now = System.nanoTime();
+        long elapsedTime = now - refillTime;
+        if (elapsedTime > SECOND_NANO && lastRefillTime.compareAndSet(refillTime, now)) {
+            int delta = maxTokensPerSecond - semaphore.availablePermits();
+            if (delta > 0) {
+                semaphore.release(delta);
+            }
+        }
+    }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
index a4d3b7a..5e8d22d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
@@ -58,6 +58,9 @@
         CLOUD_STORAGE_DEBUG_MODE_ENABLED(BOOLEAN, false),
         CLOUD_STORAGE_DEBUG_SWEEP_THRESHOLD_SIZE(LONG_BYTE_UNIT, StorageUtil.getLongSizeInBytes(1, GIGABYTE)),
         CLOUD_PROFILER_LOG_INTERVAL(NONNEGATIVE_INTEGER, 5),
+        CLOUD_ACQUIRE_TOKEN_TIMEOUT(POSITIVE_INTEGER, 100),
+        CLOUD_MAX_WRITE_REQUESTS_PER_SECOND(NONNEGATIVE_INTEGER, 2500),
+        CLOUD_MAX_READ_REQUESTS_PER_SECOND(NONNEGATIVE_INTEGER, 4000),
         CLOUD_WRITE_BUFFER_SIZE(
                 getRangedIntegerType(5, Integer.MAX_VALUE),
                 StorageUtil.getIntSizeInBytes(8, StorageUtil.StorageUnit.MEGABYTE)),
@@ -88,6 +91,9 @@
                 case CLOUD_STORAGE_DEBUG_SWEEP_THRESHOLD_SIZE:
                 case CLOUD_STORAGE_DEBUG_MODE_ENABLED:
                 case CLOUD_PROFILER_LOG_INTERVAL:
+                case CLOUD_ACQUIRE_TOKEN_TIMEOUT:
+                case CLOUD_MAX_WRITE_REQUESTS_PER_SECOND:
+                case CLOUD_MAX_READ_REQUESTS_PER_SECOND:
                 case CLOUD_WRITE_BUFFER_SIZE:
                 case CLOUD_EVICTION_PLAN_REEVALUATE_THRESHOLD:
                     return Section.COMMON;
@@ -146,6 +152,13 @@
                     return "The waiting time (in minutes) to log cloud request statistics (default: 0, which means"
                             + " the profiler is disabled by default). The minimum is 1 minute."
                             + " NOTE: Enabling the profiler could perturb the performance of cloud requests";
+                case CLOUD_ACQUIRE_TOKEN_TIMEOUT:
+                    return "The waiting time (in milliseconds) if a requesting thread failed to acquire a token if the"
+                            + " rate limit of cloud requests exceeded (default: 100, min: 1, and max: 5000)";
+                case CLOUD_MAX_WRITE_REQUESTS_PER_SECOND:
+                    return "The maximum number of write requests per second (default: 2500, 0 means unlimited)";
+                case CLOUD_MAX_READ_REQUESTS_PER_SECOND:
+                    return "The maximum number of read requests per second (default: 4000, 0 means unlimited)";
                 case CLOUD_WRITE_BUFFER_SIZE:
                     return "The write buffer size in bytes. (default: 8MB, min: 5MB)";
                 case CLOUD_EVICTION_PLAN_REEVALUATE_THRESHOLD:
@@ -225,6 +238,19 @@
         return interval == 0 ? 0 : Math.max(interval, TimeUnit.MINUTES.toNanos(1));
     }
 
+    public long getTokenAcquireTimeout() {
+        int time = accessor.getInt(Option.CLOUD_PROFILER_LOG_INTERVAL);
+        return Math.max(time, 5000);
+    }
+
+    public int getWriteMaxRequestsPerSecond() {
+        return accessor.getInt(Option.CLOUD_MAX_WRITE_REQUESTS_PER_SECOND);
+    }
+
+    public int getReadMaxRequestsPerSecond() {
+        return accessor.getInt(Option.CLOUD_MAX_READ_REQUESTS_PER_SECOND);
+    }
+
     public int getWriteBufferSize() {
         return accessor.getInt(Option.CLOUD_WRITE_BUFFER_SIZE);
     }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java
index cfe7ce8..8ca5c94 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java
@@ -18,6 +18,10 @@
  */
 package org.apache.asterix.common.config;
 
+import static org.apache.asterix.common.config.OptimizationConfUtil.MIN_FRAME_LIMIT_FOR_GROUP_BY;
+import static org.apache.asterix.common.config.OptimizationConfUtil.MIN_FRAME_LIMIT_FOR_JOIN;
+import static org.apache.asterix.common.config.OptimizationConfUtil.MIN_FRAME_LIMIT_FOR_SORT;
+import static org.apache.asterix.common.config.OptimizationConfUtil.MIN_FRAME_LIMIT_FOR_WINDOW;
 import static org.apache.hyracks.control.common.config.OptionTypes.BOOLEAN;
 import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER;
 import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER_BYTE_UNIT;
@@ -66,6 +70,22 @@
                 INTEGER_BYTE_UNIT,
                 StorageUtil.getIntSizeInBytes(32, KILOBYTE),
                 "The page size (in bytes) for computation"),
+        COMPILER_MIN_SORTMEMORY(
+                LONG_BYTE_UNIT,
+                StorageUtil.getLongSizeInBytes(512, KILOBYTE),
+                "The min memory budget (in bytes) for a sort operator instance in a partition"),
+        COMPILER_MIN_JOINMEMORY(
+                LONG_BYTE_UNIT,
+                StorageUtil.getLongSizeInBytes(512, KILOBYTE),
+                "The min memory budget (in bytes) for a join operator instance in a partition"),
+        COMPILER_MIN_GROUPMEMORY(
+                LONG_BYTE_UNIT,
+                StorageUtil.getLongSizeInBytes(512, KILOBYTE),
+                "The min memory budget (in bytes) for a group by operator instance in a partition"),
+        COMPILER_MIN_WINDOWMEMORY(
+                LONG_BYTE_UNIT,
+                StorageUtil.getLongSizeInBytes(512, KILOBYTE),
+                "The min memory budget (in bytes) for a window operator instance in a partition"),
         COMPILER_PARALLELISM(
                 INTEGER,
                 COMPILER_PARALLELISM_AS_STORAGE,
@@ -240,6 +260,26 @@
         return accessor.getLong(Option.COMPILER_TEXTSEARCHMEMORY);
     }
 
+    public int getMinSortMemoryFrames() {
+        int numFrames = (int) accessor.getLong(Option.COMPILER_MIN_SORTMEMORY) / getFrameSize();
+        return Math.max(numFrames, MIN_FRAME_LIMIT_FOR_SORT);
+    }
+
+    public int getMinJoinMemoryFrames() {
+        int numFrames = (int) accessor.getLong(Option.COMPILER_MIN_JOINMEMORY) / getFrameSize();
+        return Math.max(numFrames, MIN_FRAME_LIMIT_FOR_JOIN);
+    }
+
+    public int getMinGroupMemoryFrames() {
+        int numFrames = (int) accessor.getLong(Option.COMPILER_MIN_GROUPMEMORY) / getFrameSize();
+        return Math.max(numFrames, MIN_FRAME_LIMIT_FOR_GROUP_BY);
+    }
+
+    public int getMinWindowMemoryFrames() {
+        int numFrames = (int) accessor.getLong(Option.COMPILER_MIN_WINDOWMEMORY) / getFrameSize();
+        return Math.max(numFrames, MIN_FRAME_LIMIT_FOR_WINDOW);
+    }
+
     public int getFrameSize() {
         return accessor.getInt(Option.COMPILER_FRAMESIZE);
     }
@@ -315,7 +355,7 @@
 
     public int getSortMemoryFrames() {
         int numFrames = (int) getSortMemorySize() / getFrameSize();
-        return Math.max(numFrames, OptimizationConfUtil.MIN_FRAME_LIMIT_FOR_SORT);
+        return Math.max(numFrames, MIN_FRAME_LIMIT_FOR_SORT);
     }
 
     public boolean isColumnFilter() {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java
index c83b86e..160c04d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java
@@ -40,9 +40,9 @@
 public class OptimizationConfUtil {
 
     public static final int MIN_FRAME_LIMIT_FOR_SORT = AbstractStableSortPOperator.MIN_FRAME_LIMIT_FOR_SORT;
-    private static final int MIN_FRAME_LIMIT_FOR_GROUP_BY = AbstractGroupByPOperator.MIN_FRAME_LIMIT_FOR_GROUP_BY;
-    private static final int MIN_FRAME_LIMIT_FOR_JOIN = AbstractJoinPOperator.MIN_FRAME_LIMIT_FOR_JOIN;
-    private static final int MIN_FRAME_LIMIT_FOR_WINDOW = WindowPOperator.MIN_FRAME_LIMIT_FOR_WINDOW;
+    public static final int MIN_FRAME_LIMIT_FOR_GROUP_BY = AbstractGroupByPOperator.MIN_FRAME_LIMIT_FOR_GROUP_BY;
+    public static final int MIN_FRAME_LIMIT_FOR_JOIN = AbstractJoinPOperator.MIN_FRAME_LIMIT_FOR_JOIN;
+    public static final int MIN_FRAME_LIMIT_FOR_WINDOW = WindowPOperator.MIN_FRAME_LIMIT_FOR_WINDOW;
     public static final int MIN_FRAME_LIMIT_FOR_TEXT_SEARCH = 5; // see InvertedIndexPOperator
 
     private OptimizationConfUtil() {
@@ -119,6 +119,10 @@
         physOptConf.setForceJoinOrderMode(forceJoinOrder);
         physOptConf.setQueryPlanShapeMode(queryPlanShape);
         physOptConf.setColumnFilter(columnFilter);
+        physOptConf.setMinSortFrames(compilerProperties.getMinSortMemoryFrames());
+        physOptConf.setMinJoinFrames(compilerProperties.getMinJoinMemoryFrames());
+        physOptConf.setMinGroupFrames(compilerProperties.getMinGroupMemoryFrames());
+        physOptConf.setMinWindowFrames(compilerProperties.getMinWindowMemoryFrames());
 
         // We should have already validated the parameter names at this point...
         Set<String> filteredParameterNames = new HashSet<>(parameterNames);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java
index b0624e8..d1b1815 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IPhysicalOperator.java
@@ -25,6 +25,7 @@
 import org.apache.hyracks.algebricks.core.algebra.properties.LocalMemoryRequirements;
 import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
 
 public interface IPhysicalOperator {
 
@@ -56,6 +57,8 @@
 
     public void createLocalMemoryRequirements(ILogicalOperator op);
 
+    public void createLocalMemoryRequirements(ILogicalOperator op, PhysicalOptimizationConfig physicalOpConfig);
+
     public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
             IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
             throws AlgebricksException;
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractGroupByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractGroupByPOperator.java
index 99d87c3..bb8c488 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractGroupByPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractGroupByPOperator.java
@@ -32,6 +32,7 @@
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import org.apache.hyracks.algebricks.core.algebra.properties.LocalMemoryRequirements;
+import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
 import org.apache.hyracks.api.exceptions.ErrorCode;
 
 public abstract class AbstractGroupByPOperator extends AbstractPhysicalOperator {
@@ -86,6 +87,11 @@
     }
 
     @Override
+    public void createLocalMemoryRequirements(ILogicalOperator op, PhysicalOptimizationConfig physicalOpConfig) {
+        localMemoryRequirements = LocalMemoryRequirements.variableMemoryBudget(physicalOpConfig.getMinGroupFrames());
+    }
+
+    @Override
     public String toString() {
         return getOperatorTag().toString() + columnList;
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractJoinPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractJoinPOperator.java
index c300392..bf2b612 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractJoinPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractJoinPOperator.java
@@ -22,6 +22,7 @@
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator.JoinKind;
 import org.apache.hyracks.algebricks.core.algebra.properties.LocalMemoryRequirements;
+import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
 
 public abstract class AbstractJoinPOperator extends AbstractPhysicalOperator {
 
@@ -65,4 +66,9 @@
     public void createLocalMemoryRequirements(ILogicalOperator op) {
         localMemoryRequirements = LocalMemoryRequirements.variableMemoryBudget(MIN_FRAME_LIMIT_FOR_JOIN);
     }
+
+    @Override
+    public void createLocalMemoryRequirements(ILogicalOperator op, PhysicalOptimizationConfig physicalOpConfig) {
+        localMemoryRequirements = LocalMemoryRequirements.variableMemoryBudget(physicalOpConfig.getMinJoinFrames());
+    }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
index 4bc7502..298ad38 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
@@ -42,6 +42,7 @@
 import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import org.apache.hyracks.algebricks.core.jobgen.impl.PlanCompiler;
+import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
 import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
 import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
@@ -105,6 +106,11 @@
     }
 
     @Override
+    public void createLocalMemoryRequirements(ILogicalOperator op, PhysicalOptimizationConfig physicalOpConfig) {
+        localMemoryRequirements = LocalMemoryRequirements.fixedMemoryBudget(1);
+    }
+
+    @Override
     public void disableJobGenBelowMe() {
         this.disableJobGenBelow = true;
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
index f941bdb..ff45322 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractStableSortPOperator.java
@@ -47,6 +47,7 @@
 import org.apache.hyracks.algebricks.core.algebra.properties.OrderedPartitionedProperty;
 import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
 import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
 import org.apache.hyracks.dataflow.common.data.partition.range.RangeMap;
 
 public abstract class AbstractStableSortPOperator extends AbstractPhysicalOperator {
@@ -187,4 +188,9 @@
     public void createLocalMemoryRequirements(ILogicalOperator op) {
         localMemoryRequirements = LocalMemoryRequirements.variableMemoryBudget(MIN_FRAME_LIMIT_FOR_SORT);
     }
+
+    @Override
+    public void createLocalMemoryRequirements(ILogicalOperator op, PhysicalOptimizationConfig physicalOpConfig) {
+        localMemoryRequirements = LocalMemoryRequirements.variableMemoryBudget(physicalOpConfig.getMinSortFrames());
+    }
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
index 0e9191f..9e25d85 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java
@@ -28,6 +28,7 @@
 import org.apache.hyracks.algebricks.core.algebra.properties.LocalMemoryRequirements;
 import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
 import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import org.apache.hyracks.algebricks.runtime.operators.win.AbstractWindowRuntimeFactory;
@@ -69,6 +70,11 @@
     }
 
     @Override
+    public void createLocalMemoryRequirements(ILogicalOperator op, PhysicalOptimizationConfig physicalOpConfig) {
+        localMemoryRequirements = LocalMemoryRequirements.variableMemoryBudget(physicalOpConfig.getMinWindowFrames());
+    }
+
+    @Override
     protected AbstractWindowRuntimeFactory createRuntimeFactory(WindowOperator winOp, int[] partitionColumnsList,
             IBinaryComparatorFactory[] partitionComparatorFactories,
             IBinaryComparatorFactory[] orderComparatorFactories, IScalarEvaluatorFactory[] frameValueExprEvals,
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowStreamPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowStreamPOperator.java
index 78983b9..ead5f56 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowStreamPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowStreamPOperator.java
@@ -28,6 +28,7 @@
 import org.apache.hyracks.algebricks.core.algebra.properties.LocalMemoryRequirements;
 import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
 import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import org.apache.hyracks.algebricks.runtime.operators.win.AbstractWindowRuntimeFactory;
@@ -55,6 +56,11 @@
     }
 
     @Override
+    public void createLocalMemoryRequirements(ILogicalOperator op, PhysicalOptimizationConfig physicalOpConfig) {
+        localMemoryRequirements = LocalMemoryRequirements.fixedMemoryBudget(MEM_SIZE_IN_FRAMES_FOR_WINDOW_STREAM);
+    }
+
+    @Override
     protected AbstractWindowRuntimeFactory createRuntimeFactory(WindowOperator winOp, int[] partitionColumnsList,
             IBinaryComparatorFactory[] partitionComparatorFactories,
             IBinaryComparatorFactory[] orderComparatorFactories, IScalarEvaluatorFactory[] frameValueExprEvals,
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
index d167153..11171a1 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
@@ -18,6 +18,11 @@
  */
 package org.apache.hyracks.algebricks.core.rewriter.base;
 
+import static org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractGroupByPOperator.MIN_FRAME_LIMIT_FOR_GROUP_BY;
+import static org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractJoinPOperator.MIN_FRAME_LIMIT_FOR_JOIN;
+import static org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractStableSortPOperator.MIN_FRAME_LIMIT_FOR_SORT;
+import static org.apache.hyracks.algebricks.core.algebra.operators.physical.WindowPOperator.MIN_FRAME_LIMIT_FOR_WINDOW;
+
 import java.util.Properties;
 
 import org.apache.hyracks.algebricks.core.config.AlgebricksConfig;
@@ -53,6 +58,10 @@
     private static final String FORCE_JOIN_ORDER = "FORCE_JOIN_ORDER";
     private static final String QUERY_PLAN_SHAPE = "QUERY_PLAN_SHAPE";
     private static final String COLUMN_FILTER = "COLUMN_FILTER";
+    private static final String MIN_SORT_FRAMES = "MIN_SORT_FRAMES";
+    private static final String MIN_JOIN_FRAMES = "MIN_JOIN_FRAMES";
+    private static final String MIN_GROUP_FRAMES = "MIN_GROUP_FRAMES";
+    private static final String MIN_WINDOW_FRAMES = "MIN_WINDOW_FRAMES";
 
     private final Properties properties = new Properties();
 
@@ -66,6 +75,11 @@
         setInt(DEFAULT_HASH_GROUP_TABLE_SIZE, 10485767);
         setInt(DEFAULT_EXTERNAL_GROUP_TABLE_SIZE, 10485767);
         setInt(DEFAULT_IN_MEM_HASH_JOIN_TABLE_SIZE, 10485767);
+
+        setInt(MIN_SORT_FRAMES, MIN_FRAME_LIMIT_FOR_SORT);
+        setInt(MIN_JOIN_FRAMES, MIN_FRAME_LIMIT_FOR_JOIN);
+        setInt(MIN_GROUP_FRAMES, MIN_FRAME_LIMIT_FOR_GROUP_BY);
+        setInt(MIN_WINDOW_FRAMES, MIN_FRAME_LIMIT_FOR_WINDOW);
     }
 
     public int getFrameSize() {
@@ -169,6 +183,54 @@
         setInt(DEFAULT_IN_MEM_HASH_JOIN_TABLE_SIZE, tableSize);
     }
 
+    public int getMinSortFrames() {
+        return getInt(MIN_SORT_FRAMES, MIN_FRAME_LIMIT_FOR_SORT);
+    }
+
+    public void setMinSortFrames(int minSortFrames) {
+        if (minSortFrames < MIN_FRAME_LIMIT_FOR_SORT) {
+            throw new IllegalArgumentException(
+                    "Minimum sort frames is " + MIN_FRAME_LIMIT_FOR_SORT + ", got " + minSortFrames);
+        }
+        setInt(MIN_SORT_FRAMES, minSortFrames);
+    }
+
+    public int getMinJoinFrames() {
+        return getInt(MIN_JOIN_FRAMES, MIN_FRAME_LIMIT_FOR_JOIN);
+    }
+
+    public void setMinJoinFrames(int minJoinFrames) {
+        if (minJoinFrames < MIN_FRAME_LIMIT_FOR_JOIN) {
+            throw new IllegalArgumentException(
+                    "Minimum join frames is " + MIN_FRAME_LIMIT_FOR_JOIN + ", got " + minJoinFrames);
+        }
+        setInt(MIN_JOIN_FRAMES, minJoinFrames);
+    }
+
+    public int getMinGroupFrames() {
+        return getInt(MIN_GROUP_FRAMES, MIN_FRAME_LIMIT_FOR_GROUP_BY);
+    }
+
+    public void setMinGroupFrames(int minGroupFrames) {
+        if (minGroupFrames < MIN_FRAME_LIMIT_FOR_GROUP_BY) {
+            throw new IllegalArgumentException(
+                    "Minimum group frames is " + MIN_FRAME_LIMIT_FOR_GROUP_BY + ", got " + minGroupFrames);
+        }
+        setInt(MIN_GROUP_FRAMES, minGroupFrames);
+    }
+
+    public int getMinWindowFrames() {
+        return getInt(MIN_WINDOW_FRAMES, MIN_FRAME_LIMIT_FOR_WINDOW);
+    }
+
+    public void setMinWindowFrames(int minWindowFrames) {
+        if (minWindowFrames < MIN_FRAME_LIMIT_FOR_WINDOW) {
+            throw new IllegalArgumentException(
+                    "Minimum window frames is " + MIN_FRAME_LIMIT_FOR_WINDOW + ", got " + minWindowFrames);
+        }
+        setInt(MIN_WINDOW_FRAMES, minWindowFrames);
+    }
+
     public boolean getSortParallel() {
         return getBoolean(SORT_PARALLEL, AlgebricksConfig.SORT_PARALLEL_DEFAULT);
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetMemoryRequirementsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetMemoryRequirementsRule.java
index 4e859e4..0b87eae 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetMemoryRequirementsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetMemoryRequirementsRule.java
@@ -75,7 +75,7 @@
 /**
  * Set memory requirements for all operators as follows:
  * <ol>
- * <li>First call {@link IPhysicalOperator#createLocalMemoryRequirements(ILogicalOperator)}
+ * <li>First call {@link IPhysicalOperator#createLocalMemoryRequirements(ILogicalOperator, PhysicalOptimizationConfig)}
  *     to initialize each operator's {@link LocalMemoryRequirements} with minimal memory budget required by
  *     that operator</li>
  * <li>Then increase memory requirements for certain operators as specified by {@link PhysicalOptimizationConfig}</li>
@@ -97,19 +97,23 @@
         if (physOp.getLocalMemoryRequirements() != null) {
             return false;
         }
-        computeLocalMemoryRequirements(op, createMemoryRequirementsConfigurator(context));
+        computeLocalMemoryRequirements(op, createMemoryRequirementsConfigurator(context), context);
         return true;
     }
 
     private void computeLocalMemoryRequirements(AbstractLogicalOperator op,
-            ILogicalOperatorVisitor<Void, Void> memoryRequirementsVisitor) throws AlgebricksException {
+            ILogicalOperatorVisitor<Void, Void> memoryRequirementsVisitor, IOptimizationContext context)
+            throws AlgebricksException {
         IPhysicalOperator physOp = op.getPhysicalOperator();
         if (physOp.getLocalMemoryRequirements() == null) {
-            physOp.createLocalMemoryRequirements(op);
-            if (physOp.getLocalMemoryRequirements() == null) {
-                throw new IllegalStateException(physOp.getOperatorTag().toString());
-            }
-            if (memoryRequirementsVisitor != null) {
+            if (memoryRequirementsVisitor == null) {
+                // null means forcing the min memory budget from the physical optimization config
+                physOp.createLocalMemoryRequirements(op, context.getPhysicalOptimizationConfig());
+            } else {
+                physOp.createLocalMemoryRequirements(op);
+                if (physOp.getLocalMemoryRequirements() == null) {
+                    throw new IllegalStateException(physOp.getOperatorTag().toString());
+                }
                 op.accept(memoryRequirementsVisitor, null);
             }
         }
@@ -117,13 +121,14 @@
             AbstractOperatorWithNestedPlans nested = (AbstractOperatorWithNestedPlans) op;
             for (ILogicalPlan p : nested.getNestedPlans()) {
                 for (Mutable<ILogicalOperator> root : p.getRoots()) {
-                    computeLocalMemoryRequirements((AbstractLogicalOperator) root.getValue(),
-                            memoryRequirementsVisitor);
+                    computeLocalMemoryRequirements((AbstractLogicalOperator) root.getValue(), memoryRequirementsVisitor,
+                            context);
                 }
             }
         }
         for (Mutable<ILogicalOperator> opRef : op.getInputs()) {
-            computeLocalMemoryRequirements((AbstractLogicalOperator) opRef.getValue(), memoryRequirementsVisitor);
+            computeLocalMemoryRequirements((AbstractLogicalOperator) opRef.getValue(), memoryRequirementsVisitor,
+                    context);
         }
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index 766fb26..59a4da4 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -157,6 +157,7 @@
     INVALID_STRING_UNICODE(127),
     UNSUPPORTED_WRITE_SPEC(128),
     JOB_REJECTED(129),
+    FRAME_BIGGER_THAN_SORT_MEMORY(130),
 
     // Compilation error codes.
     RULECOLLECTION_NOT_INSTANCE_OF_LIST(10000),
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index fa52bc6..7da6bbd 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -147,6 +147,7 @@
 127 = Decoding error - %1$s
 128 = Unsupported copy to specification: PARTITION BY %1$s, ORDER BY %2$s
 129 = Job %1$s not run. Cluster is not accepting jobs
+130 = Frame data=%1$s (requiring %2$s) is bigger than the sort budget. Used=%3$s, max=%4$s. Please increase the sort memory budget.
 
 10000 = The given rule collection %1$s is not an instance of the List class.
 10001 = Cannot compose partition constraint %1$s with %2$s
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java
index 20924d5..d4dfae1 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java
@@ -31,6 +31,7 @@
 import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer;
 import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -150,9 +151,8 @@
             return true;
         }
         if (getFrameCount() == 0) {
-            throw new HyracksDataException("The required memory=" + requiredMemory + " for the frame data="
-                    + inputTupleAccessor.getBuffer().capacity() + " is too big for the sorting buffer. Used="
-                    + totalMemoryUsed + ", max=" + maxSortMemory + ", please allocate bigger buffer size");
+            throw HyracksDataException.create(ErrorCode.FRAME_BIGGER_THAN_SORT_MEMORY,
+                    inputTupleAccessor.getBuffer().capacity(), requiredMemory, totalMemoryUsed, maxSortMemory);
         }
         return false;
     }