[MULTIPLE ISSUES][STO] Multiple fixes for cloud caching
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- ASTERIXDB-3424: Disk cache sweeper can interfere
with rebalance
- ASTERIXDB-3425: Use allocated storage quota to
determine whether or not to persist
non-column pages
- Other fixes:
- Make punchHole() uninterruptible
- Allow plan reevaluation despite the disk is
being pressured or not
- Log the page info when the buffer cache's
page read operation fails
- Fix API tests failures (ASTERIXDB-3423)
- Ignore CloudStorageGCSTest
Change-Id: Ida79a61940ed8944ed9fb44ac7d8815d865b73e6
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18361
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CloudToLocalStorageCachingTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CloudToLocalStorageCachingTask.java
index 547bc8b..b47703a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CloudToLocalStorageCachingTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CloudToLocalStorageCachingTask.java
@@ -29,6 +29,7 @@
import org.apache.hyracks.api.control.CcId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.service.IControllerService;
+import org.apache.hyracks.storage.common.disk.IDiskCacheMonitoringService;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -55,16 +56,26 @@
INcApplicationContext applicationContext = (INcApplicationContext) cs.getApplicationContext();
PersistentLocalResourceRepository lrs =
(PersistentLocalResourceRepository) applicationContext.getLocalResourceRepository();
+ IDiskCacheMonitoringService diskService = applicationContext.getDiskCacheService();
- String nodeId = applicationContext.getServiceContext().getNodeId();
- LOGGER.info("Initializing Node {} with storage partitions: {}", nodeId, storagePartitions);
+ // Pause all disk caching activities
+ diskService.pause();
+ try {
+ String nodeId = applicationContext.getServiceContext().getNodeId();
+ LOGGER.info("Initializing Node {} with storage partitions: {}", nodeId, storagePartitions);
- Checkpoint latestCheckpoint = applicationContext.getTransactionSubsystem().getCheckpointManager().getLatest();
- IPartitionBootstrapper bootstrapper = applicationContext.getPartitionBootstrapper();
- bootstrapper.bootstrap(storagePartitions, lrs.getOnDiskPartitions(), metadataNode, metadataPartitionId, cleanup,
- latestCheckpoint == null);
- // Report all local resources
- applicationContext.getDiskCacheService().reportLocalResources(lrs.loadAndGetAllResources());
+ Checkpoint latestCheckpoint =
+ applicationContext.getTransactionSubsystem().getCheckpointManager().getLatest();
+ IPartitionBootstrapper bootstrapper = applicationContext.getPartitionBootstrapper();
+ bootstrapper.bootstrap(storagePartitions, lrs.getOnDiskPartitions(), metadataNode, metadataPartitionId,
+ cleanup, latestCheckpoint == null);
+
+ // Report all local resources
+ diskService.reportLocalResources(lrs.loadAndGetAllResources());
+ } finally {
+ // Resume all disk caching activities
+ diskService.resume();
+ }
}
@Override
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageGCSTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageGCSTest.java
index 89a4781..3a03445 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageGCSTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageGCSTest.java
@@ -36,6 +36,7 @@
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.FixMethodOrder;
+import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.MethodSorters;
@@ -54,6 +55,7 @@
*/
@RunWith(Parameterized.class)
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+@Ignore
public class CloudStorageGCSTest {
private static final Logger LOGGER = LogManager.getLogger();
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
index 92df8f0..93b92bc 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
@@ -10,7 +10,7 @@
"active\.suspend\.timeout" : 3600,
"azure.request.timeout" : 120,
"cloud.deployment" : false,
- "cloud.profiler.log.interval" : 0,
+ "cloud.profiler.log.interval" : 5,
"cloud.storage.allocation.percentage" : 0.8,
"cloud.storage.anonymous.auth" : false,
"cloud.storage.bucket" : "",
@@ -24,6 +24,7 @@
"cloud.storage.region" : "",
"cloud.storage.scheme" : "",
"cloud.storage.sweep.threshold.percentage" : 0.9,
+ "cloud.write.buffer.size" : 8388608,
"compiler\.arrayindex" : true,
"compiler.batch.lookup" : true,
"compiler.cbo" : true,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
index 4d37887..a5e70a5 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
@@ -10,7 +10,7 @@
"active\.suspend\.timeout" : 3600,
"azure.request.timeout" : 120,
"cloud.deployment" : false,
- "cloud.profiler.log.interval" : 0,
+ "cloud.profiler.log.interval" : 5,
"cloud.storage.allocation.percentage" : 0.8,
"cloud.storage.anonymous.auth" : false,
"cloud.storage.bucket" : "",
@@ -24,6 +24,7 @@
"cloud.storage.region" : "",
"cloud.storage.scheme" : "",
"cloud.storage.sweep.threshold.percentage" : 0.9,
+ "cloud.write.buffer.size" : 8388608,
"compiler\.arrayindex" : true,
"compiler.batch.lookup" : true,
"compiler.cbo" : true,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
index 3189a3c..cc1db05 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
@@ -10,7 +10,7 @@
"active\.suspend\.timeout" : 3600,
"azure.request.timeout" : 120,
"cloud.deployment" : false,
- "cloud.profiler.log.interval" : 0,
+ "cloud.profiler.log.interval" : 5,
"cloud.storage.allocation.percentage" : 0.8,
"cloud.storage.anonymous.auth" : false,
"cloud.storage.bucket" : "",
@@ -24,6 +24,7 @@
"cloud.storage.region" : "",
"cloud.storage.scheme" : "",
"cloud.storage.sweep.threshold.percentage" : 0.9,
+ "cloud.write.buffer.size" : 8388608,
"compiler\.arrayindex" : true,
"compiler.batch.lookup" : true,
"compiler.cbo" : true,
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/SelectiveCloudAccessor.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/SelectiveCloudAccessor.java
index d8aecc1..56eaeeb 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/SelectiveCloudAccessor.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/SelectiveCloudAccessor.java
@@ -30,6 +30,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IFileHandle;
+import org.apache.hyracks.api.util.InvokeUtil;
import org.apache.hyracks.control.nc.io.IOManager;
public class SelectiveCloudAccessor extends ReplaceableCloudAccessor {
@@ -43,7 +44,14 @@
@Override
public int doPunchHole(IFileHandle fileHandle, long offset, long length) throws HyracksDataException {
- return puncher.punchHole(fileHandle, offset, length);
+ try {
+ // Ensure that punching a hole cannot be interrupted
+ InvokeUtil.doExUninterruptibly(() -> puncher.punchHole(fileHandle, offset, length));
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
+
+ return (int) length;
}
@Override
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/filesystem/HolePuncherProvider.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/filesystem/HolePuncherProvider.java
index eb125f3..91de5ae 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/filesystem/HolePuncherProvider.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/filesystem/HolePuncherProvider.java
@@ -56,22 +56,20 @@
return new DebugHolePuncher(cloudIOManager, bufferProvider);
}
- private static int unsupported(IFileHandle fileHandle, long offset, long length) {
+ private static void unsupported(IFileHandle fileHandle, long offset, long length) {
throw new UnsupportedOperationException("punchHole is not supported");
}
- private static int linuxPunchHole(IFileHandle fileHandle, long offset, long length) throws HyracksDataException {
+ private static void linuxPunchHole(IFileHandle fileHandle, long offset, long length) throws HyracksDataException {
CloudFileHandle cloudFileHandle = (CloudFileHandle) fileHandle;
int fileDescriptor = cloudFileHandle.getFileDescriptor();
int blockSize = cloudFileHandle.getBlockSize();
- int freedSpace = FileSystemOperationDispatcherUtil.punchHole(fileDescriptor, offset, length, blockSize);
+ FileSystemOperationDispatcherUtil.punchHole(fileDescriptor, offset, length, blockSize);
try {
cloudFileHandle.getFileChannel().force(false);
} catch (IOException e) {
throw HyracksDataException.create(e);
}
-
- return freedSpace;
}
private static final class DebugHolePuncher implements IHolePuncher {
@@ -84,9 +82,8 @@
}
@Override
- public int punchHole(IFileHandle fileHandle, long offset, long length) throws HyracksDataException {
+ public void punchHole(IFileHandle fileHandle, long offset, long length) throws HyracksDataException {
ByteBuffer buffer = acquireAndPrepareBuffer(length);
- int totalWritten = 0;
try {
long remaining = length;
long position = offset;
@@ -94,14 +91,11 @@
int written = cloudIOManager.localWriter(fileHandle, position, buffer);
position += written;
remaining -= written;
- totalWritten += written;
buffer.limit((int) Math.min(remaining, buffer.capacity()));
}
} finally {
bufferProvider.recycle(buffer);
}
-
- return totalWritten;
}
private ByteBuffer acquireAndPrepareBuffer(long length) {
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/filesystem/IHolePuncher.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/filesystem/IHolePuncher.java
index 369c247..dc6981c 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/filesystem/IHolePuncher.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/filesystem/IHolePuncher.java
@@ -34,5 +34,5 @@
* @param offset starting offset
* @param length length
*/
- int punchHole(IFileHandle file, long offset, long length) throws HyracksDataException;
+ void punchHole(IFileHandle file, long offset, long length) throws HyracksDataException;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskCacheMonitoringAndPrefetchingService.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskCacheMonitoringAndPrefetchingService.java
index ba513fa..4564856 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskCacheMonitoringAndPrefetchingService.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/CloudDiskCacheMonitoringAndPrefetchingService.java
@@ -48,10 +48,21 @@
@Override
public void stop() {
+ monitorThread.pause();
executor.shutdown();
}
@Override
+ public void pause() {
+ monitorThread.pause();
+ }
+
+ @Override
+ public void resume() {
+ monitorThread.resume();
+ }
+
+ @Override
public boolean isEnabled() {
return true;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/DiskCacheSweeperThread.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/DiskCacheSweeperThread.java
index e466758..1444354 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/DiskCacheSweeperThread.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/cache/service/DiskCacheSweeperThread.java
@@ -25,6 +25,7 @@
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.IDiskSpaceMaker;
@@ -49,6 +50,7 @@
private final ICloudIOManager cloudIOManager;
private final Sweeper sweeper;
private final long inactiveTimeThreshold;
+ private final AtomicBoolean paused;
public DiskCacheSweeperThread(ExecutorService executorService, long waitTime,
CloudDiskResourceCacheLockNotifier resourceManager, ICloudIOManager cloudIOManager, int numOfSweepThreads,
@@ -60,14 +62,30 @@
this.inactiveTimeThreshold = inactiveTimeThreshold;
indexes = new ArrayList<>();
this.cloudIOManager = cloudIOManager;
- sweeper = new Sweeper(executorService, cloudIOManager, bufferCache, fileInfoMap, numOfSweepThreads,
+ paused = new AtomicBoolean();
+ sweeper = new Sweeper(paused, executorService, cloudIOManager, bufferCache, fileInfoMap, numOfSweepThreads,
sweepQueueSize);
}
+ public void pause() {
+ LOGGER.info("Pausing Sweeper threads...");
+ // Synchronize to ensure the thread is not sweeping
+ synchronized (this) {
+ // Set paused to ignore all future sweep requests
+ paused.set(true);
+ }
+ sweeper.waitForRunningRequests();
+ LOGGER.info("Sweeper threads have been paused successfully");
+ }
+
public void reportLocalResources(Map<Long, LocalResource> localResources) {
resourceManager.reportLocalResources(localResources);
}
+ public void resume() {
+ paused.set(false);
+ }
+
@Override
public void makeSpaceOrThrow(IOException ioException) throws HyracksDataException {
if (ioException.getMessage().contains("no space")) {
@@ -95,15 +113,23 @@
makeSpace();
wait(waitTime);
} catch (InterruptedException e) {
- Thread.currentThread().interrupt();
+ LOGGER.warn("DiskCacheSweeperThread interrupted", e);
+ break;
} finally {
notifyAll();
}
}
}
+
+ Thread.currentThread().interrupt();
}
private void makeSpace() {
+ if (paused.get()) {
+ LOGGER.warn("DiskCacheSweeperThread is paused");
+ return;
+ }
+
if (physicalDrive.computeAndCheckIsPressured()) {
boolean shouldSweep;
resourceManager.getEvictionLock().writeLock().lock();
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/filesystem/PhysicalDrive.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/filesystem/PhysicalDrive.java
index e4fefa5..0253b65 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/filesystem/PhysicalDrive.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/filesystem/PhysicalDrive.java
@@ -26,7 +26,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.IODeviceHandle;
@@ -40,37 +40,45 @@
public final class PhysicalDrive implements IPhysicalDrive {
private static final Logger LOGGER = LogManager.getLogger();
private final List<FileStore> drivePaths;
- private final long pressureSize;
- private final AtomicBoolean pressured;
+ private final DiskSpace diskSpace;
+ private final AtomicLong usedSpace;
public PhysicalDrive(List<IODeviceHandle> deviceHandles, double pressureThreshold, double storagePercentage,
long pressureDebugSize) throws HyracksDataException {
drivePaths = getDrivePaths(deviceHandles);
- pressureSize = getPressureSize(drivePaths, pressureThreshold, storagePercentage, pressureDebugSize);
- pressured = new AtomicBoolean();
+ diskSpace = getPressureSize(drivePaths, pressureThreshold, storagePercentage, pressureDebugSize);
+ usedSpace = new AtomicLong();
computeAndCheckIsPressured();
}
@Override
public boolean computeAndCheckIsPressured() {
long usedSpace = getUsedSpace();
- boolean isPressured = usedSpace > pressureSize;
- pressured.set(isPressured);
+ long pressureCapacity = diskSpace.getPressureCapacity();
+ boolean isPressured = usedSpace > pressureCapacity;
+ this.usedSpace.set(usedSpace);
if (isPressured) {
LOGGER.info("Used space: {}, pressureCapacity: {} (isPressured: {})",
- StorageUtil.toHumanReadableSize(usedSpace), StorageUtil.toHumanReadableSize(pressureSize), true);
+ StorageUtil.toHumanReadableSize(usedSpace), StorageUtil.toHumanReadableSize(pressureCapacity),
+ true);
} else {
LOGGER.debug("Used space: {}, pressureCapacity: {} (isPressured: {})",
- StorageUtil.toHumanReadableSize(usedSpace), StorageUtil.toHumanReadableSize(pressureSize), false);
+ StorageUtil.toHumanReadableSize(usedSpace), StorageUtil.toHumanReadableSize(pressureCapacity),
+ false);
}
return isPressured;
}
@Override
+ public boolean isUnpressured() {
+ return usedSpace.get() <= diskSpace.getPressureCapacity();
+ }
+
+ @Override
public boolean hasSpace() {
- return !pressured.get();
+ return usedSpace.get() < diskSpace.getPressureCapacity();
}
private long getUsedSpace() {
@@ -86,8 +94,8 @@
return totalUsedSpace;
}
- private static long getPressureSize(List<FileStore> drivePaths, double pressureThreshold, double storagePercentage,
- long pressureDebugSize) throws HyracksDataException {
+ private static DiskSpace getPressureSize(List<FileStore> drivePaths, double pressureThreshold,
+ double storagePercentage, long pressureDebugSize) throws HyracksDataException {
long totalCapacity = 0;
long totalUsedSpace = 0;
@@ -106,7 +114,7 @@
StorageUtil.toHumanReadableSize(totalCapacity), StorageUtil.toHumanReadableSize(allocatedCapacity),
StorageUtil.toHumanReadableSize(pressureCapacity), StorageUtil.toHumanReadableSize(totalUsedSpace));
- return pressureCapacity;
+ return new DiskSpace(allocatedCapacity, pressureCapacity);
}
private static List<FileStore> getDrivePaths(List<IODeviceHandle> deviceHandles) throws HyracksDataException {
@@ -146,4 +154,22 @@
throw HyracksDataException.create(e);
}
}
+
+ private static class DiskSpace {
+ private final long allocatedCapacity;
+ private final long pressureCapacity;
+
+ private DiskSpace(long allocatedCapacity, long pressureCapacity) {
+ this.allocatedCapacity = allocatedCapacity;
+ this.pressureCapacity = pressureCapacity;
+ }
+
+ public long getAllocatedCapacity() {
+ return allocatedCapacity;
+ }
+
+ public long getPressureCapacity() {
+ return pressureCapacity;
+ }
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/SweepContext.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/SweepContext.java
index c709cc6..973d9a1 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/SweepContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/SweepContext.java
@@ -35,16 +35,16 @@
private final ICloudIOManager cloudIOManager;
private final BufferCache bufferCache;
private final Map<Integer, BufferedFileHandle> fileInfoMap;
- private final AtomicBoolean shutdown;
+ private final AtomicBoolean stop;
private SweepableIndexUnit indexUnit;
private BufferedFileHandle handle;
public SweepContext(ICloudIOManager cloudIOManager, BufferCache bufferCache,
- Map<Integer, BufferedFileHandle> fileInfoMap, AtomicBoolean shutdown) {
+ Map<Integer, BufferedFileHandle> fileInfoMap, AtomicBoolean stop) {
this.cloudIOManager = cloudIOManager;
this.bufferCache = bufferCache;
this.fileInfoMap = fileInfoMap;
- this.shutdown = shutdown;
+ this.stop = stop;
}
@Override
@@ -97,6 +97,6 @@
* @return true if it should stop, false otherwise
*/
public boolean stopSweeping() {
- return shutdown.get() || indexUnit.isDropped();
+ return stop.get() || indexUnit.isDropped();
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/Sweeper.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/Sweeper.java
index 6e12b79..49174bd 100644
--- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/Sweeper.java
+++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/sweeper/Sweeper.java
@@ -37,18 +37,19 @@
public final class Sweeper {
private static final Logger LOGGER = LogManager.getLogger();
- private static final SweepRequest POISON = new SweepRequest();
+ private final int queueSize;
private final BlockingQueue<SweepRequest> requests;
private final BlockingQueue<SweepRequest> freeRequests;
- private final AtomicBoolean shutdown;
- public Sweeper(ExecutorService executor, ICloudIOManager cloudIOManager, BufferCache bufferCache,
- Map<Integer, BufferedFileHandle> fileInfoMap, int numOfSweepThreads, int queueSize) {
+ public Sweeper(AtomicBoolean paused, ExecutorService executor, ICloudIOManager cloudIOManager,
+ BufferCache bufferCache, Map<Integer, BufferedFileHandle> fileInfoMap, int numOfSweepThreads,
+ int queueSize) {
+ this.queueSize = queueSize;
requests = new ArrayBlockingQueue<>(queueSize);
freeRequests = new ArrayBlockingQueue<>(queueSize);
- shutdown = new AtomicBoolean(false);
for (int i = 0; i < queueSize; i++) {
- freeRequests.add(new SweepRequest(new SweepContext(cloudIOManager, bufferCache, fileInfoMap, shutdown)));
+ SweepContext context = new SweepContext(cloudIOManager, bufferCache, fileInfoMap, paused);
+ freeRequests.add(new SweepRequest(context));
}
for (int i = 0; i < numOfSweepThreads; i++) {
executor.execute(new SweepThread(requests, freeRequests, i));
@@ -61,12 +62,14 @@
requests.put(request);
}
- public void shutdown() {
- shutdown.set(true);
- requests.clear();
- freeRequests.clear();
- requests.offer(POISON);
- // TODO wait for threads to terminate
+ public void waitForRunningRequests() {
+ // Wait for all running requests if any
+ while (freeRequests.size() != queueSize) {
+ synchronized (freeRequests) {
+ LOGGER.warn("Waiting for {} running sweep requests", queueSize - freeRequests.size());
+ InvokeUtil.doUninterruptibly(freeRequests::wait);
+ }
+ }
}
private static class SweepThread implements Runnable {
@@ -88,44 +91,31 @@
SweepRequest request = null;
try {
request = requests.take();
- if (isPoison(request)) {
- break;
- }
request.handle();
} catch (InterruptedException e) {
- LOGGER.warn("Ignoring interrupt. Sweep threads should never be interrupted.");
+ LOGGER.warn("Sweep thread interrupted");
+ break;
} catch (Throwable t) {
LOGGER.error("Sweep failed", t);
} finally {
- if (request != null && request != POISON) {
+ if (request != null) {
freeRequests.add(request);
}
- }
- }
- }
-
- private boolean isPoison(SweepRequest request) {
- if (request == POISON) {
- LOGGER.info("Exiting");
- InvokeUtil.doUninterruptibly(() -> requests.put(POISON));
- if (Thread.interrupted()) {
- LOGGER.error("Ignoring interrupt. Sweep threads should never be interrupted.");
+ synchronized (freeRequests) {
+ // Notify if pause() is waiting for all requests to finish
+ freeRequests.notify();
+ }
}
- return true;
}
- return false;
+ Thread.currentThread().interrupt();
}
}
private static class SweepRequest {
private final SweepContext context;
- SweepRequest() {
- this(null);
- }
-
SweepRequest(SweepContext context) {
this.context = context;
}
@@ -150,6 +140,7 @@
*/
return;
}
+
SweepableIndexUnit indexUnit = context.getIndexUnit();
indexUnit.startSweeping();
try {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/CloudColumnIndexDiskCacheManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/CloudColumnIndexDiskCacheManager.java
index 38a1713..7187496 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/CloudColumnIndexDiskCacheManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/CloudColumnIndexDiskCacheManager.java
@@ -71,7 +71,7 @@
public IColumnReadContext createReadContext(IColumnProjectionInfo projectionInfo) {
ColumnProjectorType projectorType = projectionInfo.getProjectorType();
if (projectorType == ColumnProjectorType.QUERY) {
- planner.access(projectionInfo, drive.hasSpace());
+ planner.access(projectionInfo);
} else if (projectorType == ColumnProjectorType.MODIFY) {
planner.setIndexedColumns(projectionInfo);
// Requested (and indexed) columns will be persisted if space permits
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
index f28b2fd..2679d33 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
@@ -114,13 +114,13 @@
* - The page is not being evicted (cloudOnly)
* - The page is not planned for eviction (evictable)
* - The operation is not a merge operation (the component will be deleted anyway)
- * - The disk has space
+ * - The disk is not pressured
*
* Note: 'empty' can be false while 'cloudOnly is true'. We cannot read from disk as the page can be
* evicted at any moment. In other words, the sweeper told us that it is going to evict this page; hence
* 'cloudOnly' is true.
*/
- boolean persist = empty && !cloudOnly && !evictable && operation != MERGE && drive.hasSpace();
+ boolean persist = empty && !cloudOnly && !evictable && operation != MERGE && drive.isUnpressured();
readFromStream(ioManager, fileHandle, header, cPage, persist);
} else {
/*
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/write/CloudColumnWriteContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/write/CloudColumnWriteContext.java
index dcde833..604d6cd 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/write/CloudColumnWriteContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/write/CloudColumnWriteContext.java
@@ -67,8 +67,8 @@
@Override
public void startWritingColumn(int columnIndex, boolean overlapping) {
- if (drive.hasSpace() || indexedColumns.contains(columnIndex)) {
- // The current column should be persisted locally if free disk space permits
+ if (drive.isUnpressured() || indexedColumns.contains(columnIndex)) {
+ // The current column will be persisted locally if free disk has space (drive.hasSpace() returns true)
currentContext = DefaultBufferCacheWriteContext.INSTANCE;
} else if (plan.get(columnIndex)) {
// This column was planned for eviction, do not persist.
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepPlanner.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepPlanner.java
index 9549f4f..3875812 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepPlanner.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepPlanner.java
@@ -64,7 +64,6 @@
private long[] lastAccesses;
private double punchableThreshold;
- private long lastSweepTs;
private int numberOfSweptColumns;
private int numberOfCloudRequests;
@@ -107,16 +106,16 @@
return new IntOpenHashSet(indexedColumns);
}
- public synchronized void access(IColumnProjectionInfo projectionInfo, boolean hasSpace) {
- resetPlanIfNeeded(hasSpace);
+ public synchronized void access(IColumnProjectionInfo projectionInfo) {
+ resetPlanIfNeeded();
long accessTime = clock.getCurrentTime();
lastAccess = accessTime;
int numberOfColumns = projectionInfo.getNumberOfProjectedColumns();
boolean requireCloudAccess = false;
for (int i = 0; i < numberOfColumns; i++) {
int columnIndex = projectionInfo.getColumnIndex(i);
+ // columnIndex can be -1 when accessing a non-existing column (i.e., not known by the schema)
if (columnIndex >= 0) {
- // columnIndex can be -1 when accessing a non-existing column (i.e., not known by the schema)
lastAccesses[columnIndex] = accessTime;
requireCloudAccess |= numberOfSweptColumns > 0 && plan.get(columnIndex);
}
@@ -152,8 +151,6 @@
punchableThreshold *= PUNCHABLE_THRESHOLD_DECREMENT;
iter++;
}
- // Register the plan time
- lastSweepTs = clock.getCurrentTime();
// Add the number of evictable columns
numberOfSweptColumns += numberOfEvictableColumns;
if (numberOfEvictableColumns > 0) {
@@ -237,8 +234,8 @@
return numberOfEvictableColumns;
}
- private void resetPlanIfNeeded(boolean hasSpace) {
- if (!hasSpace || numberOfCloudRequests < REEVALUATE_PLAN_THRESHOLD) {
+ private void resetPlanIfNeeded() {
+ if (numberOfCloudRequests < REEVALUATE_PLAN_THRESHOLD) {
return;
}
@@ -249,10 +246,10 @@
int columnIndex = reevaluatedPlan.nextSetBit(i);
if (!plan.get(columnIndex)) {
// the plan contains a stale column. Invalidate!
+ LOGGER.info("Re-planning to evict {} columns. Old plan: {} new plan: {}", numberOfEvictableColumns,
+ plan, reevaluatedPlan);
plan.clear();
plan.or(reevaluatedPlan);
- LOGGER.info("Re-planning to evict {} columns. The newly evictable columns are {}",
- numberOfEvictableColumns, plan);
break;
}
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepPlannerTest.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepPlannerTest.java
index 6bace98..a5a87c4 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepPlannerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepPlannerTest.java
@@ -56,7 +56,7 @@
// Project 3 columns
projectedColumns(numberOfPrimaryKeys, numberOfColumns, 3, projectedColumns);
// access the projected columns (max 10 times)
- access(planner, info, true, 10);
+ access(planner, info, 10);
// Advance clock
clock.advance(10);
@@ -69,7 +69,7 @@
// Project another 3 columns
projectedColumns(numberOfPrimaryKeys, numberOfColumns, 3, projectedColumns);
// access the projected columns
- access(planner, info, true, 100);
+ access(planner, info, 100);
// At this point, the plan should change
BitSet newKeptColumns = new BitSet();
@@ -89,10 +89,10 @@
System.out.println("Kept columns: " + keptColumns);
}
- private void access(ColumnSweepPlanner planner, DummyColumnProjectionInfo info, boolean hasSpace, int bound) {
+ private void access(ColumnSweepPlanner planner, DummyColumnProjectionInfo info, int bound) {
int numberOfAccesses = RANDOM.nextInt(1, bound);
for (int i = 0; i < numberOfAccesses; i++) {
- planner.access(info, hasSpace);
+ planner.access(info);
clock.advance(1);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
index 271afad..d2a6fc0 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java
@@ -567,7 +567,13 @@
private void read(CachedPage cPage, IBufferCacheReadContext context) throws HyracksDataException {
BufferedFileHandle fInfo = getFileHandle(cPage);
cPage.buffer.clear();
- fInfo.read(cPage, context);
+ try {
+ fInfo.read(cPage, context);
+ } catch (Throwable e) {
+ LOGGER.error("Error while reading a page {} in file {}", cPage, fInfo);
+ throw e;
+ }
+
final IThreadStats threadStats = statsSubscribers.get(Thread.currentThread());
if (threadStats != null && context.incrementStats()) {
threadStats.coldRead();
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/DummyPhysicalDrive.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/DummyPhysicalDrive.java
index af4162b..ad9a940 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/DummyPhysicalDrive.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/DummyPhysicalDrive.java
@@ -30,6 +30,11 @@
}
@Override
+ public boolean isUnpressured() {
+ return false;
+ }
+
+ @Override
public boolean hasSpace() {
return true;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskCacheMonitoringService.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskCacheMonitoringService.java
index b957782..4151c77 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskCacheMonitoringService.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IDiskCacheMonitoringService.java
@@ -37,6 +37,16 @@
void stop();
/**
+ * Pause all disk caching activities
+ */
+ void pause();
+
+ /**
+ * Resume all disk caching activities
+ */
+ void resume();
+
+ /**
* @return whether the monitoring service is enabled
*/
boolean isEnabled();
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IPhysicalDrive.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IPhysicalDrive.java
index d2aaec1..6818140 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IPhysicalDrive.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/IPhysicalDrive.java
@@ -33,7 +33,13 @@
boolean computeAndCheckIsPressured();
/**
- * @return if drive is still has free space (i.e., not pressured)
+ * @return true if the drive is not pressured, false otherwise
+ */
+ boolean isUnpressured();
+
+ /**
+ * @return true if the disk has free space, false otherwise
+ * Note: disk could be pressured but still has free space for storage
*/
boolean hasSpace();
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskCacheMonitoringService.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskCacheMonitoringService.java
index d6155a4..5cbf5cb 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskCacheMonitoringService.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/disk/NoOpDiskCacheMonitoringService.java
@@ -39,6 +39,16 @@
}
@Override
+ public void pause() {
+ // NoOp
+ }
+
+ @Override
+ public void resume() {
+ // NoOp
+ }
+
+ @Override
public boolean isEnabled() {
return false;
}