[NO ISSUE][*DB][STO] Support validating returned objects from list op
Add new parameter, CLOUD_STORAGE_LIST_EVENTUALLY_CONSISTENT, which
indicates when a cloud provider's list result may include deleted
objects. This allows the returned list to be validated before further
processing.
Ext-ref: MB-66343
Change-Id: I7520dea29d15110e85ff770d6454cb69cdeaec51
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19724
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
index 0253348..d71912c 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
@@ -29,6 +29,7 @@
"cloud.storage.endpoint" : "",
"cloud.storage.force.path.style" : false,
"cloud.storage.index.inactive.duration.threshold" : 360,
+ "cloud.storage.list.eventually.consistent" : false,
"cloud.storage.prefix" : "",
"cloud.storage.region" : "",
"cloud.storage.scheme" : "",
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
index 14a4710..f16ae94c 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
@@ -29,6 +29,7 @@
"cloud.storage.endpoint" : "",
"cloud.storage.force.path.style" : false,
"cloud.storage.index.inactive.duration.threshold" : 360,
+ "cloud.storage.list.eventually.consistent" : false,
"cloud.storage.prefix" : "",
"cloud.storage.region" : "",
"cloud.storage.scheme" : "",
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
index 8e9b187..9871dfc 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
@@ -29,6 +29,7 @@
"cloud.storage.endpoint" : "",
"cloud.storage.force.path.style" : false,
"cloud.storage.index.inactive.duration.threshold" : 360,
+ "cloud.storage.list.eventually.consistent" : false,
"cloud.storage.prefix" : "",
"cloud.storage.region" : "",
"cloud.storage.scheme" : "",
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
index 5414099..e0449b6 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
@@ -44,16 +44,18 @@
private final int requestsMaxHttpConnections;
private final boolean forcePathStyle;
private final boolean disableSslVerify;
+ private final boolean storageListEventuallyConsistent;
public S3ClientConfig(String region, String endpoint, String prefix, boolean anonymousAuth,
long profilerLogInterval, int writeBufferSize) {
- this(region, endpoint, prefix, anonymousAuth, profilerLogInterval, writeBufferSize, 1, 0, 0, 0, false, false);
+ this(region, endpoint, prefix, anonymousAuth, profilerLogInterval, writeBufferSize, 1, 0, 0, 0, false, false,
+ false);
}
private S3ClientConfig(String region, String endpoint, String prefix, boolean anonymousAuth,
long profilerLogInterval, int writeBufferSize, long tokenAcquireTimeout, int writeMaxRequestsPerSeconds,
int readMaxRequestsPerSeconds, int requestsMaxHttpConnections, boolean forcePathStyle,
- boolean disableSslVerify) {
+ boolean disableSslVerify, boolean storageListEventuallyConsistent) {
this.region = Objects.requireNonNull(region, "region");
this.endpoint = endpoint;
this.prefix = Objects.requireNonNull(prefix, "prefix");
@@ -66,6 +68,7 @@
this.requestsMaxHttpConnections = requestsMaxHttpConnections;
this.forcePathStyle = forcePathStyle;
this.disableSslVerify = disableSslVerify;
+ this.storageListEventuallyConsistent = storageListEventuallyConsistent;
}
public static S3ClientConfig of(CloudProperties cloudProperties) {
@@ -74,7 +77,8 @@
cloudProperties.getProfilerLogInterval(), cloudProperties.getWriteBufferSize(),
cloudProperties.getTokenAcquireTimeout(), cloudProperties.getWriteMaxRequestsPerSecond(),
cloudProperties.getReadMaxRequestsPerSecond(), cloudProperties.getRequestsMaxHttpConnections(),
- cloudProperties.isStorageForcePathStyle(), cloudProperties.isStorageDisableSSLVerify());
+ cloudProperties.isStorageForcePathStyle(), cloudProperties.isStorageDisableSSLVerify(),
+ cloudProperties.isStorageListEventuallyConsistent());
}
public static S3ClientConfig of(Map<String, String> configuration, int writeBufferSize) {
@@ -144,6 +148,10 @@
return forcePathStyle;
}
+ public boolean isStorageListEventuallyConsistent() {
+ return storageListEventuallyConsistent;
+ }
+
private boolean isS3Mock() {
return endpoint != null && !endpoint.isEmpty();
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
index e4ba4b4..77d34a2 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
@@ -33,6 +33,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Set;
+import java.util.function.Function;
import org.apache.asterix.cloud.CloudResettableInputStream;
import org.apache.asterix.cloud.IWriteBufferProvider;
@@ -132,7 +133,8 @@
guardian.checkReadAccess(bucket, path);
profiler.objectsList();
path = config.isLocalS3Provider() ? encodeURI(path) : path;
- return filterAndGet(listS3Objects(s3Client, bucket, config.getPrefix() + path), filter);
+ return ensureListConsistent(filterAndGet(listS3Objects(s3Client, bucket, config.getPrefix() + path), filter),
+ bucket, CloudFile::getPath);
}
@Override
@@ -221,7 +223,15 @@
String destKey = destPath.getChildPath(IoUtil.getFileNameFromPath(srcKey));
CopyObjectRequest copyReq = CopyObjectRequest.builder().sourceBucket(bucket).sourceKey(srcKey)
.destinationBucket(bucket).destinationKey(config.getPrefix() + destKey).build();
- s3Client.copyObject(copyReq);
+ try {
+ s3Client.copyObject(copyReq);
+ } catch (NoSuchKeyException ex) {
+ if (config.isStorageListEventuallyConsistent()) {
+ LOGGER.warn("ignoring 404 on copy of {} since list is configured as eventually consistent", srcKey);
+ } else {
+ throw ex;
+ }
+ }
}
}
@@ -300,7 +310,8 @@
@Override
public JsonNode listAsJson(ObjectMapper objectMapper, String bucket) {
- List<S3Object> objects = listS3Objects(s3Client, bucket, config.getPrefix());
+ List<S3Object> objects =
+ ensureListConsistent(listS3Objects(s3Client, bucket, config.getPrefix()), bucket, S3Object::key);
ArrayNode objectsInfo = objectMapper.createArrayNode();
objects.sort((x, y) -> String.CASE_INSENSITIVE_ORDER.compare(x.key(), y.key()));
@@ -357,4 +368,24 @@
}
return files;
}
+
+ private <T, C extends Collection<T>> C ensureListConsistent(C cloudFiles, String bucket,
+ Function<T, String> pathExtractor) {
+ if (config.isStorageListEventuallyConsistent()) {
+ return cloudFiles;
+ }
+ Iterator<T> iterator = cloudFiles.iterator();
+ while (iterator.hasNext()) {
+ String path = pathExtractor.apply(iterator.next());
+ try {
+ if (!exists(bucket, path)) {
+ LOGGER.warn("Removing non-existent file from list result: {}", path);
+ iterator.remove();
+ }
+ } catch (HyracksDataException e) {
+ LOGGER.warn("Ignoring exception on exists check on {}", path, e);
+ }
+ }
+ return cloudFiles;
+ }
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
index 1af4824..4354e63 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
@@ -67,7 +67,8 @@
CLOUD_EVICTION_PLAN_REEVALUATE_THRESHOLD(POSITIVE_INTEGER, 50),
CLOUD_REQUESTS_MAX_HTTP_CONNECTIONS(POSITIVE_INTEGER, 1000),
CLOUD_STORAGE_FORCE_PATH_STYLE(BOOLEAN, false),
- CLOUD_STORAGE_DISABLE_SSL_VERIFY(BOOLEAN, false);
+ CLOUD_STORAGE_DISABLE_SSL_VERIFY(BOOLEAN, false),
+ CLOUD_STORAGE_LIST_EVENTUALLY_CONSISTENT(BOOLEAN, false);
private final IOptionType interpreter;
private final Object defaultValue;
@@ -102,6 +103,7 @@
case CLOUD_REQUESTS_MAX_HTTP_CONNECTIONS:
case CLOUD_STORAGE_FORCE_PATH_STYLE:
case CLOUD_STORAGE_DISABLE_SSL_VERIFY:
+ case CLOUD_STORAGE_LIST_EVENTUALLY_CONSISTENT:
return Section.COMMON;
default:
return Section.NC;
@@ -177,6 +179,9 @@
case CLOUD_STORAGE_DISABLE_SSL_VERIFY:
return "Indicates whether or not to disable SSL certificate verification on the cloud storage. "
+ "(default: false)";
+ case CLOUD_STORAGE_LIST_EVENTUALLY_CONSISTENT:
+ return "Indicates whether or not deleted objects may be contained in list operations for some time"
+ + "after they are deleted. (default: false)";
default:
throw new IllegalStateException("NYI: " + this);
}
@@ -284,4 +289,8 @@
public boolean isStorageDisableSSLVerify() {
return accessor.getBoolean(Option.CLOUD_STORAGE_DISABLE_SSL_VERIFY);
}
+
+ public boolean isStorageListEventuallyConsistent() {
+ return accessor.getBoolean(Option.CLOUD_STORAGE_LIST_EVENTUALLY_CONSISTENT);
+ }
}