[ATERIXDB-2975][EXT] Add support to Azure Data Lake external dataset

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Add suppor to azure data lake as external datasets

Change-Id: Idd6a414d6f412e541ad66cffe7a7d2e02abf3695
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/13643
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Hussain Towaileb <hussainht@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestConstants.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestConstants.java
index 875b0f6..f721aab 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestConstants.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestConstants.java
@@ -38,38 +38,38 @@
     // Azure blob storage constants and placeholders
     public static class Azure {
         // account name
-        public static final String ACCOUNT_NAME_PLACEHOLDER = "%azureblob-accountname%";
+        public static final String ACCOUNT_NAME_PLACEHOLDER = "%azure-accountname%";
         public static final String AZURITE_ACCOUNT_NAME_DEFAULT = "devstoreaccount1";
 
         // account key
-        public static final String ACCOUNT_KEY_PLACEHOLDER = "%azureblob-accountkey%";
+        public static final String ACCOUNT_KEY_PLACEHOLDER = "%azure-accountkey%";
         public static final String AZURITE_ACCOUNT_KEY_DEFAULT =
                 "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==";
 
         // SAS token: this is generated and assigned at runtime at the start of the test
-        public static final String SAS_TOKEN_PLACEHOLDER = "%azureblob-sas%";
+        public static final String SAS_TOKEN_PLACEHOLDER = "%azure-sas%";
         public static String sasToken = "";
 
         // blob endpoint
-        public static final String BLOB_ENDPOINT_PLACEHOLDER = "%azureblob-endpoint%";
+        public static final String BLOB_ENDPOINT_PLACEHOLDER = "%azure-endpoint%";
         public static final String BLOB_ENDPOINT_DEFAULT = "http://localhost:10000/" + AZURITE_ACCOUNT_NAME_DEFAULT;
 
-        public static final String MANAGED_IDENTITY_ID_PLACEHOLDER = "%azureblob-managedidentityid%";
+        public static final String MANAGED_IDENTITY_ID_PLACEHOLDER = "%azure-managedidentityid%";
         public static final String MANAGED_IDENTITY_ID_DEFAULT = "myManagedIdentityId";
 
-        public static final String CLIENT_ID_PLACEHOLDER = "%azureblob-clientid%";
+        public static final String CLIENT_ID_PLACEHOLDER = "%azure-clientid%";
         public static final String CLIENT_ID_DEFAULT = "myClientId";
 
-        public static final String CLIENT_SECRET_PLACEHOLDER = "%azureblob-clientsecret%";
+        public static final String CLIENT_SECRET_PLACEHOLDER = "%azure-clientsecret%";
         public static final String CLIENT_SECRET_DEFAULT = "myClientSecret";
 
-        public static final String CLIENT_CERTIFICATE_PLACEHOLDER = "%azureblob-clientcertificate%";
+        public static final String CLIENT_CERTIFICATE_PLACEHOLDER = "%azure-clientcertificate%";
         public static final String CLIENT_CERTIFICATE_DEFAULT = "myClientCertificate";
 
-        public static final String CLIENT_CERTIFICATE_PASSWORD_PLACEHOLDER = "%azureblob-clientcertificatepassword%";
+        public static final String CLIENT_CERTIFICATE_PASSWORD_PLACEHOLDER = "%azure-clientcertificatepassword%";
         public static final String CLIENT_CERTIFICATE_PASSWORD_DEFAULT = "myClientCertificatePassword";
 
-        public static final String TENANT_ID_PLACEHOLDER = "%azureblob-tenantid%";
+        public static final String TENANT_ID_PLACEHOLDER = "%azure-tenantid%";
         public static final String TENANT_ID_DEFAULT = "myTenantId";
 
         // azure template and default template
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/azure_blob_storage/auth-methods/anonymous-no-auth-public-access-allowed/test.000.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/azure_blob_storage/auth-methods/anonymous-no-auth-public-access-allowed/test.000.ddl.sqlpp
index 026696b..a665d05 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/azure_blob_storage/auth-methods/anonymous-no-auth-public-access-allowed/test.000.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/azure_blob_storage/auth-methods/anonymous-no-auth-public-access-allowed/test.000.ddl.sqlpp
@@ -27,7 +27,7 @@
 
 drop dataset test if exists;
 CREATE EXTERNAL DATASET test(test) USING AZUREBLOB (
-("endpoint"="%azureblob-endpoint%"),
+("endpoint"="%azure-endpoint%"),
 ("container"="public-access"),
 ("definition"="json-data/reviews/single-line/json"),
 ("format"="json")
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/azure_blob_storage/auth-methods/anonymous-no-auth-public-access-not-allowed/test.000.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/azure_blob_storage/auth-methods/anonymous-no-auth-public-access-not-allowed/test.000.ddl.sqlpp
index d8cbaa1..22385f2 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/azure_blob_storage/auth-methods/anonymous-no-auth-public-access-not-allowed/test.000.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/azure_blob_storage/auth-methods/anonymous-no-auth-public-access-not-allowed/test.000.ddl.sqlpp
@@ -28,7 +28,7 @@
 // bad case: no auth method is provided
 drop dataset test if exists;
 CREATE EXTERNAL DATASET test(test) USING AZUREBLOB (
-("endpoint"="%azureblob-endpoint%"),
+("endpoint"="%azure-endpoint%"),
 ("container"="playground"),
 ("definition"="json-data/reviews/single-line/json"),
 ("format"="json")
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/azure_blob_storage/auth-methods/invalid-auth-methods/test.000.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/azure_blob_storage/auth-methods/invalid-auth-methods/test.000.ddl.sqlpp
index f0225aa..a2f680f 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/azure_blob_storage/auth-methods/invalid-auth-methods/test.000.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/azure_blob_storage/auth-methods/invalid-auth-methods/test.000.ddl.sqlpp
@@ -28,10 +28,10 @@
 // bad case: more than one authentication method is provided at once
 drop dataset test if exists;
 CREATE EXTERNAL DATASET test(test) USING AZUREBLOB (
-("accountName"="%azureblob-accountname%"),
-("%azureblob-credentialsname-1%"="%azureblob-credentialsvalue-1%"),
-("%azureblob-credentialsname-2%"="%azureblob-credentialsvalue-2%"),
-("endpoint"="%azureblob-endpoint%"),
+("accountName"="%azure-accountname%"),
+("%azure-credentialsname-1%"="%azure-credentialsvalue-1%"),
+("%azure-credentialsname-2%"="%azure-credentialsvalue-2%"),
+("endpoint"="%azure-endpoint%"),
 ("container"="playground"),
 ("definition"="json-data/reviews/single-line/json"),
 ("format"="json")
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/azure_blob_storage/auth-methods/valid-auth-methods/test.000.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/azure_blob_storage/auth-methods/valid-auth-methods/test.000.ddl.sqlpp
index 01a2373..ca750fa 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/azure_blob_storage/auth-methods/valid-auth-methods/test.000.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/azure_blob_storage/auth-methods/valid-auth-methods/test.000.ddl.sqlpp
@@ -27,9 +27,9 @@
 
 drop dataset test if exists;
 CREATE EXTERNAL DATASET test(test) USING AZUREBLOB (
-("accountName"="%azureblob-accountname%"),
-("%azureblob-credentialsname%"="%azureblob-credentialsvalue%"),
-("endpoint"="%azureblob-endpoint%"),
+("accountName"="%azure-accountname%"),
+("%azure-credentialsname%"="%azure-credentialsvalue%"),
+("endpoint"="%azure-endpoint%"),
 ("container"="playground"),
 ("definition"="json-data/reviews/single-line/json"),
 ("format"="json")
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_azure_blob_storage.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_azure_blob_storage.xml
index 8010b57..8844368 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_azure_blob_storage.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_azure_blob_storage.xml
@@ -22,37 +22,37 @@
     <test-case FilePath="external-dataset/azure_blob_storage/auth-methods">
       <compilation-unit name="valid-auth-methods">
         <placeholder name="azureblob-credentialsname" value="accountKey" />
-        <placeholder name="azureblob-credentialsvalue" value="%azureblob-accountkey%" />
+        <placeholder name="azureblob-credentialsvalue" value="%azure-accountkey%" />
         <output-dir compare="Text">valid-auth-methods</output-dir>
       </compilation-unit>
     </test-case>
     <test-case FilePath="external-dataset/azure_blob_storage/auth-methods">
       <compilation-unit name="valid-auth-methods">
         <placeholder name="azureblob-credentialsname" value="sharedAccessSignature" />
-        <placeholder name="azureblob-credentialsvalue" value="%azureblob-sas%" />
+        <placeholder name="azureblob-credentialsvalue" value="%azure-sas%" />
         <output-dir compare="Text">valid-auth-methods</output-dir>
       </compilation-unit>
     </test-case>
     <test-case FilePath="external-dataset/azure_blob_storage/auth-methods">
       <compilation-unit name="valid-auth-methods">
         <placeholder name="azureblob-credentialsname" value="connectionString" />
-        <placeholder name="azureblob-credentialsvalue" value="%azureblob-connectionstringaccountkey%" />
+        <placeholder name="azureblob-credentialsvalue" value="%azure-connectionstringaccountkey%" />
         <output-dir compare="Text">valid-auth-methods</output-dir>
       </compilation-unit>
     </test-case>
     <test-case FilePath="external-dataset/azure_blob_storage/auth-methods">
       <compilation-unit name="valid-auth-methods">
         <placeholder name="azureblob-credentialsname" value="connectionString" />
-        <placeholder name="azureblob-credentialsvalue" value="%azureblob-connectionstringsas%" />
+        <placeholder name="azureblob-credentialsvalue" value="%azure-connectionstringsas%" />
         <output-dir compare="Text">valid-auth-methods</output-dir>
       </compilation-unit>
     </test-case>
     <test-case FilePath="external-dataset/azure_blob_storage/auth-methods">
       <compilation-unit name="invalid-auth-methods">
         <placeholder name="azureblob-credentialsname-1" value="accountKey" />
-        <placeholder name="azureblob-credentialsvalue-1" value="%azureblob-accountkey%" />
+        <placeholder name="azureblob-credentialsvalue-1" value="%azure-accountkey%" />
         <placeholder name="azureblob-credentialsname-2" value="connectionString" />
-        <placeholder name="azureblob-credentialsvalue-2" value="%azureblob-connectionstringaccountkey%" />
+        <placeholder name="azureblob-credentialsvalue-2" value="%azure-connectionstringaccountkey%" />
         <output-dir compare="Text">invalid-auth-methods</output-dir>
         <expected-error>ASX1138: Only a single authentication method is allowed: connectionString, accountName &amp; accountKey, or accountName &amp; sharedAccessSignature</expected-error>
       </compilation-unit>
@@ -60,9 +60,9 @@
     <test-case FilePath="external-dataset/azure_blob_storage/auth-methods">
       <compilation-unit name="invalid-auth-methods">
         <placeholder name="azureblob-credentialsname-1" value="sharedAccessSignature" />
-        <placeholder name="azureblob-credentialsvalue-1" value="%azureblob-sas%" />
+        <placeholder name="azureblob-credentialsvalue-1" value="%azure-sas%" />
         <placeholder name="azureblob-credentialsname-2" value="connectionString" />
-        <placeholder name="azureblob-credentialsvalue-2" value="%azureblob-connectionstringaccountkey%" />
+        <placeholder name="azureblob-credentialsvalue-2" value="%azure-connectionstringaccountkey%" />
         <output-dir compare="Text">invalid-auth-methods</output-dir>
         <expected-error>ASX1138: Only a single authentication method is allowed: connectionString, accountName &amp; accountKey, or accountName &amp; sharedAccessSignature</expected-error>
       </compilation-unit>
diff --git a/asterixdb/asterix-external-data/pom.xml b/asterixdb/asterix-external-data/pom.xml
index a6684d3..9a1f31c 100644
--- a/asterixdb/asterix-external-data/pom.xml
+++ b/asterixdb/asterix-external-data/pom.xml
@@ -461,6 +461,10 @@
     </dependency>
     <dependency>
       <groupId>com.azure</groupId>
+      <artifactId>azure-storage-file-datalake</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.azure</groupId>
       <artifactId>azure-identity</artifactId>
     </dependency>
     <dependency>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStream.java
similarity index 96%
rename from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStream.java
rename to asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStream.java
index c7bbcbf..b402f25 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStream.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.input.record.reader.azure;
+package org.apache.asterix.external.input.record.reader.azure.blob;
 
 import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
 
@@ -83,7 +83,7 @@
 
     private BlobServiceClient buildAzureClient(Map<String, String> configuration) throws HyracksDataException {
         try {
-            return ExternalDataUtils.Azure.buildAzureClient(configuration);
+            return ExternalDataUtils.Azure.buildAzureBlobClient(configuration);
         } catch (CompilationException ex) {
             throw HyracksDataException.create(ex);
         }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStreamFactory.java
similarity index 97%
rename from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStreamFactory.java
rename to asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStreamFactory.java
index b9e46a1..3a9ab1c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStreamFactory.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.input.record.reader.azure;
+package org.apache.asterix.external.input.record.reader.azure.blob;
 
 import java.util.Comparator;
 import java.util.List;
@@ -52,7 +52,7 @@
         // Ensure the validity of include/exclude
         ExternalDataUtils.validateIncludeExclude(configuration);
         IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration);
-        BlobServiceClient blobServiceClient = ExternalDataUtils.Azure.buildAzureClient(configuration);
+        BlobServiceClient blobServiceClient = ExternalDataUtils.Azure.buildAzureBlobClient(configuration);
         List<BlobItem> filesOnly = ExternalDataUtils.Azure.listBlobItem(blobServiceClient, configuration,
                 includeExcludeMatcher, warningCollector);
 
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobReaderFactory.java
similarity index 97%
rename from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobReaderFactory.java
rename to asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobReaderFactory.java
index 27e1b02..525ee63 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobReaderFactory.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.input.record.reader.azure;
+package org.apache.asterix.external.input.record.reader.azure.blob;
 
 import java.util.Collections;
 import java.util.List;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStream.java
similarity index 70%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStream.java
copy to asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStream.java
index c7bbcbf..f0c185e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStream.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.input.record.reader.azure;
+package org.apache.asterix.external.input.record.reader.azure.datalake;
 
 import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
 
@@ -34,18 +34,19 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.util.LogRedactionUtil;
 
-import com.azure.storage.blob.BlobClient;
-import com.azure.storage.blob.BlobContainerClient;
-import com.azure.storage.blob.BlobServiceClient;
 import com.azure.storage.blob.models.BlobErrorCode;
-import com.azure.storage.blob.models.BlobStorageException;
+import com.azure.storage.file.datalake.DataLakeFileClient;
+import com.azure.storage.file.datalake.DataLakeFileSystemClient;
+import com.azure.storage.file.datalake.DataLakeServiceClient;
+import com.azure.storage.file.datalake.models.DataLakeStorageException;
 
-public class AzureBlobInputStream extends AbstractExternalInputStream {
+public class AzureDataLakeInputStream extends AbstractExternalInputStream {
 
-    private final BlobServiceClient client;
+    private final DataLakeServiceClient client;
     private final String container;
 
-    public AzureBlobInputStream(Map<String, String> configuration, List<String> filePaths) throws HyracksDataException {
+    public AzureDataLakeInputStream(Map<String, String> configuration, List<String> filePaths)
+            throws HyracksDataException {
         super(configuration, filePaths);
         this.client = buildAzureClient(configuration);
         this.container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
@@ -54,19 +55,20 @@
     @Override
     protected boolean getInputStream() throws IOException {
         String fileName = filePaths.get(nextFileIndex);
-        BlobContainerClient blobContainerClient;
-        BlobClient blob;
+        DataLakeFileSystemClient fileSystemClient;
+        DataLakeFileClient fileClient;
         try {
-            blobContainerClient = client.getBlobContainerClient(container);
-            blob = blobContainerClient.getBlobClient(filePaths.get(nextFileIndex));
-            in = blob.openInputStream();
+            fileSystemClient = client.getFileSystemClient(container);
+            fileClient = fileSystemClient.getFileClient(filePaths.get(nextFileIndex));
+            in = fileClient.openInputStream().getInputStream();
 
             // Use gzip stream if needed
             String lowerCaseFileName = fileName.toLowerCase();
             if (lowerCaseFileName.endsWith(".gz") || lowerCaseFileName.endsWith(".gzip")) {
                 in = new GZIPInputStream(in, ExternalDataConstants.DEFAULT_BUFFER_SIZE);
             }
-        } catch (BlobStorageException ex) {
+        } catch (DataLakeStorageException ex) {
+            // TODO(htowaileb): need to find the right error for Azure Data Lake
             if (ex.getErrorCode().equals(BlobErrorCode.BLOB_NOT_FOUND)) {
                 LOGGER.debug(() -> "Key " + LogRedactionUtil.userData(filePaths.get(nextFileIndex)) + " was not "
                         + "found in container " + container);
@@ -81,9 +83,9 @@
         return true;
     }
 
-    private BlobServiceClient buildAzureClient(Map<String, String> configuration) throws HyracksDataException {
+    private DataLakeServiceClient buildAzureClient(Map<String, String> configuration) throws HyracksDataException {
         try {
-            return ExternalDataUtils.Azure.buildAzureClient(configuration);
+            return ExternalDataUtils.Azure.buildAzureDatalakeClient(configuration);
         } catch (CompilationException ex) {
             throw HyracksDataException.create(ex);
         }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStreamFactory.java
new file mode 100644
index 0000000..6bb103b
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStreamFactory.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.azure.datalake;
+
+import static org.apache.asterix.external.util.ExternalDataConstants.Azure.RECURSIVE_FIELD_NAME;
+import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.function.BiPredicate;
+import java.util.regex.Matcher;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.external.api.AsterixInputStream;
+import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.Warning;
+
+import com.azure.core.http.rest.PagedIterable;
+import com.azure.storage.file.datalake.DataLakeFileSystemClient;
+import com.azure.storage.file.datalake.DataLakeServiceClient;
+import com.azure.storage.file.datalake.models.ListPathsOptions;
+import com.azure.storage.file.datalake.models.PathItem;
+
+public class AzureDataLakeInputStreamFactory extends AbstractExternalInputStreamFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition) throws HyracksDataException {
+        return new AzureDataLakeInputStream(configuration, partitionWorkLoadsBasedOnSize.get(partition).getFilePaths());
+    }
+
+    @Override
+    public void configure(IServiceContext ctx, Map<String, String> configuration, IWarningCollector warningCollector)
+            throws AlgebricksException {
+        super.configure(ctx, configuration, warningCollector);
+
+        String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
+
+        List<PathItem> filesOnly = new ArrayList<>();
+
+        // Ensure the validity of include/exclude
+        ExternalDataUtils.validateIncludeExclude(configuration);
+
+        DataLakeServiceClient client = ExternalDataUtils.Azure.buildAzureDatalakeClient(configuration);
+        DataLakeFileSystemClient fileSystemClient;
+        try {
+            fileSystemClient = client.getFileSystemClient(container);
+
+            // Get all objects in a container and extract the paths to files
+            ListPathsOptions listOptions = new ListPathsOptions();
+            boolean recursive = Boolean.parseBoolean(configuration.get(RECURSIVE_FIELD_NAME));
+            listOptions.setRecursive(recursive);
+            listOptions.setPath(ExternalDataUtils.getPrefix(configuration, false));
+            PagedIterable<PathItem> pathItems = fileSystemClient.listPaths(listOptions, null);
+
+            // Collect the paths to files only
+            IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration);
+            collectAndFilterFiles(pathItems, includeExcludeMatcher.getPredicate(),
+                    includeExcludeMatcher.getMatchersList(), filesOnly);
+
+            // Warn if no files are returned
+            if (filesOnly.isEmpty() && warningCollector.shouldWarn()) {
+                Warning warning = Warning.of(null, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
+                warningCollector.warn(warning);
+            }
+
+            // Distribute work load amongst the partitions
+            distributeWorkLoad(filesOnly, getPartitionsCount());
+        } catch (Exception ex) {
+            throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
+        }
+    }
+
+    /**
+     * Collects and filters the files only, and excludes any folders
+     *
+     * @param items     storage items
+     * @param predicate predicate to test with for file filtration
+     * @param matchers  include/exclude matchers to test against
+     * @param filesOnly List containing the files only (excluding folders)
+     */
+    private void collectAndFilterFiles(Iterable<PathItem> items, BiPredicate<List<Matcher>, String> predicate,
+            List<Matcher> matchers, List<PathItem> filesOnly) {
+        for (PathItem item : items) {
+            String uri = item.getName();
+
+            // skip folders
+            if (uri.endsWith("/")) {
+                continue;
+            }
+
+            // No filter, add file
+            if (predicate.test(matchers, uri)) {
+                filesOnly.add(item);
+            }
+        }
+    }
+
+    /**
+     * To efficiently utilize the parallelism, work load will be distributed amongst the partitions based on the file
+     * size.
+     * <p>
+     * Example:
+     * File1 1mb, File2 300kb, File3 300kb, File4 300kb
+     * <p>
+     * Distribution:
+     * Partition1: [File1]
+     * Partition2: [File2, File3, File4]
+     *
+     * @param items           items
+     * @param partitionsCount Partitions count
+     */
+    private void distributeWorkLoad(List<PathItem> items, int partitionsCount) {
+        PriorityQueue<PartitionWorkLoadBasedOnSize> workloadQueue = new PriorityQueue<>(partitionsCount,
+                Comparator.comparingLong(PartitionWorkLoadBasedOnSize::getTotalSize));
+
+        // Prepare the workloads based on the number of partitions
+        for (int i = 0; i < partitionsCount; i++) {
+            workloadQueue.add(new PartitionWorkLoadBasedOnSize());
+        }
+
+        for (PathItem object : items) {
+            PartitionWorkLoadBasedOnSize workload = workloadQueue.poll();
+            workload.addFilePath(object.getName(), object.getContentLength());
+            workloadQueue.add(workload);
+        }
+        partitionWorkLoadsBasedOnSize.addAll(workloadQueue);
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeReaderFactory.java
similarity index 91%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobReaderFactory.java
copy to asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeReaderFactory.java
index 27e1b02..594bacf 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeReaderFactory.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.input.record.reader.azure;
+package org.apache.asterix.external.input.record.reader.azure.datalake;
 
 import java.util.Collections;
 import java.util.List;
@@ -31,12 +31,12 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
 
-public class AzureBlobReaderFactory extends StreamRecordReaderFactory {
+public class AzureDataLakeReaderFactory extends StreamRecordReaderFactory {
 
     private static final long serialVersionUID = 1L;
 
     private static final List<String> recordReaderNames =
-            Collections.singletonList(ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_BLOB);
+            Collections.singletonList(ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_DATA_LAKE);
 
     @Override
     public List<String> getRecordReaderNames() {
@@ -64,7 +64,7 @@
         this.configuration = configuration;
 
         // Stream factory
-        streamFactory = new AzureBlobInputStreamFactory();
+        streamFactory = new AzureDataLakeInputStreamFactory();
         streamFactory.configure(ctx, configuration, warningCollector);
 
         // record reader
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java
index b2b667a..0f9f484 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java
@@ -45,8 +45,7 @@
     @Override
     public void configure(IServiceContext serviceCtx, Map<String, String> configuration,
             IWarningCollector warningCollector) throws AlgebricksException, HyracksDataException {
-        //We need to the client to parse connectionString
-        BlobServiceClient blobServiceClient = ExternalDataUtils.Azure.buildAzureClient(configuration);
+        BlobServiceClient blobServiceClient = ExternalDataUtils.Azure.buildAzureBlobClient(configuration);
         //Get endpoint
         String endPoint = extractEndPoint(blobServiceClient.getAccountUrl());
         //Get path
@@ -117,7 +116,7 @@
     }
 
     private static void appendFileURI(StringBuilder builder, String container, String endPoint, BlobItem file) {
-        builder.append(ExternalDataConstants.AzureBlob.HADOOP_AZURE_BLOB_PROTOCOL);
+        builder.append(ExternalDataConstants.Azure.HADOOP_AZURE_BLOB_PROTOCOL);
         builder.append("://");
         builder.append(container);
         builder.append('@');
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 9589212..ec25997 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -142,6 +142,7 @@
     public static final String KEY_ADAPTER_NAME_HTTP = "http_adapter";
     public static final String KEY_ADAPTER_NAME_AWS_S3 = "S3";
     public static final String KEY_ADAPTER_NAME_AZURE_BLOB = "AZUREBLOB";
+    public static final String KEY_ADAPTER_NAME_AZURE_DATA_LAKE = "AZUREDATALAKE";
     public static final String KEY_ADAPTER_NAME_GCS = "GCS";
 
     /**
@@ -344,8 +345,12 @@
 
     }
 
-    public static class AzureBlob {
-        private AzureBlob() {
+    /*
+     * Note: Azure Blob and Azure Datalake use identical authentication, so they are using the same properties.
+     * If they end up diverging, then properties for AzureBlob and AzureDataLake need to be created.
+     */
+    public static class Azure {
+        private Azure() {
             throw new AssertionError("do not instantiate");
         }
 
@@ -363,6 +368,18 @@
         public static final String CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME = "clientCertificatePassword";
         public static final String ENDPOINT_FIELD_NAME = "endpoint";
 
+        // Specific Azure data lake property
+        /*
+        The behavior of Data Lake (true file system) is to read the files of the specified prefix only, example:
+        storage/myData/personal/file1.json
+        storage/myData/personal/file2.json
+        storage/myData/file3.json
+        If the prefix used is "myData", then only the file file3.json is read. However, if the property "recursive"
+        is set to "true" when creating the external dataset, then it goes recursively overall the paths, and the result
+        is file1.json, file2.json and file3.json.
+         */
+        public static final String RECURSIVE_FIELD_NAME = "recursive";
+
         /*
          * Hadoop-Azure
          */
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 09cf433..6be2896 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -38,19 +38,19 @@
 import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.HADOOP_SESSION_TOKEN;
 import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.HADOOP_TEMP_ACCESS;
 import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.SECRET_ACCESS_KEY_FIELD_NAME;
-import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.ACCOUNT_KEY_FIELD_NAME;
-import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.ACCOUNT_NAME_FIELD_NAME;
-import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.CLIENT_CERTIFICATE_FIELD_NAME;
-import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME;
-import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.CLIENT_ID_FIELD_NAME;
-import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.CLIENT_SECRET_FIELD_NAME;
-import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.ENDPOINT_FIELD_NAME;
-import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.HADOOP_AZURE_BLOB_PROTOCOL;
-import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.HADOOP_AZURE_FS_ACCOUNT_KEY;
-import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.HADOOP_AZURE_FS_SAS;
-import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.MANAGED_IDENTITY_ID_FIELD_NAME;
-import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.SHARED_ACCESS_SIGNATURE_FIELD_NAME;
-import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.TENANT_ID_FIELD_NAME;
+import static org.apache.asterix.external.util.ExternalDataConstants.Azure.ACCOUNT_KEY_FIELD_NAME;
+import static org.apache.asterix.external.util.ExternalDataConstants.Azure.ACCOUNT_NAME_FIELD_NAME;
+import static org.apache.asterix.external.util.ExternalDataConstants.Azure.CLIENT_CERTIFICATE_FIELD_NAME;
+import static org.apache.asterix.external.util.ExternalDataConstants.Azure.CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME;
+import static org.apache.asterix.external.util.ExternalDataConstants.Azure.CLIENT_ID_FIELD_NAME;
+import static org.apache.asterix.external.util.ExternalDataConstants.Azure.CLIENT_SECRET_FIELD_NAME;
+import static org.apache.asterix.external.util.ExternalDataConstants.Azure.ENDPOINT_FIELD_NAME;
+import static org.apache.asterix.external.util.ExternalDataConstants.Azure.HADOOP_AZURE_BLOB_PROTOCOL;
+import static org.apache.asterix.external.util.ExternalDataConstants.Azure.HADOOP_AZURE_FS_ACCOUNT_KEY;
+import static org.apache.asterix.external.util.ExternalDataConstants.Azure.HADOOP_AZURE_FS_SAS;
+import static org.apache.asterix.external.util.ExternalDataConstants.Azure.MANAGED_IDENTITY_ID_FIELD_NAME;
+import static org.apache.asterix.external.util.ExternalDataConstants.Azure.SHARED_ACCESS_SIGNATURE_FIELD_NAME;
+import static org.apache.asterix.external.util.ExternalDataConstants.Azure.TENANT_ID_FIELD_NAME;
 import static org.apache.asterix.external.util.ExternalDataConstants.GCS.JSON_CREDENTIALS_FIELD_NAME;
 import static org.apache.asterix.external.util.ExternalDataConstants.KEY_ADAPTER_NAME_GCS;
 import static org.apache.asterix.external.util.ExternalDataConstants.KEY_DELIMITER;
@@ -133,6 +133,11 @@
 import com.azure.storage.blob.models.BlobItem;
 import com.azure.storage.blob.models.ListBlobsOptions;
 import com.azure.storage.common.StorageSharedKeyCredential;
+import com.azure.storage.file.datalake.DataLakeFileSystemClient;
+import com.azure.storage.file.datalake.DataLakeServiceClient;
+import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;
+import com.azure.storage.file.datalake.models.ListPathsOptions;
+import com.azure.storage.file.datalake.models.PathItem;
 import com.google.api.gax.paging.Page;
 import com.google.auth.oauth2.ServiceAccountCredentials;
 import com.google.cloud.storage.Blob;
@@ -594,7 +599,10 @@
                 AwsS3.validateProperties(configuration, srcLoc, collector);
                 break;
             case ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_BLOB:
-                Azure.validateProperties(configuration, srcLoc, collector);
+                Azure.validateAzureBlobProperties(configuration, srcLoc, collector);
+                break;
+            case ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_DATA_LAKE:
+                Azure.validateAzureDataLakeProperties(configuration, srcLoc, collector);
                 break;
             case KEY_ADAPTER_NAME_GCS:
                 GCS.validateProperties(configuration, srcLoc, collector);
@@ -715,9 +723,13 @@
      * @param configuration configuration
      */
     public static String getPrefix(Map<String, String> configuration) {
+        return getPrefix(configuration, true);
+    }
+
+    public static String getPrefix(Map<String, String> configuration, boolean appendSlash) {
         String definition = configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME);
         if (definition != null && !definition.isEmpty()) {
-            return definition + (!definition.endsWith("/") ? "/" : "");
+            return appendSlash ? definition + (!definition.endsWith("/") ? "/" : "") : definition;
         }
         return "";
     }
@@ -1229,6 +1241,10 @@
         }
     }
 
+    /*
+     * Note: Azure Blob and Azure Datalake use identical authentication, so they are using the same properties.
+     * If they end up diverging, then properties for AzureBlob and AzureDataLake need to be created.
+     */
     public static class Azure {
         private Azure() {
             throw new AssertionError("do not instantiate");
@@ -1240,7 +1256,7 @@
          * @param configuration properties
          * @return client
          */
-        public static BlobServiceClient buildAzureClient(Map<String, String> configuration)
+        public static BlobServiceClient buildAzureBlobClient(Map<String, String> configuration)
                 throws CompilationException {
             String managedIdentityId = configuration.get(MANAGED_IDENTITY_ID_FIELD_NAME);
             String accountName = configuration.get(ACCOUNT_NAME_FIELD_NAME);
@@ -1361,6 +1377,157 @@
                             pemCertificate.invoke(certificate, certificateContent, clientCertificatePassword);
                         }
                     } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException ex) {
+                        throw new CompilationException(EXTERNAL_SOURCE_ERROR, ex.getMessage());
+                    }
+                    builder.credential(certificate.build());
+                }
+            }
+
+            // If client id is not present, ensure client secret, certificate, tenant id and client certificate
+            // password are not present
+            if (clientId == null) {
+                Optional<String> provided = getFirstNotNull(configuration, CLIENT_SECRET_FIELD_NAME,
+                        CLIENT_CERTIFICATE_FIELD_NAME, CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, TENANT_ID_FIELD_NAME);
+                if (provided.isPresent()) {
+                    throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, provided.get(),
+                            SHARED_ACCESS_SIGNATURE_FIELD_NAME);
+                }
+            }
+
+            try {
+                return builder.buildClient();
+            } catch (Exception ex) {
+                throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
+            }
+        }
+
+        /**
+         * Builds the Azure data lake storage account using the provided configuration
+         *
+         * @param configuration properties
+         * @return client
+         */
+        public static DataLakeServiceClient buildAzureDatalakeClient(Map<String, String> configuration)
+                throws CompilationException {
+            String managedIdentityId = configuration.get(MANAGED_IDENTITY_ID_FIELD_NAME);
+            String accountName = configuration.get(ACCOUNT_NAME_FIELD_NAME);
+            String accountKey = configuration.get(ACCOUNT_KEY_FIELD_NAME);
+            String sharedAccessSignature = configuration.get(SHARED_ACCESS_SIGNATURE_FIELD_NAME);
+            String tenantId = configuration.get(TENANT_ID_FIELD_NAME);
+            String clientId = configuration.get(CLIENT_ID_FIELD_NAME);
+            String clientSecret = configuration.get(CLIENT_SECRET_FIELD_NAME);
+            String clientCertificate = configuration.get(CLIENT_CERTIFICATE_FIELD_NAME);
+            String clientCertificatePassword = configuration.get(CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME);
+            String endpoint = configuration.get(ENDPOINT_FIELD_NAME);
+
+            // Client builder
+            DataLakeServiceClientBuilder builder = new DataLakeServiceClientBuilder();
+
+            // Endpoint is required
+            if (endpoint == null) {
+                throw new CompilationException(PARAMETERS_REQUIRED, ENDPOINT_FIELD_NAME);
+            }
+            builder.endpoint(endpoint);
+
+            // Shared Key
+            if (accountName != null || accountKey != null) {
+                if (accountName == null) {
+                    throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ACCOUNT_NAME_FIELD_NAME,
+                            ACCOUNT_KEY_FIELD_NAME);
+                }
+
+                if (accountKey == null) {
+                    throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ACCOUNT_KEY_FIELD_NAME,
+                            ACCOUNT_NAME_FIELD_NAME);
+                }
+
+                Optional<String> provided = getFirstNotNull(configuration, SHARED_ACCESS_SIGNATURE_FIELD_NAME,
+                        MANAGED_IDENTITY_ID_FIELD_NAME, CLIENT_ID_FIELD_NAME, CLIENT_SECRET_FIELD_NAME,
+                        CLIENT_CERTIFICATE_FIELD_NAME, CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, TENANT_ID_FIELD_NAME);
+                if (provided.isPresent()) {
+                    throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, provided.get(),
+                            ACCOUNT_KEY_FIELD_NAME);
+                }
+                StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName, accountKey);
+                builder.credential(credential);
+            }
+
+            // Shared access signature
+            if (sharedAccessSignature != null) {
+                Optional<String> provided = getFirstNotNull(configuration, MANAGED_IDENTITY_ID_FIELD_NAME,
+                        CLIENT_ID_FIELD_NAME, CLIENT_SECRET_FIELD_NAME, CLIENT_CERTIFICATE_FIELD_NAME,
+                        CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, TENANT_ID_FIELD_NAME);
+                if (provided.isPresent()) {
+                    throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, provided.get(),
+                            SHARED_ACCESS_SIGNATURE_FIELD_NAME);
+                }
+                AzureSasCredential credential = new AzureSasCredential(sharedAccessSignature);
+                builder.credential(credential);
+            }
+
+            // Managed Identity auth
+            if (managedIdentityId != null) {
+                Optional<String> provided = getFirstNotNull(configuration, CLIENT_ID_FIELD_NAME,
+                        CLIENT_SECRET_FIELD_NAME, CLIENT_CERTIFICATE_FIELD_NAME, CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME,
+                        TENANT_ID_FIELD_NAME);
+                if (provided.isPresent()) {
+                    throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, provided.get(),
+                            MANAGED_IDENTITY_ID_FIELD_NAME);
+                }
+                builder.credential(new ManagedIdentityCredentialBuilder().clientId(managedIdentityId).build());
+            }
+
+            // Client secret & certificate auth
+            if (clientId != null) {
+                // Both (or neither) client secret and client secret were provided, only one is allowed
+                if ((clientSecret == null) == (clientCertificate == null)) {
+                    if (clientSecret != null) {
+                        throw new CompilationException(PARAMETERS_NOT_ALLOWED_AT_SAME_TIME, CLIENT_SECRET_FIELD_NAME,
+                                CLIENT_CERTIFICATE_FIELD_NAME);
+                    } else {
+                        throw new CompilationException(REQUIRED_PARAM_OR_PARAM_IF_PARAM_IS_PRESENT,
+                                CLIENT_SECRET_FIELD_NAME, CLIENT_CERTIFICATE_FIELD_NAME, CLIENT_ID_FIELD_NAME);
+                    }
+                }
+
+                // Tenant ID is required
+                if (tenantId == null) {
+                    throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, TENANT_ID_FIELD_NAME,
+                            CLIENT_ID_FIELD_NAME);
+                }
+
+                // Client certificate password is not allowed if client secret is used
+                if (clientCertificatePassword != null && clientSecret != null) {
+                    throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT,
+                            CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, CLIENT_SECRET_FIELD_NAME);
+                }
+
+                // Use AD authentication
+                if (clientSecret != null) {
+                    ClientSecretCredentialBuilder secret = new ClientSecretCredentialBuilder();
+                    secret.clientId(clientId);
+                    secret.tenantId(tenantId);
+                    secret.clientSecret(clientSecret);
+                    builder.credential(secret.build());
+                } else {
+                    // Certificate
+                    ClientCertificateCredentialBuilder certificate = new ClientCertificateCredentialBuilder();
+                    certificate.clientId(clientId);
+                    certificate.tenantId(tenantId);
+                    try {
+                        InputStream certificateContent = new ByteArrayInputStream(clientCertificate.getBytes(UTF_8));
+                        if (clientCertificatePassword == null) {
+                            Method pemCertificate = ClientCertificateCredentialBuilder.class
+                                    .getDeclaredMethod("pemCertificate", InputStream.class);
+                            pemCertificate.setAccessible(true);
+                            pemCertificate.invoke(certificate, certificateContent);
+                        } else {
+                            Method pemCertificate = ClientCertificateCredentialBuilder.class
+                                    .getDeclaredMethod("pfxCertificate", InputStream.class, String.class);
+                            pemCertificate.setAccessible(true);
+                            pemCertificate.invoke(certificate, certificateContent, clientCertificatePassword);
+                        }
+                    } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException ex) {
                         throw new CompilationException(EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
                     }
                     builder.credential(certificate.build());
@@ -1451,7 +1618,7 @@
          * @param configuration properties
          * @throws CompilationException Compilation exception
          */
-        public static void validateProperties(Map<String, String> configuration, SourceLocation srcLoc,
+        public static void validateAzureBlobProperties(Map<String, String> configuration, SourceLocation srcLoc,
                 IWarningCollector collector) throws CompilationException {
 
             // check if the format property is present
@@ -1465,7 +1632,7 @@
             BlobServiceClient blobServiceClient;
             try {
                 String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
-                blobServiceClient = buildAzureClient(configuration);
+                blobServiceClient = buildAzureBlobClient(configuration);
                 BlobContainerClient blobContainer = blobServiceClient.getBlobContainerClient(container);
 
                 // Get all objects in a container and extract the paths to files
@@ -1485,6 +1652,45 @@
         }
 
         /**
+         * Validate external dataset properties
+         *
+         * @param configuration properties
+         * @throws CompilationException Compilation exception
+         */
+        public static void validateAzureDataLakeProperties(Map<String, String> configuration, SourceLocation srcLoc,
+                IWarningCollector collector) throws CompilationException {
+
+            // check if the format property is present
+            if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
+                throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, srcLoc, ExternalDataConstants.KEY_FORMAT);
+            }
+
+            validateIncludeExclude(configuration);
+
+            // Check if the bucket is present
+            DataLakeServiceClient dataLakeServiceClient;
+            try {
+                String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
+                dataLakeServiceClient = buildAzureDatalakeClient(configuration);
+                DataLakeFileSystemClient fileSystemClient = dataLakeServiceClient.getFileSystemClient(container);
+
+                // Get all objects in a container and extract the paths to files
+                ListPathsOptions listPathsOptions = new ListPathsOptions();
+                listPathsOptions.setPath(getPrefix(configuration));
+                Iterable<PathItem> blobItems = fileSystemClient.listPaths(listPathsOptions, null);
+
+                if (!blobItems.iterator().hasNext() && collector.shouldWarn()) {
+                    Warning warning = Warning.of(srcLoc, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
+                    collector.warn(warning);
+                }
+            } catch (CompilationException ex) {
+                throw ex;
+            } catch (Exception ex) {
+                throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
+            }
+        }
+
+        /**
          * Builds the Azure Blob storage client using the provided configuration
          *
          * @param configuration properties
@@ -1492,10 +1698,8 @@
          */
         public static void configureAzureHdfsJobConf(JobConf conf, Map<String, String> configuration, String endPoint) {
             String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
-            String accountName = configuration.get(ACCOUNT_NAME_FIELD_NAME);
             String accountKey = configuration.get(ACCOUNT_KEY_FIELD_NAME);
             String sharedAccessSignature = configuration.get(SHARED_ACCESS_SIGNATURE_FIELD_NAME);
-            String endpoint = configuration.get(ENDPOINT_FIELD_NAME);
 
             //Disable caching S3 FileSystem
             HDFSUtils.disableHadoopFileSystemCache(conf, HADOOP_AZURE_BLOB_PROTOCOL);
diff --git a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
index 7d3f901..d551c5b 100644
--- a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
+++ b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
@@ -21,7 +21,8 @@
 org.apache.asterix.external.input.record.reader.stream.StreamRecordReaderFactory
 org.apache.asterix.external.input.record.reader.http.HttpServerRecordReaderFactory
 org.apache.asterix.external.input.record.reader.aws.AwsS3ReaderFactory
-org.apache.asterix.external.input.record.reader.azure.AzureBlobReaderFactory
 org.apache.asterix.external.input.record.reader.aws.parquet.AwsS3ParquetReaderFactory
 org.apache.asterix.external.input.record.reader.gcs.GCSReaderFactory
-org.apache.asterix.external.input.record.reader.azure.parquet.AzureBlobParquetReaderFactory
+org.apache.asterix.external.input.record.reader.azure.blob.AzureBlobReaderFactory
+org.apache.asterix.external.input.record.reader.azure.datalake.AzureDataLakeReaderFactory
+org.apache.asterix.external.input.record.reader.azure.parquet.AzureBlobParquetReaderFactory
\ No newline at end of file
diff --git a/asterixdb/asterix-server/pom.xml b/asterixdb/asterix-server/pom.xml
index d33004f..1d08361 100644
--- a/asterixdb/asterix-server/pom.xml
+++ b/asterixdb/asterix-server/pom.xml
@@ -226,6 +226,7 @@
                 <gav>com.azure:azure-storage-blob:12.12.0</gav>
                 <gav>com.azure:azure-storage-common:12.12.0</gav>
                 <gav>com.azure:azure-storage-internal-avro:12.0.5</gav>
+                <gav>com.azure:azure-storage-file-datalake:12.7.0</gav>
               </gavs>
               <noticeUrl>https://raw.githubusercontent.com/Azure/azure-sdk-for-java/master/NOTICE.txt</noticeUrl>
               <url>https://raw.githubusercontent.com/Azure/azure-sdk-for-java/master/LICENSE.txt</url>
diff --git a/asterixdb/pom.xml b/asterixdb/pom.xml
index b93138f..7d449fa 100644
--- a/asterixdb/pom.xml
+++ b/asterixdb/pom.xml
@@ -90,7 +90,8 @@
     <awsjavasdk.version>2.10.83</awsjavasdk.version>
     <parquet.version>1.12.0</parquet.version>
     <hadoop-awsjavasdk.version>1.12.1</hadoop-awsjavasdk.version>
-    <azurejavasdk.version>12.12.0</azurejavasdk.version>
+    <azureblobjavasdk.version>12.12.0</azureblobjavasdk.version>
+    <azuredatalakejavasdk.version>12.7.0</azuredatalakejavasdk.version>
     <gcsjavasdk.version>1.114.0</gcsjavasdk.version>
     <hadoop-azuresdk.version>8.6.6</hadoop-azuresdk.version>
 
@@ -1583,7 +1584,7 @@
       <dependency>
         <groupId>com.azure</groupId>
         <artifactId>azure-storage-blob</artifactId>
-        <version>${azurejavasdk.version}</version>
+        <version>${azureblobjavasdk.version}</version>
         <exclusions>
           <exclusion>
             <groupId>io.netty</groupId>
@@ -1654,7 +1655,7 @@
       <dependency>
         <groupId>com.azure</groupId>
         <artifactId>azure-storage-common</artifactId>
-        <version>${azurejavasdk.version}</version>
+        <version>${azureblobjavasdk.version}</version>
         <exclusions>
           <exclusion>
             <groupId>io.netty</groupId>
@@ -1730,6 +1731,13 @@
         <version>${gcsjavasdk.version}</version>
       </dependency>
       <!-- Google Cloud Storage end -->
+      <!-- Azure Data Lake start -->
+      <dependency>
+        <groupId>com.azure</groupId>
+        <artifactId>azure-storage-file-datalake</artifactId>
+        <version>${azuredatalakejavasdk.version}</version>
+      </dependency>
+      <!-- Azure Data Lake end -->
       <dependency>
         <groupId>org.mindrot</groupId>
         <artifactId>jbcrypt</artifactId>
diff --git a/asterixdb/src/main/appended-resources/supplemental-models.xml b/asterixdb/src/main/appended-resources/supplemental-models.xml
index 93fe3a0..1f1d4fc 100644
--- a/asterixdb/src/main/appended-resources/supplemental-models.xml
+++ b/asterixdb/src/main/appended-resources/supplemental-models.xml
@@ -652,6 +652,20 @@
     </project>
   </supplement>
 
+  <!-- com.azure does not contain any embedded LICENSE or NOTICE file -->
+  <!-- see https://github.com/Azure/azure-sdk-for-java -->
+  <supplement>
+    <project>
+      <groupId>com.azure</groupId>
+      <artifactId>azure-storage-file-datalake</artifactId>
+      <properties>
+        <license.ignoreMissingEmbeddedLicense>12.7.0</license.ignoreMissingEmbeddedLicense>
+        <license.ignoreMissingEmbeddedNotice>12.7.0</license.ignoreMissingEmbeddedNotice>
+        <license.ignoreLicenseOverride>12.7.0</license.ignoreLicenseOverride>
+      </properties>
+    </project>
+  </supplement>
+
   <supplement>
     <project>
       <groupId>com.azure</groupId>