[ASTERIXDB-3581][EXT]: Do not retry GCS SDK requests if thread is interrupted
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Do not retry any GCS requests if the thread
has been interrupted.
Ext-ref: MB-65548
Change-Id: I5deb956bda11ee2ca41a1c05616788b2d697622f
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19525
Reviewed-by: Michael Blow <mblow@apache.org>
Tested-by: Hussain Towaileb <hussainht@gmail.com>
Integration-Tests: Hussain Towaileb <hussainht@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 007826b..6416df9 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -233,7 +233,7 @@
IBufferCacheReadContext defaultContext;
if (isCloudDeployment()) {
cloudConfigurator = CloudConfigurator.of(cloudProperties, ioManager, namespacePathResolver,
- getCloudGuardian(cloudProperties), threadExecutor);
+ getCloudGuardian(cloudProperties));
persistenceIOManager = cloudConfigurator.getCloudIoManager();
partitionBootstrapper = cloudConfigurator.getPartitionBootstrapper();
lockNotifier = cloudConfigurator.getLockNotifier();
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 5eafe05..6928b64 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -191,7 +191,7 @@
if (cloudDeployment) {
cloudProperties = new CloudProperties(PropertiesAccessor.getInstance(ccServiceCtx.getAppConfig()));
ioManager = CloudConfigurator.createIOManager(ioManager, cloudProperties, namespacePathResolver,
- getCloudGuardian(cloudProperties), controllerService.getExecutor());
+ getCloudGuardian(cloudProperties));
}
IGlobalTxManager globalTxManager = createGlobalTxManager(ioManager);
appCtx = createApplicationContext(null, globalRecoveryManager, lifecycleCoordinator, Receptionist::new,
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
index 516ede2..8fefcf5 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
@@ -31,7 +31,6 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.ExecutorService;
import java.util.function.Predicate;
import org.apache.asterix.cloud.bulk.DeleteBulkCloudOperation;
@@ -85,13 +84,12 @@
private final List<FileStore> drivePaths;
public AbstractCloudIOManager(IOManager ioManager, CloudProperties cloudProperties,
- INamespacePathResolver nsPathResolver, ICloudGuardian guardian, ExecutorService executor)
- throws HyracksDataException {
+ INamespacePathResolver nsPathResolver, ICloudGuardian guardian) throws HyracksDataException {
super(ioManager.getIODevices(), ioManager.getDeviceComputer(), ioManager.getIOParallelism(),
ioManager.getQueueSize());
this.nsPathResolver = nsPathResolver;
this.bucket = cloudProperties.getStorageBucket();
- cloudClient = CloudClientProvider.getClient(cloudProperties, guardian, executor);
+ cloudClient = CloudClientProvider.getClient(cloudProperties, guardian);
this.guardian = guardian;
int numOfThreads = getIODevices().size() * getIOParallelism();
writeBufferProvider = new WriteBufferProvider(numOfThreads, cloudClient.getWriteBufferSize());
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java
index e494333..dced5b0 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java
@@ -70,12 +70,11 @@
private final long diskCacheMonitoringInterval;
private CloudConfigurator(CloudProperties cloudProperties, IIOManager ioManager,
- INamespacePathResolver nsPathResolver, ICloudGuardian guardian, ExecutorService executor)
- throws HyracksDataException {
+ INamespacePathResolver nsPathResolver, ICloudGuardian guardian) throws HyracksDataException {
this.cloudProperties = cloudProperties;
localIoManager = (IOManager) ioManager;
diskCacheManagerRequired = cloudProperties.getCloudCachePolicy() == CloudCachePolicy.SELECTIVE;
- cloudIOManager = createIOManager(ioManager, cloudProperties, nsPathResolver, guardian, executor);
+ cloudIOManager = createIOManager(ioManager, cloudProperties, nsPathResolver, guardian);
physicalDrive = createPhysicalDrive(diskCacheManagerRequired, cloudProperties, ioManager);
lockNotifier = createLockNotifier(diskCacheManagerRequired);
pageAllocator = createPageAllocator(diskCacheManagerRequired);
@@ -132,22 +131,20 @@
}
public static CloudConfigurator of(CloudProperties cloudProperties, IIOManager ioManager,
- INamespacePathResolver nsPathResolver, ICloudGuardian cloudGuardian, ExecutorService executor)
- throws HyracksDataException {
- return new CloudConfigurator(cloudProperties, ioManager, nsPathResolver, cloudGuardian, executor);
+ INamespacePathResolver nsPathResolver, ICloudGuardian cloudGuardian) throws HyracksDataException {
+ return new CloudConfigurator(cloudProperties, ioManager, nsPathResolver, cloudGuardian);
}
public static AbstractCloudIOManager createIOManager(IIOManager ioManager, CloudProperties cloudProperties,
- INamespacePathResolver nsPathResolver, ICloudGuardian guardian, ExecutorService executor)
- throws HyracksDataException {
+ INamespacePathResolver nsPathResolver, ICloudGuardian guardian) throws HyracksDataException {
IOManager localIoManager = (IOManager) ioManager;
CloudCachePolicy policy = cloudProperties.getCloudCachePolicy();
if (policy == CloudCachePolicy.EAGER) {
- return new EagerCloudIOManager(localIoManager, cloudProperties, nsPathResolver, guardian, executor);
+ return new EagerCloudIOManager(localIoManager, cloudProperties, nsPathResolver, guardian);
}
boolean selective = policy == CloudCachePolicy.SELECTIVE;
- return new LazyCloudIOManager(localIoManager, cloudProperties, nsPathResolver, selective, guardian, executor);
+ return new LazyCloudIOManager(localIoManager, cloudProperties, nsPathResolver, selective, guardian);
}
private static IPhysicalDrive createPhysicalDrive(boolean diskCacheManagerRequired, CloudProperties cloudProperties,
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudManagerProvider.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudManagerProvider.java
index b944161..b49f3ce 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudManagerProvider.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudManagerProvider.java
@@ -18,8 +18,6 @@
*/
package org.apache.asterix.cloud;
-import java.util.concurrent.ExecutorService;
-
import org.apache.asterix.cloud.clients.ICloudGuardian;
import org.apache.asterix.common.api.INamespacePathResolver;
import org.apache.asterix.common.cloud.CloudCachePolicy;
@@ -34,14 +32,13 @@
}
public static IIOManager createIOManager(CloudProperties cloudProperties, IIOManager ioManager,
- INamespacePathResolver nsPathResolver, ICloudGuardian guardian, ExecutorService executor)
- throws HyracksDataException {
+ INamespacePathResolver nsPathResolver, ICloudGuardian guardian) throws HyracksDataException {
IOManager localIoManager = (IOManager) ioManager;
if (cloudProperties.getCloudCachePolicy() == CloudCachePolicy.LAZY) {
- return new LazyCloudIOManager(localIoManager, cloudProperties, nsPathResolver, false, guardian, executor);
+ return new LazyCloudIOManager(localIoManager, cloudProperties, nsPathResolver, false, guardian);
}
- return new EagerCloudIOManager(localIoManager, cloudProperties, nsPathResolver, guardian, executor);
+ return new EagerCloudIOManager(localIoManager, cloudProperties, nsPathResolver, guardian);
}
public static IPartitionBootstrapper getCloudPartitionBootstrapper(IIOManager ioManager) {
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
index 29e5da0..1cb6077 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java
@@ -23,7 +23,6 @@
import java.io.File;
import java.util.Collections;
import java.util.Set;
-import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import org.apache.asterix.cloud.clients.ICloudGuardian;
@@ -49,9 +48,8 @@
private static final Logger LOGGER = LogManager.getLogger();
public EagerCloudIOManager(IOManager ioManager, CloudProperties cloudProperties,
- INamespacePathResolver nsPathResolver, ICloudGuardian guardian, ExecutorService executor)
- throws HyracksDataException {
- super(ioManager, cloudProperties, nsPathResolver, guardian, executor);
+ INamespacePathResolver nsPathResolver, ICloudGuardian guardian) throws HyracksDataException {
+ super(ioManager, cloudProperties, nsPathResolver, guardian);
}
/*
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
index 7a7ea06..1c5efd9 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
@@ -29,7 +29,6 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import org.apache.asterix.cloud.bulk.DeleteBulkCloudOperation;
@@ -71,9 +70,9 @@
private ILazyAccessor accessor;
public LazyCloudIOManager(IOManager ioManager, CloudProperties cloudProperties,
- INamespacePathResolver nsPathResolver, boolean selective, ICloudGuardian guardian, ExecutorService executor)
+ INamespacePathResolver nsPathResolver, boolean selective, ICloudGuardian guardian)
throws HyracksDataException {
- super(ioManager, cloudProperties, nsPathResolver, guardian, executor);
+ super(ioManager, cloudProperties, nsPathResolver, guardian);
accessor = new InitialCloudAccessor(cloudClient, bucket, localIoManager);
puncher = HolePuncherProvider.get(this, cloudProperties, writeBufferProvider);
if (selective) {
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
index a0c8bd0..c98c6b4 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
@@ -18,8 +18,6 @@
*/
package org.apache.asterix.cloud.clients;
-import java.util.concurrent.ExecutorService;
-
import org.apache.asterix.cloud.clients.aws.s3.S3ClientConfig;
import org.apache.asterix.cloud.clients.aws.s3.S3CloudClient;
import org.apache.asterix.cloud.clients.azure.blobstorage.AzBlobStorageClientConfig;
@@ -40,8 +38,8 @@
throw new AssertionError("do not instantiate");
}
- public static ICloudClient getClient(CloudProperties cloudProperties, ICloudGuardian guardian,
- ExecutorService executor) throws HyracksDataException {
+ public static ICloudClient getClient(CloudProperties cloudProperties, ICloudGuardian guardian)
+ throws HyracksDataException {
String storageScheme = cloudProperties.getStorageScheme();
ICloudClient cloudClient;
if (S3.equalsIgnoreCase(storageScheme)) {
@@ -49,7 +47,7 @@
cloudClient = new S3CloudClient(config, guardian);
} else if (GCS.equalsIgnoreCase(storageScheme)) {
GCSClientConfig config = GCSClientConfig.of(cloudProperties);
- cloudClient = new GCSCloudClient(config, guardian, executor);
+ cloudClient = new GCSCloudClient(config, guardian);
} else if (AZ_BLOB.equalsIgnoreCase(storageScheme)) {
AzBlobStorageClientConfig config = AzBlobStorageClientConfig.of(cloudProperties);
cloudClient = new AzBlobStorageCloudClient(config, guardian);
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
index 16fb278..62c1835 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java
@@ -19,6 +19,7 @@
package org.apache.asterix.cloud.clients.google.gcs;
import static org.apache.asterix.cloud.clients.google.gcs.GCSClientConfig.DELETE_BATCH_SIZE;
+import static org.apache.asterix.external.util.google.gcs.GCSConstants.DEFAULT_NO_RETRY_ON_THREAD_INTERRUPT_STRATEGY;
import java.io.FilenameFilter;
import java.io.IOException;
@@ -31,11 +32,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.function.Supplier;
import org.apache.asterix.cloud.IWriteBufferProvider;
import org.apache.asterix.cloud.clients.CloudFile;
@@ -71,7 +67,6 @@
import com.google.cloud.storage.Storage.CopyRequest;
import com.google.cloud.storage.StorageBatch;
import com.google.cloud.storage.StorageBatchResult;
-import com.google.cloud.storage.StorageException;
import com.google.cloud.storage.StorageOptions;
public class GCSCloudClient implements ICloudClient {
@@ -81,15 +76,12 @@
private final ICloudGuardian guardian;
private final IRequestProfilerLimiter profilerLimiter;
private final int writeBufferSize;
- private final ExecutorService executor;
- public GCSCloudClient(GCSClientConfig config, Storage gcsClient, ICloudGuardian guardian,
- ExecutorService executor) {
+ public GCSCloudClient(GCSClientConfig config, Storage gcsClient, ICloudGuardian guardian) {
this.gcsClient = gcsClient;
this.config = config;
this.guardian = guardian;
this.writeBufferSize = config.getWriteBufferSize();
- this.executor = executor;
long profilerInterval = config.getProfilerLogInterval();
GCSRequestRateLimiter limiter = new GCSRequestRateLimiter(config);
if (profilerInterval > 0) {
@@ -100,9 +92,8 @@
guardian.setCloudClient(this);
}
- public GCSCloudClient(GCSClientConfig config, ICloudGuardian guardian, ExecutorService executor)
- throws HyracksDataException {
- this(config, buildClient(config), guardian, executor);
+ public GCSCloudClient(GCSClientConfig config, ICloudGuardian guardian) throws HyracksDataException {
+ this(config, buildClient(config), guardian);
}
@Override
@@ -121,12 +112,11 @@
}
@Override
- public Set<CloudFile> listObjects(String bucket, String path, FilenameFilter filter) throws HyracksDataException {
+ public Set<CloudFile> listObjects(String bucket, String path, FilenameFilter filter) {
guardian.checkReadAccess(bucket, path);
profilerLimiter.objectsList();
- // MB-65432: Storage.list is not interrupt-safe; we need to offload onto another thread
- Page<Blob> blobs = runOpInterruptibly(() -> gcsClient.list(bucket,
- BlobListOption.prefix(config.getPrefix() + path), BlobListOption.fields(Storage.BlobField.SIZE)));
+ Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.prefix(config.getPrefix() + path),
+ BlobListOption.fields(Storage.BlobField.SIZE));
Set<CloudFile> files = new HashSet<>();
for (Blob blob : blobs.iterateAll()) {
if (filter.accept(null, IoUtil.getFileNameFromPath(blob.getName()))) {
@@ -148,7 +138,7 @@
from.seek(offset + totalRead);
totalRead += from.read(buffer);
}
- } catch (IOException | StorageException ex) {
+ } catch (IOException | BaseServiceException ex) {
throw HyracksDataException.create(ex);
}
@@ -163,17 +153,14 @@
guardian.checkReadAccess(bucket, path);
profilerLimiter.objectGet();
BlobId blobId = BlobId.of(bucket, config.getPrefix() + path);
- // MB-65432: Storage.readAllBytes is not interrupt-safe; we need to offload onto another thread
- return runOpInterruptibly(() -> {
- try {
- return gcsClient.readAllBytes(blobId);
- } catch (StorageException e) {
- if (e.getCode() == 404) {
- return null;
- }
- throw e;
+ try {
+ return gcsClient.readAllBytes(blobId);
+ } catch (BaseServiceException ex) {
+ if (ex.getCode() == 404) {
+ return null;
}
- });
+ throw HyracksDataException.create(ex);
+ }
}
@Override
@@ -185,38 +172,33 @@
reader = gcsClient.reader(bucket, config.getPrefix() + path).limit(offset + length);
reader.seek(offset);
return Channels.newInputStream(reader);
- } catch (StorageException | IOException ex) {
+ } catch (BaseServiceException | IOException ex) {
throw new RuntimeException(CleanupUtils.close(reader, ex));
}
}
@Override
- public void write(String bucket, String path, byte[] data) throws HyracksDataException {
+ public void write(String bucket, String path, byte[] data) {
guardian.checkWriteAccess(bucket, path);
profilerLimiter.objectWrite();
BlobInfo blobInfo = BlobInfo.newBuilder(bucket, config.getPrefix() + path).build();
- // MB-65432: Storage.create is not interrupt-safe; we need to offload onto another thread
- runOpInterruptibly(() -> gcsClient.create(blobInfo, data));
+ gcsClient.create(blobInfo, data);
}
@Override
- public void copy(String bucket, String srcPath, FileReference destPath) throws HyracksDataException {
+ public void copy(String bucket, String srcPath, FileReference destPath) {
guardian.checkReadAccess(bucket, srcPath);
profilerLimiter.objectsList();
- // MB-65432: Storage.list & copy are not interrupt-safe; we need to offload onto another thread
- runOpInterruptibly(() -> {
- Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.prefix(config.getPrefix() + srcPath));
- for (Blob blob : blobs.iterateAll()) {
- profilerLimiter.objectCopy();
- BlobId source = blob.getBlobId();
- String targetName = destPath.getChildPath(IoUtil.getFileNameFromPath(source.getName()));
- BlobId target = BlobId.of(bucket, targetName);
- guardian.checkWriteAccess(bucket, targetName);
- CopyRequest copyReq = CopyRequest.newBuilder().setSource(source).setTarget(target).build();
- gcsClient.copy(copyReq);
- }
- return null;
- });
+ Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.prefix(config.getPrefix() + srcPath));
+ for (Blob blob : blobs.iterateAll()) {
+ profilerLimiter.objectCopy();
+ BlobId source = blob.getBlobId();
+ String targetName = destPath.getChildPath(IoUtil.getFileNameFromPath(source.getName()));
+ BlobId target = BlobId.of(bucket, targetName);
+ guardian.checkWriteAccess(bucket, targetName);
+ CopyRequest copyReq = CopyRequest.newBuilder().setSource(source).setTarget(target).build();
+ gcsClient.copy(copyReq);
+ }
}
@Override
@@ -234,8 +216,7 @@
guardian.checkWriteAccess(bucket, blobId.getName());
deleteResponses.add(batchRequest.delete(blobId));
}
- // MB-65432: StorageBatch.submit may not be interrupt-safe; we need to offload onto another thread
- runOpInterruptibly(batchRequest::submit);
+ batchRequest.submit();
Iterator<String> deletePathIter = paths.iterator();
for (StorageBatchResult<Boolean> deleteResponse : deleteResponses) {
String deletedPath = deletePathIter.next();
@@ -250,7 +231,7 @@
}
} catch (BaseServiceException e) {
LOGGER.warn("Failed to delete object {} while deleting {}", deletedPath, paths, e);
- throw new RuntimeDataException(ErrorCode.CLOUD_IO_FAILURE, e, "DELETE", deletedPath,
+ throw RuntimeDataException.create(ErrorCode.CLOUD_IO_FAILURE, e, "DELETE", deletedPath,
paths.toString());
}
}
@@ -259,12 +240,11 @@
}
@Override
- public long getObjectSize(String bucket, String path) throws HyracksDataException {
+ public long getObjectSize(String bucket, String path) {
guardian.checkReadAccess(bucket, path);
profilerLimiter.objectGet();
- // MB-65432: Storage.get is not interrupt-safe; we need to offload onto another thread
- Blob blob = runOpInterruptibly(() -> gcsClient.get(bucket, config.getPrefix() + path,
- Storage.BlobGetOption.fields(Storage.BlobField.SIZE)));
+ Blob blob =
+ gcsClient.get(bucket, config.getPrefix() + path, Storage.BlobGetOption.fields(Storage.BlobField.SIZE));
if (blob == null) {
return 0;
}
@@ -272,22 +252,19 @@
}
@Override
- public boolean exists(String bucket, String path) throws HyracksDataException {
+ public boolean exists(String bucket, String path) {
guardian.checkReadAccess(bucket, path);
profilerLimiter.objectGet();
- // MB-65432: Storage.get is not interrupt-safe; we need to offload onto another thread
- Blob blob = runOpInterruptibly(() -> gcsClient.get(bucket, config.getPrefix() + path,
- Storage.BlobGetOption.fields(Storage.BlobField.values())));
+ Blob blob = gcsClient.get(bucket, config.getPrefix() + path,
+ Storage.BlobGetOption.fields(Storage.BlobField.values()));
return blob != null && blob.exists();
}
@Override
- public boolean isEmptyPrefix(String bucket, String path) throws HyracksDataException {
+ public boolean isEmptyPrefix(String bucket, String path) {
guardian.checkReadAccess(bucket, path);
profilerLimiter.objectsList();
- // MB-65432: Storage.list is not interrupt-safe; we need to offload onto another thread
- Page<Blob> blobs =
- runOpInterruptibly(() -> gcsClient.list(bucket, BlobListOption.prefix(config.getPrefix() + path)));
+ Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.prefix(config.getPrefix() + path));
return !blobs.iterateAll().iterator().hasNext();
}
@@ -298,12 +275,10 @@
}
@Override
- public JsonNode listAsJson(ObjectMapper objectMapper, String bucket) throws HyracksDataException {
+ public JsonNode listAsJson(ObjectMapper objectMapper, String bucket) {
guardian.checkReadAccess(bucket, "/");
profilerLimiter.objectsList();
- // MB-65432: Storage.list is not interrupt-safe; we need to offload onto another thread
- Page<Blob> blobs =
- runOpInterruptibly(() -> gcsClient.list(bucket, BlobListOption.fields(Storage.BlobField.SIZE)));
+ Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.fields(Storage.BlobField.SIZE));
ArrayNode objectsInfo = objectMapper.createArrayNode();
List<Blob> objects = new ArrayList<>();
@@ -328,6 +303,7 @@
private static Storage buildClient(GCSClientConfig config) throws HyracksDataException {
StorageOptions.Builder builder = StorageOptions.newBuilder().setCredentials(config.createCredentialsProvider());
+ builder.setStorageRetryStrategy(DEFAULT_NO_RETRY_ON_THREAD_INTERRUPT_STRATEGY);
if (config.getEndpoint() != null && !config.getEndpoint().isEmpty()) {
builder.setHost(config.getEndpoint());
@@ -338,42 +314,4 @@
private String stripCloudPrefix(String objectName) {
return objectName.substring(config.getPrefix().length());
}
-
- private void runOpInterruptibly(Runnable operation) throws HyracksDataException {
- try {
- executor.submit(operation).get();
- } catch (InterruptedException e) {
- throw HyracksDataException.create(e);
- } catch (ExecutionException e) {
- throw HyracksDataException.create(e.getCause());
- }
- }
-
- private <T> T runOpInterruptibly(Supplier<T> operation) throws HyracksDataException {
- Future<T> opTask = executor.submit(operation::get);
- try {
- return opTask.get();
- } catch (InterruptedException e) {
- cancelAndUnwind(opTask);
- throw HyracksDataException.create(e);
- } catch (ExecutionException e) {
- throw HyracksDataException.create(e.getCause());
- }
- }
-
- private static <T> void cancelAndUnwind(Future<T> opTask) {
- opTask.cancel(true);
- while (true) {
- try {
- opTask.get();
- } catch (InterruptedException e1) {
- continue;
- } catch (CancellationException e1) {
- LOGGER.debug("ignoring exception after cancel of op", e1);
- } catch (ExecutionException e1) {
- LOGGER.debug("ignoring exception after cancel of op", e1.getCause());
- }
- return;
- }
- }
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java
index cbbaf31..b9e7eee 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.cloud.clients.google.gcs;
+import static org.apache.asterix.external.util.google.gcs.GCSConstants.DEFAULT_NO_RETRY_ON_THREAD_INTERRUPT_STRATEGY;
+
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
@@ -64,6 +66,7 @@
this.ioManager = ioManager;
this.profiler = profiler;
StorageOptions.Builder builder = StorageOptions.newBuilder();
+ builder.setStorageRetryStrategy(DEFAULT_NO_RETRY_ON_THREAD_INTERRUPT_STRATEGY);
if (config.getEndpoint() != null && !config.getEndpoint().isEmpty()) {
builder.setHost(config.getEndpoint());
}
@@ -95,7 +98,6 @@
downloadJobs.add(transferManager.downloadBlobs(entry.getValue(),
downConfig.setDownloadDirectory(entry.getKey()).build()));
}
- // MB-65432: DownloadJob.getDownloadResults is interrupt-safe; no need to offload
downloadJobs.forEach(DownloadJob::getDownloadResults);
}
@@ -122,7 +124,6 @@
}
List<DownloadResult> results;
for (DownloadJob job : downloadJobs) {
- // MB-65432: DownloadJob.getDownloadResults is interrupt-safe; no need to offload
results = job.getDownloadResults();
for (DownloadResult result : results) {
if (result.getStatus() != TransferStatus.SUCCESS) {
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
index 36bb94c..3a34365 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java
@@ -67,8 +67,7 @@
ICloudClient createCloudClient(IApplicationContext appCtx) throws CompilationException {
GCSClientConfig config = GCSClientConfig.of(configuration, writeBufferSize);
return new GCSCloudClient(config, GCSUtils.buildClient(configuration),
- ICloudGuardian.NoOpCloudGuardian.INSTANCE,
- appCtx.getServiceContext().getControllerService().getExecutor());
+ ICloudGuardian.NoOpCloudGuardian.INSTANCE);
}
@Override
diff --git a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java
index 2f64c37..d89c872 100644
--- a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java
+++ b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java
@@ -18,8 +18,6 @@
*/
package org.apache.asterix.cloud.gcs;
-import java.util.concurrent.Executors;
-
import org.apache.asterix.cloud.AbstractLSMTest;
import org.apache.asterix.cloud.clients.ICloudGuardian;
import org.apache.asterix.cloud.clients.google.gcs.GCSClientConfig;
@@ -54,8 +52,7 @@
int writeBufferSize = StorageUtil.getIntSizeInBytes(5, StorageUtil.StorageUnit.MEGABYTE);
GCSClientConfig config =
new GCSClientConfig(MOCK_SERVER_REGION, MOCK_SERVER_HOSTNAME, true, 0, writeBufferSize, "");
- CLOUD_CLIENT =
- new GCSCloudClient(config, ICloudGuardian.NoOpCloudGuardian.INSTANCE, Executors.newCachedThreadPool());
+ CLOUD_CLIENT = new GCSCloudClient(config, ICloudGuardian.NoOpCloudGuardian.INSTANCE);
}
private static void cleanup() {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java
index 2613f34..739dbde 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java
@@ -18,7 +18,23 @@
*/
package org.apache.asterix.external.util.google.gcs;
+import static org.apache.commons.lang3.exception.ExceptionUtils.getStackTrace;
+
+import java.util.concurrent.CancellationException;
+
+import org.apache.hyracks.api.util.ExceptionUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import com.google.api.gax.retrying.ResultRetryAlgorithm;
+import com.google.api.gax.retrying.TimedAttemptSettings;
+import com.google.cloud.ExceptionHandler;
+import com.google.cloud.storage.StorageRetryStrategy;
+
public class GCSConstants {
+
+ private static final Logger LOGGER = LogManager.getLogger();
+
private GCSConstants() {
throw new AssertionError("do not instantiate");
}
@@ -36,25 +52,87 @@
// hadoop credentials
public static final String HADOOP_AUTH_TYPE = "fs.gs.auth.type";
public static final String HADOOP_AUTH_UNAUTHENTICATED = "UNAUTHENTICATED";
- public static final String HADOOP_AUTH_SERVICE_ACCOUNT_JSON_KEY_FILE = "SERVICE_ACCOUNT_JSON_KEYFILE";
- public static final String HADOOP_AUTH_SERVICE_ACCOUNT_JSON_KEY_FILE_PATH =
- "google.cloud.auth.service.account.json.keyfile";
// gs hadoop parameters
public static final String HADOOP_SUPPORT_COMPRESSED = "fs.gs.inputstream.support.gzip.encoding.enable";
public static final String HADOOP_ENDPOINT = "fs.gs.storage.root.url";
- public static final String HADOOP_MAX_REQUESTS_PER_BATCH = "fs.gs.max.requests.per.batch";
- public static final String HADOOP_BATCH_THREADS = "fs.gs.batch.threads";
- public static class JSON_CREDENTIALS_FIELDS {
+ public static class JsonCredentials {
public static final String PRIVATE_KEY_ID = "private_key_id";
public static final String PRIVATE_KEY = "private_key";
public static final String CLIENT_EMAIL = "client_email";
}
- public static class HADOOP_AUTH_SERVICE_ACCOUNT_JSON_FIELDS {
+ public static class HadoopAuthServiceAccount {
public static final String PRIVATE_KEY_ID = "fs.gs.auth.service.account.private.key.id";
public static final String PRIVATE_KEY = "fs.gs.auth.service.account.private.key";
public static final String CLIENT_EMAIL = "fs.gs.auth.service.account.email";
}
+
+ public static final StorageRetryStrategy DEFAULT_NO_RETRY_ON_THREAD_INTERRUPT_STRATEGY;
+ static {
+ StorageRetryStrategy defaultStrategy = StorageRetryStrategy.getDefaultStorageRetryStrategy();
+ ExceptionHandler defaultIdempotentHandler = (ExceptionHandler) defaultStrategy.getIdempotentHandler();
+ ExceptionHandler defaultNonIdempotentHandler = (ExceptionHandler) defaultStrategy.getNonidempotentHandler();
+
+ ResultRetryAlgorithm<Object> noRetryOnThreadInterruptIdempotentHandler = new ResultRetryAlgorithm<>() {
+ @Override
+ public TimedAttemptSettings createNextAttempt(Throwable prevThrowable, Object prevResponse,
+ TimedAttemptSettings prevSettings) {
+ return defaultIdempotentHandler.createNextAttempt(prevThrowable, prevResponse, prevSettings);
+ }
+
+ @Override
+ public boolean shouldRetry(Throwable prevThrowable, Object prevResponse) throws CancellationException {
+ if (ExceptionUtils.causedByInterrupt(prevThrowable) || Thread.currentThread().isInterrupted()) {
+ interruptRequest(prevThrowable);
+ }
+ return defaultIdempotentHandler.shouldRetry(prevThrowable, prevResponse);
+ }
+ };
+
+ ResultRetryAlgorithm<Object> noRetryOnThreadInterruptNonIdempotentHandler = new ResultRetryAlgorithm<>() {
+ @Override
+ public TimedAttemptSettings createNextAttempt(Throwable prevThrowable, Object prevResponse,
+ TimedAttemptSettings prevSettings) {
+ return defaultNonIdempotentHandler.createNextAttempt(prevThrowable, prevResponse, prevSettings);
+ }
+
+ @Override
+ public boolean shouldRetry(Throwable prevThrowable, Object prevResponse) throws CancellationException {
+ if (ExceptionUtils.causedByInterrupt(prevThrowable) || Thread.currentThread().isInterrupted()) {
+ interruptRequest(prevThrowable);
+ }
+ return defaultNonIdempotentHandler.shouldRetry(prevThrowable, prevResponse);
+ }
+ };
+
+ DEFAULT_NO_RETRY_ON_THREAD_INTERRUPT_STRATEGY = new StorageRetryStrategy() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ResultRetryAlgorithm<?> getIdempotentHandler() {
+ return noRetryOnThreadInterruptIdempotentHandler;
+ }
+
+ @Override
+ public ResultRetryAlgorithm<?> getNonidempotentHandler() {
+ return noRetryOnThreadInterruptNonIdempotentHandler;
+ }
+ };
+ }
+
+ /**
+ * Throwing a CancellationException will cause the GCS client to abort the whole operation, not only stop retrying
+ */
+ private static void interruptRequest(Throwable th) {
+ Thread.currentThread().interrupt();
+ CancellationException ex = new CancellationException("Request was interrupted, aborting retries and request");
+ if (th != null) {
+ ex.initCause(th);
+ }
+ String stackTrace = getStackTrace(ex);
+ LOGGER.debug("Request was interrupted, aborting retries and request\n{}", stackTrace);
+ throw ex;
+ }
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
index 481b7ff..d768c23 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
@@ -26,6 +26,7 @@
import static org.apache.asterix.external.util.ExternalDataUtils.validateDeltaTableProperties;
import static org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude;
import static org.apache.asterix.external.util.google.gcs.GCSConstants.APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME;
+import static org.apache.asterix.external.util.google.gcs.GCSConstants.DEFAULT_NO_RETRY_ON_THREAD_INTERRUPT_STRATEGY;
import static org.apache.asterix.external.util.google.gcs.GCSConstants.ENDPOINT_FIELD_NAME;
import static org.apache.asterix.external.util.google.gcs.GCSConstants.HADOOP_AUTH_TYPE;
import static org.apache.asterix.external.util.google.gcs.GCSConstants.HADOOP_AUTH_UNAUTHENTICATED;
@@ -94,6 +95,7 @@
String endpoint = configuration.get(ENDPOINT_FIELD_NAME);
StorageOptions.Builder builder = StorageOptions.newBuilder();
+ builder.setStorageRetryStrategy(DEFAULT_NO_RETRY_ON_THREAD_INTERRUPT_STRATEGY);
// default credentials provider
if (applicationDefaultCredentials != null) {
@@ -259,12 +261,12 @@
// Setting these values instead of HADOOP_AUTH_SERVICE_ACCOUNT_JSON_KEY_FILE_PATH is supported
// in com.google.cloud.bigdataoss:util-hadoop only up to version hadoop3-2.2.x and is removed in
// version 3.x.y, which also removed support for hadoop-2
- conf.set(GCSConstants.HADOOP_AUTH_SERVICE_ACCOUNT_JSON_FIELDS.PRIVATE_KEY_ID,
- jsonCreds.get(GCSConstants.JSON_CREDENTIALS_FIELDS.PRIVATE_KEY_ID).asText());
- conf.set(GCSConstants.HADOOP_AUTH_SERVICE_ACCOUNT_JSON_FIELDS.PRIVATE_KEY,
- jsonCreds.get(GCSConstants.JSON_CREDENTIALS_FIELDS.PRIVATE_KEY).asText());
- conf.set(GCSConstants.HADOOP_AUTH_SERVICE_ACCOUNT_JSON_FIELDS.CLIENT_EMAIL,
- jsonCreds.get(GCSConstants.JSON_CREDENTIALS_FIELDS.CLIENT_EMAIL).asText());
+ conf.set(GCSConstants.HadoopAuthServiceAccount.PRIVATE_KEY_ID,
+ jsonCreds.get(GCSConstants.JsonCredentials.PRIVATE_KEY_ID).asText());
+ conf.set(GCSConstants.HadoopAuthServiceAccount.PRIVATE_KEY,
+ jsonCreds.get(GCSConstants.JsonCredentials.PRIVATE_KEY).asText());
+ conf.set(GCSConstants.HadoopAuthServiceAccount.CLIENT_EMAIL,
+ jsonCreds.get(GCSConstants.JsonCredentials.CLIENT_EMAIL).asText());
} catch (JsonProcessingException e) {
throw CompilationException.create(EXTERNAL_SOURCE_ERROR, "Unable to parse Json Credentials",
getMessageOrToString(e));
diff --git a/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/exceptions/AlgebricksException.java b/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/exceptions/AlgebricksException.java
index 0b7af91..483076b 100644
--- a/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/exceptions/AlgebricksException.java
+++ b/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/exceptions/AlgebricksException.java
@@ -61,6 +61,10 @@
this.sourceLoc = sourceLoc;
this.nodeId = nodeId;
this.params = params;
+
+ if (cause instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
}
/**
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
index 46c8ec2..50df1cb 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
@@ -43,8 +43,6 @@
}
if (cause instanceof HyracksDataException) {
return (HyracksDataException) cause;
- } else if (cause instanceof InterruptedException) {
- Thread.currentThread().interrupt();
} else if (cause instanceof Error) {
throw (Error) cause;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
index 12f1095..b12c8df 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java
@@ -79,6 +79,10 @@
this.errorCode = intCode;
this.nodeId = nodeId;
this.params = params;
+
+ if (cause instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
}
protected HyracksException(IError errorCode, Throwable cause, SourceLocation sourceLoc, String nodeId,
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/ExceptionUtils.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/ExceptionUtils.java
index d616eb5..ddba3f0 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/ExceptionUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/ExceptionUtils.java
@@ -128,6 +128,9 @@
}
public static Throwable getRootCause(Throwable e) {
+ if (e == null) {
+ return null;
+ }
Throwable current = e;
Throwable cause = e.getCause();
while (cause != null && cause != current) {