[ASTERIXDB-2540][STO] Introduce Disk Write Rate Limiter

- user model changes: yes. Add a new storage option:
     storage.write.rate.limit (default 0)
- storage format changes: no.
- interface changes: no.

Details:
- Introduce a disk write rate limiting mechanism to bound the maximum
disk write bandwidth usage of large merges.
- Disk write limiting is performed for each partition. No change to the
storage format.

Change-Id: If3cb3df1b3c3b4bbee1ba9ec8ab67c357873ef44
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/3455
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index e804d60..a702381 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -33,6 +33,7 @@
 import org.apache.asterix.common.api.IConfigValidatorFactory;
 import org.apache.asterix.common.api.ICoordinationService;
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.api.IDiskWriteRateLimiterProvider;
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.api.IPropertiesFactory;
 import org.apache.asterix.common.api.IReceptionist;
@@ -49,6 +50,7 @@
 import org.apache.asterix.common.config.StorageProperties;
 import org.apache.asterix.common.config.TransactionProperties;
 import org.apache.asterix.common.context.DatasetLifecycleManager;
+import org.apache.asterix.common.context.DiskWriteRateLimiterProvider;
 import org.apache.asterix.common.context.GlobalVirtualBufferCache;
 import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.library.ILibraryManager;
@@ -153,6 +155,7 @@
     private IReceptionist receptionist;
     private ICacheManager cacheManager;
     private IConfigValidator configValidator;
+    private IDiskWriteRateLimiterProvider diskWriteRateLimiterProvider;
 
     public NCAppRuntimeContext(INCServiceContext ncServiceContext, NCExtensionManager extensionManager,
             IPropertiesFactory propertiesFactory) {
@@ -287,6 +290,8 @@
         lccm.register((ILifeCycleComponent) txnSubsystem.getLockManager());
         lccm.register(txnSubsystem.getCheckpointManager());
         lccm.register(libraryManager);
+
+        diskWriteRateLimiterProvider = new DiskWriteRateLimiterProvider();
     }
 
     @Override
@@ -593,4 +598,9 @@
         }
         return ioScheduler;
     }
+
+    @Override
+    public IDiskWriteRateLimiterProvider getDiskWriteRateLimiterProvider() {
+        return diskWriteRateLimiterProvider;
+    }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
index f7cbf18..7b737a0 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
@@ -32,6 +32,7 @@
 import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
 import org.apache.hyracks.storage.common.IIndex;
 import org.apache.hyracks.storage.common.IResourceLifecycleManager;
