[ASTERIXDB-3514][EXT]: Support trust auth for parquet + delete assumed creds on collection drop

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
- Support using trusted credentials for reading S3 parquet
  files since it uses different code path to build the client.
- Delete the temporarily generated credentials when the
  collection is dropped.

Ext-ref: MB-63505
Change-Id: I77998a5dfcc304692e12280b7b4018f3593085b9
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19246
Reviewed-by: Hussain Towaileb <hussainht@gmail.com>
Tested-by: Hussain Towaileb <hussainht@gmail.com>
Integration-Tests: Hussain Towaileb <hussainht@gmail.com>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java
index e892d04..9cecaa7 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java
@@ -183,7 +183,7 @@
         this.globalTxManager = globalTxManager;
         this.ioManager = ioManager;
         dataPartitioningProvider = DataPartitioningProvider.create(this);
-        externalCredentialsCache = new ExternalCredentialsCache();
+        externalCredentialsCache = new ExternalCredentialsCache(this);
         externalCredentialsCacheUpdater = new ExternalCredentialsCacheUpdater(this);
     }
 
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCache.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCache.java
index 0ddca4e..66fbdec 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCache.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCache.java
@@ -23,6 +23,7 @@
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.external.IExternalCredentialsCache;
 import org.apache.asterix.common.metadata.MetadataConstants;
 import org.apache.asterix.external.util.ExternalDataConstants;
@@ -38,8 +39,12 @@
 
     private static final Logger LOGGER = LogManager.getLogger();
     private final ConcurrentMap<String, Pair<Span, Object>> cache = new ConcurrentHashMap<>();
+    private final int awsAssumeRoleDuration;
+    private final double refreshAwsAssumeRoleThreshold;
 
-    public ExternalCredentialsCache() {
+    public ExternalCredentialsCache(IApplicationContext appCtx) {
+        this.awsAssumeRoleDuration = appCtx.getExternalProperties().getAwsAssumeRoleDuration();
+        this.refreshAwsAssumeRoleThreshold = appCtx.getExternalProperties().getAwsRefreshAssumeRoleThreshold();
     }
 
     @Override
@@ -60,6 +65,11 @@
     }
 
     @Override
+    public void deleteCredentials(String name) {
+        cache.remove(name);
+    }
+
+    @Override
     public String getName(Map<String, String> configuration) {
         String database = configuration.get(ExternalDataConstants.KEY_DATASET_DATABASE);
         if (database == null) {
@@ -78,20 +88,19 @@
     }
 
     private void doUpdateAwsCache(Map<String, String> configuration, AwsSessionCredentials credentials) {
-        // TODO(htowaileb): Set default expiration value
         String name = getName(configuration);
-        cache.put(name, Pair.of(Span.start(15, TimeUnit.MINUTES), credentials));
+        cache.put(name, Pair.of(Span.start(awsAssumeRoleDuration, TimeUnit.SECONDS), credentials));
         LOGGER.info("Received and cached new credentials for {}", name);
     }
 
     /**
-     * Refresh if the remaining time is half or less than the total expiration time
+     * Refresh if the remaining time is less than the configured refresh percentage
      *
      * @param span expiration span
-     * @return true if the remaining time is half or less than the total expiration time, false otherwise
+     * @return true if the remaining time is less than the configured refresh percentage, false otherwise
      */
     private boolean needsRefresh(Span span) {
-        // TODO(htowaileb): At what % (and should be configurable?) do we decide it's better to refresh credentials
-        return (double) span.remaining(TimeUnit.MINUTES) / span.getSpan(TimeUnit.MINUTES) < 0.5;
+        return (double) span.remaining(TimeUnit.SECONDS)
+                / span.getSpan(TimeUnit.SECONDS) < refreshAwsAssumeRoleThreshold;
     }
 }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 8c2a0ab..343baf0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -216,7 +216,7 @@
         cacheManager = new CacheManager();
         this.namespacePathResolver = namespacePathResolver;
         this.namespaceResolver = namespaceResolver;
