[NO ISSUE][*DB][STO] Support validating returned objects from list op

Add new parameter, CLOUD_STORAGE_LIST_EVENTUALLY_CONSISTENT, which
indicates when a cloud provider's list result may include deleted
objects. This allows the returned list to be validated before further
processing.

Ext-ref: MB-66343
Change-Id: I7520dea29d15110e85ff770d6454cb69cdeaec51
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19724
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
index 0253348..d71912c 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
@@ -29,6 +29,7 @@
     "cloud.storage.endpoint" : "",
     "cloud.storage.force.path.style" : false,
     "cloud.storage.index.inactive.duration.threshold" : 360,
+    "cloud.storage.list.eventually.consistent" : false,
     "cloud.storage.prefix" : "",
     "cloud.storage.region" : "",
     "cloud.storage.scheme" : "",
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
index 14a4710..f16ae94c 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
@@ -29,6 +29,7 @@
     "cloud.storage.endpoint" : "",
     "cloud.storage.force.path.style" : false,
     "cloud.storage.index.inactive.duration.threshold" : 360,
+    "cloud.storage.list.eventually.consistent" : false,
     "cloud.storage.prefix" : "",
     "cloud.storage.region" : "",
     "cloud.storage.scheme" : "",
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
index 8e9b187..9871dfc 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
@@ -29,6 +29,7 @@
     "cloud.storage.endpoint" : "",
     "cloud.storage.force.path.style" : false,
     "cloud.storage.index.inactive.duration.threshold" : 360,
+    "cloud.storage.list.eventually.consistent" : false,
     "cloud.storage.prefix" : "",
     "cloud.storage.region" : "",
     "cloud.storage.scheme" : "",
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
index 5414099..e0449b6 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
@@ -44,16 +44,18 @@
     private final int requestsMaxHttpConnections;
     private final boolean forcePathStyle;
     private final boolean disableSslVerify;
+    private final boolean storageListEventuallyConsistent;
 
     public S3ClientConfig(String region, String endpoint, String prefix, boolean anonymousAuth,
             long profilerLogInterval, int writeBufferSize) {
-        this(region, endpoint, prefix, anonymousAuth, profilerLogInterval, writeBufferSize, 1, 0, 0, 0, false, false);
+        this(region, endpoint, prefix, anonymousAuth, profilerLogInterval, writeBufferSize, 1, 0, 0, 0, false, false,
+                false);
     }
 
     private S3ClientConfig(String region, String endpoint, String prefix, boolean anonymousAuth,
             long profilerLogInterval, int writeBufferSize, long tokenAcquireTimeout, int writeMaxRequestsPerSeconds,
             int readMaxRequestsPerSeconds, int requestsMaxHttpConnections, boolean forcePathStyle,
-            boolean disableSslVerify) {
+            boolean disableSslVerify, boolean storageListEventuallyConsistent) {
         this.region = Objects.requireNonNull(region, "region");
         this.endpoint = endpoint;
         this.prefix = Objects.requireNonNull(prefix, "prefix");
@@ -66,6 +68,7 @@
         this.requestsMaxHttpConnections = requestsMaxHttpConnections;
         this.forcePathStyle = forcePathStyle;
         this.disableSslVerify = disableSslVerify;
+        this.storageListEventuallyConsistent = storageListEventuallyConsistent;
     }
 
     public static S3ClientConfig of(CloudProperties cloudProperties) {
@@ -74,7 +77,8 @@
                 cloudProperties.getProfilerLogInterval(), cloudProperties.getWriteBufferSize(),
                 cloudProperties.getTokenAcquireTimeout(), cloudProperties.getWriteMaxRequestsPerSecond(),
                 cloudProperties.getReadMaxRequestsPerSecond(), cloudProperties.getRequestsMaxHttpConnections(),
-                cloudProperties.isStorageForcePathStyle(), cloudProperties.isStorageDisableSSLVerify());
+                cloudProperties.isStorageForcePathStyle(), cloudProperties.isStorageDisableSSLVerify(),
+                cloudProperties.isStorageListEventuallyConsistent());
     }
 
     public static S3ClientConfig of(Map<String, String> configuration, int writeBufferSize) {
@@ -144,6 +148,10 @@
         return forcePathStyle;
     }
 
+    public boolean isStorageListEventuallyConsistent() {
+        return storageListEventuallyConsistent;
+    }
+
     private boolean isS3Mock() {
         return endpoint != null && !endpoint.isEmpty();
     }
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
index e4ba4b4..77d34a2 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
@@ -33,6 +33,7 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
+import java.util.function.Function;
 
 import org.apache.asterix.cloud.CloudResettableInputStream;
 import org.apache.asterix.cloud.IWriteBufferProvider;