+import org.apache.hyracks.storage.common.buffercache.IRateLimiter;
 
 public interface IDatasetLifecycleManager extends IResourceLifecycleManager<IIndex> {
     /**
@@ -104,6 +105,16 @@
     ILSMComponentIdGenerator getComponentIdGenerator(int datasetId, int partition, String path);
 
     /**
+     * creates (if necessary) and returns the rate limiter of a dataset.
+     *
+     * @param datasetId
+     * @param partition
+     * @param path
+     * @return
+     */
+    IRateLimiter getRateLimiter(int datasetId, int partition, long writeRateLimit);
+
+    /**
      * creates (if necessary) and returns the dataset virtual buffer caches.
      *
      * @param datasetId
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDiskWriteRateLimiterProvider.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDiskWriteRateLimiterProvider.java
new file mode 100644
index 0000000..becd655
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDiskWriteRateLimiterProvider.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.api;
+
+import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.common.IResource;
+import org.apache.hyracks.storage.common.buffercache.IRateLimiter;
+
+public interface IDiskWriteRateLimiterProvider {
+    IRateLimiter getRateLimiter(INCServiceContext serviceCtx, IResource resource) throws HyracksDataException;
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
index 65b587b..8c82979 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
@@ -140,4 +140,9 @@
      * @return the library manager
      */
     ILibraryManager getLibraryManager();
+
+    /**
+     * @return the disk write rate limiter provider
+     */
+    IDiskWriteRateLimiterProvider getDiskWriteRateLimiterProvider();
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
index adf57fd..22393ec 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
@@ -54,7 +54,8 @@
         STORAGE_LSM_BLOOMFILTER_FALSEPOSITIVERATE(DOUBLE, 0.01d),
         STORAGE_COMPRESSION_BLOCK(STRING, "snappy"),
         STORAGE_DISK_FORCE_BYTES(LONG_BYTE_UNIT, StorageUtil.getLongSizeInBytes(16, MEGABYTE)),
-        STORAGE_IO_SCHEDULER(STRING, "greedy");
+        STORAGE_IO_SCHEDULER(STRING, "greedy"),
+        STORAGE_WRITE_RATE_LIMIT(LONG_BYTE_UNIT, 0l);
 
         private final IOptionType interpreter;
         private final Object defaultValue;
@@ -104,6 +105,8 @@
                     return "The maximum acceptable false positive rate for bloom filters associated with LSM indexes";
                 case STORAGE_COMPRESSION_BLOCK:
                     return "The default compression scheme for the storage";
+                case STORAGE_WRITE_RATE_LIMIT:
+                    return "The maximum disk write rate (bytes/s) for each storage partition (disabled if the provided value <= 0)";
                 case STORAGE_DISK_FORCE_BYTES:
                     return "The number of bytes before each disk force (fsync)";
                 case STORAGE_IO_SCHEDULER:
@@ -209,6 +212,10 @@
         return SYSTEM_RESERVED_DATASETS;
     }
 
+    public long getWriteRateLimit() {
+        return accessor.getLong(Option.STORAGE_WRITE_RATE_LIMIT);
+    }
+
     public int getDiskForcePages() {
         return (int) (accessor.getLong(Option.STORAGE_DISK_FORCE_BYTES) / getBufferCachePageSize());
     }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 5ea79b3..b26220d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -55,6 +55,8 @@
 import org.apache.hyracks.storage.common.IIndex;
 import org.apache.hyracks.storage.common.ILocalResourceRepository;
 import org.apache.hyracks.storage.common.LocalResource;
+import org.apache.hyracks.storage.common.buffercache.IRateLimiter;
+import org.apache.hyracks.storage.common.buffercache.SleepRateLimiter;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -310,6 +312,16 @@
     }
 
     @Override
+    public synchronized IRateLimiter getRateLimiter(int datasetId, int partition, long writeRateLimit) {
+        DatasetResource dataset = datasets.get(datasetId);
+        IRateLimiter rateLimiter = dataset.getRateLimiter(partition);
+        if (rateLimiter == null) {
+            rateLimiter = populateRateLimiter(dataset, partition, writeRateLimit);
+        }
+        return rateLimiter;
+    }
+
+    @Override
     public synchronized boolean isRegistered(int datasetId) {
         return datasets.containsKey(datasetId);
     }
@@ -324,6 +336,12 @@
         dataset.setIdGenerator(partition, idGenerator);
     }
 