-        this.externalCredentialsCache = new ExternalCredentialsCache();
+        this.externalCredentialsCache = new ExternalCredentialsCache(this);
         this.externalCredentialsCacheUpdater = new ExternalCredentialsCacheUpdater(this);
     }
 
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index ed93838..4ee674e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -2448,6 +2448,8 @@
                     sourceLoc, EnumSet.of(DropOption.IF_EXISTS), requestParameters.isForceDropDataset());
 
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue());
+            appCtx.getExternalCredentialsCache()
+                    .deleteCredentials(String.join(".", databaseName, dataverseName.getCanonicalForm(), datasetName));
             return true;
         } catch (Exception e) {
             LOGGER.error("failed to drop dataset; executing compensating operations", e);
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
index 7c963a7..29a4a6e 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
@@ -8,6 +8,8 @@
     "active\.memory\.global\.budget" : 67108864,
     "active\.stop\.timeout" : 3600,
     "active\.suspend\.timeout" : 3600,
+    "aws.assume.role.duration" : 900,
+    "aws.refresh.assume.role.threshold" : 0.5,
     "azure.request.timeout" : 120,
     "cloud.acquire.token.timeout" : 100,
     "cloud.deployment" : false,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
index 50927b4..dbbf83f 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
@@ -8,6 +8,8 @@
     "active\.memory\.global\.budget" : 67108864,
     "active\.stop\.timeout" : 3600,
     "active\.suspend\.timeout" : 3600,
+    "aws.assume.role.duration" : 900,
+    "aws.refresh.assume.role.threshold" : 0.5,
     "azure.request.timeout" : 120,
     "cloud.acquire.token.timeout" : 100,
     "cloud.deployment" : false,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
index 1b035a0..4778cb8 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
@@ -8,6 +8,8 @@
     "active\.memory\.global\.budget" : 67108864,
     "active\.stop\.timeout" : 3600,
     "active\.suspend\.timeout" : 3600,
+    "aws.assume.role.duration" : 900,
+    "aws.refresh.assume.role.threshold" : 0.5,
     "azure.request.timeout" : 120,
     "cloud.acquire.token.timeout" : 100,
     "cloud.deployment" : false,
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java
index 8aa4532..f6559f0 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.common.config;
 
+import static org.apache.hyracks.control.common.config.OptionTypes.DOUBLE;
 import static org.apache.hyracks.control.common.config.OptionTypes.LEVEL;
 import static org.apache.hyracks.control.common.config.OptionTypes.NONNEGATIVE_INTEGER;
 import static org.apache.hyracks.control.common.config.OptionTypes.POSITIVE_INTEGER;
@@ -52,7 +53,16 @@
                 "The maximum accepted web request size in bytes"),
         REQUESTS_ARCHIVE_SIZE(NONNEGATIVE_INTEGER, 1000, "The maximum number of archived requests to maintain"),
         LIBRARY_DEPLOY_TIMEOUT(POSITIVE_INTEGER, 1800, "Timeout to upload a UDF in seconds"),
-        AZURE_REQUEST_TIMEOUT(POSITIVE_INTEGER, 120, "Timeout for Azure client requests in seconds");
+        AZURE_REQUEST_TIMEOUT(POSITIVE_INTEGER, 120, "Timeout for Azure client requests in seconds"),
+        AWS_ASSUME_ROLE_DURATION(
+                POSITIVE_INTEGER,
+                900,
+                "AWS assuming role duration in seconds. "
+                        + "Range from 900 seconds (15 mins) to 43200 seconds (12 hours)"),
+        AWS_REFRESH_ASSUME_ROLE_THRESHOLD(
+                DOUBLE,
+                .5,
+                "Percentage of left duration before assume role credentials " + "needs to be refreshed");
 
         private final IOptionType type;
         private final Object defaultValue;
@@ -80,6 +90,8 @@
                 case MAX_WEB_REQUEST_SIZE:
                 case LIBRARY_DEPLOY_TIMEOUT:
                 case AZURE_REQUEST_TIMEOUT:
+                case AWS_ASSUME_ROLE_DURATION:
+                case AWS_REFRESH_ASSUME_ROLE_THRESHOLD:
                     return Section.COMMON;
                 case CC_JAVA_OPTS:
                 case NC_JAVA_OPTS:
@@ -160,4 +172,12 @@
     public int getAzureRequestTimeout() {
         return accessor.getInt(Option.AZURE_REQUEST_TIMEOUT);
     }
