[ASTERIXDB-2944][EXT] Ensure the size of S3 connection pool
- user mode changes: no
- storage format changes: no
- interface changes: no
Details:
- We set the S3 connection pool size to be the number of partitions.
Change-Id: I1e1ce66cc7cd39cc81d004f90c36871ad31a685f
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/12763
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wael Alkowaileet <wael.y.k@gmail.com>
Reviewed-by: Hussain Towaileb <hussainht@gmail.com>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
index c9eb489..803e657 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
@@ -51,7 +51,8 @@
//Configure Hadoop S3 input splits
JobConf conf = createHdfsConf(serviceCtx, configuration);
- ExternalDataUtils.AwsS3.configureAwsS3HdfsJobConf(conf, configuration);
+ int numberOfPartitions = getPartitionConstraint().getLocations().length;
+ ExternalDataUtils.AwsS3.configureAwsS3HdfsJobConf(conf, configuration, numberOfPartitions);
configureHdfsConf(conf, configuration);
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 51865f0..7445fbd 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -320,9 +320,17 @@
public static final String HADOOP_ACCESS_KEY_ID = "fs.s3a.access.key";
public static final String HADOOP_SECRET_ACCESS_KEY = "fs.s3a.secret.key";
public static final String HADOOP_SESSION_TOKEN = "fs.s3a.session.token";
- public static final String HADOOP_PATH_STYLE_ACCESS = "fs.s3a.path.style.access";
public static final String HADOOP_REGION = "fs.s3a.region";
public static final String HADOOP_SERVICE_END_POINT = "fs.s3a.endpoint";
+
+ /*
+ * Internal configurations
+ */
+ //Allows accessing directories as file system path
+ public static final String HADOOP_PATH_STYLE_ACCESS = "fs.s3a.path.style.access";
+ //The number of maximum HTTP connections in connection pool
+ public static final String HADOOP_S3_CONNECTION_POOL_SIZE = "fs.s3a.connection.maximum";
+ //S3 used protocol
public static final String HADOOP_S3_PROTOCOL = "s3a";
//Hadoop credentials provider key
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 750ff13..59f2382 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -25,6 +25,7 @@
import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.HADOOP_ANONYMOUS_ACCESS;
import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.HADOOP_CREDENTIAL_PROVIDER_KEY;
import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.HADOOP_PATH_STYLE_ACCESS;
+import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.HADOOP_S3_CONNECTION_POOL_SIZE;
import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.HADOOP_SECRET_ACCESS_KEY;
import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.HADOOP_SESSION_TOKEN;
import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.HADOOP_TEMP_ACCESS;
@@ -121,7 +122,6 @@
import software.amazon.awssdk.services.s3.model.S3Response;
public class ExternalDataUtils {
-
private static final Map<ATypeTag, IValueParserFactory> valueParserFactoryMap = new EnumMap<>(ATypeTag.class);
private static final int DEFAULT_MAX_ARGUMENT_SZ = 1024 * 1024;
private static final int HEADER_FUDGE = 64;
@@ -833,9 +833,11 @@
/**
* Builds the S3 client using the provided configuration
*
- * @param configuration properties
+ * @param configuration properties
+ * @param numberOfPartitions number of partitions in the cluster
*/
- public static void configureAwsS3HdfsJobConf(JobConf conf, Map<String, String> configuration) {
+ public static void configureAwsS3HdfsJobConf(JobConf conf, Map<String, String> configuration,
+ int numberOfPartitions) {
String accessKeyId = configuration.get(ExternalDataConstants.AwsS3.ACCESS_KEY_ID_FIELD_NAME);
String secretAccessKey = configuration.get(ExternalDataConstants.AwsS3.SECRET_ACCESS_KEY_FIELD_NAME);
String sessionToken = configuration.get(ExternalDataConstants.AwsS3.SESSION_TOKEN_FIELD_NAME);
@@ -867,6 +869,11 @@
*/
conf.set(HADOOP_PATH_STYLE_ACCESS, ExternalDataConstants.TRUE);
+ /*
+ * Set the size of S3 connection pool to be the number of partitions
+ */
+ conf.set(HADOOP_S3_CONNECTION_POOL_SIZE, String.valueOf(numberOfPartitions));
+
if (serviceEndpoint != null) {
// Validation of the URL should be done at hadoop-aws level
conf.set(ExternalDataConstants.AwsS3.HADOOP_SERVICE_END_POINT, serviceEndpoint);