+    private IRateLimiter populateRateLimiter(DatasetResource dataset, int partition, long writeRateLimit) {
+        IRateLimiter rateLimiter = SleepRateLimiter.create(writeRateLimit);
+        dataset.setRateLimiter(partition, rateLimiter);
+        return rateLimiter;
+    }
+
     private void validateDatasetLifecycleManagerState() throws HyracksDataException {
         if (stopped) {
             throw new HyracksDataException(DatasetLifecycleManager.class.getSimpleName() + " was stopped.");
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
index 8844d41..54e1976 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
@@ -28,6 +28,7 @@
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.common.LocalResource;
+import org.apache.hyracks.storage.common.buffercache.IRateLimiter;
 
 /**
  * A dataset can be in one of two states { EVICTED , LOADED }.
@@ -46,11 +47,13 @@
 
     private final Map<Integer, PrimaryIndexOperationTracker> datasetPrimaryOpTrackers;
     private final Map<Integer, ILSMComponentIdGenerator> datasetComponentIdGenerators;
+    private final Map<Integer, IRateLimiter> datasetRateLimiters;
 
     public DatasetResource(DatasetInfo datasetInfo) {
         this.datasetInfo = datasetInfo;
         this.datasetPrimaryOpTrackers = new HashMap<>();
         this.datasetComponentIdGenerators = new HashMap<>();
+        this.datasetRateLimiters = new HashMap<>();
     }
 
     public boolean isRegistered() {
@@ -124,6 +127,10 @@
         return datasetComponentIdGenerators.get(partition);
     }
 
+    public IRateLimiter getRateLimiter(int partition) {
+        return datasetRateLimiters.get(partition);
+    }
+
     public void setPrimaryIndexOperationTracker(int partition, PrimaryIndexOperationTracker opTracker) {
         if (datasetPrimaryOpTrackers.containsKey(partition)) {
             throw new IllegalStateException(
@@ -139,6 +146,13 @@
         datasetComponentIdGenerators.put(partition, idGenerator);
     }
 
+    public void setRateLimiter(int partition, IRateLimiter rateLimiter) {
+        if (datasetRateLimiters.containsKey(partition)) {
+            throw new IllegalStateException("RateLimiter has already been set for partition " + partition);
+        }
+        datasetRateLimiters.put(partition, rateLimiter);
+    }
+
     @Override
     public int compareTo(DatasetResource o) {
         return datasetInfo.compareTo(o.datasetInfo);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DiskWriteRateLimiterProvider.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DiskWriteRateLimiterProvider.java
new file mode 100644
index 0000000..7d07cfa
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DiskWriteRateLimiterProvider.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.common.context;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.asterix.common.api.IDiskWriteRateLimiterProvider;
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.utils.StoragePathUtil;
+import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.common.IResource;
+import org.apache.hyracks.storage.common.buffercache.IRateLimiter;
+import org.apache.hyracks.storage.common.buffercache.SleepRateLimiter;
+
+public class DiskWriteRateLimiterProvider implements IDiskWriteRateLimiterProvider {
+    // stores the write rate limiter for each NC partition
+    private final Map<Integer, IRateLimiter> limiters = new HashMap<>();
+
+    @Override
+    public synchronized IRateLimiter getRateLimiter(INCServiceContext serviceCtx, IResource resource)
+            throws HyracksDataException {
+        int partition = StoragePathUtil.getPartitionNumFromRelativePath(resource.getPath());
+        IRateLimiter limiter = limiters.get(partition);
+        if (limiter == null) {
+            INcApplicationContext appCtx = (INcApplicationContext) serviceCtx.getApplicationContext();
+            long writeRateLimit = appCtx.getStorageProperties().getWriteRateLimit();
+            limiter = SleepRateLimiter.create(writeRateLimit);
+            limiters.put(partition, limiter);
+        }
+        return limiter;
+    }
+
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIndexPageWriteCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIndexPageWriteCallbackFactory.java
index 27f18cf..7452ae6 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIndexPageWriteCallbackFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIndexPageWriteCallbackFactory.java
@@ -28,6 +28,7 @@
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMIndexPageWriteCallback;
 import org.apache.hyracks.storage.common.IResource;
 import org.apache.hyracks.storage.common.buffercache.IPageWriteCallback;
+import org.apache.hyracks.storage.common.buffercache.IRateLimiter;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -38,15 +39,21 @@
 
     protected transient int pagesPerForce;
 
+    protected transient IRateLimiter rateLimiter;
+
+    public LSMIndexPageWriteCallbackFactory() {
+    }
+
     @Override
     public void initialize(INCServiceContext ncCtx, IResource resource) throws HyracksDataException {
         INcApplicationContext appCtx = (INcApplicationContext) ncCtx.getApplicationContext();
         pagesPerForce = appCtx.getStorageProperties().getDiskForcePages();
+        rateLimiter = appCtx.getDiskWriteRateLimiterProvider().getRateLimiter(ncCtx, resource);
     }
 
     @Override
     public IPageWriteCallback createPageWriteCallback() throws HyracksDataException {
-        return new LSMIndexPageWriteCallback(pagesPerForce);
+        return new LSMIndexPageWriteCallback(rateLimiter, pagesPerForce);
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexPageWriteCallback.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexPageWriteCallback.java
index 991968f..0ad7033 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexPageWriteCallback.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexPageWriteCallback.java
@@ -22,15 +22,18 @@
 import org.apache.hyracks.storage.common.IIndexBulkLoader;
 import org.apache.hyracks.storage.common.buffercache.ICachedPage;
 import org.apache.hyracks.storage.common.buffercache.IPageWriteCallback;
+import org.apache.hyracks.storage.common.buffercache.IRateLimiter;
 
 public class LSMIndexPageWriteCallback implements IPageWriteCallback {
 
+    private final IRateLimiter rateLimiter;
     private final int pagesPerForce;
     private IIndexBulkLoader bulkLoader;
     private long totalWrittenPages;
     private int totalForces;
 
-    public LSMIndexPageWriteCallback(int pagesPerForce) {
+    public LSMIndexPageWriteCallback(IRateLimiter rateLimiter, int pagesPerForce) {
+        this.rateLimiter = rateLimiter;
         this.pagesPerForce = pagesPerForce;
     }
 
@@ -39,10 +42,15 @@
         this.bulkLoader = bulkLoader;
     }
 
+    public void beforeWrite(ICachedPage page) throws HyracksDataException {
+        rateLimiter.request(page.getFrameSizeMultiplier());
+    }
+
     @Override
     public void afterWrite(ICachedPage page) throws HyracksDataException {
         totalWrittenPages++;
         if (pagesPerForce > 0 && totalWrittenPages % pagesPerForce == 0) {
+            // perform a force
             bulkLoader.force();
             totalForces++;
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml b/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml
index eb453f6..d0dfbf4 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml
@@ -71,5 +71,9 @@
       <artifactId>snappy-java</artifactId>
       <version>1.1.7.1</version>
     </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
   </dependencies>
 </project>
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FIFOLocalWriter.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FIFOLocalWriter.java
index 5f8bb4a..e74fe59 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FIFOLocalWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FIFOLocalWriter.java
@@ -40,6 +40,7 @@
     public void write(ICachedPage page) {
         CachedPage cPage = (CachedPage) page;
         try {
+            callback.beforeWrite(cPage);
             bufferCache.write(cPage);
             callback.afterWrite(cPage);
         } catch (Exception e) {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IPageWriteCallback.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IPageWriteCallback.java
index 3d2bda9..f4ea6cf 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IPageWriteCallback.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IPageWriteCallback.java
@@ -30,6 +30,14 @@
     void initialize(IIndexBulkLoader bulkLoader);
 
     /**
+     * Notify that a page is about to be written
+     *
+     * @param page
+     * @throws HyracksDataException
+     */
+    void beforeWrite(ICachedPage page) throws HyracksDataException;
+
+    /**
      * Notify that a page has been written
      *
      * @param page
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IRateLimiter.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IRateLimiter.java
new file mode 100644
index 0000000..b7433ae
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IRateLimiter.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common.buffercache;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface IRateLimiter {
+
+    void setRate(double ratePerSecond);
+
+    void request(int permits) throws HyracksDataException;
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/NoOpRateLimiter.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/NoOpRateLimiter.java
new file mode 100644
index 0000000..ac0a1a9
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/NoOpRateLimiter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common.buffercache;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class NoOpRateLimiter implements IRateLimiter {
+
+    public static final NoOpRateLimiter INSTANCE = new NoOpRateLimiter();
+
+    private NoOpRateLimiter() {
+    }
+
+    @Override
+    public void setRate(double ratePerSecond) {
+        // no op
+    }
+
+    @Override
+    public void request(int tokens) throws HyracksDataException {
+        // no op
+    }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/SleepRateLimiter.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/SleepRateLimiter.java
new file mode 100644
index 0000000..4d0ca92
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/SleepRateLimiter.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common.buffercache;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+import com.google.common.util.concurrent.RateLimiter;
+
+/**
+ * A wrapper of the RateLimiter implementation from {@link RateLimiter}
+ *
+ */
+public class SleepRateLimiter implements IRateLimiter {
+    /**
+     * Defines the maximum storage capacity of the rate limiter, i.e., the number of permits it stores.
+     * Maybe make configurable in the future
+     */
+    private static final double MAX_BURST_SECONDS = 1.0;
+
+    private final RateLimiter rateLimiterImpl;
+
+    public static IRateLimiter create(long ratePerSecond) {
+        if (ratePerSecond > 0) {
+            return new SleepRateLimiter(ratePerSecond, MAX_BURST_SECONDS);
+        } else {
+            return NoOpRateLimiter.INSTANCE;
+        }
+    }
+
+    public SleepRateLimiter(long ratePerSecond, double maxBurstSeconds) {
+        rateLimiterImpl = RateLimiter.create(ratePerSecond, (long) (maxBurstSeconds * 1000), TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void setRate(double ratePerSecond) {
+        rateLimiterImpl.setRate(ratePerSecond);
+    }
+
+    @Override
+    public void request(int permits) throws HyracksDataException {
+        rateLimiterImpl.acquire(permits);
+    }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreePageWriteCallbackTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreePageWriteCallbackTest.java
index c0dc6b0..4e59ab0 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreePageWriteCallbackTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreePageWriteCallbackTest.java
@@ -42,6 +42,8 @@
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMIndexPageWriteCallback;
 import org.apache.hyracks.storage.common.IResource;
 import org.apache.hyracks.storage.common.buffercache.IPageWriteCallback;
+import org.apache.hyracks.storage.common.buffercache.IRateLimiter;
+import org.apache.hyracks.storage.common.buffercache.SleepRateLimiter;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -55,7 +57,22 @@
 
     private final int PAGES_PER_FORCE = 16;
 
+    private int pageCounter = 0;
     private LSMIndexPageWriteCallback lastCallback = null;
+    private final IRateLimiter testLimiter = new IRateLimiter() {
+        IRateLimiter limiter = SleepRateLimiter.create(100 * 1000);
+
+        @Override
+        public void setRate(double ratePerSecond) {
+
+        }
+
+        @Override
+        public void request(int permits) throws HyracksDataException {
+            limiter.request(permits);
+            pageCounter++;
+        }
+    };
 
     private final ILSMPageWriteCallbackFactory pageWriteCallbackFactory = new ILSMPageWriteCallbackFactory() {
         private static final long serialVersionUID = 1L;
@@ -67,7 +84,7 @@
 
         @Override
         public IPageWriteCallback createPageWriteCallback() throws HyracksDataException {
-            lastCallback = new LSMIndexPageWriteCallback(PAGES_PER_FORCE);
+            lastCallback = new LSMIndexPageWriteCallback(testLimiter, PAGES_PER_FORCE);
             return lastCallback;
         }
     };
@@ -134,6 +151,7 @@
                     ctx.getIndex().activate();
                 }
             }
+            pageCounter = 0;
             ILSMIndexAccessor accessor = (ILSMIndexAccessor) ctx.getIndexAccessor();
             ILSMIOOperation mergeOp = accessor.scheduleMerge(((LSMBTree) ctx.getIndex()).getDiskComponents());
             mergeOp.addCompleteListener(op -> {
@@ -141,6 +159,7 @@
                     long numPages = op.getNewComponent().getComponentSize()
                             / harness.getDiskBufferCache().getPageSizeWithHeader() - 1;
                     // we skipped the metadata page for simplicity
+                    Assert.assertEquals(numPages, pageCounter);
                     Assert.assertEquals(numPages / PAGES_PER_FORCE, lastCallback.getTotalForces());
                 }
             });