[ASTERIXDB-2540][STO] Introduce Disk Write Rate Limiter
- user model changes: yes. Add a new storage option:
storage.write.rate.limit (default 0)
- storage format changes: no.
- interface changes: no.
Details:
- Introduce a disk write rate limiting mechanism to bound the maximum
disk write bandwidth usage of large merges.
- Disk write limiting is performed for each partition. No change to the
storage format.
Change-Id: If3cb3df1b3c3b4bbee1ba9ec8ab67c357873ef44
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/3455
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index e804d60..a702381 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -33,6 +33,7 @@
import org.apache.asterix.common.api.IConfigValidatorFactory;
import org.apache.asterix.common.api.ICoordinationService;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.api.IDiskWriteRateLimiterProvider;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.api.IPropertiesFactory;
import org.apache.asterix.common.api.IReceptionist;
@@ -49,6 +50,7 @@
import org.apache.asterix.common.config.StorageProperties;
import org.apache.asterix.common.config.TransactionProperties;
import org.apache.asterix.common.context.DatasetLifecycleManager;
+import org.apache.asterix.common.context.DiskWriteRateLimiterProvider;
import org.apache.asterix.common.context.GlobalVirtualBufferCache;
import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.library.ILibraryManager;
@@ -153,6 +155,7 @@
private IReceptionist receptionist;
private ICacheManager cacheManager;
private IConfigValidator configValidator;
+ private IDiskWriteRateLimiterProvider diskWriteRateLimiterProvider;
public NCAppRuntimeContext(INCServiceContext ncServiceContext, NCExtensionManager extensionManager,
IPropertiesFactory propertiesFactory) {
@@ -287,6 +290,8 @@
lccm.register((ILifeCycleComponent) txnSubsystem.getLockManager());
lccm.register(txnSubsystem.getCheckpointManager());
lccm.register(libraryManager);
+
+ diskWriteRateLimiterProvider = new DiskWriteRateLimiterProvider();
}
@Override
@@ -593,4 +598,9 @@
}
return ioScheduler;
}
+
+ @Override
+ public IDiskWriteRateLimiterProvider getDiskWriteRateLimiterProvider() {
+ return diskWriteRateLimiterProvider;
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
index f7cbf18..7b737a0 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
@@ -32,6 +32,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
import org.apache.hyracks.storage.common.IIndex;
import org.apache.hyracks.storage.common.IResourceLifecycleManager;
+import org.apache.hyracks.storage.common.buffercache.IRateLimiter;
public interface IDatasetLifecycleManager extends IResourceLifecycleManager<IIndex> {
/**
@@ -104,6 +105,16 @@
ILSMComponentIdGenerator getComponentIdGenerator(int datasetId, int partition, String path);
/**
+ * creates (if necessary) and returns the rate limiter of a dataset.
+ *
+ * @param datasetId
+ * @param partition
+ * @param path
+ * @return
+ */
+ IRateLimiter getRateLimiter(int datasetId, int partition, long writeRateLimit);
+
+ /**
* creates (if necessary) and returns the dataset virtual buffer caches.
*
* @param datasetId
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDiskWriteRateLimiterProvider.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDiskWriteRateLimiterProvider.java
new file mode 100644
index 0000000..becd655
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDiskWriteRateLimiterProvider.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.api;
+
+import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.common.IResource;
+import org.apache.hyracks.storage.common.buffercache.IRateLimiter;
+
+public interface IDiskWriteRateLimiterProvider {
+ IRateLimiter getRateLimiter(INCServiceContext serviceCtx, IResource resource) throws HyracksDataException;
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
index 65b587b..8c82979 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
@@ -140,4 +140,9 @@
* @return the library manager
*/
ILibraryManager getLibraryManager();
+
+ /**
+ * @return the disk write rate limiter provider
+ */
+ IDiskWriteRateLimiterProvider getDiskWriteRateLimiterProvider();
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
index adf57fd..22393ec 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
@@ -54,7 +54,8 @@
STORAGE_LSM_BLOOMFILTER_FALSEPOSITIVERATE(DOUBLE, 0.01d),
STORAGE_COMPRESSION_BLOCK(STRING, "snappy"),
STORAGE_DISK_FORCE_BYTES(LONG_BYTE_UNIT, StorageUtil.getLongSizeInBytes(16, MEGABYTE)),
- STORAGE_IO_SCHEDULER(STRING, "greedy");
+ STORAGE_IO_SCHEDULER(STRING, "greedy"),
+ STORAGE_WRITE_RATE_LIMIT(LONG_BYTE_UNIT, 0l);
private final IOptionType interpreter;
private final Object defaultValue;
@@ -104,6 +105,8 @@
return "The maximum acceptable false positive rate for bloom filters associated with LSM indexes";
case STORAGE_COMPRESSION_BLOCK:
return "The default compression scheme for the storage";
+ case STORAGE_WRITE_RATE_LIMIT:
+ return "The maximum disk write rate (bytes/s) for each storage partition (disabled if the provided value <= 0)";
case STORAGE_DISK_FORCE_BYTES:
return "The number of bytes before each disk force (fsync)";
case STORAGE_IO_SCHEDULER:
@@ -209,6 +212,10 @@
return SYSTEM_RESERVED_DATASETS;
}
+ public long getWriteRateLimit() {
+ return accessor.getLong(Option.STORAGE_WRITE_RATE_LIMIT);
+ }
+
public int getDiskForcePages() {
return (int) (accessor.getLong(Option.STORAGE_DISK_FORCE_BYTES) / getBufferCachePageSize());
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 5ea79b3..b26220d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -55,6 +55,8 @@
import org.apache.hyracks.storage.common.IIndex;
import org.apache.hyracks.storage.common.ILocalResourceRepository;
import org.apache.hyracks.storage.common.LocalResource;
+import org.apache.hyracks.storage.common.buffercache.IRateLimiter;
+import org.apache.hyracks.storage.common.buffercache.SleepRateLimiter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -310,6 +312,16 @@
}
@Override
+ public synchronized IRateLimiter getRateLimiter(int datasetId, int partition, long writeRateLimit) {
+ DatasetResource dataset = datasets.get(datasetId);
+ IRateLimiter rateLimiter = dataset.getRateLimiter(partition);
+ if (rateLimiter == null) {
+ rateLimiter = populateRateLimiter(dataset, partition, writeRateLimit);
+ }
+ return rateLimiter;
+ }
+
+ @Override
public synchronized boolean isRegistered(int datasetId) {
return datasets.containsKey(datasetId);
}
@@ -324,6 +336,12 @@
dataset.setIdGenerator(partition, idGenerator);
}
+ private IRateLimiter populateRateLimiter(DatasetResource dataset, int partition, long writeRateLimit) {
+ IRateLimiter rateLimiter = SleepRateLimiter.create(writeRateLimit);
+ dataset.setRateLimiter(partition, rateLimiter);
+ return rateLimiter;
+ }
+
private void validateDatasetLifecycleManagerState() throws HyracksDataException {
if (stopped) {
throw new HyracksDataException(DatasetLifecycleManager.class.getSimpleName() + " was stopped.");
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
index 8844d41..54e1976 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
@@ -28,6 +28,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.common.LocalResource;
+import org.apache.hyracks.storage.common.buffercache.IRateLimiter;
/**
* A dataset can be in one of two states { EVICTED , LOADED }.
@@ -46,11 +47,13 @@
private final Map<Integer, PrimaryIndexOperationTracker> datasetPrimaryOpTrackers;
private final Map<Integer, ILSMComponentIdGenerator> datasetComponentIdGenerators;
+ private final Map<Integer, IRateLimiter> datasetRateLimiters;
public DatasetResource(DatasetInfo datasetInfo) {
this.datasetInfo = datasetInfo;
this.datasetPrimaryOpTrackers = new HashMap<>();
this.datasetComponentIdGenerators = new HashMap<>();
+ this.datasetRateLimiters = new HashMap<>();
}
public boolean isRegistered() {
@@ -124,6 +127,10 @@
return datasetComponentIdGenerators.get(partition);
}
+ public IRateLimiter getRateLimiter(int partition) {
+ return datasetRateLimiters.get(partition);
+ }
+
public void setPrimaryIndexOperationTracker(int partition, PrimaryIndexOperationTracker opTracker) {
if (datasetPrimaryOpTrackers.containsKey(partition)) {
throw new IllegalStateException(
@@ -139,6 +146,13 @@
datasetComponentIdGenerators.put(partition, idGenerator);
}
+ public void setRateLimiter(int partition, IRateLimiter rateLimiter) {
+ if (datasetRateLimiters.containsKey(partition)) {
+ throw new IllegalStateException("RateLimiter has already been set for partition " + partition);
+ }
+ datasetRateLimiters.put(partition, rateLimiter);
+ }
+
@Override
public int compareTo(DatasetResource o) {
return datasetInfo.compareTo(o.datasetInfo);
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DiskWriteRateLimiterProvider.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DiskWriteRateLimiterProvider.java
new file mode 100644
index 0000000..7d07cfa
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DiskWriteRateLimiterProvider.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.common.context;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.asterix.common.api.IDiskWriteRateLimiterProvider;
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.utils.StoragePathUtil;
+import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.common.IResource;
+import org.apache.hyracks.storage.common.buffercache.IRateLimiter;
+import org.apache.hyracks.storage.common.buffercache.SleepRateLimiter;
+
+public class DiskWriteRateLimiterProvider implements IDiskWriteRateLimiterProvider {
+ // stores the write rate limiter for each NC partition
+ private final Map<Integer, IRateLimiter> limiters = new HashMap<>();
+
+ @Override
+ public synchronized IRateLimiter getRateLimiter(INCServiceContext serviceCtx, IResource resource)
+ throws HyracksDataException {
+ int partition = StoragePathUtil.getPartitionNumFromRelativePath(resource.getPath());
+ IRateLimiter limiter = limiters.get(partition);
+ if (limiter == null) {
+ INcApplicationContext appCtx = (INcApplicationContext) serviceCtx.getApplicationContext();
+ long writeRateLimit = appCtx.getStorageProperties().getWriteRateLimit();
+ limiter = SleepRateLimiter.create(writeRateLimit);
+ limiters.put(partition, limiter);
+ }
+ return limiter;
+ }
+
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIndexPageWriteCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIndexPageWriteCallbackFactory.java
index 27f18cf..7452ae6 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIndexPageWriteCallbackFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIndexPageWriteCallbackFactory.java
@@ -28,6 +28,7 @@
import org.apache.hyracks.storage.am.lsm.common.impls.LSMIndexPageWriteCallback;
import org.apache.hyracks.storage.common.IResource;
import org.apache.hyracks.storage.common.buffercache.IPageWriteCallback;
+import org.apache.hyracks.storage.common.buffercache.IRateLimiter;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -38,15 +39,21 @@
protected transient int pagesPerForce;
+ protected transient IRateLimiter rateLimiter;
+
+ public LSMIndexPageWriteCallbackFactory() {
+ }
+
@Override
public void initialize(INCServiceContext ncCtx, IResource resource) throws HyracksDataException {
INcApplicationContext appCtx = (INcApplicationContext) ncCtx.getApplicationContext();
pagesPerForce = appCtx.getStorageProperties().getDiskForcePages();
+ rateLimiter = appCtx.getDiskWriteRateLimiterProvider().getRateLimiter(ncCtx, resource);
}
@Override
public IPageWriteCallback createPageWriteCallback() throws HyracksDataException {
- return new LSMIndexPageWriteCallback(pagesPerForce);
+ return new LSMIndexPageWriteCallback(rateLimiter, pagesPerForce);
}
@Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexPageWriteCallback.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexPageWriteCallback.java
index 991968f..0ad7033 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexPageWriteCallback.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexPageWriteCallback.java
@@ -22,15 +22,18 @@
import org.apache.hyracks.storage.common.IIndexBulkLoader;
import org.apache.hyracks.storage.common.buffercache.ICachedPage;
import org.apache.hyracks.storage.common.buffercache.IPageWriteCallback;
+import org.apache.hyracks.storage.common.buffercache.IRateLimiter;
public class LSMIndexPageWriteCallback implements IPageWriteCallback {
+ private final IRateLimiter rateLimiter;
private final int pagesPerForce;
private IIndexBulkLoader bulkLoader;
private long totalWrittenPages;
private int totalForces;
- public LSMIndexPageWriteCallback(int pagesPerForce) {
+ public LSMIndexPageWriteCallback(IRateLimiter rateLimiter, int pagesPerForce) {
+ this.rateLimiter = rateLimiter;
this.pagesPerForce = pagesPerForce;
}
@@ -39,10 +42,15 @@
this.bulkLoader = bulkLoader;
}
+ public void beforeWrite(ICachedPage page) throws HyracksDataException {
+ rateLimiter.request(page.getFrameSizeMultiplier());
+ }
+
@Override
public void afterWrite(ICachedPage page) throws HyracksDataException {
totalWrittenPages++;
if (pagesPerForce > 0 && totalWrittenPages % pagesPerForce == 0) {
+ // perform a force
bulkLoader.force();
totalForces++;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml b/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml
index eb453f6..d0dfbf4 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml
@@ -71,5 +71,9 @@
<artifactId>snappy-java</artifactId>
<version>1.1.7.1</version>
</dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FIFOLocalWriter.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FIFOLocalWriter.java
index 5f8bb4a..e74fe59 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FIFOLocalWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FIFOLocalWriter.java
@@ -40,6 +40,7 @@
public void write(ICachedPage page) {
CachedPage cPage = (CachedPage) page;
try {
+ callback.beforeWrite(cPage);
bufferCache.write(cPage);
callback.afterWrite(cPage);
} catch (Exception e) {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IPageWriteCallback.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IPageWriteCallback.java
index 3d2bda9..f4ea6cf 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IPageWriteCallback.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IPageWriteCallback.java
@@ -30,6 +30,14 @@
void initialize(IIndexBulkLoader bulkLoader);
/**
+ * Notify that a page is about to be written
+ *
+ * @param page
+ * @throws HyracksDataException
+ */
+ void beforeWrite(ICachedPage page) throws HyracksDataException;
+
+ /**
* Notify that a page has been written
*
* @param page
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IRateLimiter.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IRateLimiter.java
new file mode 100644
index 0000000..b7433ae
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IRateLimiter.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common.buffercache;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface IRateLimiter {
+
+ void setRate(double ratePerSecond);
+
+ void request(int permits) throws HyracksDataException;
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/NoOpRateLimiter.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/NoOpRateLimiter.java
new file mode 100644
index 0000000..ac0a1a9
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/NoOpRateLimiter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common.buffercache;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class NoOpRateLimiter implements IRateLimiter {
+
+ public static final NoOpRateLimiter INSTANCE = new NoOpRateLimiter();
+
+ private NoOpRateLimiter() {
+ }
+
+ @Override
+ public void setRate(double ratePerSecond) {
+ // no op
+ }
+
+ @Override
+ public void request(int tokens) throws HyracksDataException {
+ // no op
+ }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/SleepRateLimiter.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/SleepRateLimiter.java
new file mode 100644
index 0000000..4d0ca92
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/SleepRateLimiter.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common.buffercache;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+import com.google.common.util.concurrent.RateLimiter;
+
+/**
+ * A wrapper of the RateLimiter implementation from {@link RateLimiter}
+ *
+ */
+public class SleepRateLimiter implements IRateLimiter {
+ /**
+ * Defines the maximum storage capacity of the rate limiter, i.e., the number of permits it stores.
+ * Maybe make configurable in the future
+ */
+ private static final double MAX_BURST_SECONDS = 1.0;
+
+ private final RateLimiter rateLimiterImpl;
+
+ public static IRateLimiter create(long ratePerSecond) {
+ if (ratePerSecond > 0) {
+ return new SleepRateLimiter(ratePerSecond, MAX_BURST_SECONDS);
+ } else {
+ return NoOpRateLimiter.INSTANCE;
+ }
+ }
+
+ public SleepRateLimiter(long ratePerSecond, double maxBurstSeconds) {
+ rateLimiterImpl = RateLimiter.create(ratePerSecond, (long) (maxBurstSeconds * 1000), TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void setRate(double ratePerSecond) {
+ rateLimiterImpl.setRate(ratePerSecond);
+ }
+
+ @Override
+ public void request(int permits) throws HyracksDataException {
+ rateLimiterImpl.acquire(permits);
+ }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreePageWriteCallbackTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreePageWriteCallbackTest.java
index c0dc6b0..4e59ab0 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreePageWriteCallbackTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreePageWriteCallbackTest.java
@@ -42,6 +42,8 @@
import org.apache.hyracks.storage.am.lsm.common.impls.LSMIndexPageWriteCallback;
import org.apache.hyracks.storage.common.IResource;
import org.apache.hyracks.storage.common.buffercache.IPageWriteCallback;
+import org.apache.hyracks.storage.common.buffercache.IRateLimiter;
+import org.apache.hyracks.storage.common.buffercache.SleepRateLimiter;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -55,7 +57,22 @@
private final int PAGES_PER_FORCE = 16;
+ private int pageCounter = 0;
private LSMIndexPageWriteCallback lastCallback = null;
+ private final IRateLimiter testLimiter = new IRateLimiter() {
+ IRateLimiter limiter = SleepRateLimiter.create(100 * 1000);
+
+ @Override
+ public void setRate(double ratePerSecond) {
+
+ }
+
+ @Override
+ public void request(int permits) throws HyracksDataException {
+ limiter.request(permits);
+ pageCounter++;
+ }
+ };
private final ILSMPageWriteCallbackFactory pageWriteCallbackFactory = new ILSMPageWriteCallbackFactory() {
private static final long serialVersionUID = 1L;
@@ -67,7 +84,7 @@
@Override
public IPageWriteCallback createPageWriteCallback() throws HyracksDataException {
- lastCallback = new LSMIndexPageWriteCallback(PAGES_PER_FORCE);
+ lastCallback = new LSMIndexPageWriteCallback(testLimiter, PAGES_PER_FORCE);
return lastCallback;
}
};
@@ -134,6 +151,7 @@
ctx.getIndex().activate();
}
}
+ pageCounter = 0;
ILSMIndexAccessor accessor = (ILSMIndexAccessor) ctx.getIndexAccessor();
ILSMIOOperation mergeOp = accessor.scheduleMerge(((LSMBTree) ctx.getIndex()).getDiskComponents());
mergeOp.addCompleteListener(op -> {
@@ -141,6 +159,7 @@
long numPages = op.getNewComponent().getComponentSize()
/ harness.getDiskBufferCache().getPageSizeWithHeader() - 1;
// we skipped the metadata page for simplicity
+ Assert.assertEquals(numPages, pageCounter);
Assert.assertEquals(numPages / PAGES_PER_FORCE, lastCallback.getTotalForces());
}
});