@@ -132,7 +133,8 @@
         guardian.checkReadAccess(bucket, path);
         profiler.objectsList();
         path = config.isLocalS3Provider() ? encodeURI(path) : path;
-        return filterAndGet(listS3Objects(s3Client, bucket, config.getPrefix() + path), filter);
+        return ensureListConsistent(filterAndGet(listS3Objects(s3Client, bucket, config.getPrefix() + path), filter),
+                bucket, CloudFile::getPath);
     }
 
     @Override
@@ -221,7 +223,15 @@
             String destKey = destPath.getChildPath(IoUtil.getFileNameFromPath(srcKey));
             CopyObjectRequest copyReq = CopyObjectRequest.builder().sourceBucket(bucket).sourceKey(srcKey)
                     .destinationBucket(bucket).destinationKey(config.getPrefix() + destKey).build();
-            s3Client.copyObject(copyReq);
+            try {
+                s3Client.copyObject(copyReq);
+            } catch (NoSuchKeyException ex) {
+                if (config.isStorageListEventuallyConsistent()) {
+                    LOGGER.warn("ignoring 404 on copy of {} since list is configured as eventually consistent", srcKey);
+                } else {
+                    throw ex;
+                }
+            }
         }
     }
 
@@ -300,7 +310,8 @@
 
     @Override
     public JsonNode listAsJson(ObjectMapper objectMapper, String bucket) {
-        List<S3Object> objects = listS3Objects(s3Client, bucket, config.getPrefix());
+        List<S3Object> objects =
+                ensureListConsistent(listS3Objects(s3Client, bucket, config.getPrefix()), bucket, S3Object::key);
         ArrayNode objectsInfo = objectMapper.createArrayNode();
 
         objects.sort((x, y) -> String.CASE_INSENSITIVE_ORDER.compare(x.key(), y.key()));
@@ -357,4 +368,24 @@
         }
         return files;
     }
+
+    private <T, C extends Collection<T>> C ensureListConsistent(C cloudFiles, String bucket,
+            Function<T, String> pathExtractor) {
+        if (config.isStorageListEventuallyConsistent()) {
+            return cloudFiles;
+        }
+        Iterator<T> iterator = cloudFiles.iterator();
+        while (iterator.hasNext()) {
+            String path = pathExtractor.apply(iterator.next());
+            try {
+                if (!exists(bucket, path)) {
+                    LOGGER.warn("Removing non-existent file from list result: {}", path);
+                    iterator.remove();
+                }
+            } catch (HyracksDataException e) {
+                LOGGER.warn("Ignoring exception on exists check on {}", path, e);
+            }
+        }
+        return cloudFiles;
+    }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
index 1af4824..4354e63 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
@@ -67,7 +67,8 @@
         CLOUD_EVICTION_PLAN_REEVALUATE_THRESHOLD(POSITIVE_INTEGER, 50),
         CLOUD_REQUESTS_MAX_HTTP_CONNECTIONS(POSITIVE_INTEGER, 1000),
         CLOUD_STORAGE_FORCE_PATH_STYLE(BOOLEAN, false),
-        CLOUD_STORAGE_DISABLE_SSL_VERIFY(BOOLEAN, false);
+        CLOUD_STORAGE_DISABLE_SSL_VERIFY(BOOLEAN, false),
+        CLOUD_STORAGE_LIST_EVENTUALLY_CONSISTENT(BOOLEAN, false);
 
         private final IOptionType interpreter;
         private final Object defaultValue;
@@ -102,6 +103,7 @@
                 case CLOUD_REQUESTS_MAX_HTTP_CONNECTIONS:
                 case CLOUD_STORAGE_FORCE_PATH_STYLE:
                 case CLOUD_STORAGE_DISABLE_SSL_VERIFY:
+                case CLOUD_STORAGE_LIST_EVENTUALLY_CONSISTENT:
                     return Section.COMMON;
                 default:
                     return Section.NC;
@@ -177,6 +179,9 @@
                 case CLOUD_STORAGE_DISABLE_SSL_VERIFY:
                     return "Indicates whether or not to disable SSL certificate verification on the cloud storage. "
                             + "(default: false)";
+                case CLOUD_STORAGE_LIST_EVENTUALLY_CONSISTENT:
+                    return "Indicates whether or not deleted objects may be contained in list operations for some time"
+                            + "after they are deleted. (default: false)";
                 default:
                     throw new IllegalStateException("NYI: " + this);
             }
@@ -284,4 +289,8 @@
     public boolean isStorageDisableSSLVerify() {
         return accessor.getBoolean(Option.CLOUD_STORAGE_DISABLE_SSL_VERIFY);
     }
+
+    public boolean isStorageListEventuallyConsistent() {
+        return accessor.getBoolean(Option.CLOUD_STORAGE_LIST_EVENTUALLY_CONSISTENT);
+    }
 }