[NO ISSUE][*DB][CLOUD] Ensure interrupts are not lost during GCS ops
Ext-ref: MB-65432
Change-Id: Iab4b86fe3966a2d0b509c9bd871f792c3e35ff23
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19464
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Tested-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 343baf0..de4a066 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -225,13 +225,15 @@
IConfigValidatorFactory configValidatorFactory, IReplicationStrategyFactory replicationStrategyFactory,
boolean initialRun) throws IOException {
ioManager = getServiceContext().getIoManager();
+ threadExecutor =
+ MaintainedThreadNameExecutorService.newCachedThreadPool(getServiceContext().getThreadFactory());
CloudConfigurator cloudConfigurator;
IDiskResourceCacheLockNotifier lockNotifier;
IDiskCachedPageAllocator pageAllocator;
IBufferCacheReadContext defaultContext;
if (isCloudDeployment()) {
cloudConfigurator = CloudConfigurator.of(cloudProperties, ioManager, namespacePathResolver,
- getCloudGuardian(cloudProperties));
+ getCloudGuardian(cloudProperties), threadExecutor);
persistenceIOManager = cloudConfigurator.getCloudIoManager();
partitionBootstrapper = cloudConfigurator.getPartitionBootstrapper();
lockNotifier = cloudConfigurator.getLockNotifier();
@@ -247,8 +249,6 @@
}
int ioQueueLen = getServiceContext().getAppConfig().getInt(NCConfig.Option.IO_QUEUE_SIZE);
- threadExecutor =
- MaintainedThreadNameExecutorService.newCachedThreadPool(getServiceContext().getThreadFactory());
ICacheMemoryAllocator bufferAllocator = new HeapBufferAllocator();
IPageCleanerPolicy pcp = new DelayPageCleanerPolicy(600000);
IPageReplacementStrategy prs = new ClockPageReplacementStrategy(bufferAllocator, pageAllocator,
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 78f574b..179b996 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -131,7 +131,7 @@
* not supported, yet.
*/
@Override
- public SystemState getSystemState() throws ACIDException {
+ public SystemState getSystemState() throws ACIDException, HyracksDataException {
//read checkpoint file
Checkpoint checkpointObject = checkpointManager.getLatest();
if (checkpointObject == null) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 6928b64..5eafe05 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -191,7 +191,7 @@
if (cloudDeployment) {
cloudProperties = new CloudProperties(PropertiesAccessor.getInstance(ccServiceCtx.getAppConfig()));
ioManager = CloudConfigurator.createIOManager(ioManager, cloudProperties, namespacePathResolver,
- getCloudGuardian(cloudProperties));
+ getCloudGuardian(cloudProperties), controllerService.getExecutor());
}
IGlobalTxManager globalTxManager = createGlobalTxManager(ioManager);
appCtx = createApplicationContext(null, globalRecoveryManager, lifecycleCoordinator, Receptionist::new,
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index 6f8126b..0da4e0d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -370,7 +370,7 @@
return nodeStatus == NodeStatus.IDLE && (primaryCc == null || primaryCc.equals(registeredCc));
}
- private SystemState getCurrentSystemState() {
+ private SystemState getCurrentSystemState() throws HyracksDataException {
final NodeProperties nodeProperties = runtimeContext.getNodeProperties();
IRecoveryManager recoveryMgr = runtimeContext.getTransactionSubsystem().getRecoveryManager();
SystemState state = recoveryMgr.getSystemState();
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
index 912ca47..516ede2 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
@@ -31,6 +31,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
import java.util.function.Predicate;
import org.apache.asterix.cloud.bulk.DeleteBulkCloudOperation;
@@ -84,12 +85,13 @@
private final List<FileStore> drivePaths;
public AbstractCloudIOManager(IOManager ioManager, CloudProperties cloudProperties,
- INamespacePathResolver nsPathResolver, ICloudGuardian guardian) throws HyracksDataException {
+ INamespacePathResolver nsPathResolver, ICloudGuardian guardian, ExecutorService executor)
+ throws HyracksDataException {
super(ioManager.getIODevices(), ioManager.getDeviceComputer(), ioManager.getIOParallelism(),
ioManager.getQueueSize());
this.nsPathResolver = nsPathResolver;
this.bucket = cloudProperties.getStorageBucket();
- cloudClient = CloudClientProvider.getClient(cloudProperties, guardian);
+ cloudClient = CloudClientProvider.getClient(cloudProperties, guardian, executor);
this.guardian = guardian;
int numOfThreads = getIODevices().size() * getIOParallelism();
writeBufferProvider = new WriteBufferProvider(numOfThreads, cloudClient.getWriteBufferSize());
@@ -106,7 +108,7 @@
*/
@Override
- public SystemState getSystemStateOnMissingCheckpoint() {
+ public SystemState getSystemStateOnMissingCheckpoint() throws HyracksDataException {
Set<CloudFile> existingMetadataFiles = getCloudMetadataPartitionFiles();
CloudFile bootstrapMarkerPath = CloudFile.of(StoragePathUtil.getBootstrapMarkerRelativePath(nsPathResolver));
if (existingMetadataFiles.isEmpty() || existingMetadataFiles.contains(bootstrapMarkerPath)) {
@@ -436,7 +438,7 @@
* @param key the key where the bytes will be written
* @param bytes the bytes to write
*/
- public final void put(String key, byte[] bytes) {
+ public final void put(String key, byte[] bytes) throws HyracksDataException {
cloudClient.write(bucket, key, bytes);
}
@@ -444,7 +446,7 @@
return cloudClient;
}
- private Set<CloudFile> getCloudMetadataPartitionFiles() {
+ private Set<CloudFile> getCloudMetadataPartitionFiles() throws HyracksDataException {
String metadataNamespacePath = StoragePathUtil.getNamespacePath(nsPathResolver,
MetadataConstants.METADATA_NAMESPACE, METADATA_PARTITION);
return cloudClient.listObjects(bucket, metadataNamespacePath, IoUtil.NO_OP_FILTER);
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java
index dced5b0..e494333 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java
@@ -70,11 +70,12 @@
private final long diskCacheMonitoringInterval;
private CloudConfigurator(CloudProperties cloudProperties, IIOManager ioManager,
- INamespacePathResolver nsPathResolver, ICloudGuardian guardian) throws HyracksDataException {
+ INamespacePathResolver nsPathResolver, ICloudGuardian guardian, ExecutorService executor)
+ throws HyracksDataException {
this.cloudProperties = cloudProperties;
localIoManager = (IOManager) ioManager;
diskCacheManagerRequired = cloudProperties.getCloudCachePolicy() == CloudCachePolicy.SELECTIVE;
- cloudIOManager = createIOManager(ioManager, cloudProperties, nsPathResolver, guardian);
+ cloudIOManager = createIOManager(ioManager, cloudProperties, nsPathResolver, guardian, executor);
physicalDrive = createPhysicalDrive(diskCacheManagerRequired, cloudProperties, ioManager);
lockNotifier = createLockNotifier(diskCacheManagerRequired);
pageAllocator = createPageAllocator(diskCacheManagerRequired);
@@ -131,20 +132,22 @@
}
public static CloudConfigurator of(CloudProperties cloudProperties, IIOManager ioManager,
- INamespacePathResolver nsPathResolver, ICloudGuardian cloudGuardian) throws HyracksDataException {
- return new CloudConfigurator(cloudProperties, ioManager, nsPathResolver, cloudGuardian);
+ INamespacePathResolver nsPathResolver, ICloudGuardian cloudGuardian, ExecutorService executor)
+ throws HyracksDataException {
+ return new CloudConfigurator(cloudProperties, ioManager, nsPathResolver, cloudGuardian, executor);
}
public static AbstractCloudIOManager createIOManager(IIOManager ioManager, CloudProperties cloudProperties,
- INamespacePathResolver nsPathResolver, ICloudGuardian guardian) throws HyracksDataException {
+ INamespacePathResolver nsPathResolver, ICloudGuardian guardian, ExecutorService executor)
+ throws HyracksDataException {
IOManager localIoManager = (IOManager) ioManager;
CloudCachePolicy policy = cloudProperties.getCloudCachePolicy();
if (policy == CloudCachePolicy.EAGER) {
- return new EagerCloudIOManager(localIoManager, cloudProperties, nsPathResolver, guardian);
+ return new EagerCloudIOManager(localIoManager, cloudProperties, nsPathResolver, guardian, executor);
}
boolean selective = policy == CloudCachePolicy.SELECTIVE;
- return new LazyCloudIOManager(localIoManager, cloudProperties, nsPathResolver, selective, guardian);
+ return new LazyCloudIOManager(localIoManager, cloudProperties, nsPathResolver, selective, guardian, executor);
}
private static IPhysicalDrive createPhysicalDrive(boolean diskCacheManagerRequired, CloudProperties cloudProperties,
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudManagerProvider.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudManagerProvider.java
index b49f3ce..b944161 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudManagerProvider.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudManagerProvider.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.cloud;
+import java.util.concurrent.ExecutorService;
+
import org.apache.asterix.cloud.clients.ICloudGuardian;
import org.apache.asterix.common.api.INamespacePathResolver;
import org.apache.asterix.common.cloud.CloudCachePolicy;
@@ -32,13 +34,14 @@
}
public static IIOManager createIOManager(CloudProperties cloudProperties, IIOManager ioManager,
- INamespacePathResolver nsPathResolver, ICloudGuardian guardian) throws HyracksDataException {
+ INamespacePathResolver nsPathResolver, ICloudGuardian guardian, ExecutorService executor)
+ throws HyracksDataException {
IOManager localIoManager = (IOManager) ioManager;
if (cloudProperties.getCloudCachePolicy() == CloudCachePolicy.LAZY) {
- return new LazyCloudIOManager(localIoManager, cloudProperties, nsPathResolver, false, guardian);
+ return new LazyCloudIOManager(localIoManager, cloudProperties, nsPathResolver, false, guardian, executor);
}
- return new EagerCloudIOManager(localIoManager, cloudProperties, nsPathResolver, guardian);
+ return new EagerCloudIOManager(localIoManager, cloudProperties, nsPathResolver, guardian, executor);
}
public static IPartitionBootstrapper getCloudPartitionBootstrapper(IIOManager ioManager) {
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
index 1cb6077..29e5da0 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
@@ -23,6 +23,7 @@
import java.io.File;
import java.util.Collections;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import org.apache.asterix.cloud.clients.ICloudGuardian;
@@ -48,8 +49,9 @@
private static final Logger LOGGER = LogManager.getLogger();
public EagerCloudIOManager(IOManager ioManager, CloudProperties cloudProperties,
- INamespacePathResolver nsPathResolver, ICloudGuardian guardian) throws HyracksDataException {
- super(ioManager, cloudProperties, nsPathResolver, guardian);
+ INamespacePathResolver nsPathResolver, ICloudGuardian guardian, ExecutorService executor)
+ throws HyracksDataException {
+ super(ioManager, cloudProperties, nsPathResolver, guardian, executor);
}
/*
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
index 1c5efd9..7a7ea06 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
@@ -29,6 +29,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import org.apache.asterix.cloud.bulk.DeleteBulkCloudOperation;
@@ -70,9 +71,9 @@
private ILazyAccessor accessor;
public LazyCloudIOManager(IOManager ioManager, CloudProperties cloudProperties,
- INamespacePathResolver nsPathResolver, boolean selective, ICloudGuardian guardian)
+ INamespacePathResolver nsPathResolver, boolean selective, ICloudGuardian guardian, ExecutorService executor)
throws HyracksDataException {
- super(ioManager, cloudProperties, nsPathResolver, guardian);
+ super(ioManager, cloudProperties, nsPathResolver, guardian, executor);
accessor = new InitialCloudAccessor(cloudClient, bucket, localIoManager);
puncher = HolePuncherProvider.get(this, cloudProperties, writeBufferProvider);
if (selective) {
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
index c98c6b4..a0c8bd0 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.cloud.clients;
+import java.util.concurrent.ExecutorService;
+
import org.apache.asterix.cloud.clients.aws.s3.S3ClientConfig;
import org.apache.asterix.cloud.clients.aws.s3.S3CloudClient;
import org.apache.asterix.cloud.clients.azure.blobstorage.AzBlobStorageClientConfig;
@@ -38,8 +40,8 @@
throw new AssertionError("do not instantiate");
}
- public static ICloudClient getClient(CloudProperties cloudProperties, ICloudGuardian guardian)
- throws HyracksDataException {
+ public static ICloudClient getClient(CloudProperties cloudProperties, ICloudGuardian guardian,
+ ExecutorService executor) throws HyracksDataException {
String storageScheme = cloudProperties.getStorageScheme();
ICloudClient cloudClient;
if (S3.equalsIgnoreCase(storageScheme)) {
@@ -47,7 +49,7 @@
cloudClient = new S3CloudClient(config, guardian);
} else if (GCS.equalsIgnoreCase(storageScheme)) {
GCSClientConfig config = GCSClientConfig.of(cloudProperties);
- cloudClient = new GCSCloudClient(config, guardian);
+ cloudClient = new GCSCloudClient(config, guardian, executor);
} else if (AZ_BLOB.equalsIgnoreCase(storageScheme)) {
AzBlobStorageClientConfig config = AzBlobStorageClientConfig.of(cloudProperties);
cloudClient = new AzBlobStorageCloudClient(config, guardian);
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
index fd82944..d332944 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
@@ -65,7 +65,7 @@
* @param filter filter to apply
* @return file names returned after applying the file name filter
*/
- Set<CloudFile> listObjects(String bucket, String path, FilenameFilter filter);
+ Set<CloudFile> listObjects(String bucket, String path, FilenameFilter filter) throws HyracksDataException;
/**
* Performs a range-read from the specified bucket and path starting at the offset. The amount read is equal to the
@@ -107,7 +107,7 @@
* @param path path
* @param data data
*/
- void write(String bucket, String path, byte[] data);
+ void write(String bucket, String path, byte[] data) throws HyracksDataException;
/**
* Copies an object from the source path to the destination path
@@ -116,7 +116,7 @@
* @param srcPath source path
* @param destPath destination path
*/
- void copy(String bucket, String srcPath, FileReference destPath);
+ void copy(String bucket, String srcPath, FileReference destPath) throws HyracksDataException;
/**
* Deletes all objects at the specified bucket and paths
@@ -162,7 +162,7 @@
* @param bucket bucket name
* @return {@link JsonNode} with stored objects' information
*/
- JsonNode listAsJson(ObjectMapper objectMapper, String bucket);
+ JsonNode listAsJson(ObjectMapper objectMapper, String bucket) throws HyracksDataException;
/**
* Performs any necessary closing and cleaning up
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java
index 28fa53e..2f61b31 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java
@@ -66,7 +66,7 @@
}
@Override
- public Set<CloudFile> listObjects(String bucket, String path, FilenameFilter filter) {
+ public Set<CloudFile> listObjects(String bucket, String path, FilenameFilter filter) throws HyracksDataException {
return cloudClient.listObjects(bucket, path, filter);
}
@@ -88,12 +88,12 @@
}
@Override
- public void write(String bucket, String path, byte[] data) {
+ public void write(String bucket, String path, byte[] data) throws HyracksDataException {
cloudClient.write(bucket, path, data);
}
@Override
- public void copy(String bucket, String srcPath, FileReference destPath) {
+ public void copy(String bucket, String srcPath, FileReference destPath) throws HyracksDataException {
cloudClient.copy(bucket, srcPath, destPath);
}
@@ -127,7 +127,7 @@
}
@Override
- public JsonNode listAsJson(ObjectMapper objectMapper, String bucket) {
+ public JsonNode listAsJson(ObjectMapper objectMapper, String bucket) throws HyracksDataException {
return cloudClient.listAsJson(objectMapper, bucket);
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
index 924f34a..99cda9e 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
@@ -31,6 +31,9 @@
import java.util.Iterator;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
import org.apache.asterix.cloud.IWriteBufferProvider;
import org.apache.asterix.cloud.clients.CloudFile;
@@ -76,12 +79,15 @@
private final ICloudGuardian guardian;
private final IRequestProfilerLimiter profilerLimiter;
private final int writeBufferSize;
+ private final ExecutorService executor;
- public GCSCloudClient(GCSClientConfig config, Storage gcsClient, ICloudGuardian guardian) {
+ public GCSCloudClient(GCSClientConfig config, Storage gcsClient, ICloudGuardian guardian,
+ ExecutorService executor) {
this.gcsClient = gcsClient;
this.config = config;
this.guardian = guardian;
this.writeBufferSize = config.getWriteBufferSize();
+ this.executor = executor;
long profilerInterval = config.getProfilerLogInterval();
GCSRequestRateLimiter limiter = new GCSRequestRateLimiter(config);
if (profilerInterval > 0) {
@@ -92,8 +98,9 @@
guardian.setCloudClient(this);
}
- public GCSCloudClient(GCSClientConfig config, ICloudGuardian guardian) throws HyracksDataException {
- this(config, buildClient(config), guardian);
+ public GCSCloudClient(GCSClientConfig config, ICloudGuardian guardian, ExecutorService executor)
+ throws HyracksDataException {
+ this(config, buildClient(config), guardian, executor);
}
@Override
@@ -112,12 +119,12 @@
}
@Override
- public Set<CloudFile> listObjects(String bucket, String path, FilenameFilter filter) {
+ public Set<CloudFile> listObjects(String bucket, String path, FilenameFilter filter) throws HyracksDataException {
guardian.checkReadAccess(bucket, path);
profilerLimiter.objectsList();
- Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.prefix(config.getPrefix() + path),
- BlobListOption.fields(Storage.BlobField.SIZE));
-
+ // MB-65432: Storage.list is not interrupt-safe; we need to offload onto another thread
+ Page<Blob> blobs = runOpInterruptibly(() -> gcsClient.list(bucket,
+ BlobListOption.prefix(config.getPrefix() + path), BlobListOption.fields(Storage.BlobField.SIZE)));
Set<CloudFile> files = new HashSet<>();
for (Blob blob : blobs.iterateAll()) {
if (filter.accept(null, IoUtil.getFileNameFromPath(blob.getName()))) {
@@ -150,15 +157,21 @@
}
@Override
- public byte[] readAllBytes(String bucket, String path) {
+ public byte[] readAllBytes(String bucket, String path) throws HyracksDataException {
guardian.checkReadAccess(bucket, path);
profilerLimiter.objectGet();
BlobId blobId = BlobId.of(bucket, config.getPrefix() + path);
- try {
- return gcsClient.readAllBytes(blobId);
- } catch (StorageException e) {
- return null;
- }
+ // MB-65432: Storage.readAllBytes is not interrupt-safe; we need to offload onto another thread
+ return runOpInterruptibly(() -> {
+ try {
+ return gcsClient.readAllBytes(blobId);
+ } catch (StorageException e) {
+ if (e.getCode() == 404) {
+ return null;
+ }
+ throw e;
+ }
+ });
}
@Override
@@ -176,27 +189,32 @@
}
@Override
- public void write(String bucket, String path, byte[] data) {
+ public void write(String bucket, String path, byte[] data) throws HyracksDataException {
guardian.checkWriteAccess(bucket, path);
profilerLimiter.objectWrite();
BlobInfo blobInfo = BlobInfo.newBuilder(bucket, config.getPrefix() + path).build();
- gcsClient.create(blobInfo, data);
+ // MB-65432: Storage.create is not interrupt-safe; we need to offload onto another thread
+ runOpInterruptibly(() -> gcsClient.create(blobInfo, data));
}
@Override
- public void copy(String bucket, String srcPath, FileReference destPath) {
+ public void copy(String bucket, String srcPath, FileReference destPath) throws HyracksDataException {
guardian.checkReadAccess(bucket, srcPath);
profilerLimiter.objectsList();
- Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.prefix(config.getPrefix() + srcPath));
- for (Blob blob : blobs.iterateAll()) {
- profilerLimiter.objectCopy();
- BlobId source = blob.getBlobId();
- String targetName = destPath.getChildPath(IoUtil.getFileNameFromPath(source.getName()));
- BlobId target = BlobId.of(bucket, targetName);
- guardian.checkWriteAccess(bucket, targetName);
- CopyRequest copyReq = CopyRequest.newBuilder().setSource(source).setTarget(target).build();
- gcsClient.copy(copyReq);
- }
+ // MB-65432: Storage.list & copy are not interrupt-safe; we need to offload onto another thread
+ runOpInterruptibly(() -> {
+ Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.prefix(config.getPrefix() + srcPath));
+ for (Blob blob : blobs.iterateAll()) {
+ profilerLimiter.objectCopy();
+ BlobId source = blob.getBlobId();
+ String targetName = destPath.getChildPath(IoUtil.getFileNameFromPath(source.getName()));
+ BlobId target = BlobId.of(bucket, targetName);
+ guardian.checkWriteAccess(bucket, targetName);
+ CopyRequest copyReq = CopyRequest.newBuilder().setSource(source).setTarget(target).build();
+ gcsClient.copy(copyReq);
+ }
+ return null;
+ });
}
@Override
@@ -206,17 +224,16 @@
}
List<StorageBatchResult<Boolean>> deleteResponses = new ArrayList<>();
- StorageBatch batchRequest;
Iterator<String> pathIter = paths.iterator();
while (pathIter.hasNext()) {
- batchRequest = gcsClient.batch();
+ StorageBatch batchRequest = gcsClient.batch();
for (int i = 0; pathIter.hasNext() && i < DELETE_BATCH_SIZE; i++) {
BlobId blobId = BlobId.of(bucket, config.getPrefix() + pathIter.next());
guardian.checkWriteAccess(bucket, blobId.getName());
deleteResponses.add(batchRequest.delete(blobId));
}
-
- batchRequest.submit();
+ // MB-65432: StorageBatch.submit may not be interrupt-safe; we need to offload onto another thread
+ runOpInterruptibly(batchRequest::submit);
Iterator<String> deletePathIter = paths.iterator();
for (StorageBatchResult<Boolean> deleteResponse : deleteResponses) {
String deletedPath = deletePathIter.next();
@@ -240,11 +257,12 @@
}
@Override
- public long getObjectSize(String bucket, String path) {
+ public long getObjectSize(String bucket, String path) throws HyracksDataException {
guardian.checkReadAccess(bucket, path);
profilerLimiter.objectGet();
- Blob blob =
- gcsClient.get(bucket, config.getPrefix() + path, Storage.BlobGetOption.fields(Storage.BlobField.SIZE));
+ // MB-65432: Storage.get is not interrupt-safe; we need to offload onto another thread
+ Blob blob = runOpInterruptibly(() -> gcsClient.get(bucket, config.getPrefix() + path,
+ Storage.BlobGetOption.fields(Storage.BlobField.SIZE)));
if (blob == null) {
return 0;
}
@@ -252,19 +270,22 @@
}
@Override
- public boolean exists(String bucket, String path) {
+ public boolean exists(String bucket, String path) throws HyracksDataException {
guardian.checkReadAccess(bucket, path);
profilerLimiter.objectGet();
- Blob blob = gcsClient.get(bucket, config.getPrefix() + path,
- Storage.BlobGetOption.fields(Storage.BlobField.values()));
+ // MB-65432: Storage.get is not interrupt-safe; we need to offload onto another thread
+ Blob blob = runOpInterruptibly(() -> gcsClient.get(bucket, config.getPrefix() + path,
+ Storage.BlobGetOption.fields(Storage.BlobField.values())));
return blob != null && blob.exists();
}
@Override
- public boolean isEmptyPrefix(String bucket, String path) {
+ public boolean isEmptyPrefix(String bucket, String path) throws HyracksDataException {
guardian.checkReadAccess(bucket, path);
profilerLimiter.objectsList();
- Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.prefix(config.getPrefix() + path));
+ // MB-65432: Storage.list is not interrupt-safe; we need to offload onto another thread
+ Page<Blob> blobs =
+ runOpInterruptibly(() -> gcsClient.list(bucket, BlobListOption.prefix(config.getPrefix() + path)));
return !blobs.iterateAll().iterator().hasNext();
}
@@ -275,10 +296,12 @@
}
@Override
- public JsonNode listAsJson(ObjectMapper objectMapper, String bucket) {
+ public JsonNode listAsJson(ObjectMapper objectMapper, String bucket) throws HyracksDataException {
guardian.checkReadAccess(bucket, "/");
profilerLimiter.objectsList();
- Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.fields(Storage.BlobField.SIZE));
+ // MB-65432: Storage.list is not interrupt-safe; we need to offload onto another thread
+ Page<Blob> blobs =
+ runOpInterruptibly(() -> gcsClient.list(bucket, BlobListOption.fields(Storage.BlobField.SIZE)));
ArrayNode objectsInfo = objectMapper.createArrayNode();
List<Blob> objects = new ArrayList<>();
@@ -313,4 +336,24 @@
private String stripCloudPrefix(String objectName) {
return objectName.substring(config.getPrefix().length());
}
+
+ private void runOpInterruptibly(Runnable operation) throws HyracksDataException {
+ try {
+ executor.submit(operation).get();
+ } catch (InterruptedException e) {
+ throw HyracksDataException.create(e);
+ } catch (ExecutionException e) {
+ throw HyracksDataException.create(e.getCause());
+ }
+ }
+
+ private <T> T runOpInterruptibly(Supplier<T> operation) throws HyracksDataException {
+ try {
+ return executor.submit(operation::get).get();
+ } catch (InterruptedException e) {
+ throw HyracksDataException.create(e);
+ } catch (ExecutionException e) {
+ throw HyracksDataException.create(e.getCause());
+ }
+ }
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java
index 0d30120..cbbaf31 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java
@@ -33,6 +33,7 @@
import org.apache.commons.io.FileUtils;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.util.InvokeUtil;
import org.apache.hyracks.control.nc.io.IOManager;
import com.google.api.gax.paging.Page;
@@ -50,7 +51,6 @@
public class GCSParallelDownloader implements IParallelDownloader {
- // private static final Logger LOGGER = LogManager.getLogger();
private final String bucket;
private final IOManager ioManager;
private final Storage gcsClient;
@@ -95,6 +95,7 @@
downloadJobs.add(transferManager.downloadBlobs(entry.getValue(),
downConfig.setDownloadDirectory(entry.getKey()).build()));
}
+ // MB-65432: DownloadJob.getDownloadResults is interrupt-safe; no need to offload
downloadJobs.forEach(DownloadJob::getDownloadResults);
}
@@ -121,6 +122,7 @@
}
List<DownloadResult> results;
for (DownloadJob job : downloadJobs) {
+ // MB-65432: DownloadJob.getDownloadResults is interrupt-safe; no need to offload
results = job.getDownloadResults();
for (DownloadResult result : results) {
if (result.getStatus() != TransferStatus.SUCCESS) {
@@ -134,12 +136,7 @@
@Override
public void close() throws HyracksDataException {
- try {
- transferManager.close();
- gcsClient.close();
- } catch (Exception e) {
- throw HyracksDataException.create(e);
- }
+ InvokeUtil.tryWithCleanupsAsHyracks(transferManager::close, gcsClient::close);
}
private <K, V> void addToMap(Map<K, List<V>> map, K key, V value) {
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
index 89d8fd5..f240ad4 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
@@ -108,9 +108,10 @@
profiler.objectMultipartUpload();
try {
writer.close();
- writer = null;
} catch (IOException | RuntimeException e) {
throw HyracksDataException.create(e);
+ } finally {
+ writer = null;
}
log("FINISHED");
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
index 63a8366..0aa1f87 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
@@ -65,7 +65,8 @@
ICloudClient createCloudClient(IApplicationContext appCtx) throws CompilationException {
GCSClientConfig config = GCSClientConfig.of(configuration, writeBufferSize);
return new GCSCloudClient(config, GCSUtils.buildClient(configuration),
- ICloudGuardian.NoOpCloudGuardian.INSTANCE);
+ ICloudGuardian.NoOpCloudGuardian.INSTANCE,
+ appCtx.getServiceContext().getControllerService().getExecutor());
}
@Override
diff --git a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java
index d89c872..2f64c37 100644
--- a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java
+++ b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.cloud.gcs;
+import java.util.concurrent.Executors;
+
import org.apache.asterix.cloud.AbstractLSMTest;
import org.apache.asterix.cloud.clients.ICloudGuardian;
import org.apache.asterix.cloud.clients.google.gcs.GCSClientConfig;
@@ -52,7 +54,8 @@
int writeBufferSize = StorageUtil.getIntSizeInBytes(5, StorageUtil.StorageUnit.MEGABYTE);
GCSClientConfig config =
new GCSClientConfig(MOCK_SERVER_REGION, MOCK_SERVER_HOSTNAME, true, 0, writeBufferSize, "");
- CLOUD_CLIENT = new GCSCloudClient(config, ICloudGuardian.NoOpCloudGuardian.INSTANCE);
+ CLOUD_CLIENT =
+ new GCSCloudClient(config, ICloudGuardian.NoOpCloudGuardian.INSTANCE, Executors.newCachedThreadPool());
}
private static void cleanup() {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cloud/IPartitionBootstrapper.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cloud/IPartitionBootstrapper.java
index 73a1392..88e88ab 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cloud/IPartitionBootstrapper.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cloud/IPartitionBootstrapper.java
@@ -34,7 +34,7 @@
* @return the systems state in case the checkpoint upon calling {@link IRecoveryManager#getSystemState()}
* is missing the checkpoint file
*/
- IRecoveryManager.SystemState getSystemStateOnMissingCheckpoint();
+ IRecoveryManager.SystemState getSystemStateOnMissingCheckpoint() throws HyracksDataException;
/**
* Bootstraps the node's partitions directory by doing the following:
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
index aef2215..a5f79ac 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
@@ -61,7 +61,7 @@
* @return SystemState The state of the system
* @throws ACIDException
*/
- SystemState getSystemState() throws ACIDException;
+ SystemState getSystemState() throws ACIDException, HyracksDataException;
/**
* Rolls back a transaction.
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
index fdda793..46c8ec2 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
@@ -45,6 +45,8 @@
return (HyracksDataException) cause;
} else if (cause instanceof InterruptedException) {
Thread.currentThread().interrupt();
+ } else if (cause instanceof Error) {
+ throw (Error) cause;
}
return new HyracksDataException(cause);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
index e7970d3..ff0a3c5 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
@@ -216,6 +216,39 @@
@SuppressWarnings({ "squid:S1181", "squid:S1193", "ConstantConditions", "UnreachableCode" })
// catching Throwable, instanceofs, false-positive unreachable code
+ public static void tryWithCleanupsAsHyracks(ThrowingAction action, ThrowingAction... cleanups)
+ throws HyracksDataException {
+ Throwable savedT = null;
+ boolean suppressedInterrupted = false;
+ try {
+ action.run();
+ } catch (Throwable t) {
+ savedT = t;
+ } finally {
+ for (ThrowingAction cleanup : cleanups) {
+ try {
+ cleanup.run();
+ } catch (Throwable t) {
+ if (savedT != null) {
+ savedT.addSuppressed(t);
+ suppressedInterrupted |= t instanceof InterruptedException;
+ } else {
+ savedT = t;
+ }
+ }
+ }
+ }
+ if (savedT == null) {
+ return;
+ }
+ if (suppressedInterrupted) {
+ Thread.currentThread().interrupt();
+ }
+ throw HyracksDataException.create(savedT);
+ }
+
+ @SuppressWarnings({ "squid:S1181", "squid:S1193", "ConstantConditions", "UnreachableCode" })
+ // catching Throwable, instanceofs, false-positive unreachable code
public static void tryWithCleanups(ThrowingAction action, ThrowingAction... cleanups) throws Exception {
Throwable savedT = null;
boolean suppressedInterrupted = false;