+
+    public int getAwsAssumeRoleDuration() {
+        return accessor.getInt(Option.AWS_ASSUME_ROLE_DURATION);
+    }
+
+    public double getAwsRefreshAssumeRoleThreshold() {
+        return accessor.getDouble(Option.AWS_REFRESH_ASSUME_ROLE_THRESHOLD);
+    }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCache.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCache.java
index 245b350..c603893 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCache.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalCredentialsCache.java
@@ -39,6 +39,13 @@
     void updateCache(Map<String, String> configuration, Map<String, String> credentials);
 
     /**
+     * Deletes the cache for the provided entity name
+     *
+     * @param name name of the entity for which the credentials are to be deleted
+     */
+    void deleteCredentials(String name);
+
+    /**
      * Returns the name of the entity which the cached credentials belong to
      *
      * @param configuration configuration containing external collection details
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
index 138b364..f950feb 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.external.input.record.reader.aws;
 
 import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
+import static org.apache.hyracks.util.LogRedactionUtil.userData;
 
 import java.io.IOException;
 import java.util.List;
@@ -37,7 +38,6 @@
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.util.CleanupUtils;
-import org.apache.hyracks.util.LogRedactionUtil;
 
 import software.amazon.awssdk.core.ResponseInputStream;
 import software.amazon.awssdk.core.exception.SdkException;
@@ -48,12 +48,11 @@
 
 public class AwsS3InputStream extends AbstractExternalInputStream {
 
-    // Configuration
+    private static final int MAX_RETRIES = 5; // We will retry 5 times in case of internal error from AWS S3 service
     private final IApplicationContext ncAppCtx;
     private final String bucket;
-    private final S3Client s3Client;
+    private S3Client s3Client;
     private ResponseInputStream<?> s3InStream;
-    private static final int MAX_RETRIES = 5; // We will retry 5 times in case of internal error from AWS S3 service
 
     public AwsS3InputStream(IApplicationContext ncAppCtx, Map<String, String> configuration, List<String> filePaths,
             IExternalFilterValueEmbedder valueEmbedder) throws HyracksDataException {
@@ -85,7 +84,7 @@
      *
      * @return true
      */
