[ASTERIXDB-3387][STO] Enable Selective caching policy
- user model changes: yes
- storage format changes: no
- interface changes: no
Details:
Allow user to enable/select Selective caching policy
Change-Id: Ia212db8c1673f6b725eb21fe7035a1e20490d1f6
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18291
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index cd52b9d..1d6a0d8 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -33,7 +33,7 @@
import org.apache.asterix.active.ActiveManager;
import org.apache.asterix.app.result.ResultReader;
-import org.apache.asterix.cloud.CloudManagerProvider;
+import org.apache.asterix.cloud.CloudConfigurator;
import org.apache.asterix.cloud.LocalPartitionBootstrapper;
import org.apache.asterix.common.api.IConfigValidator;
import org.apache.asterix.common.api.IConfigValidatorFactory;
@@ -122,6 +122,10 @@
import org.apache.hyracks.storage.common.buffercache.IPageReplacementStrategy;
import org.apache.hyracks.storage.common.buffercache.context.IBufferCacheReadContext;
import org.apache.hyracks.storage.common.buffercache.context.read.DefaultBufferCacheReadContextProvider;
+import org.apache.hyracks.storage.common.disk.IDiskCacheMonitoringService;
+import org.apache.hyracks.storage.common.disk.IDiskResourceCacheLockNotifier;
+import org.apache.hyracks.storage.common.disk.NoOpDiskCacheMonitoringService;
+import org.apache.hyracks.storage.common.disk.NoOpDiskResourceCacheLockNotifier;
import org.apache.hyracks.storage.common.file.BufferedFileHandle;
import org.apache.hyracks.storage.common.file.FileMapManager;
import org.apache.hyracks.storage.common.file.ILocalResourceRepositoryFactory;
@@ -180,6 +184,7 @@
private IPartitionBootstrapper partitionBootstrapper;
private final INamespacePathResolver namespacePathResolver;
private final INamespaceResolver namespaceResolver;
+ private IDiskCacheMonitoringService diskCacheService;
public NCAppRuntimeContext(INCServiceContext ncServiceContext, NCExtensionManager extensionManager,
IPropertiesFactory propertiesFactory, INamespaceResolver namespaceResolver,
@@ -211,16 +216,26 @@
IConfigValidatorFactory configValidatorFactory, IReplicationStrategyFactory replicationStrategyFactory,
boolean initialRun) throws IOException {
ioManager = getServiceContext().getIoManager();
- IDiskCachedPageAllocator pageAllocator = DefaultDiskCachedPageAllocator.INSTANCE;
- IBufferCacheReadContext defaultContext = DefaultBufferCacheReadContextProvider.DEFAULT;
+ CloudConfigurator cloudConfigurator;
+ IDiskResourceCacheLockNotifier lockNotifier;
+ IDiskCachedPageAllocator pageAllocator;
+ IBufferCacheReadContext defaultContext;
if (isCloudDeployment()) {
- persistenceIOManager =
- CloudManagerProvider.createIOManager(cloudProperties, ioManager, namespacePathResolver);
- partitionBootstrapper = CloudManagerProvider.getCloudPartitionBootstrapper(persistenceIOManager);
+ cloudConfigurator = CloudConfigurator.of(cloudProperties, ioManager, namespacePathResolver);
+ persistenceIOManager = cloudConfigurator.getCloudIoManager();
+ partitionBootstrapper = cloudConfigurator.getPartitionBootstrapper();
+ lockNotifier = cloudConfigurator.getLockNotifier();
+ pageAllocator = cloudConfigurator.getPageAllocator();
+ defaultContext = cloudConfigurator.getDefaultContext();
} else {
+ cloudConfigurator = null;
persistenceIOManager = ioManager;
partitionBootstrapper = new LocalPartitionBootstrapper(ioManager);
+ lockNotifier = NoOpDiskResourceCacheLockNotifier.INSTANCE;
+ pageAllocator = DefaultDiskCachedPageAllocator.INSTANCE;
+ defaultContext = DefaultBufferCacheReadContextProvider.DEFAULT;
}
+
int ioQueueLen = getServiceContext().getAppConfig().getInt(NCConfig.Option.IO_QUEUE_SIZE);
threadExecutor =
MaintainedThreadNameExecutorService.newCachedThreadPool(getServiceContext().getThreadFactory());
@@ -261,9 +276,8 @@
// Must start vbc now instead of by life cycle component manager (lccm) because lccm happens after
// the metadata bootstrap task
((ILifeCycleComponent) virtualBufferCache).start();
- datasetLifecycleManager =
- new DatasetLifecycleManager(storageProperties, localResourceRepository, txnSubsystem.getLogManager(),
- virtualBufferCache, indexCheckpointManagerProvider, ioManager.getIODevices().size());
+ datasetLifecycleManager = new DatasetLifecycleManager(storageProperties, localResourceRepository,
+ txnSubsystem.getLogManager(), virtualBufferCache, indexCheckpointManagerProvider, lockNotifier);
localResourceRepository.setDatasetLifecycleManager(datasetLifecycleManager);
final String nodeId = getServiceContext().getNodeId();
final Set<Integer> nodePartitions = metadataProperties.getNodePartitions(nodeId);
@@ -300,6 +314,15 @@
fileInfoMap, defaultContext);
}
+ if (cloudConfigurator != null) {
+ diskCacheService =
+ cloudConfigurator.createDiskCacheMonitoringService(getServiceContext(), bufferCache, fileInfoMap);
+ } else {
+ diskCacheService = NoOpDiskCacheMonitoringService.INSTANCE;
+ }
+
+ diskCacheService.start();
+
NodeControllerService ncs = (NodeControllerService) getServiceContext().getControllerService();
FileReference appDir =
ioManager.resolveAbsolutePath(getServiceContext().getServerCtx().getAppDir().getAbsolutePath());
@@ -351,6 +374,7 @@
@Override
public synchronized void preStop() throws Exception {
activeManager.shutdown();
+ diskCacheService.stop();
if (metadataNodeStub != null) {
unexportMetadataNodeStub();
}
@@ -695,6 +719,11 @@
}
@Override
+ public IDiskCacheMonitoringService getDiskCacheService() {
+ return diskCacheService;
+ }
+
+ @Override
public boolean isCloudDeployment() {
return ncServiceContext.getAppConfig().getBoolean(CLOUD_DEPLOYMENT);
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CloudToLocalStorageCachingTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CloudToLocalStorageCachingTask.java
index 6b3257d..547bc8b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CloudToLocalStorageCachingTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CloudToLocalStorageCachingTask.java
@@ -63,6 +63,8 @@
IPartitionBootstrapper bootstrapper = applicationContext.getPartitionBootstrapper();
bootstrapper.bootstrap(storagePartitions, lrs.getOnDiskPartitions(), metadataNode, metadataPartitionId, cleanup,
latestCheckpoint == null);
+ // Report all local resources
+ applicationContext.getDiskCacheService().reportLocalResources(lrs.loadAndGetAllResources());
}
@Override
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index ba3ffcd..cb8b8e5 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -61,7 +61,7 @@
import org.apache.asterix.app.io.PersistedResourceRegistry;
import org.apache.asterix.app.replication.NcLifecycleCoordinator;
import org.apache.asterix.app.result.JobResultCallback;
-import org.apache.asterix.cloud.CloudManagerProvider;
+import org.apache.asterix.cloud.CloudConfigurator;
import org.apache.asterix.common.api.AsterixThreadFactory;
import org.apache.asterix.common.api.IConfigValidatorFactory;
import org.apache.asterix.common.api.INamespacePathResolver;
@@ -186,8 +186,7 @@
CloudProperties cloudProperties = null;
if (cloudDeployment) {
cloudProperties = new CloudProperties(PropertiesAccessor.getInstance(ccServiceCtx.getAppConfig()));
- ioManager =
- (IOManager) CloudManagerProvider.createIOManager(cloudProperties, ioManager, namespacePathResolver);
+ ioManager = CloudConfigurator.createIOManager(ioManager, cloudProperties, namespacePathResolver);
}
IGlobalTxManager globalTxManager = createGlobalTxManager(ioManager);
appCtx = createApplicationContext(null, globalRecoveryManager, lifecycleCoordinator, Receptionist::new,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
index 6888acc..4c06994 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
@@ -11,13 +11,19 @@
"azure.request.timeout" : 120,
"cloud.deployment" : false,
"cloud.profiler.log.interval" : 0,
+ "cloud.storage.allocation.percentage" : 0.8,
"cloud.storage.anonymous.auth" : false,
"cloud.storage.bucket" : "",
"cloud.storage.cache.policy" : "lazy",
+ "cloud.storage.debug.mode.enabled" : false,
+ "cloud.storage.debug.sweep.threshold.size" : 1073741824,
+ "cloud.storage.disk.monitor.interval" : 60,
"cloud.storage.endpoint" : "",
+ "cloud.storage.index.inactive.duration.threshold" : 6,
"cloud.storage.prefix" : "",
"cloud.storage.region" : "",
"cloud.storage.scheme" : "",
+ "cloud.storage.sweep.threshold.percentage" : 0.9,
"compiler\.arrayindex" : true,
"compiler.batch.lookup" : true,
"compiler.cbo" : true,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
index 31daf99..1d64ade 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
@@ -11,13 +11,19 @@
"azure.request.timeout" : 120,
"cloud.deployment" : false,
"cloud.profiler.log.interval" : 0,
+ "cloud.storage.allocation.percentage" : 0.8,
"cloud.storage.anonymous.auth" : false,
"cloud.storage.bucket" : "",
"cloud.storage.cache.policy" : "lazy",
+ "cloud.storage.debug.mode.enabled" : false,
+ "cloud.storage.debug.sweep.threshold.size" : 1073741824,
+ "cloud.storage.disk.monitor.interval" : 60,
"cloud.storage.endpoint" : "",
+ "cloud.storage.index.inactive.duration.threshold" : 6,
"cloud.storage.prefix" : "",
"cloud.storage.region" : "",
"cloud.storage.scheme" : "",
+ "cloud.storage.sweep.threshold.percentage" : 0.9,
"compiler\.arrayindex" : true,
"compiler.batch.lookup" : true,
"compiler.cbo" : true,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
index a47b3be..aff2c77 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
@@ -11,13 +11,19 @@
"azure.request.timeout" : 120,
"cloud.deployment" : false,
"cloud.profiler.log.interval" : 0,
+ "cloud.storage.allocation.percentage" : 0.8,
"cloud.storage.anonymous.auth" : false,
"cloud.storage.bucket" : "",
"cloud.storage.cache.policy" : "lazy",
+ "cloud.storage.debug.mode.enabled" : false,
+ "cloud.storage.debug.sweep.threshold.size" : 1073741824,
+ "cloud.storage.disk.monitor.interval" : 60,
"cloud.storage.endpoint" : "",
+ "cloud.storage.index.inactive.duration.threshold" : 6,
"cloud.storage.prefix" : "",
"cloud.storage.region" : "",
"cloud.storage.scheme" : "",
+ "cloud.storage.sweep.threshold.percentage" : 0.9,
"compiler\.arrayindex" : true,
"compiler.batch.lookup" : true,
"compiler.cbo" : true,
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java
new file mode 100644
index 0000000..a0c1fbb
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud;
+
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.asterix.common.api.INamespacePathResolver;
+import org.apache.asterix.common.cloud.CloudCachePolicy;
+import org.apache.asterix.common.cloud.IPartitionBootstrapper;
+import org.apache.asterix.common.config.CloudProperties;
+import org.apache.asterix.common.utils.StorageConstants;
+import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.config.IApplicationConfig;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.cloud.buffercache.context.DefaultCloudReadContext;
+import org.apache.hyracks.cloud.buffercache.page.CloudDiskCachedPageAllocator;
+import org.apache.hyracks.cloud.cache.service.CloudDiskCacheMonitoringAndPrefetchingService;
+import org.apache.hyracks.cloud.cache.service.CloudDiskResourceCacheLockNotifier;
+import org.apache.hyracks.cloud.cache.service.DiskCacheSweeperThread;
+import org.apache.hyracks.cloud.filesystem.PhysicalDrive;
+import org.apache.hyracks.control.common.controllers.NCConfig;
+import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.hyracks.storage.common.buffercache.BufferCache;
+import org.apache.hyracks.storage.common.buffercache.DefaultDiskCachedPageAllocator;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.buffercache.IDiskCachedPageAllocator;
+import org.apache.hyracks.storage.common.buffercache.context.IBufferCacheReadContext;
+import org.apache.hyracks.storage.common.buffercache.context.read.DefaultBufferCacheReadContextProvider;
+import org.apache.hyracks.storage.common.disk.DummyPhysicalDrive;
+import org.apache.hyracks.storage.common.disk.IDiskCacheMonitoringService;
+import org.apache.hyracks.storage.common.disk.IDiskResourceCacheLockNotifier;
+import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
+import org.apache.hyracks.storage.common.disk.NoOpDiskCacheMonitoringService;
+import org.apache.hyracks.storage.common.disk.NoOpDiskResourceCacheLockNotifier;
+import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+
+public final class CloudConfigurator {
+ private final CloudProperties cloudProperties;
+ private final IOManager localIoManager;
+ private final AbstractCloudIOManager cloudIOManager;
+ private final IPhysicalDrive physicalDrive;
+ private final IDiskResourceCacheLockNotifier lockNotifier;
+ private final IDiskCachedPageAllocator pageAllocator;
+ private final IBufferCacheReadContext defaultContext;
+ private final boolean diskCacheManagerRequired;
+ private final long diskCacheMonitoringInterval;
+
+ private CloudConfigurator(CloudProperties cloudProperties, IIOManager ioManager,
+ INamespacePathResolver nsPathResolver) throws HyracksDataException {
+ this.cloudProperties = cloudProperties;
+ localIoManager = (IOManager) ioManager;
+ diskCacheManagerRequired = cloudProperties.getCloudCachePolicy() == CloudCachePolicy.SELECTIVE;
+ cloudIOManager = createIOManager(ioManager, cloudProperties, nsPathResolver);
+ physicalDrive = createPhysicalDrive(diskCacheManagerRequired, cloudProperties, ioManager);
+ lockNotifier = createLockNotifier(diskCacheManagerRequired);
+ pageAllocator = createPageAllocator(diskCacheManagerRequired);
+ defaultContext = createDefaultBufferCachePageOpContext(diskCacheManagerRequired, physicalDrive);
+ diskCacheMonitoringInterval = cloudProperties.getStorageDiskMonitorInterval();
+ }
+
+ public IPartitionBootstrapper getPartitionBootstrapper() {
+ return cloudIOManager;
+ }
+
+ public IIOManager getCloudIoManager() {
+ return cloudIOManager;
+ }
+
+ public IDiskResourceCacheLockNotifier getLockNotifier() {
+ return lockNotifier;
+ }
+
+ public IDiskCachedPageAllocator getPageAllocator() {
+ return pageAllocator;
+ }
+
+ public IBufferCacheReadContext getDefaultContext() {
+ return defaultContext;
+ }
+
+ public IDiskCacheMonitoringService createDiskCacheMonitoringService(INCServiceContext serviceContext,
+ IBufferCache bufferCache, Map<Integer, BufferedFileHandle> fileInfoMap) {
+ if (!diskCacheManagerRequired) {
+ return NoOpDiskCacheMonitoringService.INSTANCE;
+ }
+
+ CloudDiskResourceCacheLockNotifier resourceCacheManager = (CloudDiskResourceCacheLockNotifier) lockNotifier;
+ BufferCache diskBufferCache = (BufferCache) bufferCache;
+ int numOfIoDevices = localIoManager.getIODevices().size();
+ IApplicationConfig appConfig = serviceContext.getAppConfig();
+ int ioParallelism = appConfig.getInt(NCConfig.Option.IO_WORKERS_PER_PARTITION);
+ int sweepQueueSize = appConfig.getInt(NCConfig.Option.IO_QUEUE_SIZE);
+ int numOfSweepThreads = ioParallelism * numOfIoDevices;
+ // Ensure at least each sweep thread has one entry in the queue
+ int maxSweepQueueSize = Math.max(numOfSweepThreads, sweepQueueSize);
+ long inactiveThreshold = cloudProperties.getStorageIndexInactiveDurationThreshold();
+ // +1 for the monitorThread
+ ExecutorService executor = Executors.newFixedThreadPool(numOfSweepThreads + 1);
+ DiskCacheSweeperThread monitorThread = new DiskCacheSweeperThread(executor, diskCacheMonitoringInterval,
+ resourceCacheManager, cloudIOManager, numOfSweepThreads, maxSweepQueueSize, physicalDrive,
+ diskBufferCache, fileInfoMap, inactiveThreshold);
+
+ IDiskCacheMonitoringService diskCacheService =
+ new CloudDiskCacheMonitoringAndPrefetchingService(executor, physicalDrive, monitorThread);
+ localIoManager.setSpaceMaker(monitorThread);
+ return diskCacheService;
+ }
+
+ public static CloudConfigurator of(CloudProperties cloudProperties, IIOManager ioManager,
+ INamespacePathResolver nsPathResolver) throws HyracksDataException {
+ return new CloudConfigurator(cloudProperties, ioManager, nsPathResolver);
+ }
+
+ public static AbstractCloudIOManager createIOManager(IIOManager ioManager, CloudProperties cloudProperties,
+ INamespacePathResolver nsPathResolver) throws HyracksDataException {
+ IOManager localIoManager = (IOManager) ioManager;
+ CloudCachePolicy policy = cloudProperties.getCloudCachePolicy();
+ if (policy == CloudCachePolicy.EAGER) {
+ return new EagerCloudIOManager(localIoManager, cloudProperties, nsPathResolver);
+ }
+
+ boolean selective = policy == CloudCachePolicy.SELECTIVE;
+ return new LazyCloudIOManager(localIoManager, cloudProperties, nsPathResolver, selective);
+ }
+
+ private static IPhysicalDrive createPhysicalDrive(boolean diskCacheManagerRequired, CloudProperties cloudProperties,
+ IIOManager ioManager) throws HyracksDataException {
+ if (diskCacheManagerRequired) {
+ double storagePercentage = cloudProperties.getStorageAllocationPercentage();
+ double pressureThreshold = cloudProperties.getStorageSweepThresholdPercentage();
+ long pressureDebugSize = cloudProperties.getStorageDebugSweepThresholdSize();
+ return new PhysicalDrive(ioManager.getIODevices(), pressureThreshold, storagePercentage, pressureDebugSize);
+ }
+
+ return DummyPhysicalDrive.INSTANCE;
+ }
+
+ private static IDiskResourceCacheLockNotifier createLockNotifier(boolean diskCacheManagerRequired) {
+ if (diskCacheManagerRequired) {
+ return new CloudDiskResourceCacheLockNotifier(StorageConstants.METADATA_PARTITION);
+ }
+
+ return NoOpDiskResourceCacheLockNotifier.INSTANCE;
+ }
+
+ private static IDiskCachedPageAllocator createPageAllocator(boolean diskCacheManagerRequired) {
+ if (diskCacheManagerRequired) {
+ return CloudDiskCachedPageAllocator.INSTANCE;
+ }
+ return DefaultDiskCachedPageAllocator.INSTANCE;
+ }
+
+ private static IBufferCacheReadContext createDefaultBufferCachePageOpContext(boolean diskCacheManagerRequired,
+ IPhysicalDrive drive) {
+ if (diskCacheManagerRequired) {
+ return new DefaultCloudReadContext(drive);
+ }
+
+ return DefaultBufferCacheReadContextProvider.DEFAULT;
+ }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
index 04979e6..c993cd3 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
@@ -102,7 +102,7 @@
}
@Override
- public void evict(FileReference directory) {
+ public void evict(String resourcePath) {
throw new UnsupportedOperationException("evict is not supported with Eager caching");
}
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
index 612237a..afe0878 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
@@ -68,11 +68,11 @@
private ILazyAccessor accessor;
public LazyCloudIOManager(IOManager ioManager, CloudProperties cloudProperties,
- INamespacePathResolver nsPathResolver, boolean replaceableAccessor) throws HyracksDataException {
+ INamespacePathResolver nsPathResolver, boolean selective) throws HyracksDataException {
super(ioManager, cloudProperties, nsPathResolver);
accessor = new InitialCloudAccessor(cloudClient, bucket, localIoManager);
puncher = HolePuncherProvider.get(this, cloudProperties, writeBufferProvider);
- if (replaceableAccessor) {
+ if (selective) {
replacer = InitialCloudAccessor.NO_OP_REPLACER;
} else {
replacer = () -> {
@@ -208,8 +208,8 @@
}
@Override
- public void evict(FileReference directory) throws HyracksDataException {
- accessor.doEvict(directory);
+ public void evict(String resourcePath) throws HyracksDataException {
+ accessor.doEvict(resolve(resourcePath));
}
private List<FileReference> resolve(Set<CloudFile> cloudFiles) throws HyracksDataException {
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/ParallelCacher.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/ParallelCacher.java
index 7539aa7..a3079e6 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/ParallelCacher.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/ParallelCacher.java
@@ -180,8 +180,8 @@
@Override
public synchronized void add(Collection<FileReference> files) {
LOGGER.info("Uncache {}", files);
+ // We only can 'uncache' data files
uncachedDataFiles.putAll(getFiles(files, DATA_FILTER));
- uncachedMetadataFiles.putAll(getFiles(files, METADATA_FILTER));
}
@Override
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/SelectiveCloudAccessor.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/SelectiveCloudAccessor.java
index 934468a..218015d 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/SelectiveCloudAccessor.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/SelectiveCloudAccessor.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.cloud.lazy.accessor;
+import static org.apache.asterix.cloud.util.CloudFileUtil.DATA_FILTER;
+
import java.util.Collection;
import java.util.Set;
@@ -50,10 +52,21 @@
throw new IllegalStateException(directory + " is not a directory");
}
- // TODO only delete data files?
- Collection<FileReference> uncachedFiles = UncachedFileReference.toUncached(localIoManager.list(directory));
+ // Get a list of all data files
+ Collection<FileReference> files = localIoManager.list(directory, DATA_FILTER);
+ if (files.isEmpty()) {
+ // Nothing to evict
+ return;
+ }
+
+ // Convert file references to uncached ones
+ Collection<FileReference> uncachedFiles = UncachedFileReference.toUncached(files);
+ // Add all data files to the cacher to indicate they are in a 'cacheable' state (i.e., not downloaded)
cacher.add(uncachedFiles);
- localIoManager.delete(directory);
+ // Delete all data files from the local drive
+ for (FileReference uncachedFile : uncachedFiles) {
+ localIoManager.delete(uncachedFile);
+ }
}
@Override
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/filesystem/HolePuncherProvider.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/filesystem/HolePuncherProvider.java
index 4ea6c46..406f2f3 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/filesystem/HolePuncherProvider.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/filesystem/HolePuncherProvider.java
@@ -18,17 +18,21 @@
*/
package org.apache.asterix.cloud.lazy.filesystem;
+import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.asterix.cloud.AbstractCloudIOManager;
+import org.apache.asterix.cloud.CloudFileHandle;
import org.apache.asterix.cloud.IWriteBufferProvider;
import org.apache.asterix.common.cloud.CloudCachePolicy;
import org.apache.asterix.common.config.CloudProperties;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.IFileHandle;
+import org.apache.hyracks.cloud.filesystem.FileSystemOperationDispatcherUtil;
public final class HolePuncherProvider {
private static final IHolePuncher UNSUPPORTED = HolePuncherProvider::unsupported;
+ private static final IHolePuncher LINUX = HolePuncherProvider::linuxPunchHole;
private HolePuncherProvider() {
}
@@ -39,13 +43,35 @@
return UNSUPPORTED;
}
- return new DebugHolePuncher(cloudIOManager, bufferProvider);
+ if (FileSystemOperationDispatcherUtil.isLinux()) {
+ return LINUX;
+ } else if (cloudProperties.isStorageDebugModeEnabled()) {
+ // Running on debug mode on a non-Linux box
+ return new DebugHolePuncher(cloudIOManager, bufferProvider);
+ }
+
+ throw new UnsupportedOperationException(
+ "Hole puncher is not supported using " + FileSystemOperationDispatcherUtil.getOSName());
}
private static int unsupported(IFileHandle fileHandle, long offset, long length) {
throw new UnsupportedOperationException("punchHole is not supported");
}
+ private static int linuxPunchHole(IFileHandle fileHandle, long offset, long length) throws HyracksDataException {
+ CloudFileHandle cloudFileHandle = (CloudFileHandle) fileHandle;
+ int fileDescriptor = cloudFileHandle.getFileDescriptor();
+ int blockSize = cloudFileHandle.getBlockSize();
+ int freedSpace = FileSystemOperationDispatcherUtil.punchHole(fileDescriptor, offset, length, blockSize);
+ try {
+ cloudFileHandle.getFileChannel().force(false);
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+
+ return freedSpace;
+ }
+
private static final class DebugHolePuncher implements IHolePuncher {
private final AbstractCloudIOManager cloudIOManager;
private final IWriteBufferProvider bufferProvider;
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
index 8e6becf..888cea1 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
@@ -43,6 +43,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
import org.apache.hyracks.storage.common.ILocalResourceRepository;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.disk.IDiskCacheMonitoringService;
import org.apache.hyracks.storage.common.file.IResourceIdFactory;
import org.apache.hyracks.util.cache.ICacheManager;
@@ -152,4 +153,9 @@
* @return the disk write rate limiter provider
*/
IDiskWriteRateLimiterProvider getDiskWriteRateLimiterProvider();
+
+ /**
+ * @return disk cache service
+ */
+ IDiskCacheMonitoringService getDiskCacheService();
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
index 4b7649a8..5c23f5c 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
@@ -19,8 +19,12 @@
package org.apache.asterix.common.config;
import static org.apache.hyracks.control.common.config.OptionTypes.BOOLEAN;
+import static org.apache.hyracks.control.common.config.OptionTypes.DOUBLE;
+import static org.apache.hyracks.control.common.config.OptionTypes.LONG_BYTE_UNIT;
import static org.apache.hyracks.control.common.config.OptionTypes.NONNEGATIVE_INTEGER;
+import static org.apache.hyracks.control.common.config.OptionTypes.POSITIVE_INTEGER;
import static org.apache.hyracks.control.common.config.OptionTypes.STRING;
+import static org.apache.hyracks.util.StorageUtil.StorageUnit.GIGABYTE;
import java.util.concurrent.TimeUnit;
@@ -28,6 +32,7 @@
import org.apache.hyracks.api.config.IOption;
import org.apache.hyracks.api.config.IOptionType;
import org.apache.hyracks.api.config.Section;
+import org.apache.hyracks.util.StorageUtil;
public class CloudProperties extends AbstractProperties {
@@ -43,6 +48,14 @@
CLOUD_STORAGE_ENDPOINT(STRING, ""),
CLOUD_STORAGE_ANONYMOUS_AUTH(BOOLEAN, false),
CLOUD_STORAGE_CACHE_POLICY(STRING, "lazy"),
+ // 80% of the total disk space
+ CLOUD_STORAGE_ALLOCATION_PERCENTAGE(DOUBLE, 0.8d),
+ // 90% of the allocated space for storage (i.e., 90% of the 80% of the total disk space)
+ CLOUD_STORAGE_SWEEP_THRESHOLD_PERCENTAGE(DOUBLE, 0.9d),
+ CLOUD_STORAGE_DISK_MONITOR_INTERVAL(POSITIVE_INTEGER, 60),
+ CLOUD_STORAGE_INDEX_INACTIVE_DURATION_THRESHOLD(POSITIVE_INTEGER, 6),
+ CLOUD_STORAGE_DEBUG_MODE_ENABLED(BOOLEAN, false),
+ CLOUD_STORAGE_DEBUG_SWEEP_THRESHOLD_SIZE(LONG_BYTE_UNIT, StorageUtil.getLongSizeInBytes(1, GIGABYTE)),
CLOUD_PROFILER_LOG_INTERVAL(NONNEGATIVE_INTEGER, 0);
private final IOptionType interpreter;
@@ -63,6 +76,12 @@
case CLOUD_STORAGE_ENDPOINT:
case CLOUD_STORAGE_ANONYMOUS_AUTH:
case CLOUD_STORAGE_CACHE_POLICY:
+ case CLOUD_STORAGE_ALLOCATION_PERCENTAGE:
+ case CLOUD_STORAGE_SWEEP_THRESHOLD_PERCENTAGE:
+ case CLOUD_STORAGE_DISK_MONITOR_INTERVAL:
+ case CLOUD_STORAGE_INDEX_INACTIVE_DURATION_THRESHOLD:
+ case CLOUD_STORAGE_DEBUG_SWEEP_THRESHOLD_SIZE:
+ case CLOUD_STORAGE_DEBUG_MODE_ENABLED:
case CLOUD_PROFILER_LOG_INTERVAL:
return Section.COMMON;
default:
@@ -86,9 +105,36 @@
case CLOUD_STORAGE_ANONYMOUS_AUTH:
return "Indicates whether or not anonymous auth should be used for the cloud storage";
case CLOUD_STORAGE_CACHE_POLICY:
- return "The caching policy (either eager or lazy). 'Eager' caching will download all partitions"
- + " upon booting, whereas lazy caching will download a file upon request to open it."
+ return "The caching policy (either eager, lazy or selective). 'eager' caching will download"
+ + "all partitions upon booting, whereas 'lazy' caching will download a file upon"
+ + " request to open it. 'selective' caching will act as the 'lazy' policy; however, "
+ + " it allows to use the local disk(s) as a cache, where pages and indexes can be "
+ + " cached or evicted according to the pressure imposed on the local disks."
+ " (default: 'lazy')";
+ case CLOUD_STORAGE_ALLOCATION_PERCENTAGE:
+ return "The percentage of the total disk space that should be allocated for data storage when the"
+ + " 'selective' caching policy is used. The remaining will act as a buffer for "
+ + " query workspace (i.e., for query operations that require spilling to disk)."
+ + " (default: 80% of the total disk space)";
+ case CLOUD_STORAGE_SWEEP_THRESHOLD_PERCENTAGE:
+ return "The percentage of the used storage space at which the disk sweeper starts freeing space by"
+ + " punching holes in stored indexes or by evicting them entirely, "
+ + " when the 'selective' caching policy is used."
+ + " (default: 90% of the allocated space for storage)";
+ case CLOUD_STORAGE_DISK_MONITOR_INTERVAL:
+ return "The disk monitoring interval time (in seconds): determines how often the system"
+ + " checks for pressure on disk space when using the 'selective' caching policy."
+ + " (default : 60 seconds)";
+ case CLOUD_STORAGE_INDEX_INACTIVE_DURATION_THRESHOLD:
+ return "The duration in minutes to consider an index is inactive. (default: 360 or 6 hours)";
+ case CLOUD_STORAGE_DEBUG_MODE_ENABLED:
+ return "Whether or not the debug mode is enabled when using the 'selective' caching policy."
+ + "(default: false)";
+ case CLOUD_STORAGE_DEBUG_SWEEP_THRESHOLD_SIZE:
+ return "For debugging only. Pressure size will be the current used space + the additional bytes"
+ + " provided by this configuration option instead of using "
+ + " CLOUD_STORAGE_SWEEP_THRESHOLD_PERCENTAGE."
+ + " (default: 0. I.e., CLOUD_STORAGE_SWEEP_THRESHOLD_PERCENTAGE will be used by default)";
case CLOUD_PROFILER_LOG_INTERVAL:
return "The waiting time (in minutes) to log cloud request statistics (default: 0, which means"
+ " the profiler is disabled by default). The minimum is 1 minute."
@@ -138,6 +184,31 @@
return CloudCachePolicy.fromName(accessor.getString(Option.CLOUD_STORAGE_CACHE_POLICY));
}
+ public double getStorageAllocationPercentage() {
+ return accessor.getDouble(Option.CLOUD_STORAGE_ALLOCATION_PERCENTAGE);
+ }
+
+ public double getStorageSweepThresholdPercentage() {
+ return accessor.getDouble(Option.CLOUD_STORAGE_SWEEP_THRESHOLD_PERCENTAGE);
+ }
+
+ public int getStorageDiskMonitorInterval() {
+ return accessor.getInt(Option.CLOUD_STORAGE_DISK_MONITOR_INTERVAL);
+ }
+
+ public long getStorageIndexInactiveDurationThreshold() {
+ int minutes = accessor.getInt(Option.CLOUD_STORAGE_INDEX_INACTIVE_DURATION_THRESHOLD);
+ return TimeUnit.MINUTES.toNanos(minutes);
+ }
+
+ public boolean isStorageDebugModeEnabled() {
+ return accessor.getBoolean(Option.CLOUD_STORAGE_DEBUG_MODE_ENABLED);
+ }
+
+ public long getStorageDebugSweepThresholdSize() {
+ return isStorageDebugModeEnabled() ? accessor.getLong(Option.CLOUD_STORAGE_DEBUG_SWEEP_THRESHOLD_SIZE) : 0L;
+ }
+
public long getProfilerLogInterval() {
long interval = TimeUnit.MINUTES.toNanos(accessor.getInt(Option.CLOUD_PROFILER_LOG_INTERVAL));
return interval == 0 ? 0 : Math.max(interval, TimeUnit.MINUTES.toNanos(1));
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 1a45155..07801a9 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -59,6 +59,7 @@
import org.apache.hyracks.storage.common.LocalResource;
import org.apache.hyracks.storage.common.buffercache.IRateLimiter;
import org.apache.hyracks.storage.common.buffercache.SleepRateLimiter;
+import org.apache.hyracks.storage.common.disk.IDiskResourceCacheLockNotifier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -71,6 +72,7 @@
private final IVirtualBufferCache vbc;
private final ILogManager logManager;
private final LogRecord waitLog;
+ private final IDiskResourceCacheLockNotifier lockNotifier;
private volatile boolean stopped = false;
private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
// all LSM-trees share the same virtual buffer cache list
@@ -78,7 +80,8 @@
public DatasetLifecycleManager(StorageProperties storageProperties, ILocalResourceRepository resourceRepository,
ILogManager logManager, IVirtualBufferCache vbc,
- IIndexCheckpointManagerProvider indexCheckpointManagerProvider, int numPartitions) {
+ IIndexCheckpointManagerProvider indexCheckpointManagerProvider,
+ IDiskResourceCacheLockNotifier lockNotifier) {
this.logManager = logManager;
this.storageProperties = storageProperties;
this.resourceRepository = resourceRepository;
@@ -89,6 +92,7 @@
vbcs.add(vbc);
}
this.indexCheckpointManagerProvider = indexCheckpointManagerProvider;
+ this.lockNotifier = lockNotifier;
waitLog = new LogRecord();
waitLog.setLogType(LogType.WAIT_FOR_FLUSHES);
waitLog.computeAndSetLogSize();
@@ -122,6 +126,7 @@
datasetResource = getDatasetLifecycle(did);
}
datasetResource.register(resource, (ILSMIndex) index);
+ lockNotifier.onRegister(resource, index, datasetResource.getIndexInfo(resource.getId()).getPartition());
}
private int getDIDfromResourcePath(String resourcePath) throws HyracksDataException {
@@ -145,7 +150,6 @@
validateDatasetLifecycleManagerState();
int did = getDIDfromResourcePath(resourcePath);
long resourceID = getResourceIDfromResourcePath(resourcePath);
-
DatasetResource dsr = datasets.get(did);
IndexInfo iInfo = dsr == null ? null : dsr.getIndexInfo(resourceID);
@@ -153,6 +157,7 @@
throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST, resourcePath);
}
+ lockNotifier.onUnregister(resourceID);
PrimaryIndexOperationTracker opTracker = dsr.getOpTracker(iInfo.getPartition());
if (iInfo.getReferenceCount() != 0 || (opTracker != null && opTracker.getNumActiveOperations() != 0)) {
if (LOGGER.isErrorEnabled()) {
@@ -190,6 +195,9 @@
int did = getDIDfromResourcePath(resourcePath);
long resourceID = getResourceIDfromResourcePath(resourcePath);
+ // Notify first before opening a resource
+ lockNotifier.onOpen(resourceID);
+
DatasetResource dsr = datasets.get(did);
DatasetInfo dsInfo = dsr.getDatasetInfo();
if (dsInfo == null || !dsInfo.isRegistered()) {
@@ -253,6 +261,7 @@
if (iInfo == null) {
throw HyracksDataException.create(ErrorCode.NO_INDEX_FOUND_WITH_RESOURCE_ID, resourceID);
}
+ lockNotifier.onClose(resourceID);
} finally {
// Regardless of what exception is thrown in the try-block (e.g., line 279),
// we have to un-touch the index and dataset.
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RuntimeComponentsProvider.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RuntimeComponentsProvider.java
index 04f594e..620417b 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RuntimeComponentsProvider.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RuntimeComponentsProvider.java
@@ -30,6 +30,7 @@
import org.apache.hyracks.storage.common.ILocalResourceRepository;
import org.apache.hyracks.storage.common.IStorageManager;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.disk.IDiskCacheMonitoringService;
import org.apache.hyracks.storage.common.file.IResourceIdFactory;
import com.fasterxml.jackson.databind.JsonNode;
@@ -45,32 +46,32 @@
@Override
public ILSMIOOperationScheduler getIoScheduler(INCServiceContext ctx) {
- return ((INcApplicationContext) ctx.getApplicationContext()).getLSMIOScheduler();
+ return getAppCtx(ctx).getLSMIOScheduler();
}
@Override
public IIOManager getIoManager(INCServiceContext ctx) {
- return ((INcApplicationContext) ctx.getApplicationContext()).getPersistenceIoManager();
+ return getAppCtx(ctx).getPersistenceIoManager();
}
@Override
public IBufferCache getBufferCache(INCServiceContext ctx) {
- return ((INcApplicationContext) ctx.getApplicationContext()).getBufferCache();
+ return getAppCtx(ctx).getBufferCache();
}
@Override
public ILocalResourceRepository getLocalResourceRepository(INCServiceContext ctx) {
- return ((INcApplicationContext) ctx.getApplicationContext()).getLocalResourceRepository();
+ return getAppCtx(ctx).getLocalResourceRepository();
}
@Override
public IDatasetLifecycleManager getLifecycleManager(INCServiceContext ctx) {
- return ((INcApplicationContext) ctx.getApplicationContext()).getDatasetLifecycleManager();
+ return getAppCtx(ctx).getDatasetLifecycleManager();
}
@Override
public IResourceIdFactory getResourceIdFactory(INCServiceContext ctx) {
- return ((INcApplicationContext) ctx.getApplicationContext()).getResourceIdFactory();
+ return getAppCtx(ctx).getResourceIdFactory();
}
@Override
@@ -78,6 +79,16 @@
return registry.getClassIdentifier(getClass(), serialVersionUID);
}
+ @Override
+ public IDiskCacheMonitoringService getDiskCacheMonitoringService(INCServiceContext ctx) {
+ return getAppCtx(ctx).getDiskCacheService();
+
+ }
+
+ private INcApplicationContext getAppCtx(INCServiceContext ctx) {
+ return ((INcApplicationContext) ctx.getApplicationContext());
+ }
+
@SuppressWarnings("squid:S1172") // unused parameter
public static IJsonSerializable fromJson(IPersistedResourceRegistry registry, JsonNode json) {
return RUNTIME_PROVIDER;
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskCacheMonitoringAndPrefetchingService.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskCacheMonitoringAndPrefetchingService.java
index 381d5eb..ba513fa 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskCacheMonitoringAndPrefetchingService.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskCacheMonitoringAndPrefetchingService.java
@@ -18,9 +18,11 @@
*/
package org.apache.hyracks.cloud.cache.service;
+import java.util.Map;
import java.util.concurrent.ExecutorService;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.common.LocalResource;
import org.apache.hyracks.storage.common.disk.IDiskCacheMonitoringService;
import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
import org.apache.hyracks.storage.common.disk.prefetch.AbstractPrefetchRequest;
@@ -55,6 +57,11 @@
}
@Override
+ public void reportLocalResources(Map<Long, LocalResource> localResources) {
+ monitorThread.reportLocalResources(localResources);
+ }
+
+ @Override
public IPhysicalDrive getPhysicalDrive() {
return drive;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskResourceCacheLockNotifier.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskResourceCacheLockNotifier.java
index 55bab43..036a812 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskResourceCacheLockNotifier.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskResourceCacheLockNotifier.java
@@ -18,67 +18,151 @@
*/
package org.apache.hyracks.cloud.cache.service;
-import org.apache.hyracks.cloud.cache.unit.DatasetUnit;
-import org.apache.hyracks.cloud.cache.unit.IndexUnit;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.hyracks.cloud.cache.unit.AbstractIndexUnit;
+import org.apache.hyracks.cloud.cache.unit.SweepableIndexUnit;
+import org.apache.hyracks.cloud.cache.unit.UnsweepableIndexUnit;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.common.IIndex;
import org.apache.hyracks.storage.common.LocalResource;
import org.apache.hyracks.storage.common.disk.IDiskResourceCacheLockNotifier;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
-import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
-import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
-import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectMaps;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
-// TODO locking should be revised
public final class CloudDiskResourceCacheLockNotifier implements IDiskResourceCacheLockNotifier {
- private final Int2ObjectMap<DatasetUnit> datasets;
+ private static final Logger LOGGER = LogManager.getLogger();
+ private final int metadataPartition;
+ private final Long2ObjectMap<LocalResource> inactiveResources;
+ private final Long2ObjectMap<UnsweepableIndexUnit> unsweepableIndexes;
+ private final Long2ObjectMap<SweepableIndexUnit> sweepableIndexes;
+ private final ReentrantReadWriteLock evictionLock;
- public CloudDiskResourceCacheLockNotifier() {
- datasets = Int2ObjectMaps.synchronize(new Int2ObjectOpenHashMap<>());
+ public CloudDiskResourceCacheLockNotifier(int metadataPartition) {
+ this.metadataPartition = metadataPartition;
+ inactiveResources = Long2ObjectMaps.synchronize(new Long2ObjectOpenHashMap<>());
+ unsweepableIndexes = Long2ObjectMaps.synchronize(new Long2ObjectOpenHashMap<>());
+ sweepableIndexes = Long2ObjectMaps.synchronize(new Long2ObjectOpenHashMap<>());
+ evictionLock = new ReentrantReadWriteLock();
}
@Override
- public void onRegister(int datasetId, LocalResource localResource, IIndex index) {
+ public void onRegister(LocalResource localResource, IIndex index, int partition) {
ILSMIndex lsmIndex = (ILSMIndex) index;
- if (lsmIndex.getDiskCacheManager().isSweepable()) {
- DatasetUnit datasetUnit = datasets.computeIfAbsent(datasetId, DatasetUnit::new);
- datasetUnit.addIndex(localResource.getId(), lsmIndex);
- }
- }
-
- @Override
- public void onUnregister(int datasetId, long resourceId) {
- DatasetUnit datasetUnit = datasets.get(datasetId);
- if (datasetUnit != null && datasetUnit.dropIndex(resourceId)) {
- datasets.remove(datasetId);
-
- // TODO invalidate eviction plans if the disk is not pressured
- }
- }
-
- @Override
- public void onOpen(int datasetId, long resourceId) {
- DatasetUnit datasetUnit = datasets.get(datasetId);
- if (datasetUnit != null) {
- IndexUnit indexUnit = datasetUnit.getIndex(resourceId);
- if (indexUnit != null) {
- indexUnit.readLock();
+ evictionLock.readLock().lock();
+ try {
+ if (partition != metadataPartition) {
+ long resourceId = localResource.getId();
+ if (lsmIndex.getDiskCacheManager().isSweepable()) {
+ sweepableIndexes.put(resourceId, new SweepableIndexUnit(localResource, lsmIndex));
+ } else {
+ unsweepableIndexes.put(resourceId, new UnsweepableIndexUnit(localResource));
+ }
}
+ inactiveResources.remove(localResource.getId());
+ } finally {
+ evictionLock.readLock().unlock();
}
}
@Override
- public void onClose(int datasetId, long resourceId) {
- DatasetUnit datasetUnit = datasets.get(datasetId);
- if (datasetUnit != null) {
- IndexUnit indexUnit = datasetUnit.getIndex(resourceId);
+ public void onUnregister(long resourceId) {
+ evictionLock.readLock().lock();
+ try {
+ AbstractIndexUnit indexUnit = getUnit(resourceId);
if (indexUnit != null) {
- indexUnit.readUnlock();
+ indexUnit.drop();
+ } else {
+ inactiveResources.remove(resourceId);
}
+ } finally {
+ evictionLock.readLock().unlock();
}
}
- Int2ObjectMap<DatasetUnit> getDatasets() {
- return datasets;
+ private AbstractIndexUnit getUnit(long resourceId) {
+ AbstractIndexUnit indexUnit = sweepableIndexes.get(resourceId);
+ if (indexUnit == null) {
+ indexUnit = unsweepableIndexes.get(resourceId);
+ }
+ return indexUnit;
+ }
+
+ @Override
+ public void onOpen(long resourceId) {
+ evictionLock.readLock().lock();
+ try {
+ AbstractIndexUnit indexUnit = getUnit(resourceId);
+ if (indexUnit == null) {
+ // Metadata resource
+ return;
+ }
+ indexUnit.open();
+ } finally {
+ evictionLock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public void onClose(long resourceId) {
+ evictionLock.readLock().lock();
+ try {
+ AbstractIndexUnit indexUnit = getUnit(resourceId);
+ if (indexUnit == null) {
+ // Metadata resource
+ return;
+ }
+ indexUnit.close();
+ } finally {
+ evictionLock.readLock().unlock();
+ }
+
+ }
+
+ ReentrantReadWriteLock getEvictionLock() {
+ return evictionLock;
+ }
+
+ void reportLocalResources(Map<Long, LocalResource> localResources) {
+ inactiveResources.clear();
+ // First check whatever we had already
+ for (LocalResource lr : localResources.values()) {
+ if (unsweepableIndexes.containsKey(lr.getId()) || sweepableIndexes.containsKey(lr.getId())) {
+ // We already have this resource
+ continue;
+ }
+
+ // Probably a new resource or an old resource that wasn't registered before
+ inactiveResources.put(lr.getId(), lr);
+ }
+
+ removeUnassignedResources(unsweepableIndexes, localResources);
+ removeUnassignedResources(sweepableIndexes, localResources);
+
+ LOGGER.info("Retained active {unsweepable: {}, sweepable: {}} and inactive: {}", unsweepableIndexes,
+ sweepableIndexes, inactiveResources.values().stream()
+ .map(x -> "(id: " + x.getId() + ", path: " + x.getPath() + ")").toList());
+ }
+
+ private void removeUnassignedResources(Long2ObjectMap<?> indexes, Map<Long, LocalResource> localResources) {
+ indexes.long2ObjectEntrySet().removeIf(x -> !localResources.containsKey(x.getLongKey()));
+ }
+
+ Collection<LocalResource> getInactiveResources() {
+ return inactiveResources.values();
+ }
+
+ Collection<UnsweepableIndexUnit> getUnsweepableIndexes() {
+ return unsweepableIndexes.values();
+ }
+
+ void getSweepableIndexes(Collection<SweepableIndexUnit> indexes) {
+ indexes.addAll(sweepableIndexes.values());
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/DiskCacheSweeperThread.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/DiskCacheSweeperThread.java
index 79b978e..f594e06 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/DiskCacheSweeperThread.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/DiskCacheSweeperThread.java
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
@@ -27,11 +28,12 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.IDiskSpaceMaker;
-import org.apache.hyracks.cloud.cache.unit.DatasetUnit;
-import org.apache.hyracks.cloud.cache.unit.IndexUnit;
+import org.apache.hyracks.cloud.cache.unit.SweepableIndexUnit;
+import org.apache.hyracks.cloud.cache.unit.UnsweepableIndexUnit;
import org.apache.hyracks.cloud.io.ICloudIOManager;
import org.apache.hyracks.cloud.sweeper.ISweeper;
import org.apache.hyracks.cloud.sweeper.Sweeper;
+import org.apache.hyracks.storage.common.LocalResource;
import org.apache.hyracks.storage.common.buffercache.BufferCache;
import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
import org.apache.hyracks.storage.common.file.BufferedFileHandle;
@@ -44,21 +46,29 @@
private final long waitTime;
private final CloudDiskResourceCacheLockNotifier resourceManager;
private final IPhysicalDrive physicalDrive;
- private final List<IndexUnit> indexes;
+ private final List<SweepableIndexUnit> indexes;
+ private final ICloudIOManager cloudIOManager;
private final ISweeper sweeper;
+ private final long inactiveTimeThreshold;
public DiskCacheSweeperThread(ExecutorService executorService, long waitTime,
CloudDiskResourceCacheLockNotifier resourceManager, ICloudIOManager cloudIOManager, int numOfSweepThreads,
int sweepQueueSize, IPhysicalDrive physicalDrive, BufferCache bufferCache,
- Map<Integer, BufferedFileHandle> fileInfoMap) {
+ Map<Integer, BufferedFileHandle> fileInfoMap, long inactiveTimeThreshold) {
this.waitTime = TimeUnit.SECONDS.toMillis(waitTime);
this.resourceManager = resourceManager;
this.physicalDrive = physicalDrive;
+ this.inactiveTimeThreshold = inactiveTimeThreshold;
indexes = new ArrayList<>();
+ this.cloudIOManager = cloudIOManager;
sweeper = new Sweeper(executorService, cloudIOManager, bufferCache, fileInfoMap, numOfSweepThreads,
sweepQueueSize);
}
+ public void reportLocalResources(Map<Long, LocalResource> localResources) {
+ resourceManager.reportLocalResources(localResources);
+ }
+
@Override
public void makeSpaceOrThrow(IOException ioException) throws HyracksDataException {
if (ioException.getMessage().contains("no space")) {
@@ -83,7 +93,7 @@
while (true) {
synchronized (this) {
try {
- sweep();
+ makeSpace();
wait(waitTime);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -94,20 +104,66 @@
}
}
- private void sweep() {
+ private void makeSpace() {
if (physicalDrive.computeAndCheckIsPressured()) {
- for (DatasetUnit dataset : resourceManager.getDatasets().values()) {
- indexes.clear();
- dataset.getIndexes(indexes);
- sweepIndexes(sweeper, indexes);
+ boolean shouldSweep;
+ resourceManager.getEvictionLock().writeLock().lock();
+ try {
+ shouldSweep = evictInactive();
+ } finally {
+ resourceManager.getEvictionLock().writeLock().unlock();
+ }
+
+ if (shouldSweep) {
+ // index eviction didn't help. Sweep!
+ sweep();
}
}
}
+ private boolean evictInactive() {
+ long now = System.nanoTime();
+ Collection<LocalResource> inactiveResources = resourceManager.getInactiveResources();
+ Collection<UnsweepableIndexUnit> unsweepableIndexes = resourceManager.getUnsweepableIndexes();
+ if (inactiveResources.isEmpty() && unsweepableIndexes.isEmpty()) {
+ // return true to run sweep as nothing will be evicted
+ return true;
+ }
+
+ // First evict all resources that were never been registered
+ for (LocalResource resource : inactiveResources) {
+ try {
+ cloudIOManager.evict(resource.getPath());
+ } catch (HyracksDataException e) {
+ LOGGER.error("Failed to evict resource " + resource.getPath(), e);
+ }
+ }
+
+ // Next evict all inactive indexes
+ for (UnsweepableIndexUnit index : unsweepableIndexes) {
+ if (now - index.getLastAccessTime() >= inactiveTimeThreshold) {
+ try {
+ cloudIOManager.evict(index.getPath());
+ } catch (HyracksDataException e) {
+ LOGGER.error("Failed to evict resource " + index.getPath(), e);
+ }
+ }
+ }
+
+ // If disk is still pressured, proceed with sweep
+ return physicalDrive.computeAndCheckIsPressured();
+ }
+
+ private void sweep() {
+ indexes.clear();
+ resourceManager.getSweepableIndexes(indexes);
+ sweepIndexes(sweeper, indexes);
+ }
+
@CriticalPath
- private static void sweepIndexes(ISweeper sweeper, List<IndexUnit> indexes) {
+ private static void sweepIndexes(ISweeper sweeper, List<SweepableIndexUnit> indexes) {
for (int i = 0; i < indexes.size(); i++) {
- IndexUnit index = indexes.get(i);
+ SweepableIndexUnit index = indexes.get(i);
if (!index.isSweeping()) {
try {
sweeper.sweep(index);
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/AbstractIndexUnit.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/AbstractIndexUnit.java
new file mode 100644
index 0000000..1532340
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/AbstractIndexUnit.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.cache.unit;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hyracks.storage.common.LocalResource;
+
+public abstract class AbstractIndexUnit {
+ protected final LocalResource localResource;
+ private final AtomicLong lastAccessTime;
+ private final AtomicInteger openCounter;
+
+ AbstractIndexUnit(LocalResource localResource) {
+ this.localResource = localResource;
+ this.lastAccessTime = new AtomicLong(0);
+ this.openCounter = new AtomicInteger(0);
+ }
+
+ public final void open() {
+ lastAccessTime.set(System.nanoTime());
+ openCounter.get();
+ }
+
+ public final void close() {
+ openCounter.decrementAndGet();
+ }
+
+ public final long getLastAccessTime() {
+ return lastAccessTime.get();
+ }
+
+ public abstract void drop();
+
+ @Override
+ public String toString() {
+ return "(id: " + localResource.getId() + ", path: " + localResource.getPath() + "sweepable: " + isSweepable()
+ + ")";
+ }
+
+ protected abstract boolean isSweepable();
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/DatasetUnit.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/DatasetUnit.java
deleted file mode 100644
index 34b37fb..0000000
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/DatasetUnit.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.cloud.cache.unit;
-
-import java.util.List;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
-
-import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
-import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
-
-public final class DatasetUnit {
- private final int id;
- private final ReentrantReadWriteLock lock;
- /**
- * Maps resourceId to {@link IndexUnit}
- */
- private final Long2ObjectMap<IndexUnit> indexes;
-
- public DatasetUnit(int datasetId) {
- id = datasetId;
- lock = new ReentrantReadWriteLock();
- indexes = new Long2ObjectOpenHashMap<>();
-
- }
-
- public int getId() {
- return id;
- }
-
- public IndexUnit addIndex(long resourceId, ILSMIndex index) {
- writeLock();
- try {
- IndexUnit indexUnit = new IndexUnit(resourceId, index);
- indexes.put(resourceId, indexUnit);
- return indexUnit;
- } finally {
- writeUnlock();
- }
- }
-
- public boolean dropIndex(long resourceId) {
- IndexUnit indexUnit = indexes.remove(resourceId);
- // Signal that the index is being dropped so a sweeper thread does not sweep this index or stops sweeping
- indexUnit.setDropped();
- // Wait for the sweep operation (if running) before allowing the index to be dropped
- indexUnit.waitForSweep();
- return indexUnit.getIndex().isPrimaryIndex();
- }
-
- public IndexUnit getIndex(long resourceId) {
- readLock();
- try {
- return indexes.get(resourceId);
- } finally {
- readUnlock();
- }
- }
-
- /**
- * Return the current indexes
- *
- * @param indexUnits container used to return the current indexes
- */
- public void getIndexes(List<IndexUnit> indexUnits) {
- readLock();
- try {
- indexUnits.addAll(indexes.values());
- } finally {
- readUnlock();
- }
- }
-
- private void readLock() {
- lock.readLock().lock();
- }
-
- private void readUnlock() {
- lock.readLock().unlock();
- }
-
- private void writeLock() {
- lock.writeLock().lock();
- }
-
- private void writeUnlock() {
- lock.writeLock().unlock();
- }
-}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/IndexUnit.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/SweepableIndexUnit.java
similarity index 77%
rename from hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/IndexUnit.java
rename to hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/SweepableIndexUnit.java
index 8f8b412..794cafc 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/IndexUnit.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/SweepableIndexUnit.java
@@ -19,39 +19,40 @@
package org.apache.hyracks.cloud.cache.unit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hyracks.api.util.InvokeUtil;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.common.LocalResource;
-// TODO allow evicting an index entirely
-public final class IndexUnit {
- private final long id;
+public final class SweepableIndexUnit extends AbstractIndexUnit {
private final ILSMIndex index;
private final AtomicBoolean dropped;
private final AtomicBoolean sweeping;
- private final AtomicInteger readCounter;
- public IndexUnit(long resourceId, ILSMIndex index) {
- this.id = resourceId;
+ public SweepableIndexUnit(LocalResource localResource, ILSMIndex index) {
+ super(localResource);
this.index = index;
dropped = new AtomicBoolean(false);
sweeping = new AtomicBoolean(false);
- readCounter = new AtomicInteger(0);
}
- public long getId() {
- return id;
+ @Override
+ public void drop() {
+ // Signal that the index is being dropped so a sweeper thread does not sweep this index or stops sweeping
+ dropped.set(false);
+ // Wait for the sweep operation (if running) before allowing the index to be dropped
+ waitForSweep();
+ }
+
+ @Override
+ protected boolean isSweepable() {
+ return true;
}
public ILSMIndex getIndex() {
return index;
}
- public void setDropped() {
- dropped.set(false);
- }
-
public boolean isDropped() {
return dropped.get();
}
@@ -79,13 +80,4 @@
sweeping.notifyAll();
}
}
-
- public void readLock() {
- readCounter.incrementAndGet();
- }
-
- public void readUnlock() {
- readCounter.decrementAndGet();
- }
-
}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/UnsweepableIndexUnit.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/UnsweepableIndexUnit.java
new file mode 100644
index 0000000..204fe68
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/unit/UnsweepableIndexUnit.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.cache.unit;
+
+import org.apache.hyracks.storage.common.LocalResource;
+
+public final class UnsweepableIndexUnit extends AbstractIndexUnit {
+ public UnsweepableIndexUnit(LocalResource localResource) {
+ super(localResource);
+ }
+
+ @Override
+ public void drop() {
+ // NoOp
+ }
+
+ @Override
+ protected boolean isSweepable() {
+ return false;
+ }
+
+ public String getPath() {
+ return localResource.getPath();
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
index 83b7629..0dca417 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
@@ -22,7 +22,6 @@
import java.nio.ByteBuffer;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IFileHandle;
/**
@@ -86,9 +85,9 @@
int punchHole(IFileHandle fHandle, long offset, long length) throws HyracksDataException;
/**
- * Evict a directory from the local disk cache
+ * Evict a resource from the local disk cache
*
- * @param directory to evict
+ * @param resourcePath to evict
*/
- void evict(FileReference directory) throws HyracksDataException;
+ void evict(String resourcePath) throws HyracksDataException;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/ISweeper.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/ISweeper.java
index d064fda..9067a3f 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/ISweeper.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/ISweeper.java
@@ -18,7 +18,7 @@
*/
package org.apache.hyracks.cloud.sweeper;
-import org.apache.hyracks.cloud.cache.unit.IndexUnit;
+import org.apache.hyracks.cloud.cache.unit.SweepableIndexUnit;
import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
/**
@@ -31,5 +31,5 @@
*
* @param indexUnit to sweep
*/
- void sweep(IndexUnit indexUnit) throws InterruptedException;
+ void sweep(SweepableIndexUnit indexUnit) throws InterruptedException;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/NoOpSweeper.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/NoOpSweeper.java
index 0410e3b..ca103ab 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/NoOpSweeper.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/NoOpSweeper.java
@@ -18,7 +18,7 @@
*/
package org.apache.hyracks.cloud.sweeper;
-import org.apache.hyracks.cloud.cache.unit.IndexUnit;
+import org.apache.hyracks.cloud.cache.unit.SweepableIndexUnit;
public final class NoOpSweeper implements ISweeper {
public static final ISweeper INSTANCE = new NoOpSweeper();
@@ -27,7 +27,7 @@
}
@Override
- public void sweep(IndexUnit indexUnit) {
+ public void sweep(SweepableIndexUnit indexUnit) {
// NoOp
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/SweepContext.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/SweepContext.java
index 79fdb5a..c709cc6 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/SweepContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/SweepContext.java
@@ -23,7 +23,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.cloud.buffercache.page.CloudCachedPage;
-import org.apache.hyracks.cloud.cache.unit.IndexUnit;
+import org.apache.hyracks.cloud.cache.unit.SweepableIndexUnit;
import org.apache.hyracks.cloud.io.ICloudIOManager;
import org.apache.hyracks.storage.common.buffercache.BufferCache;
import org.apache.hyracks.storage.common.buffercache.ICachedPage;
@@ -36,7 +36,7 @@
private final BufferCache bufferCache;
private final Map<Integer, BufferedFileHandle> fileInfoMap;
private final AtomicBoolean shutdown;
- private IndexUnit indexUnit;
+ private SweepableIndexUnit indexUnit;
private BufferedFileHandle handle;
public SweepContext(ICloudIOManager cloudIOManager, BufferCache bufferCache,
@@ -74,11 +74,11 @@
bufferCache.unpin(page, bcOpCtx);
}
- public void setIndexUnit(IndexUnit indexUnit) {
+ public void setIndexUnit(SweepableIndexUnit indexUnit) {
this.indexUnit = indexUnit;
}
- public IndexUnit getIndexUnit() {
+ public SweepableIndexUnit getIndexUnit() {
return indexUnit;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/Sweeper.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/Sweeper.java
index e5d368c..245c957 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/Sweeper.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/Sweeper.java
@@ -26,7 +26,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.util.InvokeUtil;
-import org.apache.hyracks.cloud.cache.unit.IndexUnit;
+import org.apache.hyracks.cloud.cache.unit.SweepableIndexUnit;
import org.apache.hyracks.cloud.io.ICloudIOManager;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.cloud.IIndexDiskCacheManager;
@@ -56,7 +56,7 @@
}
@Override
- public void sweep(IndexUnit indexUnit) throws InterruptedException {
+ public void sweep(SweepableIndexUnit indexUnit) throws InterruptedException {
SweepRequest request = freeRequests.take();
request.reset(indexUnit);
requests.put(request);
@@ -131,7 +131,7 @@
this.context = context;
}
- void reset(IndexUnit indexUnit) {
+ void reset(SweepableIndexUnit indexUnit) {
context.setIndexUnit(indexUnit);
}
@@ -151,7 +151,7 @@
*/
return;
}
- IndexUnit indexUnit = context.getIndexUnit();
+ SweepableIndexUnit indexUnit = context.getIndexUnit();
indexUnit.startSweeping();
try {
ILSMIndex index = indexUnit.getIndex();
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
index b9f277a..9909e97 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
@@ -45,6 +45,7 @@
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IDiskSpaceMaker;
import org.apache.hyracks.api.io.IFileDeviceResolver;
import org.apache.hyracks.api.io.IFileHandle;
import org.apache.hyracks.api.io.IIOBulkOperation;
@@ -82,6 +83,8 @@
* Mutables
*/
private int workspaceIndex;
+ // TODO use space make on write
+ private IDiskSpaceMaker spaceMaker;
public IOManager(List<IODeviceHandle> devices, IFileDeviceResolver deviceComputer, int ioParallelism, int queueSize)
throws HyracksDataException {
@@ -112,6 +115,7 @@
for (int i = 0; i < numIoThreads; i++) {
executor.execute(new IoRequestHandler(i, submittedRequests));
}
+ spaceMaker = NoOpDiskSpaceMaker.INSTANCE;
}
public int getQueueSize() {
@@ -596,4 +600,8 @@
public void performBulkOperation(IIOBulkOperation bulkOperation) throws HyracksDataException {
((AbstractBulkOperation) bulkOperation).performOperation();
}
+
+ public void setSpaceMaker(IDiskSpaceMaker spaceMaker) {
+ this.spaceMaker = spaceMaker;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/NoOpDiskSpaceMaker.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/NoOpDiskSpaceMaker.java
new file mode 100644
index 0000000..1527dde
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/NoOpDiskSpaceMaker.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.io;
+
+import java.io.IOException;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IDiskSpaceMaker;
+
+final class NoOpDiskSpaceMaker implements IDiskSpaceMaker {
+ static final IDiskSpaceMaker INSTANCE = new NoOpDiskSpaceMaker();
+
+ private NoOpDiskSpaceMaker() {
+ }
+
+ @Override
+ public void makeSpaceOrThrow(IOException e) throws HyracksDataException {
+ throw HyracksDataException.create(e);
+ }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/BTreeHelperStorageManager.java b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/BTreeHelperStorageManager.java
index bf870b1..426e81d 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/BTreeHelperStorageManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/BTreeHelperStorageManager.java
@@ -26,6 +26,8 @@
import org.apache.hyracks.storage.common.IResourceLifecycleManager;
import org.apache.hyracks.storage.common.IStorageManager;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.disk.IDiskCacheMonitoringService;
+import org.apache.hyracks.storage.common.disk.NoOpDiskCacheMonitoringService;
import org.apache.hyracks.storage.common.file.ResourceIdFactory;
public class BTreeHelperStorageManager implements IStorageManager {
@@ -60,4 +62,9 @@
public IResourceLifecycleManager<IIndex> getLifecycleManager(INCServiceContext ctx) {
return RuntimeContext.get(ctx).getIndexLifecycleManager();
}
+
+ @Override
+ public IDiskCacheMonitoringService getDiskCacheMonitoringService(INCServiceContext ctx) {
+ return NoOpDiskCacheMonitoringService.INSTANCE;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweeper.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweeper.java
index 4932bf0..9fc3b8d 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweeper.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweeper.java
@@ -27,7 +27,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.cloud.buffercache.page.CloudCachedPage;
-import org.apache.hyracks.cloud.cache.unit.IndexUnit;
+import org.apache.hyracks.cloud.cache.unit.SweepableIndexUnit;
import org.apache.hyracks.cloud.sweeper.SweepContext;
import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnTupleProjector;
@@ -59,7 +59,7 @@
public long sweep(BitSet plan, SweepContext context, IColumnTupleProjector sweepProjector)
throws HyracksDataException {
- IndexUnit indexUnit = context.getIndexUnit();
+ SweepableIndexUnit indexUnit = context.getIndexUnit();
LSMColumnBTree lsmColumnBTree = (LSMColumnBTree) indexUnit.getIndex();
IColumnProjectionInfo projectionInfo = captureSweepableComponents(lsmColumnBTree, sweepProjector);
if (projectionInfo == null) {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResource.java
index a0b592a..7ea4e35 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResource.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResource.java
@@ -45,6 +45,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCacheProvider;
import org.apache.hyracks.storage.common.IStorageManager;
import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory;
+import org.apache.hyracks.storage.common.disk.IDiskCacheMonitoringService;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -87,13 +88,14 @@
List<IVirtualBufferCache> vbcs = vbcProvider.getVirtualBufferCaches(serviceCtx, file);
ioOpCallbackFactory.initialize(serviceCtx, this);
pageWriteCallbackFactory.initialize(serviceCtx, this);
+ IDiskCacheMonitoringService diskCacheService = storageManager.getDiskCacheMonitoringService(serviceCtx);
return LSMColumnBTreeUtil.createLSMTree(ioManager, vbcs, file, storageManager.getBufferCache(serviceCtx),
typeTraits, cmpFactories, bloomFilterKeyFields, bloomFilterFalsePositiveRate,
mergePolicyFactory.createMergePolicy(mergePolicyProperties, serviceCtx),
opTrackerProvider.getOperationTracker(serviceCtx, this), ioSchedulerProvider.getIoScheduler(serviceCtx),
ioOpCallbackFactory, pageWriteCallbackFactory, btreeFields, metadataPageManagerFactory, false,
serviceCtx.getTracer(), compressorDecompressorFactory, nullTypeTraits, nullIntrospector,
- columnManagerFactory, atomic);
+ columnManagerFactory, atomic, diskCacheService);
}
public static IJsonSerializable fromJson(IPersistedResourceRegistry registry, JsonNode json)
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/LSMColumnBTreeUtil.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/LSMColumnBTreeUtil.java
index 039aaed..6335acd 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/LSMColumnBTreeUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/LSMColumnBTreeUtil.java
@@ -34,6 +34,7 @@
import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnManager;
import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnManagerFactory;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.CloudColumnIndexDiskCacheManager;
import org.apache.hyracks.storage.am.lsm.btree.column.cloud.IColumnIndexDiskCacheManager;
import org.apache.hyracks.storage.am.lsm.btree.column.cloud.NoOpColumnIndexDiskCacheManager;
import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTree;
@@ -55,6 +56,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
import org.apache.hyracks.storage.am.lsm.common.impls.TreeIndexFactory;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.disk.IDiskCacheMonitoringService;
import org.apache.hyracks.util.trace.ITracer;
public class LSMColumnBTreeUtil {
@@ -66,13 +68,18 @@
ILSMIOOperationCallbackFactory ioOpCallbackFactory, ILSMPageWriteCallbackFactory pageWriteCallbackFactory,
int[] btreeFields, IMetadataPageManagerFactory freePageManagerFactory, boolean updateAware, ITracer tracer,
ICompressorDecompressorFactory compressorDecompressorFactory, ITypeTraits nullTypeTraits,
- INullIntrospector nullIntrospector, IColumnManagerFactory columnManagerFactory, boolean atomic)
- throws HyracksDataException {
- // Initialize managers
- IColumnManager columnManager = columnManagerFactory.createColumnManager();
- IColumnIndexDiskCacheManager diskCacheManager = NoOpColumnIndexDiskCacheManager.INSTANCE;
+ INullIntrospector nullIntrospector, IColumnManagerFactory columnManagerFactory, boolean atomic,
+ IDiskCacheMonitoringService diskCacheService) throws HyracksDataException {
- //Tuple writers
+ // Initialize managers
+ boolean diskCacheEnabled = diskCacheService.isEnabled();
+ IColumnManager columnManager = columnManagerFactory.createColumnManager();
+ IColumnIndexDiskCacheManager diskCacheManager = diskCacheEnabled
+ ? new CloudColumnIndexDiskCacheManager(columnManager.getNumberOfPrimaryKeys(),
+ columnManager.getMergeColumnProjector(), diskCacheService.getPhysicalDrive())
+ : NoOpColumnIndexDiskCacheManager.INSTANCE;
+
+ // Tuple writers
LSMBTreeTupleWriterFactory insertTupleWriterFactory = new LSMBTreeTupleWriterFactory(typeTraits,
cmpFactories.length, false, updateAware, nullTypeTraits, nullIntrospector);
LSMBTreeTupleWriterFactory deleteTupleWriterFactory = new LSMBTreeTupleWriterFactory(typeTraits,
@@ -82,7 +89,7 @@
LSMBTreeTupleWriterFactory bulkLoadTupleWriterFactory = new LSMBTreeTupleWriterFactory(typeTraits,
cmpFactories.length, false, updateAware, nullTypeTraits, nullIntrospector);
- //Leaf frames
+ // Leaf frames
ITreeIndexFrameFactory flushLeafFrameFactory = new ColumnBTreeLeafFrameFactory(copyTupleWriterFactory,
columnManagerFactory.getFlushColumnTupleReaderWriterFactory());
ITreeIndexFrameFactory mergeLeafFrameFactory = new ColumnBTreeLeafFrameFactory(copyTupleWriterFactory,
@@ -93,7 +100,7 @@
ITreeIndexFrameFactory deleteLeafFrameFactory = new BTreeNSMLeafFrameFactory(deleteTupleWriterFactory);
ITreeIndexFrameFactory interiorFrameFactory = new BTreeNSMInteriorFrameFactory(insertTupleWriterFactory);
- //BTree factory
+ // BTree factory
TreeIndexFactory<ColumnBTree> flushBTreeFactory = new ColumnBTreeFactory(ioManager, diskBufferCache,
freePageManagerFactory, interiorFrameFactory, flushLeafFrameFactory, cmpFactories, typeTraits.length);
TreeIndexFactory<ColumnBTree> mergeBTreeFactory = new ColumnBTreeFactory(ioManager, diskBufferCache,
@@ -102,8 +109,8 @@
new ColumnBTreeFactory(ioManager, diskBufferCache, freePageManagerFactory, interiorFrameFactory,
bulkLoadLeafFrameFactory, cmpFactories, typeTraits.length);
- ILSMIndexFileManager fileNameManager =
- new LSMBTreeFileManager(ioManager, file, flushBTreeFactory, true, compressorDecompressorFactory);
+ ILSMIndexFileManager fileNameManager = new LSMBTreeFileManager(ioManager, file, flushBTreeFactory, true,
+ compressorDecompressorFactory, diskCacheEnabled);
BloomFilterFactory bloomFilterFactory = new BloomFilterFactory(diskBufferCache, bloomFilterKeyFields);
ILSMDiskComponentFactory flushComponentFactory =
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
index 98b82db..e2e49e3 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java
@@ -45,18 +45,20 @@
(dir, name) -> !name.startsWith(".") && name.endsWith(BTREE_SUFFIX);
private final TreeIndexFactory<? extends ITreeIndex> btreeFactory;
private final boolean hasBloomFilter;
-
- public LSMBTreeFileManager(IIOManager ioManager, FileReference file,
- TreeIndexFactory<? extends ITreeIndex> btreeFactory, boolean hasBloomFilter,
- ICompressorDecompressorFactory compressorDecompressorFactory) {
- super(ioManager, file, null, compressorDecompressorFactory);
- this.btreeFactory = btreeFactory;
- this.hasBloomFilter = hasBloomFilter;
- }
+ private final boolean allowHoles;
public LSMBTreeFileManager(IIOManager ioManager, FileReference file,
TreeIndexFactory<? extends ITreeIndex> btreeFactory, boolean hasBloomFilter) {
- this(ioManager, file, btreeFactory, hasBloomFilter, NoOpCompressorDecompressorFactory.INSTANCE);
+ this(ioManager, file, btreeFactory, hasBloomFilter, NoOpCompressorDecompressorFactory.INSTANCE, false);
+ }
+
+ public LSMBTreeFileManager(IIOManager ioManager, FileReference file,
+ TreeIndexFactory<? extends ITreeIndex> btreeFactory, boolean hasBloomFilter,
+ ICompressorDecompressorFactory compressorDecompressorFactory, boolean allowHoles) {
+ super(ioManager, file, null, compressorDecompressorFactory);
+ this.btreeFactory = btreeFactory;
+ this.hasBloomFilter = hasBloomFilter;
+ this.allowHoles = allowHoles;
}
@Override
@@ -180,4 +182,9 @@
return validFiles;
}
+
+ @Override
+ protected boolean areHolesAllowed() {
+ return allowHoles;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java
index 3106b29..6bd236c 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java
@@ -122,7 +122,7 @@
filterManager = new LSMComponentFilterManager(filterFrameFactory);
}
ILSMIndexFileManager fileNameManager = new LSMBTreeFileManager(ioManager, file, diskBTreeFactory,
- hasBloomFilter, compressorDecompressorFactory);
+ hasBloomFilter, compressorDecompressorFactory, false);
ILSMDiskComponentFactory componentFactory;
ILSMDiskComponentFactory bulkLoadComponentFactory;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IStorageManager.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IStorageManager.java
index 9d0880e..47492e8 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IStorageManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IStorageManager.java
@@ -24,6 +24,7 @@
import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.api.io.IJsonSerializable;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.disk.IDiskCacheMonitoringService;
import org.apache.hyracks.storage.common.file.IResourceIdFactory;
/**
@@ -59,4 +60,10 @@
* @return the resource lifecycle manager
*/
IResourceLifecycleManager<IIndex> getLifecycleManager(INCServiceContext ctx);
+
+ /**
+ * @param ctx the nc service context
+ * @return disk cache monitoring service
+ */
+ IDiskCacheMonitoringService getDiskCacheMonitoringService(INCServiceContext ctx);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskCacheMonitoringService.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskCacheMonitoringService.java
index 5fb7088..b957782 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskCacheMonitoringService.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskCacheMonitoringService.java
@@ -18,6 +18,10 @@
*/
package org.apache.hyracks.storage.common.disk;
+import java.util.Map;
+
+import org.apache.hyracks.storage.common.LocalResource;
+
/**
* Disk cache monitoring service is responsible for monitor the local drives
*/
@@ -38,6 +42,13 @@
boolean isEnabled();
/**
+ * Report all local resources
+ *
+ * @param localResources local resources
+ */
+ void reportLocalResources(Map<Long, LocalResource> localResources);
+
+ /**
* @return physical drive
*/
IPhysicalDrive getPhysicalDrive();
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskResourceCacheLockNotifier.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskResourceCacheLockNotifier.java
index fc4645d..1e986b3 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskResourceCacheLockNotifier.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskResourceCacheLockNotifier.java
@@ -31,34 +31,31 @@
* Notify registering a new resource
* Note: this method is not thread-safe outside {@link org.apache.hyracks.storage.common.IResourceLifecycleManager}
*
- * @param datasetId dataset ID
* @param localResource resource to be registered
* @param index of the resource
+ * @param partition partition
*/
- void onRegister(int datasetId, LocalResource localResource, IIndex index);
+ void onRegister(LocalResource localResource, IIndex index, int partition);
/**
* Notify unregistering an existing resource
* Note: this method is not thread-safe outside {@link org.apache.hyracks.storage.common.IResourceLifecycleManager}
*
- * @param datasetId dataset ID
* @param resourceId resource ID
*/
- void onUnregister(int datasetId, long resourceId);
+ void onUnregister(long resourceId);
/**
* Notify opening a resource
*
- * @param datasetId dataset ID
* @param resourceId resource ID
*/
- void onOpen(int datasetId, long resourceId);
+ void onOpen(long resourceId);
/**
* Notify closing a resource
*
- * @param datasetId dataset ID
* @param resourceId resource ID
*/
- void onClose(int datasetId, long resourceId);
+ void onClose(long resourceId);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskCacheMonitoringService.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskCacheMonitoringService.java
index 9a7a5d6..d6155a4 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskCacheMonitoringService.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskCacheMonitoringService.java
@@ -18,6 +18,10 @@
*/
package org.apache.hyracks.storage.common.disk;
+import java.util.Map;
+
+import org.apache.hyracks.storage.common.LocalResource;
+
public final class NoOpDiskCacheMonitoringService implements IDiskCacheMonitoringService {
public static final IDiskCacheMonitoringService INSTANCE = new NoOpDiskCacheMonitoringService();
@@ -40,6 +44,11 @@
}
@Override
+ public void reportLocalResources(Map<Long, LocalResource> localResources) {
+ // NoOp
+ }
+
+ @Override
public IPhysicalDrive getPhysicalDrive() {
return DummyPhysicalDrive.INSTANCE;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskResourceCacheLockNotifier.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskResourceCacheLockNotifier.java
index 3c0d6c9..b83c388 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskResourceCacheLockNotifier.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskResourceCacheLockNotifier.java
@@ -28,22 +28,22 @@
}
@Override
- public void onRegister(int datasetId, LocalResource localResource, IIndex index) {
+ public void onRegister(LocalResource localResource, IIndex index, int partition) {
// NoOp
}
@Override
- public void onUnregister(int datasetId, long resourceId) {
+ public void onUnregister(long resourceId) {
// NoOp
}
@Override
- public void onOpen(int datasetId, long resourceId) {
+ public void onOpen(long resourceId) {
// NoOp
}
@Override
- public void onClose(int datasetId, long resourceId) {
+ public void onClose(long resourceId) {
// NoOp
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManager.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManager.java
index e54ce2c..0b4d2ed 100644
--- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManager.java
@@ -26,6 +26,8 @@
import org.apache.hyracks.storage.common.IResourceLifecycleManager;
import org.apache.hyracks.storage.common.IStorageManager;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.disk.IDiskCacheMonitoringService;
+import org.apache.hyracks.storage.common.disk.NoOpDiskCacheMonitoringService;
import org.apache.hyracks.storage.common.file.ResourceIdFactory;
public class TestStorageManager implements IStorageManager {
@@ -60,4 +62,9 @@
return TestStorageManagerComponentHolder.getIndexLifecycleManager();
}
+ @Override
+ public IDiskCacheMonitoringService getDiskCacheMonitoringService(INCServiceContext ctx) {
+ return NoOpDiskCacheMonitoringService.INSTANCE;
+ }
+
}