[ASTERIXDB-3598][CONF] Add cloud storage configuration

- user model changes: yes
- storage format changes: no
- interface changes: no

Details:

- Add an option to disable SSL verification on the cloud storage
  to be used for testing purposes.
- Add an to force path style access on cloud storage (ASTERIXDB-3599)

Change-Id: I932b85cea015daa91b512b0219d10d4b9075ca9b
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19648
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
index 29061ec..0253348 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
@@ -24,8 +24,10 @@
     "cloud.storage.cache.policy" : "selective",
     "cloud.storage.debug.mode.enabled" : false,
     "cloud.storage.debug.sweep.threshold.size" : 1073741824,
+    "cloud.storage.disable.ssl.verify" : false,
     "cloud.storage.disk.monitor.interval" : 120,
     "cloud.storage.endpoint" : "",
+    "cloud.storage.force.path.style" : false,
     "cloud.storage.index.inactive.duration.threshold" : 360,
     "cloud.storage.prefix" : "",
     "cloud.storage.region" : "",
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
index f2ea15b..14a4710 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
@@ -24,8 +24,10 @@
     "cloud.storage.cache.policy" : "selective",
     "cloud.storage.debug.mode.enabled" : false,
     "cloud.storage.debug.sweep.threshold.size" : 1073741824,
+    "cloud.storage.disable.ssl.verify" : false,
     "cloud.storage.disk.monitor.interval" : 120,
     "cloud.storage.endpoint" : "",
+    "cloud.storage.force.path.style" : false,
     "cloud.storage.index.inactive.duration.threshold" : 360,
     "cloud.storage.prefix" : "",
     "cloud.storage.region" : "",
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
index 685d28b..8e9b187 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
@@ -24,8 +24,10 @@
     "cloud.storage.cache.policy" : "selective",
     "cloud.storage.debug.mode.enabled" : false,
     "cloud.storage.debug.sweep.threshold.size" : 1073741824,
+    "cloud.storage.disable.ssl.verify" : false,
     "cloud.storage.disk.monitor.interval" : 120,
     "cloud.storage.endpoint" : "",
+    "cloud.storage.force.path.style" : false,
     "cloud.storage.index.inactive.duration.threshold" : 360,
     "cloud.storage.prefix" : "",
     "cloud.storage.region" : "",
diff --git a/asterixdb/asterix-cloud/pom.xml b/asterixdb/asterix-cloud/pom.xml
index 419ac4d..efe8673 100644
--- a/asterixdb/asterix-cloud/pom.xml
+++ b/asterixdb/asterix-cloud/pom.xml
@@ -251,6 +251,11 @@
             <artifactId>apache-client</artifactId>
         </dependency>
         <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>netty-nio-client</artifactId>
+            <version>${awsjavasdk.version}</version>
+        </dependency>
+        <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
             <scope>test</scope>
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
index 20727de..5414099 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
@@ -42,15 +42,18 @@
     private final int readMaxRequestsPerSeconds;
     private final int writeMaxRequestsPerSeconds;
     private final int requestsMaxHttpConnections;
+    private final boolean forcePathStyle;
+    private final boolean disableSslVerify;
 
     public S3ClientConfig(String region, String endpoint, String prefix, boolean anonymousAuth,
             long profilerLogInterval, int writeBufferSize) {
-        this(region, endpoint, prefix, anonymousAuth, profilerLogInterval, writeBufferSize, 1, 0, 0, 0);
+        this(region, endpoint, prefix, anonymousAuth, profilerLogInterval, writeBufferSize, 1, 0, 0, 0, false, false);
     }
 
     private S3ClientConfig(String region, String endpoint, String prefix, boolean anonymousAuth,
             long profilerLogInterval, int writeBufferSize, long tokenAcquireTimeout, int writeMaxRequestsPerSeconds,
-            int readMaxRequestsPerSeconds, int requestsMaxHttpConnections) {
+            int readMaxRequestsPerSeconds, int requestsMaxHttpConnections, boolean forcePathStyle,
+            boolean disableSslVerify) {
         this.region = Objects.requireNonNull(region, "region");
         this.endpoint = endpoint;
         this.prefix = Objects.requireNonNull(prefix, "prefix");
@@ -61,6 +64,8 @@
         this.writeMaxRequestsPerSeconds = writeMaxRequestsPerSeconds;
         this.readMaxRequestsPerSeconds = readMaxRequestsPerSeconds;
         this.requestsMaxHttpConnections = requestsMaxHttpConnections;
+        this.forcePathStyle = forcePathStyle;
+        this.disableSslVerify = disableSslVerify;
     }
 
     public static S3ClientConfig of(CloudProperties cloudProperties) {
@@ -68,7 +73,8 @@
                 cloudProperties.getStoragePrefix(), cloudProperties.isStorageAnonymousAuth(),
                 cloudProperties.getProfilerLogInterval(), cloudProperties.getWriteBufferSize(),
                 cloudProperties.getTokenAcquireTimeout(), cloudProperties.getWriteMaxRequestsPerSecond(),
-                cloudProperties.getReadMaxRequestsPerSecond(), cloudProperties.getRequestsMaxHttpConnections());
+                cloudProperties.getReadMaxRequestsPerSecond(), cloudProperties.getRequestsMaxHttpConnections(),
+                cloudProperties.isStorageForcePathStyle(), cloudProperties.isStorageDisableSSLVerify());
     }
 
     public static S3ClientConfig of(Map<String, String> configuration, int writeBufferSize) {
@@ -130,6 +136,14 @@
         return requestsMaxHttpConnections;
     }
 