-    private boolean doGetInputStream(GetObjectRequest request) throws RuntimeDataException {
+    private boolean doGetInputStream(GetObjectRequest request) throws HyracksDataException {
         int retries = 0;
         while (retries < MAX_RETRIES) {
             try {
@@ -93,14 +92,18 @@
                 in = s3InStream;
                 break;
             } catch (NoSuchKeyException ex) {
-                LOGGER.debug(() -> "Key " + LogRedactionUtil.userData(request.key()) + " was not found in bucket "
-                        + request.bucket());
+                LOGGER.debug(() -> "Key " + userData(request.key()) + " was not found in bucket {}" + request.bucket());
                 return false;
             } catch (S3Exception ex) {
-                if (!shouldRetry(ex.awsErrorDetails().errorCode(), retries++)) {
+                if (S3AuthUtils.isArnAssumedRoleExpiredToken(configuration, ex.awsErrorDetails().errorCode())) {
+                    LOGGER.debug(() -> "Expired AWS assume role session, will attempt to refresh the session");
+                    rebuildAwsS3Client(configuration);
+                    LOGGER.debug(() -> "Successfully refreshed AWS assume role session");
+                } else if (shouldRetry(ex.awsErrorDetails().errorCode(), retries++)) {
+                    LOGGER.debug(() -> "S3 retryable error: " + userData(ex.getMessage()));
+                } else {
                     throw new RuntimeDataException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex, getMessageOrToString(ex));
                 }
-                LOGGER.debug(() -> "S3 retryable error: " + LogRedactionUtil.userData(ex.getMessage()));
 
                 // Backoff for 1 sec for the first 2 retries, and 2 seconds from there onward
                 try {
@@ -149,4 +152,8 @@
             throw HyracksDataException.create(ex);
         }
     }
+
+    private void rebuildAwsS3Client(Map<String, String> configuration) throws HyracksDataException {
+        s3Client = buildAwsS3Client(configuration);
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
index ba0d0f4..7d678a9 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
@@ -24,6 +24,8 @@
 import java.util.List;
 import java.util.Map;
 
+import org.apache.asterix.common.api.IApplicationContext;
+import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.aws.s3.S3Utils;
 import org.apache.hadoop.mapred.JobConf;
@@ -34,8 +36,9 @@
             Collections.singletonList(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3);
 
     @Override
-    protected void configureJobConf(JobConf conf, Map<String, String> configuration) {
-        configureAwsS3HdfsJobConf(conf, configuration);
+    protected void configureJobConf(IApplicationContext appCtx, JobConf conf, Map<String, String> configuration)
+            throws CompilationException {
+        configureAwsS3HdfsJobConf(appCtx, conf, configuration);
     }
 
     @Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
index 790db8c..e3313f7 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
@@ -30,6 +30,7 @@
 import java.util.PriorityQueue;
 import java.util.Set;
 
+import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.ErrorCode;
@@ -84,8 +85,8 @@
         return locationConstraints;
     }
 
-    protected abstract void configureJobConf(JobConf conf, Map<String, String> configuration)
-            throws AlgebricksException;
+    protected abstract void configureJobConf(IApplicationContext appCtx, JobConf conf,
+            Map<String, String> configuration) throws AlgebricksException;
 
     protected abstract String getTablePath(Map<String, String> configuration) throws AlgebricksException;
 
@@ -95,7 +96,7 @@
             throws AlgebricksException, HyracksDataException {
         JobConf conf = new JobConf();
         ICcApplicationContext appCtx = (ICcApplicationContext) serviceCtx.getApplicationContext();
-        configureJobConf(conf, configuration);
+        configureJobConf(appCtx, conf, configuration);
         confFactory = new ConfFactory(conf);
         String tableMetadataPath = getTablePath(configuration);
         Engine engine = DefaultEngine.create(conf);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
index 7ddbab91..2d92e10 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
@@ -87,7 +87,9 @@
         try {
             JobConf conf = prepareHDFSConf(serviceCtx, configuration, filterEvaluatorFactory);
             int numberOfPartitions = getPartitionConstraint().getLocations().length;
-            configureAwsS3HdfsJobConf(conf, configuration, numberOfPartitions);
+
+            IApplicationContext appCtx = (IApplicationContext) serviceCtx.getApplicationContext();
+            configureAwsS3HdfsJobConf(appCtx, conf, configuration, numberOfPartitions);
             configureHdfsConf(conf, configuration);
         } catch (SdkException | SdkBaseException ex) {
             throw new RuntimeDataException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex, getMessageOrToString(ex));
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/delta/GCSDeltaReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/delta/GCSDeltaReaderFactory.java
index ee88569..db7673c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/delta/GCSDeltaReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/delta/GCSDeltaReaderFactory.java
@@ -22,6 +22,7 @@
 import java.util.List;
 import java.util.Map;
 
+import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.external.input.record.reader.aws.delta.DeltaReaderFactory;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.google.gcs.GCSUtils;
@@ -34,7 +35,8 @@
             Collections.singletonList(ExternalDataConstants.KEY_ADAPTER_NAME_GCS);
 
     @Override
-    protected void configureJobConf(JobConf conf, Map<String, String> configuration) throws AlgebricksException {
+    protected void configureJobConf(IApplicationContext appCtx, JobConf conf, Map<String, String> configuration)
+            throws AlgebricksException {
         GCSUtils.configureHdfsJobConf(conf, configuration);
     }
 
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 6767f93..82b5dad 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -519,12 +519,13 @@
         }
     }
 
-    public static void validateDeltaTableExists(Map<String, String> configuration) throws AlgebricksException {
+    public static void validateDeltaTableExists(IApplicationContext appCtx, Map<String, String> configuration)
+            throws AlgebricksException {
         String tableMetadataPath = null;
         JobConf conf = new JobConf();
         if (configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE)
                 .equals(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3)) {
-            configureAwsS3HdfsJobConf(conf, configuration);
+            configureAwsS3HdfsJobConf(appCtx, conf, configuration);
             tableMetadataPath = S3Utils.getPath(configuration);
         } else if (configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE)
                 .equals(ExternalDataConstants.KEY_ADAPTER_NAME_GCS)) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
index 45988e8..2ed16a1 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
@@ -28,20 +28,28 @@
 import static org.apache.asterix.external.util.ExternalDataUtils.validateDeltaTableProperties;
 import static org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude;
 import static org.apache.asterix.external.util.aws.s3.S3Constants.ACCESS_KEY_ID_FIELD_NAME;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.ERROR_EXPIRED_TOKEN;
 import static org.apache.asterix.external.util.aws.s3.S3Constants.ERROR_INTERNAL_ERROR;
 import static org.apache.asterix.external.util.aws.s3.S3Constants.ERROR_METHOD_NOT_IMPLEMENTED;
 import static org.apache.asterix.external.util.aws.s3.S3Constants.ERROR_SLOW_DOWN;
 import static org.apache.asterix.external.util.aws.s3.S3Constants.EXTERNAL_ID_FIELD_NAME;
 import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ACCESS_KEY_ID;
-import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ANONYMOUS_ACCESS;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ANONYMOUS;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ASSUMED_ROLE;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ASSUME_ROLE_ARN;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ASSUME_ROLE_EXTERNAL_ID;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ASSUME_ROLE_SESSION_DURATION;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ASSUME_ROLE_SESSION_NAME;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_CREDENTIALS_TO_ASSUME_ROLE_KEY;
 import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_CREDENTIAL_PROVIDER_KEY;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_INSTANCE_PROFILE;
 import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_PATH_STYLE_ACCESS;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_REGION;
 import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_S3_CONNECTION_POOL_SIZE;
-import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_S3_PROTOCOL;
 import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_SECRET_ACCESS_KEY;
 import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_SERVICE_END_POINT;
 import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_SESSION_TOKEN;
-import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_TEMP_ACCESS;
+import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_SIMPLE;
 import static org.apache.asterix.external.util.aws.s3.S3Constants.INSTANCE_PROFILE_FIELD_NAME;
 import static org.apache.asterix.external.util.aws.s3.S3Constants.REGION_FIELD_NAME;
 import static org.apache.asterix.external.util.aws.s3.S3Constants.ROLE_ARN_FIELD_NAME;
@@ -64,7 +72,6 @@
 import org.apache.asterix.common.external.IExternalCredentialsCacheUpdater;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataPrefix;
-import org.apache.asterix.external.util.HDFSUtils;
 import org.apache.hadoop.fs.s3a.Constants;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -94,6 +101,14 @@
 import software.amazon.awssdk.services.sts.model.Credentials;
 
 public class S3AuthUtils {
+    enum AuthenticationType {
+        ANONYMOUS,
+        ARN,
+        INSTANCE_PROFILE,
+        ACCESS_KEYS,
+        BAD_AUTHENTICATION
+    }
+
     private S3AuthUtils() {
         throw new AssertionError("do not instantiate");
     }
@@ -102,6 +117,10 @@
         return errorCode.equals(ERROR_INTERNAL_ERROR) || errorCode.equals(ERROR_SLOW_DOWN);
     }
 
+    public static boolean isArnAssumedRoleExpiredToken(Map<String, String> configuration, String errorCode) {
+        return ERROR_EXPIRED_TOKEN.equals(errorCode) && getAuthenticationType(configuration) == AuthenticationType.ARN;
+    }
+
     /**
      * Builds the S3 client using the provided configuration
      *
@@ -119,7 +138,6 @@
 
         S3ClientBuilder builder = S3Client.builder();
         builder.region(region);
-        builder.crossRegionAccessEnabled(true);
         builder.credentialsProvider(credentialsProvider);
 
         // Validate the service endpoint if present
@@ -142,28 +160,26 @@
 
     public static AwsCredentialsProvider buildCredentialsProvider(IApplicationContext appCtx,
             Map<String, String> configuration) throws CompilationException {
-        String arnRole = configuration.get(ROLE_ARN_FIELD_NAME);
-        String externalId = configuration.get(EXTERNAL_ID_FIELD_NAME);
-        String instanceProfile = configuration.get(INSTANCE_PROFILE_FIELD_NAME);
-        String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
-        String secretAccessKey = configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
-
-        if (noAuth(configuration)) {
-            return AnonymousCredentialsProvider.create();
-        } else if (arnRole != null) {
-            return getTrustAccountCredentials(appCtx, configuration);
-        } else if (instanceProfile != null) {
-            return getInstanceProfileCredentials(configuration);
-        } else if (accessKeyId != null || secretAccessKey != null) {
-            return getAccessKeyCredentials(configuration);
-        } else {
-            if (externalId != null) {
-                throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ROLE_ARN_FIELD_NAME,
-                        EXTERNAL_ID_FIELD_NAME);
-            } else {
-                throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ACCESS_KEY_ID_FIELD_NAME,
-                        SESSION_TOKEN_FIELD_NAME);
-            }
+        AuthenticationType authenticationType = getAuthenticationType(configuration);
+        switch (authenticationType) {
+            case ANONYMOUS:
+                return AnonymousCredentialsProvider.create();
+            case ARN:
+                return getTrustAccountCredentials(appCtx, configuration);
+            case INSTANCE_PROFILE:
+                return getInstanceProfileCredentials(configuration);
+            case ACCESS_KEYS:
+                return getAccessKeyCredentials(configuration);
+            default:
+                // missing required creds, report correct error message
+                String externalId = configuration.get(EXTERNAL_ID_FIELD_NAME);
+                if (externalId != null) {
+                    throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ROLE_ARN_FIELD_NAME,
+                            EXTERNAL_ID_FIELD_NAME);
+                } else {
+                    throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ACCESS_KEY_ID_FIELD_NAME,
+                            SESSION_TOKEN_FIELD_NAME);
+                }
         }
     }
 
@@ -177,6 +193,25 @@
         return selectedRegion.get();
     }
 
+    private static AuthenticationType getAuthenticationType(Map<String, String> configuration) {
+        String roleArn = configuration.get(ROLE_ARN_FIELD_NAME);
+        String instanceProfile = configuration.get(INSTANCE_PROFILE_FIELD_NAME);
+        String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
+        String secretAccessKey = configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
+
+        if (noAuth(configuration)) {
+            return AuthenticationType.ANONYMOUS;
+        } else if (roleArn != null) {
+            return AuthenticationType.ARN;
+        } else if (instanceProfile != null) {
+            return AuthenticationType.INSTANCE_PROFILE;
+        } else if (accessKeyId != null || secretAccessKey != null) {
+            return AuthenticationType.ACCESS_KEYS;
+        } else {
+            return AuthenticationType.BAD_AUTHENTICATION;
+        }
+    }
+
     private static boolean noAuth(Map<String, String> configuration) {
         return getNonNull(configuration, INSTANCE_PROFILE_FIELD_NAME, ROLE_ARN_FIELD_NAME, EXTERNAL_ID_FIELD_NAME,
                 ACCESS_KEY_ID_FIELD_NAME, SECRET_ACCESS_KEY_FIELD_NAME, SESSION_TOKEN_FIELD_NAME) == null;
@@ -327,64 +362,83 @@
         return null;
     }
 
+    public static void configureAwsS3HdfsJobConf(IApplicationContext appCtx, JobConf conf,
+            Map<String, String> configuration) throws CompilationException {
+        configureAwsS3HdfsJobConf(appCtx, conf, configuration, 0);
+    }
+
     /**
      * Builds the S3 client using the provided configuration
      *
+     * @param appCtx application context
      * @param configuration      properties
      * @param numberOfPartitions number of partitions in the cluster
      */
-    public static void configureAwsS3HdfsJobConf(JobConf conf, Map<String, String> configuration) {
-        configureAwsS3HdfsJobConf(conf, configuration, 0);
-    }
-
-    public static void configureAwsS3HdfsJobConf(JobConf conf, Map<String, String> configuration,
-            int numberOfPartitions) {
-        String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
-        String secretAccessKey = configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
-        String sessionToken = configuration.get(SESSION_TOKEN_FIELD_NAME);
+    public static void configureAwsS3HdfsJobConf(IApplicationContext appCtx, JobConf jobConf,
+            Map<String, String> configuration, int numberOfPartitions) throws CompilationException {
+        setHadoopCredentials(jobConf, configuration);
         String serviceEndpoint = configuration.get(SERVICE_END_POINT_FIELD_NAME);
-
-        //Disable caching S3 FileSystem
-        HDFSUtils.disableHadoopFileSystemCache(conf, HADOOP_S3_PROTOCOL);
-
-        /*
-         * Authentication Methods:
-         * 1- Anonymous: no accessKeyId and no secretAccessKey
-         * 2- Temporary: has to provide accessKeyId, secretAccessKey and sessionToken
-         * 3- Private: has to provide accessKeyId and secretAccessKey
-         */
-        if (accessKeyId == null) {
-            //Tells hadoop-aws it is an anonymous access
-            conf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_ANONYMOUS_ACCESS);
+        Region region = validateAndGetRegion(configuration.get(REGION_FIELD_NAME));
+        jobConf.set(HADOOP_REGION, region.toString());
+        if (serviceEndpoint != null) {
+            // Validation of the URL should be done at hadoop-aws level
+            jobConf.set(HADOOP_SERVICE_END_POINT, serviceEndpoint);
         } else {
-            conf.set(HADOOP_ACCESS_KEY_ID, accessKeyId);
-            conf.set(HADOOP_SECRET_ACCESS_KEY, secretAccessKey);
-            if (sessionToken != null) {
-                conf.set(HADOOP_SESSION_TOKEN, sessionToken);
-                //Tells hadoop-aws it is a temporary access
-                conf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_TEMP_ACCESS);
-            }
+            //Region is ignored and buckets could be found by the central endpoint
+            jobConf.set(HADOOP_SERVICE_END_POINT, Constants.CENTRAL_ENDPOINT);
         }
 
         /*
          * This is to allow S3 definition to have path-style form. Should always be true to match the current
          * way we access files in S3
          */
-        conf.set(HADOOP_PATH_STYLE_ACCESS, ExternalDataConstants.TRUE);
+        jobConf.set(HADOOP_PATH_STYLE_ACCESS, ExternalDataConstants.TRUE);
 
         /*
          * Set the size of S3 connection pool to be the number of partitions
          */
         if (numberOfPartitions != 0) {
-            conf.set(HADOOP_S3_CONNECTION_POOL_SIZE, String.valueOf(numberOfPartitions));
+            jobConf.set(HADOOP_S3_CONNECTION_POOL_SIZE, String.valueOf(numberOfPartitions));
         }
+    }
 
-        if (serviceEndpoint != null) {
-            // Validation of the URL should be done at hadoop-aws level
-            conf.set(HADOOP_SERVICE_END_POINT, serviceEndpoint);
-        } else {
-            //Region is ignored and buckets could be found by the central endpoint
-            conf.set(HADOOP_SERVICE_END_POINT, Constants.CENTRAL_ENDPOINT);
+    /**
+     * Sets the credentials provider type and the credentials to hadoop based on the provided configuration
+     *
+     * @param jobConf hadoop job config
+     * @param configuration external details configuration
+     */
+    private static void setHadoopCredentials(JobConf jobConf, Map<String, String> configuration) {
+        AuthenticationType authenticationType = getAuthenticationType(configuration);
+        switch (authenticationType) {
+            case ANONYMOUS:
+                jobConf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_ANONYMOUS);
+                break;
+            case ARN:
+                jobConf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_ASSUMED_ROLE);
+                jobConf.set(HADOOP_ASSUME_ROLE_ARN, configuration.get(ROLE_ARN_FIELD_NAME));
+                jobConf.set(HADOOP_ASSUME_ROLE_EXTERNAL_ID, configuration.get(EXTERNAL_ID_FIELD_NAME));
+                jobConf.set(HADOOP_ASSUME_ROLE_SESSION_NAME, "parquet-" + UUID.randomUUID());
+                jobConf.set(HADOOP_ASSUME_ROLE_SESSION_DURATION, "15m");
+
+                // TODO: this assumes basic keys always, also support if we use InstanceProfile to assume a role
+                jobConf.set(HADOOP_CREDENTIALS_TO_ASSUME_ROLE_KEY, HADOOP_SIMPLE);
+                jobConf.set(HADOOP_ACCESS_KEY_ID, configuration.get(ACCESS_KEY_ID_FIELD_NAME));
+                jobConf.set(HADOOP_SECRET_ACCESS_KEY, configuration.get(SECRET_ACCESS_KEY_FIELD_NAME));
+                break;
+            case INSTANCE_PROFILE:
+                jobConf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_INSTANCE_PROFILE);
+                break;
+            case ACCESS_KEYS:
+                jobConf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_SIMPLE);
+                jobConf.set(HADOOP_ACCESS_KEY_ID, configuration.get(ACCESS_KEY_ID_FIELD_NAME));
+                jobConf.set(HADOOP_SECRET_ACCESS_KEY, configuration.get(SECRET_ACCESS_KEY_FIELD_NAME));
+                if (configuration.get(SESSION_TOKEN_FIELD_NAME) != null) {
+                    jobConf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_SESSION_TOKEN);
+                    jobConf.set(HADOOP_SESSION_TOKEN, configuration.get(SESSION_TOKEN_FIELD_NAME));
+                }
+                break;
+            case BAD_AUTHENTICATION:
         }
     }
 
