[NO ISSUE][EXT] Set Azure request timeout
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- add azure_request_timeout configurable property
- default timeout to 120 seconds
- catch exceptions from external input stream and
wrap in a RuntimeDataException to avoid halt due
to non-serializable exceptions from external sources
Change-Id: Iebf988384b0bc5d6ae7688c65747227dbde062b1
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/15483
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Tested-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index f7da31d..06458ce 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -65,6 +65,7 @@
import org.apache.asterix.app.result.fields.ResultHandlePrinter;
import org.apache.asterix.app.result.fields.ResultsPrinter;
import org.apache.asterix.app.result.fields.StatusPrinter;
+import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.api.IClientRequest;
import org.apache.asterix.common.api.IMetadataLockManager;
import org.apache.asterix.common.api.IRequestTracker;
@@ -829,7 +830,8 @@
metadataProvider, mdTxnCtx);
ExternalDataUtils.normalize(properties);
ExternalDataUtils.validate(properties);
- validateExternalDatasetProperties(externalDetails, properties, dd.getSourceLocation(), mdTxnCtx);
+ validateExternalDatasetProperties(externalDetails, properties, dd.getSourceLocation(), mdTxnCtx,
+ appCtx);
datasetDetails = new ExternalDatasetDetails(externalDetails.getAdapter(), properties, new Date(),
TransactionState.COMMIT);
break;
@@ -4773,13 +4775,13 @@
}
protected void validateExternalDatasetProperties(ExternalDetailsDecl externalDetails,
- Map<String, String> properties, SourceLocation srcLoc, MetadataTransactionContext mdTxnCtx)
- throws AlgebricksException, HyracksDataException {
+ Map<String, String> properties, SourceLocation srcLoc, MetadataTransactionContext mdTxnCtx,
+ IApplicationContext appCtx) throws AlgebricksException, HyracksDataException {
// Validate adapter specific properties
String adapter = externalDetails.getAdapter();
Map<String, String> details = new HashMap<>(properties);
details.put(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE, adapter);
- validateAdapterSpecificProperties(details, srcLoc);
+ validateAdapterSpecificProperties(details, srcLoc, appCtx);
}
/**
@@ -4787,9 +4789,9 @@
*
* @param configuration external source properties
*/
- protected void validateAdapterSpecificProperties(Map<String, String> configuration, SourceLocation srcLoc)
- throws CompilationException {
- ExternalDataUtils.validateAdapterSpecificProperties(configuration, srcLoc, warningCollector);
+ protected void validateAdapterSpecificProperties(Map<String, String> configuration, SourceLocation srcLoc,
+ IApplicationContext appCtx) throws CompilationException {
+ ExternalDataUtils.validateAdapterSpecificProperties(configuration, srcLoc, warningCollector, appCtx);
}
protected enum CreateResult {
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
index bf736b4..1805e7a 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
@@ -8,6 +8,7 @@
"active\.memory\.global\.budget" : 67108864,
"active\.stop\.timeout" : 3600,
"active\.suspend\.timeout" : 3600,
+ "azure.request.timeout" : 120,
"compiler\.arrayindex" : true,
"compiler\.external\.field\.pushdown" : true,
"compiler\.framesize" : 32768,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
index d52cedd..743347a 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
@@ -8,6 +8,7 @@
"active\.memory\.global\.budget" : 67108864,
"active\.stop\.timeout" : 3600,
"active\.suspend\.timeout" : 3600,
+ "azure.request.timeout" : 120,
"compiler\.arrayindex" : true,
"compiler\.external\.field\.pushdown" : true,
"compiler\.framesize" : 32768,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
index 4f5267f..4359bd9 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
@@ -8,6 +8,7 @@
"active\.memory\.global\.budget" : 67108864,
"active\.stop\.timeout" : 3600,
"active\.suspend\.timeout" : 3600,
+ "azure.request.timeout" : 120,
"compiler\.arrayindex" : true,
"compiler\.external\.field\.pushdown" : true,
"compiler\.framesize" : 32768,
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java
index 46258bf..515aad6 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java
@@ -51,7 +51,8 @@
StorageUtil.getIntSizeInBytes(200, StorageUtil.StorageUnit.MEGABYTE),
"The maximum accepted web request size in bytes"),
REQUESTS_ARCHIVE_SIZE(NONNEGATIVE_INTEGER, 50, "The maximum number of archived requests to maintain"),
- LIBRARY_DEPLOY_TIMEOUT(POSITIVE_INTEGER, 1800, "Timeout to upload a UDF in seconds");
+ LIBRARY_DEPLOY_TIMEOUT(POSITIVE_INTEGER, 1800, "Timeout to upload a UDF in seconds"),
+ AZURE_REQUEST_TIMEOUT(POSITIVE_INTEGER, 120, "Timeout for Azure client requests in seconds");
private final IOptionType type;
private final Object defaultValue;
@@ -78,6 +79,7 @@
case MAX_WAIT_ACTIVE_CLUSTER:
case MAX_WEB_REQUEST_SIZE:
case LIBRARY_DEPLOY_TIMEOUT:
+ case AZURE_REQUEST_TIMEOUT:
return Section.COMMON;
case CC_JAVA_OPTS:
case NC_JAVA_OPTS:
@@ -155,4 +157,7 @@
return accessor.getInt(Option.LIBRARY_DEPLOY_TIMEOUT);
}
+ public int getAzureRequestTimeout() {
+ return accessor.getInt(Option.AZURE_REQUEST_TIMEOUT);
+ }
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStream.java
index b402f25..cdb3834 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStream.java
@@ -25,6 +25,7 @@
import java.util.Map;
import java.util.zip.GZIPInputStream;
+import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
@@ -45,9 +46,10 @@
private final BlobServiceClient client;
private final String container;
- public AzureBlobInputStream(Map<String, String> configuration, List<String> filePaths) throws HyracksDataException {
+ public AzureBlobInputStream(IApplicationContext appCtx, Map<String, String> configuration, List<String> filePaths)
+ throws HyracksDataException {
super(configuration, filePaths);
- this.client = buildAzureClient(configuration);
+ this.client = buildAzureClient(appCtx, configuration);
this.container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
}
@@ -81,9 +83,10 @@
return true;
}
- private BlobServiceClient buildAzureClient(Map<String, String> configuration) throws HyracksDataException {
+ private BlobServiceClient buildAzureClient(IApplicationContext appCtx, Map<String, String> configuration)
+ throws HyracksDataException {
try {
- return ExternalDataUtils.Azure.buildAzureBlobClient(configuration);
+ return ExternalDataUtils.Azure.buildAzureBlobClient(appCtx, configuration);
} catch (CompilationException ex) {
throw HyracksDataException.create(ex);
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStreamFactory.java
index bf904a4..064b319 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStreamFactory.java
@@ -23,6 +23,7 @@
import java.util.Map;
import java.util.PriorityQueue;
+import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
import org.apache.asterix.external.util.ExternalDataUtils;
@@ -41,7 +42,10 @@
@Override
public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition) throws HyracksDataException {
- return new AzureBlobInputStream(configuration, partitionWorkLoadsBasedOnSize.get(partition).getFilePaths());
+ IApplicationContext appCtx =
+ (IApplicationContext) ctx.getJobletContext().getServiceContext().getApplicationContext();
+ return new AzureBlobInputStream(appCtx, configuration,
+ partitionWorkLoadsBasedOnSize.get(partition).getFilePaths());
}
@Override
@@ -49,10 +53,11 @@
throws AlgebricksException {
super.configure(ctx, configuration, warningCollector);
+ IApplicationContext appCtx = (IApplicationContext) ctx.getApplicationContext();
// Ensure the validity of include/exclude
ExternalDataUtils.validateIncludeExclude(configuration);
IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration);
- BlobServiceClient blobServiceClient = ExternalDataUtils.Azure.buildAzureBlobClient(configuration);
+ BlobServiceClient blobServiceClient = ExternalDataUtils.Azure.buildAzureBlobClient(appCtx, configuration);
List<BlobItem> filesOnly = ExternalDataUtils.Azure.listBlobItems(blobServiceClient, configuration,
includeExcludeMatcher, warningCollector);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStream.java
index b7d142f..e34d188 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStream.java
@@ -25,6 +25,7 @@
import java.util.Map;
import java.util.zip.GZIPInputStream;
+import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
@@ -45,10 +46,10 @@
private final DataLakeServiceClient client;
private final String container;
- public AzureDataLakeInputStream(Map<String, String> configuration, List<String> filePaths)
- throws HyracksDataException {
+ public AzureDataLakeInputStream(IApplicationContext appCtx, Map<String, String> configuration,
+ List<String> filePaths) throws HyracksDataException {
super(configuration, filePaths);
- this.client = buildAzureClient(configuration);
+ this.client = buildAzureClient(appCtx, configuration);
this.container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
}
@@ -82,9 +83,10 @@
return true;
}
- private DataLakeServiceClient buildAzureClient(Map<String, String> configuration) throws HyracksDataException {
+ private DataLakeServiceClient buildAzureClient(IApplicationContext appCtx, Map<String, String> configuration)
+ throws HyracksDataException {
try {
- return ExternalDataUtils.Azure.buildAzureDatalakeClient(configuration);
+ return ExternalDataUtils.Azure.buildAzureDatalakeClient(appCtx, configuration);
} catch (CompilationException ex) {
throw HyracksDataException.create(ex);
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStreamFactory.java
index e145e1f..e9f8d4c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStreamFactory.java
@@ -23,6 +23,7 @@
import java.util.Map;
import java.util.PriorityQueue;
+import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
import org.apache.asterix.external.util.ExternalDataUtils;
@@ -41,7 +42,10 @@
@Override
public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition) throws HyracksDataException {
- return new AzureDataLakeInputStream(configuration, partitionWorkLoadsBasedOnSize.get(partition).getFilePaths());
+ IApplicationContext appCtx =
+ (IApplicationContext) ctx.getJobletContext().getServiceContext().getApplicationContext();
+ return new AzureDataLakeInputStream(appCtx, configuration,
+ partitionWorkLoadsBasedOnSize.get(partition).getFilePaths());
}
@Override
@@ -49,10 +53,11 @@
throws AlgebricksException {
super.configure(ctx, configuration, warningCollector);
+ IApplicationContext appCtx = (IApplicationContext) ctx.getApplicationContext();
// Ensure the validity of include/exclude
ExternalDataUtils.validateIncludeExclude(configuration);
IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration);
- DataLakeServiceClient client = ExternalDataUtils.Azure.buildAzureDatalakeClient(configuration);
+ DataLakeServiceClient client = ExternalDataUtils.Azure.buildAzureDatalakeClient(appCtx, configuration);
List<PathItem> filesOnly = ExternalDataUtils.Azure.listDatalakePathItems(client, configuration,
includeExcludeMatcher, warningCollector);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java
index 1f82dae..c2251df 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java
@@ -23,6 +23,7 @@
import java.util.Map;
import java.util.Set;
+import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.external.input.HDFSDataSourceFactory;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher;
@@ -45,7 +46,8 @@
@Override
public void configure(IServiceContext serviceCtx, Map<String, String> configuration,
IWarningCollector warningCollector) throws AlgebricksException, HyracksDataException {
- BlobServiceClient blobServiceClient = ExternalDataUtils.Azure.buildAzureBlobClient(configuration);
+ IApplicationContext appCtx = (IApplicationContext) serviceCtx.getApplicationContext();
+ BlobServiceClient blobServiceClient = ExternalDataUtils.Azure.buildAzureBlobClient(appCtx, configuration);
//Get endpoint
String endPoint = extractEndPoint(blobServiceClient.getAccountUrl());
//Get path
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java
index 8474a74..db87868 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java
@@ -23,6 +23,7 @@
import java.util.Map;
import java.util.Set;
+import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.external.input.HDFSDataSourceFactory;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher;
@@ -45,7 +46,9 @@
@Override
public void configure(IServiceContext serviceCtx, Map<String, String> configuration,
IWarningCollector warningCollector) throws AlgebricksException, HyracksDataException {
- DataLakeServiceClient dataLakeServiceClient = ExternalDataUtils.Azure.buildAzureDatalakeClient(configuration);
+ IApplicationContext appCtx = (IApplicationContext) serviceCtx.getApplicationContext();
+ DataLakeServiceClient dataLakeServiceClient =
+ ExternalDataUtils.Azure.buildAzureDatalakeClient(appCtx, configuration);
//Get endpoint
String endPoint = extractEndPoint(dataLakeServiceClient.getAccountUrl());
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AbstractMultipleInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AbstractMultipleInputStream.java
index 8f032d8..18ef150 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AbstractMultipleInputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/AbstractMultipleInputStream.java
@@ -21,10 +21,13 @@
import java.io.IOException;
import java.io.InputStream;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.api.IStreamNotificationHandler;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.ExceptionUtils;
/**
* Base class for a source stream that is composed of multiple separate input streams. Reading proceeds one stream at
@@ -54,25 +57,30 @@
@Override
public final int read(byte[] b, int off, int len) throws IOException {
- if (in == null) {
- if (!advance()) {
- return -1;
+ try {
+ if (in == null) {
+ if (!advance()) {
+ return -1;
+ }
}
+ int result = in.read(b, off, len);
+ if (result < 0 && (lastByte != ExternalDataConstants.BYTE_LF)
+ && (lastByte != ExternalDataConstants.BYTE_CR)) {
+ // return a new line at the end of every file <--Might create problems for some cases
+ // depending on the parser implementation-->
+ lastByte = ExternalDataConstants.BYTE_LF;
+ b[off] = ExternalDataConstants.BYTE_LF;
+ return 1;
+ }
+ while ((result < 0) && advance()) {
+ result = in.read(b, off, len);
+ }
+ if (result > 0) {
+ lastByte = b[(off + result) - 1];
+ }
+ return result;
+ } catch (Exception e) {
+ throw RuntimeDataException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, ExceptionUtils.getMessageOrToString(e));
}
- int result = in.read(b, off, len);
- if (result < 0 && (lastByte != ExternalDataConstants.BYTE_LF) && (lastByte != ExternalDataConstants.BYTE_CR)) {
- // return a new line at the end of every file <--Might create problems for some cases
- // depending on the parser implementation-->
- lastByte = ExternalDataConstants.BYTE_LF;
- b[off] = ExternalDataConstants.BYTE_LF;
- return 1;
- }
- while ((result < 0) && advance()) {
- result = in.read(b, off, len);
- }
- if (result > 0) {
- lastByte = b[(off + result) - 1];
- }
- return result;
}
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 22040e2..8e38eed 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -90,6 +90,7 @@
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
+import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
@@ -139,6 +140,7 @@
import com.azure.storage.blob.models.BlobItem;
import com.azure.storage.blob.models.ListBlobsOptions;
import com.azure.storage.common.StorageSharedKeyCredential;
+import com.azure.storage.common.policy.RequestRetryOptions;
import com.azure.storage.file.datalake.DataLakeFileSystemClient;
import com.azure.storage.file.datalake.DataLakeServiceClient;
import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;
@@ -597,7 +599,7 @@
* @param configuration properties
*/
public static void validateAdapterSpecificProperties(Map<String, String> configuration, SourceLocation srcLoc,
- IWarningCollector collector) throws CompilationException {
+ IWarningCollector collector, IApplicationContext appCtx) throws CompilationException {
String type = configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE);
switch (type) {
@@ -605,10 +607,10 @@
AwsS3.validateProperties(configuration, srcLoc, collector);
break;
case ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_BLOB:
- Azure.validateAzureBlobProperties(configuration, srcLoc, collector);
+ Azure.validateAzureBlobProperties(configuration, srcLoc, collector, appCtx);
break;
case ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_DATA_LAKE:
- Azure.validateAzureDataLakeProperties(configuration, srcLoc, collector);
+ Azure.validateAzureDataLakeProperties(configuration, srcLoc, collector, appCtx);
break;
case KEY_ADAPTER_NAME_GCS:
GCS.validateProperties(configuration, srcLoc, collector);
@@ -1277,8 +1279,8 @@
* @param configuration properties
* @return client
*/
- public static BlobServiceClient buildAzureBlobClient(Map<String, String> configuration)
- throws CompilationException {
+ public static BlobServiceClient buildAzureBlobClient(IApplicationContext appCtx,
+ Map<String, String> configuration) throws CompilationException {
String managedIdentityId = configuration.get(MANAGED_IDENTITY_ID_FIELD_NAME);
String accountName = configuration.get(ACCOUNT_NAME_FIELD_NAME);
String accountKey = configuration.get(ACCOUNT_KEY_FIELD_NAME);
@@ -1292,6 +1294,9 @@
// Client builder
BlobServiceClientBuilder builder = new BlobServiceClientBuilder();
+ int timeout = appCtx.getExternalProperties().getAzureRequestTimeout();
+ RequestRetryOptions requestRetryOptions = new RequestRetryOptions(null, null, timeout, null, null, null);
+ builder.retryOptions(requestRetryOptions);
// Endpoint is required
if (endpoint == null) {
@@ -1428,8 +1433,8 @@
* @param configuration properties
* @return client
*/
- public static DataLakeServiceClient buildAzureDatalakeClient(Map<String, String> configuration)
- throws CompilationException {
+ public static DataLakeServiceClient buildAzureDatalakeClient(IApplicationContext appCtx,
+ Map<String, String> configuration) throws CompilationException {
String managedIdentityId = configuration.get(MANAGED_IDENTITY_ID_FIELD_NAME);
String accountName = configuration.get(ACCOUNT_NAME_FIELD_NAME);
String accountKey = configuration.get(ACCOUNT_KEY_FIELD_NAME);
@@ -1443,6 +1448,9 @@
// Client builder
DataLakeServiceClientBuilder builder = new DataLakeServiceClientBuilder();
+ int timeout = appCtx.getExternalProperties().getAzureRequestTimeout();
+ RequestRetryOptions requestRetryOptions = new RequestRetryOptions(null, null, timeout, null, null, null);
+ builder.retryOptions(requestRetryOptions);
// Endpoint is required
if (endpoint == null) {
@@ -1702,7 +1710,7 @@
* @throws CompilationException Compilation exception
*/
public static void validateAzureBlobProperties(Map<String, String> configuration, SourceLocation srcLoc,
- IWarningCollector collector) throws CompilationException {
+ IWarningCollector collector, IApplicationContext appCtx) throws CompilationException {
// check if the format property is present
if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
@@ -1715,7 +1723,7 @@
BlobServiceClient blobServiceClient;
try {
String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
- blobServiceClient = buildAzureBlobClient(configuration);
+ blobServiceClient = buildAzureBlobClient(appCtx, configuration);
BlobContainerClient blobContainer = blobServiceClient.getBlobContainerClient(container);
// Get all objects in a container and extract the paths to files
@@ -1741,7 +1749,7 @@
* @throws CompilationException Compilation exception
*/
public static void validateAzureDataLakeProperties(Map<String, String> configuration, SourceLocation srcLoc,
- IWarningCollector collector) throws CompilationException {
+ IWarningCollector collector, IApplicationContext appCtx) throws CompilationException {
// check if the format property is present
if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
@@ -1754,7 +1762,7 @@
DataLakeServiceClient dataLakeServiceClient;
try {
String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
- dataLakeServiceClient = buildAzureDatalakeClient(configuration);
+ dataLakeServiceClient = buildAzureDatalakeClient(appCtx, configuration);
DataLakeFileSystemClient fileSystemClient = dataLakeServiceClient.getFileSystemClient(container);
// Get all objects in a container and extract the paths to files