+    public boolean isDisableSslVerify() {
+        return disableSslVerify;
+    }
+
+    public boolean isForcePathStyle() {
+        return forcePathStyle;
+    }
+
     private boolean isS3Mock() {
         return endpoint != null && !endpoint.isEmpty();
     }
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
index 01a8b02..e4ba4b4 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
@@ -26,7 +26,6 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
-import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -63,6 +62,8 @@
 
 import software.amazon.awssdk.core.ResponseInputStream;
 import software.amazon.awssdk.core.sync.RequestBody;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.http.SdkHttpConfigurationOption;
 import software.amazon.awssdk.http.apache.ApacheHttpClient;
 import software.amazon.awssdk.regions.Region;
 import software.amazon.awssdk.services.s3.S3Client;
@@ -79,6 +80,7 @@
 import software.amazon.awssdk.services.s3.model.PutObjectRequest;
 import software.amazon.awssdk.services.s3.model.S3Error;
 import software.amazon.awssdk.services.s3.model.S3Object;
+import software.amazon.awssdk.utils.AttributeMap;
 
 @ThreadSafe
 public final class S3CloudClient implements ICloudClient {
@@ -326,19 +328,21 @@
         S3ClientBuilder builder = S3Client.builder();
         builder.credentialsProvider(config.createCredentialsProvider());
         builder.region(Region.of(config.getRegion()));
+        builder.forcePathStyle(config.isForcePathStyle());
+
+        AttributeMap.Builder customHttpConfigBuilder = AttributeMap.builder();
         if (config.getRequestsMaxHttpConnections() > 0) {
-            builder.httpClientBuilder(
-                    ApacheHttpClient.builder().maxConnections(config.getRequestsMaxHttpConnections()));
+            customHttpConfigBuilder.put(SdkHttpConfigurationOption.MAX_CONNECTIONS,
+                    config.getRequestsMaxHttpConnections());
         }
         if (config.getEndpoint() != null && !config.getEndpoint().isEmpty()) {
-            URI uri;
-            try {
-                uri = new URI(config.getEndpoint());
-            } catch (URISyntaxException ex) {
-                throw new IllegalArgumentException(ex);
-            }
-            builder.endpointOverride(uri);
+            builder.endpointOverride(URI.create(config.getEndpoint()));
         }
+        if (config.isDisableSslVerify()) {
+            customHttpConfigBuilder.put(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, true);
+        }
+        SdkHttpClient httpClient = ApacheHttpClient.builder().buildWithDefaults(customHttpConfigBuilder.build());
+        builder.httpClient(httpClient);
         return builder.build();
     }
 
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
index 2eb9f09..b5259e9 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
@@ -37,6 +37,9 @@
 import org.apache.hyracks.control.nc.io.IOManager;
 import org.apache.hyracks.util.annotations.ThreadSafe;
 
+import software.amazon.awssdk.http.SdkHttpConfigurationOption;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
 import software.amazon.awssdk.regions.Region;
 import software.amazon.awssdk.services.s3.S3AsyncClient;
 import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
@@ -50,6 +53,7 @@
 import software.amazon.awssdk.transfer.s3.model.DownloadFileRequest;
 import software.amazon.awssdk.transfer.s3.model.FailedFileDownload;
 import software.amazon.awssdk.transfer.s3.model.FileDownload;