@@ -477,7 +531,7 @@
         }
         if (isDeltaTable(configuration)) {
             try {
-                validateDeltaTableExists(configuration);
+                validateDeltaTableExists(appCtx, configuration);
             } catch (AlgebricksException e) {
                 throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, e);
             }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java
index 126c868..95c5040 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java
@@ -23,6 +23,7 @@
         throw new AssertionError("do not instantiate");
     }
 
+    // Authentication specific parameters
     public static final String REGION_FIELD_NAME = "region";
     public static final String INSTANCE_PROFILE_FIELD_NAME = "instanceProfile";
     public static final String ACCESS_KEY_ID_FIELD_NAME = "accessKeyId";
@@ -36,11 +37,15 @@
     public static final String ERROR_INTERNAL_ERROR = "InternalError";
     public static final String ERROR_SLOW_DOWN = "SlowDown";
     public static final String ERROR_METHOD_NOT_IMPLEMENTED = "NotImplemented";
+    public static final String ERROR_EXPIRED_TOKEN = "ExpiredToken";
 
     /*
      * Hadoop-AWS
-     * AWS connectors for s3 and s3n are deprecated.
      */
+    public static final String HADOOP_ASSUME_ROLE_ARN = "fs.s3a.assumed.role.arn";
+    public static final String HADOOP_ASSUME_ROLE_EXTERNAL_ID = "fs.s3a.assumed.role.external.id";
+    public static final String HADOOP_ASSUME_ROLE_SESSION_NAME = "fs.s3a.assumed.role.session.name";
+    public static final String HADOOP_ASSUME_ROLE_SESSION_DURATION = "fs.s3a.assumed.role.session.duration";
     public static final String HADOOP_ACCESS_KEY_ID = "fs.s3a.access.key";
     public static final String HADOOP_SECRET_ACCESS_KEY = "fs.s3a.secret.key";
     public static final String HADOOP_SESSION_TOKEN = "fs.s3a.session.token";
@@ -58,10 +63,14 @@
     //S3 used protocol
     public static final String HADOOP_S3_PROTOCOL = "s3a";
 
-    //Hadoop credentials provider key
+    // hadoop credentials provider key
     public static final String HADOOP_CREDENTIAL_PROVIDER_KEY = "fs.s3a.aws.credentials.provider";
-    //Anonymous credential provider
-    public static final String HADOOP_ANONYMOUS_ACCESS = "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider";
-    //Temporary credential provider
-    public static final String HADOOP_TEMP_ACCESS = "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider";
+    public static final String HADOOP_CREDENTIALS_TO_ASSUME_ROLE_KEY = "fs.s3a.assumed.role.credentials.provider";
+
+    // credential providers
+    public static final String HADOOP_ANONYMOUS = "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider";
+    public static final String HADOOP_ASSUMED_ROLE = "org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider";
+    public static final String HADOOP_INSTANCE_PROFILE = "org.apache.hadoop.fs.s3a.auth.IAMInstanceCredentialsProvider";
+    public static final String HADOOP_SIMPLE = "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider";
+    public static final String HADOOP_TEMPORARY = "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider";
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
index bfd35fc..481b7ff 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
@@ -222,17 +222,17 @@
         }
     }
 
+    public static void configureHdfsJobConf(JobConf conf, Map<String, String> configuration)
+            throws AlgebricksException {
+        configureHdfsJobConf(conf, configuration, 0);
+    }
+
     /**
      * Builds the client using the provided configuration
      *
      * @param configuration      properties
      * @param numberOfPartitions number of partitions in the cluster
      */