+import software.amazon.awssdk.utils.AttributeMap;
 
 @ThreadSafe
 class S3ParallelDownloader implements IParallelDownloader {
@@ -170,24 +174,28 @@
     }
 
     private static S3AsyncClient createAsyncClient(S3ClientConfig config) {
-        if (config.isLocalS3Provider()) {
-            // CRT client is not supported by S3Mock
-            return createS3AsyncClient(config);
-        } else {
-            // CRT could provide a better performance when used with an actual S3
+        // CRT client is not supported by all local S3 providers, but provides a better performance with AWS S3
+        if (!config.isLocalS3Provider()) {
             return createS3CrtAsyncClient(config);
         }
+        return createS3AsyncClient(config);
     }
 
     private static S3AsyncClient createS3AsyncClient(S3ClientConfig config) {
         S3AsyncClientBuilder builder = S3AsyncClient.builder();
         builder.credentialsProvider(config.createCredentialsProvider());
         builder.region(Region.of(config.getRegion()));
-
+        builder.forcePathStyle(config.isForcePathStyle());
+        AttributeMap.Builder customHttpConfigBuilder = AttributeMap.builder();
         if (config.getEndpoint() != null && !config.getEndpoint().isEmpty()) {
             builder.endpointOverride(URI.create(config.getEndpoint()));
         }
-
+        if (config.isDisableSslVerify()) {
+            customHttpConfigBuilder.put(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, true);
+        }
+        SdkAsyncHttpClient nettyHttpClient =
+                NettyNioAsyncHttpClient.builder().buildWithDefaults(customHttpConfigBuilder.build());
+        builder.httpClient(nettyHttpClient);
         return builder.build();
     }
 
@@ -195,11 +203,10 @@
         S3CrtAsyncClientBuilder builder = S3AsyncClient.crtBuilder();
         builder.credentialsProvider(config.createCredentialsProvider());
         builder.region(Region.of(config.getRegion()));
-
+        builder.forcePathStyle(config.isForcePathStyle());
         if (config.getEndpoint() != null && !config.getEndpoint().isEmpty()) {
             builder.endpointOverride(URI.create(config.getEndpoint()));
         }
-
         return builder.build();
     }
 
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
index 1e3fe75..1af4824 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
@@ -65,7 +65,9 @@
                 getRangedIntegerType(5, Integer.MAX_VALUE),
                 StorageUtil.getIntSizeInBytes(8, StorageUtil.StorageUnit.MEGABYTE)),
         CLOUD_EVICTION_PLAN_REEVALUATE_THRESHOLD(POSITIVE_INTEGER, 50),
-        CLOUD_REQUESTS_MAX_HTTP_CONNECTIONS(POSITIVE_INTEGER, 1000);
+        CLOUD_REQUESTS_MAX_HTTP_CONNECTIONS(POSITIVE_INTEGER, 1000),
+        CLOUD_STORAGE_FORCE_PATH_STYLE(BOOLEAN, false),
+        CLOUD_STORAGE_DISABLE_SSL_VERIFY(BOOLEAN, false);
 
         private final IOptionType interpreter;
         private final Object defaultValue;
@@ -98,6 +100,8 @@
                 case CLOUD_WRITE_BUFFER_SIZE:
                 case CLOUD_EVICTION_PLAN_REEVALUATE_THRESHOLD:
                 case CLOUD_REQUESTS_MAX_HTTP_CONNECTIONS:
+                case CLOUD_STORAGE_FORCE_PATH_STYLE:
+                case CLOUD_STORAGE_DISABLE_SSL_VERIFY:
                     return Section.COMMON;
                 default:
                     return Section.NC;
@@ -167,6 +171,12 @@
                     return "The number of cloud reads for re-evaluating an eviction plan. (default: 50)";
                 case CLOUD_REQUESTS_MAX_HTTP_CONNECTIONS:
                     return "The maximum number of HTTP connections to use for cloud requests per node. (default: 1000)";
+                case CLOUD_STORAGE_FORCE_PATH_STYLE:
+                    return "Indicates whether or not to force path style when accessing the cloud storage. (default:"
+                            + " false)";
+                case CLOUD_STORAGE_DISABLE_SSL_VERIFY:
+                    return "Indicates whether or not to disable SSL certificate verification on the cloud storage. "
+                            + "(default: false)";
                 default:
                     throw new IllegalStateException("NYI: " + this);
             }
@@ -266,4 +276,12 @@
     public int getRequestsMaxHttpConnections() {
         return accessor.getInt(Option.CLOUD_REQUESTS_MAX_HTTP_CONNECTIONS);
     }
+
+    public boolean isStorageForcePathStyle() {
+        return accessor.getBoolean(Option.CLOUD_STORAGE_FORCE_PATH_STYLE);
+    }
+
+    public boolean isStorageDisableSSLVerify() {
+        return accessor.getBoolean(Option.CLOUD_STORAGE_DISABLE_SSL_VERIFY);
+    }
 }