-    public static void configureHdfsJobConf(JobConf conf, Map<String, String> configuration)
-            throws AlgebricksException {
-        configureHdfsJobConf(conf, configuration, 0);
-    }
-
     public static void configureHdfsJobConf(JobConf conf, Map<String, String> configuration, int numberOfPartitions)
             throws AlgebricksException {
         String jsonCredentials = configuration.get(JSON_CREDENTIALS_FIELD_NAME);
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/awss3/DeltaTopicPartitionDistributionTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/awss3/DeltaTopicPartitionDistributionTest.java
index 1bc8eb8..9ad93df 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/awss3/DeltaTopicPartitionDistributionTest.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/awss3/DeltaTopicPartitionDistributionTest.java
@@ -25,6 +25,7 @@
 import java.util.List;
 import java.util.Map;
 
+import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.external.input.record.reader.aws.delta.DeltaReaderFactory;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -48,7 +49,7 @@
         List<Row> scanFiles = createMockRows(rowCount);
         DeltaReaderFactory d = new DeltaReaderFactory() {
             @Override
-            protected void configureJobConf(JobConf conf, Map<String, String> configuration)
+            protected void configureJobConf(IApplicationContext appCtx, JobConf conf, Map<String, String> configuration)
                     throws AlgebricksException {
 
             }
@@ -75,7 +76,7 @@
         List<Row> scanFiles = createMockRows(rowCount);
         DeltaReaderFactory d = new DeltaReaderFactory() {
             @Override
-            protected void configureJobConf(JobConf conf, Map<String, String> configuration)
+            protected void configureJobConf(IApplicationContext appCtx, JobConf conf, Map<String, String> configuration)
                     throws AlgebricksException {
 
             }
@@ -102,7 +103,7 @@
         List<Row> scanFiles = createMockRows(rowCount);
         DeltaReaderFactory d = new DeltaReaderFactory() {
             @Override
-            protected void configureJobConf(JobConf conf, Map<String, String> configuration)
+            protected void configureJobConf(IApplicationContext appCtx, JobConf conf, Map<String, String> configuration)
                     throws AlgebricksException {
 
             }