ASTERIXDB-3379 Provide Azure Blob Storage support for the Engine

- user model changes: no
- storage format changes: no
- interface changes: yes

Ext-ref: MB-63021

Details:
- In order to provide compute/storage separation, the engine is to support reading and writing its data from Azure Blob Storage.

Change-Id: Ie9deece8ba98a5b8be7c817baea286cd50d88e68
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18235
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Tested-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml
index 63bb7da..ecd91d5 100644
--- a/asterixdb/asterix-app/pom.xml
+++ b/asterixdb/asterix-app/pom.xml
@@ -513,7 +513,7 @@
     <profile>
       <id>asterix-gerrit-asterix-app</id>
       <properties>
-        <test.excludes>**/CloudStorageTest.java,**/CloudStorageGCSTest.java,**/SqlppExecutionWithCancellationTest.java,**/DmlTest.java,**/RepeatedTest.java,**/SqlppExecutionTest.java,**/SqlppExecutionColumnTest.java,**/*StaticPartitioning*Test.java,**/*Ssl*Test.java,**/Podman*.java,**/*AnalyzedExecutionTest.java,**/SqlppProfiledExecutionTest.java,**/CloudPythonTest.java</test.excludes>
+        <test.excludes>**/CloudStorageTest.java,**/CloudStorageGCSTest.java,**/SqlppExecutionWithCancellationTest.java,**/DmlTest.java,**/RepeatedTest.java,**/SqlppExecutionTest.java,**/SqlppExecutionColumnTest.java,**/*StaticPartitioning*Test.java,**/*Ssl*Test.java,**/Podman*.java,**/*AnalyzedExecutionTest.java,**/SqlppProfiledExecutionTest.java,**/CloudPythonTest.java,**/CloudStorageAzTest.java</test.excludes>
         <itest.excludes>**/*.java</itest.excludes>
       </properties>
       <build>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageAzTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageAzTest.java
new file mode 100644
index 0000000..aec3898
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageAzTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.cloud_storage;
+
+import static org.apache.asterix.api.common.LocalCloudUtil.CLOUD_STORAGE_BUCKET;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.test.common.TestExecutor;
+import org.apache.asterix.test.runtime.LangExecutionUtil;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.asterix.testframework.xml.Description;
+import org.apache.asterix.testframework.xml.TestCase;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.MethodSorters;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.BlobServiceClientBuilder;
+import com.azure.storage.common.StorageSharedKeyCredential;
+
+/**
+ * Run tests in cloud deployment environment
+ */
+@RunWith(Parameterized.class)
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public class CloudStorageAzTest {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+
+    private final TestCaseContext tcCtx;
+    private static final String SUITE_TESTS = "testsuite_cloud_storage.xml";
+    private static final String ONLY_TESTS = "testsuite_cloud_storage_only.xml";
+    private static final String CONFIG_FILE_NAME = "src/test/resources/cc-cloud-storage-azblob.conf";
+    private static final String DELTA_RESULT_PATH = "results_cloud";
+    private static final String EXCLUDED_TESTS = "MP";
+
+    public CloudStorageAzTest(TestCaseContext tcCtx) {
+        this.tcCtx = tcCtx;
+    }
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        String endpointString = "http://127.0.0.1:15055/devstoreaccount1/" + CLOUD_STORAGE_BUCKET;
+        final String accKey =
+                "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==";
+        final String accName = "devstoreaccount1";
+
+        BlobServiceClient blobServiceClient = new BlobServiceClientBuilder().endpoint(endpointString)
+                .credential(new StorageSharedKeyCredential(accName, accKey)).buildClient();
+
+        cleanup(blobServiceClient);
+        initialize(blobServiceClient);
+
+        //storage.close(); WHAT IS THIS FOR IN GCS
+
+        TestExecutor testExecutor = new TestExecutor(DELTA_RESULT_PATH);
+        testExecutor.executorId = "cloud";
+        testExecutor.stripSubstring = "//DB:";
+        LangExecutionUtil.setUp(CONFIG_FILE_NAME, testExecutor);
+        System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, CONFIG_FILE_NAME);
+    }
+
+    @AfterClass
+    public static void tearDown() throws Exception {
+        LangExecutionUtil.tearDown();
+    }
+
+    @Parameters(name = "CloudStorageAzBlobTest {index}: {0}")
+    public static Collection<Object[]> tests() throws Exception {
+        return LangExecutionUtil.tests(ONLY_TESTS, SUITE_TESTS);
+    }
+
+    @Test
+    public void test() throws Exception {
+        List<TestCase.CompilationUnit> cu = tcCtx.getTestCase().getCompilationUnit();
+        Assume.assumeTrue(cu.size() > 1 || !EXCLUDED_TESTS.equals(getText(cu.get(0).getDescription())));
+        LangExecutionUtil.test(tcCtx);
+    }
+
+    private static String getText(Description description) {
+        return description == null ? "" : description.getValue();
+    }
+
+    private static void cleanup(BlobServiceClient blobServiceClient) {
+        blobServiceClient.deleteBlobContainerIfExists(CLOUD_STORAGE_BUCKET);
+    }
+
+    private static void initialize(BlobServiceClient blobServiceClient) {
+        blobServiceClient.createBlobContainerIfNotExists(CLOUD_STORAGE_BUCKET);
+    }
+}
diff --git a/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-azblob.conf b/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-azblob.conf
new file mode 100644
index 0000000..d11cb5e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-azblob.conf
@@ -0,0 +1,72 @@
+; Licensed to the Apache Software Foundation (ASF) under one
+; or more contributor license agreements.  See the NOTICE file
+; distributed with this work for additional information
+; regarding copyright ownership.  The ASF licenses this file
+; to you under the Apache License, Version 2.0 (the
+; "License"); you may not use this file except in compliance
+; with the License.  You may obtain a copy of the License at
+;
+;   http://www.apache.org/licenses/LICENSE-2.0
+;
+; Unless required by applicable law or agreed to in writing,
+; software distributed under the License is distributed on an
+; "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+; KIND, either express or implied.  See the License for the
+; specific language governing permissions and limitations
+; under the License.
+
+[nc/asterix_nc1]
+txn.log.dir=target/tmp/asterix_nc1/txnlog
+core.dump.dir=target/tmp/asterix_nc1/coredump
+iodevices=target/tmp/asterix_nc1/iodevice1
+iodevices=../asterix-server/target/tmp/asterix_nc1/iodevice2
+nc.api.port=19004
+#jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5006
+
+[nc/asterix_nc2]
+ncservice.port=9091
+txn.log.dir=target/tmp/asterix_nc2/txnlog
+core.dump.dir=target/tmp/asterix_nc2/coredump
+iodevices=target/tmp/asterix_nc2/iodevice1,../asterix-server/target/tmp/asterix_nc2/iodevice2
+nc.api.port=19005
+#jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5007
+
+[nc]
+credential.file=src/test/resources/security/passwd
+python.cmd.autolocate=true
+python.env=FOO=BAR=BAZ,BAR=BAZ
+address=127.0.0.1
+command=asterixnc
+app.class=org.apache.asterix.hyracks.bootstrap.NCApplication
+jvm.args=-Xmx4096m -Dnode.Resolver="org.apache.asterix.external.util.IdentitiyResolverFactory"
+storage.buffercache.size=128MB
+storage.memorycomponent.globalbudget=512MB
+
+[cc]
+address = 127.0.0.1
+app.class=org.apache.asterix.hyracks.bootstrap.CCApplication
+heartbeat.period=2000
+heartbeat.max.misses=25
+credential.file=src/test/resources/security/passwd
+
+[common]
+log.dir = logs/
+log.level = INFO
+compiler.framesize=32KB
+compiler.sortmemory=320KB
+compiler.groupmemory=160KB
+compiler.joinmemory=256KB
+compiler.textsearchmemory=160KB
+compiler.windowmemory=192KB
+compiler.internal.sanitycheck=true
+messaging.frame.size=4096
+messaging.frame.count=512
+cloud.deployment=true
+storage.buffercache.pagesize=32KB
+storage.partitioning=static
+cloud.storage.scheme=azblob
+cloud.storage.bucket=cloud-storage-container
+cloud.storage.region=us-east-2
+cloud.storage.endpoint=http://127.0.0.1:15055
+cloud.storage.anonymous.auth=true
+cloud.storage.cache.policy=lazy
\ No newline at end of file
diff --git a/asterixdb/asterix-cloud/pom.xml b/asterixdb/asterix-cloud/pom.xml
index 659a6d7..b10929f 100644
--- a/asterixdb/asterix-cloud/pom.xml
+++ b/asterixdb/asterix-cloud/pom.xml
@@ -16,181 +16,193 @@
  ! specific language governing permissions and limitations
  ! under the License.
  !-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <artifactId>apache-asterixdb</artifactId>
-    <groupId>org.apache.asterix</groupId>
-    <version>0.9.10-SNAPSHOT</version>
-  </parent>
-  <artifactId>asterix-cloud</artifactId>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <artifactId>apache-asterixdb</artifactId>
+        <groupId>org.apache.asterix</groupId>
+        <version>0.9.10-SNAPSHOT</version>
+    </parent>
+    <artifactId>asterix-cloud</artifactId>
 
-  <licenses>
-    <license>
-      <name>Apache License, Version 2.0</name>
-      <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
-      <distribution>repo</distribution>
-      <comments>A business-friendly OSS license</comments>
-    </license>
-  </licenses>
+    <licenses>
+        <license>
+            <name>Apache License, Version 2.0</name>
+            <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+            <distribution>repo</distribution>
+            <comments>A business-friendly OSS license</comments>
+        </license>
+    </licenses>
 
-  <properties>
-    <root.dir>${basedir}/..</root.dir>
-  </properties>
+    <properties>
+        <root.dir>${basedir}/..</root.dir>
+    </properties>
 
-  <build>
-    <plugins>
-      <plugin>
-        <groupId>org.apache.rat</groupId>
-        <artifactId>apache-rat-plugin</artifactId>
-        <executions>
-          <execution>
-            <id>default</id>
-            <phase>validate</phase>
-            <goals>
-              <goal>check</goal>
-            </goals>
-            <configuration>
-              <licenses>
-                <license implementation="org.apache.rat.analysis.license.ApacheSoftwareLicense20"/>
-              </licenses>
-              <excludes combine.children="append">
-                <exclude>src/test/resources/result/**</exclude>
-              </excludes>
-            </configuration>
-          </execution>
-        </executions>
-      </plugin>
-      <plugin>
-        <groupId>com.googlecode.maven-download-plugin</groupId>
-        <artifactId>download-maven-plugin</artifactId>
-        <version>1.4.2</version>
-        <executions>
-          <execution>
-            <id>install-fake-gcs</id>
-            <phase>${gcs.download.stage}</phase>
-            <goals>
-              <goal>wget</goal>
-            </goals>
-            <configuration>
-              <url>https://github.com/fsouza/fake-gcs-server/releases/download/v1.48.0/fake-gcs-server_1.48.0_Linux_amd64.tar.gz</url>
-              <outputFileName>fake-gcs-server_1.48.0_Linux_amd64.tar.gz</outputFileName>
-              <outputDirectory>${project.build.directory}</outputDirectory>
-            </configuration>
-          </execution>
-        </executions>
-      </plugin>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-antrun-plugin</artifactId>
-        <executions>
-          <execution>
-            <id>extract-gcs</id>
-            <phase>${gcs.install.stage}</phase>
-            <configuration>
-              <target>
-                <echo message="Extracting fake-gcs-server" />
-                <mkdir dir="${project.build.directory}/fake-gcs-server" />
-                <gunzip src="${project.build.directory}/fake-gcs-server_1.48.0_Linux_amd64.tar.gz" dest="${project.build.directory}/fake-gcs-server_1.48.0_Linux_amd64.tar" />
-                <untar src="${project.build.directory}/fake-gcs-server_1.48.0_Linux_amd64.tar" dest="${project.build.directory}/fake-gcs-server" />
-                <chmod file="${project.build.directory}/fake-gcs-server/fake-gcs-server" perm="ugo+rx" />
-              </target>
-            </configuration>
-            <goals>
-              <goal>run</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-      <plugin>
-      <groupId>org.codehaus.mojo</groupId>
-      <artifactId>exec-maven-plugin</artifactId>
-        <executions>
-          <execution>
-          <id>fake-gcs-server</id>
-          <phase>${gcs.stage}</phase>
-          <goals>
-            <goal>exec</goal>
-          </goals>
-          <configuration>
-            <executable>${project.build.directory}/fake-gcs-server/fake-gcs-server</executable>
-            <workingDirectory>${project.build.directory}/fake-gcs-server</workingDirectory>
-            <arguments>
-              <argument>-port</argument>
-              <argument>4443</argument>
-              <argument>-scheme</argument>
-              <argument>http</argument>
-              <argument>-host</argument>
-              <argument>127.0.0.1</argument>
-              <argument>-log-level</argument>
-              <argument>error</argument>
-              <argument>-filesystem-root</argument>
-              <argument>${project.build.directory}/fake-gcs-server/storage</argument>
-            </arguments>
-            <async>true</async>
-          </configuration>
-          </execution>
-        </executions>
-      </plugin>
-    </plugins>
-  </build>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.rat</groupId>
+                <artifactId>apache-rat-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>default</id>
+                        <phase>validate</phase>
+                        <goals>
+                            <goal>check</goal>
+                        </goals>
+                        <configuration>
+                            <licenses>
+                                <license implementation="org.apache.rat.analysis.license.ApacheSoftwareLicense20"/>
+                            </licenses>
+                            <excludes combine.children="append">
+                                <exclude>src/test/resources/result/**</exclude>
+                            </excludes>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>com.googlecode.maven-download-plugin</groupId>
+                <artifactId>download-maven-plugin</artifactId>
+                <version>1.4.2</version>
+                <executions>
+                    <execution>
+                        <id>install-fake-gcs</id>
+                        <phase>${gcs.download.stage}</phase>
+                        <goals>
+                            <goal>wget</goal>
+                        </goals>
+                        <configuration>
+                            <url>
+                                https://github.com/fsouza/fake-gcs-server/releases/download/v1.48.0/fake-gcs-server_1.48.0_Linux_amd64.tar.gz
+                            </url>
+                            <outputFileName>fake-gcs-server_1.48.0_Linux_amd64.tar.gz</outputFileName>
+                            <outputDirectory>${project.build.directory}</outputDirectory>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-antrun-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>extract-gcs</id>
+                        <phase>${gcs.install.stage}</phase>
+                        <configuration>
+                            <target>
+                                <echo message="Extracting fake-gcs-server"/>
+                                <mkdir dir="${project.build.directory}/fake-gcs-server"/>
+                                <gunzip src="${project.build.directory}/fake-gcs-server_1.48.0_Linux_amd64.tar.gz"
+                                        dest="${project.build.directory}/fake-gcs-server_1.48.0_Linux_amd64.tar"/>
+                                <untar src="${project.build.directory}/fake-gcs-server_1.48.0_Linux_amd64.tar"
+                                       dest="${project.build.directory}/fake-gcs-server"/>
+                                <chmod file="${project.build.directory}/fake-gcs-server/fake-gcs-server" perm="ugo+rx"/>
+                            </target>
+                        </configuration>
+                        <goals>
+                            <goal>run</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>exec-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>fake-gcs-server</id>
+                        <phase>${gcs.stage}</phase>
+                        <goals>
+                            <goal>exec</goal>
+                        </goals>
+                        <configuration>
+                            <executable>${project.build.directory}/fake-gcs-server/fake-gcs-server</executable>
+                            <workingDirectory>${project.build.directory}/fake-gcs-server</workingDirectory>
+                            <arguments>
+                                <argument>-port</argument>
+                                <argument>4443</argument>
+                                <argument>-scheme</argument>
+                                <argument>http</argument>
+                                <argument>-host</argument>
+                                <argument>127.0.0.1</argument>
+                                <argument>-log-level</argument>
+                                <argument>error</argument>
+                                <argument>-filesystem-root</argument>
+                                <argument>${project.build.directory}/fake-gcs-server/storage</argument>
+                            </arguments>
+                            <async>true</async>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
 
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.hyracks</groupId>
-      <artifactId>hyracks-cloud</artifactId>
-      <version>${hyracks.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.asterix</groupId>
-      <artifactId>asterix-common</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.asterix</groupId>
-      <artifactId>asterix-external-data</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <!-- aws s3 start -->
-    <dependency>
-      <groupId>software.amazon.awssdk</groupId>
-      <artifactId>sdk-core</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>software.amazon.awssdk</groupId>
-      <artifactId>s3</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>software.amazon.awssdk</groupId>
-      <artifactId>regions</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>software.amazon.awssdk</groupId>
-      <artifactId>auth</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>software.amazon.awssdk</groupId>
-      <artifactId>s3-transfer-manager</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>software.amazon.awssdk.crt</groupId>
-      <artifactId>aws-crt</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>io.findify</groupId>
-      <artifactId>s3mock_2.12</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>com.typesafe.akka</groupId>
-      <artifactId>akka-http-core_2.12</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <!-- aws s3 end -->
-  </dependencies>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.hyracks</groupId>
+            <artifactId>hyracks-cloud</artifactId>
+            <version>${hyracks.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.asterix</groupId>
+            <artifactId>asterix-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.asterix</groupId>
+            <artifactId>asterix-external-data</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <!-- aws s3 start -->
+        <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>sdk-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>s3</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>regions</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>auth</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>s3-transfer-manager</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>software.amazon.awssdk.crt</groupId>
+            <artifactId>aws-crt</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>io.findify</groupId>
+            <artifactId>s3mock_2.12</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.typesafe.akka</groupId>
+            <artifactId>akka-http-core_2.12</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <!-- aws s3 end -->
+
+        <dependency>
+            <groupId>com.azure</groupId>
+            <artifactId>azure-storage-blob-batch</artifactId>
+            <version>12.23.0</version>
+        </dependency>
+
+    </dependencies>
 </project>
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
index ee43a2c..c98c6b4 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
@@ -20,6 +20,8 @@
 
 import org.apache.asterix.cloud.clients.aws.s3.S3ClientConfig;
 import org.apache.asterix.cloud.clients.aws.s3.S3CloudClient;
+import org.apache.asterix.cloud.clients.azure.blobstorage.AzBlobStorageClientConfig;
+import org.apache.asterix.cloud.clients.azure.blobstorage.AzBlobStorageCloudClient;
 import org.apache.asterix.cloud.clients.google.gcs.GCSClientConfig;
 import org.apache.asterix.cloud.clients.google.gcs.GCSCloudClient;
 import org.apache.asterix.common.config.CloudProperties;
@@ -30,6 +32,7 @@
     private static final boolean UNSTABLE = isUnstable();
     public static final String S3 = "s3";
     public static final String GCS = "gs";
+    public static final String AZ_BLOB = "azblob";
 
     private CloudClientProvider() {
         throw new AssertionError("do not instantiate");
@@ -45,6 +48,9 @@
         } else if (GCS.equalsIgnoreCase(storageScheme)) {
             GCSClientConfig config = GCSClientConfig.of(cloudProperties);
             cloudClient = new GCSCloudClient(config, guardian);
+        } else if (AZ_BLOB.equalsIgnoreCase(storageScheme)) {
+            AzBlobStorageClientConfig config = AzBlobStorageClientConfig.of(cloudProperties);
+            cloudClient = new AzBlobStorageCloudClient(config, guardian);
         } else {
             throw new IllegalStateException("unsupported cloud storage scheme: " + storageScheme);
         }
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageBufferedWriter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageBufferedWriter.java
new file mode 100644
index 0000000..2a79c86
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageBufferedWriter.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.cloud.clients.azure.blobstorage;
+
+import java.io.BufferedInputStream;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
+import org.apache.asterix.cloud.clients.ICloudGuardian;
+import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter;
+import org.apache.commons.io.IOUtils;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import com.azure.core.util.BinaryData;
+import com.azure.storage.blob.BlobClient;
+import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.models.BlobStorageException;
+import com.azure.storage.blob.specialized.BlockBlobClient;
+
+public class AzBlobStorageBufferedWriter implements ICloudBufferedWriter {
+    private static final String PUT_UPLOAD_ID = "putUploadId";
+    private static final int MAX_RETRIES = 3;
+    private static final Logger LOGGER = LogManager.getLogger();
+    private final List<String> blockIDArrayList;
+    private final ICloudGuardian guardian;
+    private int blockNumber;
+    private final String path;
+    private String uploadID;
+
+    private final BlobContainerClient blobContainerClient;
+
+    private final IRequestProfilerLimiter profiler;
+
+    private final String bucket;
+
+    public AzBlobStorageBufferedWriter(BlobContainerClient blobContainerClient, IRequestProfilerLimiter profiler,
+            ICloudGuardian guardian, String bucket, String path) {
+        this.blobContainerClient = blobContainerClient;
+        this.profiler = profiler;
+        this.guardian = guardian;
+        this.bucket = bucket;
+        this.path = path;
+        this.blockIDArrayList = new ArrayList<>();
+    }
+
+    @Override
+    public void upload(InputStream stream, int length) {
+        profiler.objectMultipartUpload();
+        if (length <= 0) {
+            String errMsg = String.format("A block with size %d cannot be staged for upload", length);
+            LOGGER.error(errMsg);
+            throw new IllegalArgumentException(errMsg);
+        }
+        guardian.checkIsolatedWriteAccess(bucket, path);
+        try {
+            BlockBlobClient blockBlobClient = blobContainerClient.getBlobClient(path).getBlockBlobClient();
+            BufferedInputStream bufferedInputStream = IOUtils.buffer(stream, length);
+            String blockID =
+                    Base64.getEncoder().encodeToString(UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8));
+            initBlockBlobUploads(blockID);
+            blockIDArrayList.add(blockID);
+            blockBlobClient.stageBlock(blockID, bufferedInputStream, length);
+        } catch (Exception e) {
+            LOGGER.error("Error while uploading blocks of data: {}", e.getMessage());
+            throw new RuntimeException(e);
+        }
+        blockNumber++;
+    }
+
+    private void initBlockBlobUploads(String blockID) {
+        if (this.uploadID == null) {
+            this.uploadID = blockID;
+            this.blockNumber = 1;
+        }
+    }
+
+    @Override
+    public void uploadLast(InputStream stream, ByteBuffer buffer) throws HyracksDataException {
+        if (uploadID == null) {
+            profiler.objectWrite();
+            BlobClient blobClient = blobContainerClient.getBlobClient(path);
+            BinaryData binaryData = BinaryData.fromBytes(getDataFromBuffer(buffer));
+            blobClient.upload(binaryData);
+            uploadID = PUT_UPLOAD_ID; // uploadID should be updated if the put-object operation succeeds
+        } else {
+            upload(stream, buffer.limit());
+        }
+    }
+
+    private byte[] getDataFromBuffer(ByteBuffer buffer) {
+        byte[] data = new byte[buffer.limit()];
+        buffer.get(data, 0, buffer.limit());
+        return data;
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return this.uploadID == null;
+    }
+
+    @Override
+    public void finish() throws HyracksDataException {
+        if (this.uploadID == null) {
+            throw new IllegalStateException("Cannot finish without writing any bytes");
+        } else if (PUT_UPLOAD_ID.equals(uploadID)) {
+            return;
+        }
+        int currRetryAttempt = 0;
+        BlockBlobClient blockBlobClient = blobContainerClient.getBlobClient(path).getBlockBlobClient();
+        while (true) {
+            try {
+                guardian.checkWriteAccess(bucket, path);
+                profiler.objectMultipartUpload();
+                blockBlobClient.commitBlockList(blockIDArrayList);
+                break;
+            } catch (BlobStorageException e) {
+                currRetryAttempt++;
+                if (currRetryAttempt == MAX_RETRIES) {
+                    throw HyracksDataException.create(e);
+                }
+                LOGGER.info(() -> "AzBlob storage write retry, encountered: " + e.getMessage());
+
+                // Backoff for 1 sec for the first 2 retries, and 2 seconds from there onward
+                try {
+                    Thread.sleep(TimeUnit.SECONDS.toMillis(currRetryAttempt < 2 ? 1 : 2));
+                } catch (InterruptedException ex) {
+                    Thread.currentThread().interrupt();
+                    throw HyracksDataException.create(ex);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void abort() throws HyracksDataException {
+        // Todo: As of the current Azure Java SDK, it does not support aborting a staged or under-upload block.
+        // https://github.com/Azure/azure-sdk-for-java/issues/31150
+        LOGGER.warn("Multipart upload for {} was aborted", path);
+    }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClientConfig.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClientConfig.java
new file mode 100644
index 0000000..9aedfc3
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClientConfig.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.clients.azure.blobstorage;
+
+import java.util.Objects;
+
+import org.apache.asterix.common.config.CloudProperties;
+
+import com.azure.identity.DefaultAzureCredential;
+import com.azure.identity.DefaultAzureCredentialBuilder;
+
+public class AzBlobStorageClientConfig {
+    private final int writeBufferSize;
+    // Ref: https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch?tabs=microsoft-entra-id
+    static final int DELETE_BATCH_SIZE = 256;
+    private final String region;
+    private final String endpoint;
+    private final String prefix;
+
+    private final boolean anonymousAuth;
+    private final long profilerLogInterval;
+    private final String bucket;
+    private final long tokenAcquireTimeout;
+    private final int writeMaxRequestsPerSeconds;
+    private final int readMaxRequestsPerSeconds;
+
+    public AzBlobStorageClientConfig(String region, String endpoint, String prefix, boolean anonymousAuth,
+            long profilerLogInterval, String bucket, long tokenAcquireTimeout, int writeMaxRequestsPerSeconds,
+            int readMaxRequestsPerSeconds, int writeBufferSize) {
+        this.region = Objects.requireNonNull(region, "region");
+        this.endpoint = endpoint;
+        this.prefix = Objects.requireNonNull(prefix, "prefix");
+        this.anonymousAuth = anonymousAuth;
+        this.profilerLogInterval = profilerLogInterval;
+        this.bucket = bucket;
+        this.tokenAcquireTimeout = tokenAcquireTimeout;
+        this.writeMaxRequestsPerSeconds = writeMaxRequestsPerSeconds;
+        this.readMaxRequestsPerSeconds = readMaxRequestsPerSeconds;
+        this.writeBufferSize = writeBufferSize;
+    }
+
+    public static AzBlobStorageClientConfig of(CloudProperties cloudProperties) {
+        return new AzBlobStorageClientConfig(cloudProperties.getStorageRegion(), cloudProperties.getStorageEndpoint(),
+                cloudProperties.getStoragePrefix(), cloudProperties.isStorageAnonymousAuth(),
+                cloudProperties.getProfilerLogInterval(), cloudProperties.getStorageBucket(),
+                cloudProperties.getTokenAcquireTimeout(), cloudProperties.getWriteMaxRequestsPerSecond(),
+                cloudProperties.getReadMaxRequestsPerSecond(), cloudProperties.getWriteBufferSize());
+    }
+
+    public String getRegion() {
+        return region;
+    }
+
+    public String getEndpoint() {
+        return endpoint;
+    }
+
+    public String getPrefix() {
+        return prefix;
+    }
+
+    public String getBucket() {
+        return bucket;
+    }
+
+    public long getProfilerLogInterval() {
+        return profilerLogInterval;
+    }
+
+    public boolean isAnonymousAuth() {
+        return anonymousAuth;
+    }
+
+    public DefaultAzureCredential createCredentialsProvider() {
+        return new DefaultAzureCredentialBuilder().build();
+    }
+
+    public long getTokenAcquireTimeout() {
+        return tokenAcquireTimeout;
+    }
+
+    public int getWriteMaxRequestsPerSeconds() {
+        return writeMaxRequestsPerSeconds;
+    }
+
+    public int getReadMaxRequestsPerSeconds() {
+        return readMaxRequestsPerSeconds;
+    }
+
+    public int getWriteBufferSize() {
+        return writeBufferSize;
+    }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageCloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageCloudClient.java
new file mode 100644
index 0000000..b9f9421
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageCloudClient.java
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.cloud.clients.azure.blobstorage;
+
+import static org.apache.asterix.cloud.clients.azure.blobstorage.AzBlobStorageClientConfig.DELETE_BATCH_SIZE;
+
+import java.io.ByteArrayOutputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.nio.ReadOnlyBufferException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.asterix.cloud.CloudResettableInputStream;
+import org.apache.asterix.cloud.IWriteBufferProvider;
+import org.apache.asterix.cloud.clients.CloudFile;
+import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
+import org.apache.asterix.cloud.clients.ICloudClient;
+import org.apache.asterix.cloud.clients.ICloudGuardian;
+import org.apache.asterix.cloud.clients.ICloudWriter;
+import org.apache.asterix.cloud.clients.IParallelDownloader;
+import org.apache.asterix.cloud.clients.profiler.CountRequestProfilerLimiter;
+import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter;
+import org.apache.asterix.cloud.clients.profiler.RequestLimiterNoOpProfiler;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import com.azure.core.http.rest.PagedIterable;
+import com.azure.core.util.BinaryData;
+import com.azure.storage.blob.BlobClient;
+import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.BlobContainerClientBuilder;
+import com.azure.storage.blob.batch.BlobBatchClient;
+import com.azure.storage.blob.batch.BlobBatchClientBuilder;
+import com.azure.storage.blob.models.BlobErrorCode;
+import com.azure.storage.blob.models.BlobItem;
+import com.azure.storage.blob.models.BlobListDetails;
+import com.azure.storage.blob.models.BlobRange;
+import com.azure.storage.blob.models.BlobStorageException;
+import com.azure.storage.blob.models.ListBlobsOptions;
+import com.azure.storage.common.StorageSharedKeyCredential;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+public class AzBlobStorageCloudClient implements ICloudClient {
+    private static final String BUCKET_ROOT_PATH = "";
+    public static final String AZURITE_ENDPOINT = "http://127.0.0.1:15055/devstoreaccount1/";
+    private static final String AZURITE_ACCOUNT_NAME = "devstoreaccount1";
+    private static final String AZURITE_ACCOUNT_KEY =
+            "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==";
+    private final ICloudGuardian guardian;
+    private BlobContainerClient blobContainerClient;
+    private AzBlobStorageClientConfig config;
+    private IRequestProfilerLimiter profiler;
+    private final BlobBatchClient blobBatchClient;
+    private static final Logger LOGGER = LogManager.getLogger();
+
+    public AzBlobStorageCloudClient(AzBlobStorageClientConfig config, ICloudGuardian guardian) {
+        this(config, buildClient(config), guardian);
+    }
+
+    public AzBlobStorageCloudClient(AzBlobStorageClientConfig config, BlobContainerClient blobContainerClient,
+            ICloudGuardian guardian) {
+        this.blobContainerClient = blobContainerClient;
+        this.config = config;
+        this.guardian = guardian;
+        long profilerInterval = config.getProfilerLogInterval();
+        AzureRequestRateLimiter limiter = new AzureRequestRateLimiter(config);
+        if (profilerInterval > 0) {
+            profiler = new CountRequestProfilerLimiter(profilerInterval, limiter);
+        } else {
+            profiler = new RequestLimiterNoOpProfiler(limiter);
+        }
+        guardian.setCloudClient(this);
+        blobBatchClient = new BlobBatchClientBuilder(blobContainerClient.getServiceClient()).buildClient();
+    }
+
+    @Override
+    public int getWriteBufferSize() {
+        return config.getWriteBufferSize();
+    }
+
+    @Override
+    public IRequestProfilerLimiter getProfilerLimiter() {
+        return profiler;
+    }
+
+    @Override
+    public ICloudWriter createWriter(String bucket, String path, IWriteBufferProvider bufferProvider) {
+        ICloudBufferedWriter bufferedWriter = new AzBlobStorageBufferedWriter(blobContainerClient, profiler, guardian,
+                bucket, config.getPrefix() + path);
+        return new CloudResettableInputStream(bufferedWriter, bufferProvider);
+    }
+
+    @Override
+    public Set<CloudFile> listObjects(String bucket, String path, FilenameFilter filter) {
+        guardian.checkReadAccess(bucket, path);
+        profiler.objectsList();
+        PagedIterable<BlobItem> blobItems = getBlobItems(bucket, config.getPrefix() + path);
+        Stream<CloudFile> cloudFileStream = mapBlobItemsToStreamOfCloudFiles(blobItems);
+        return filterCloudFiles(filter, cloudFileStream);
+    }
+
+    private Set<CloudFile> filterCloudFiles(FilenameFilter filter, Stream<CloudFile> cloudFileStream) {
+        if (filter == null) {
+            return cloudFileStream.map(this::removeCloudPrefixFromBlobName).collect(Collectors.toSet());
+        }
+        return cloudFileStream.filter(cloudFile -> filter.accept(null, cloudFile.getPath()))
+                .map(this::removeCloudPrefixFromBlobName).collect(Collectors.toSet());
+    }
+
+    private CloudFile removeCloudPrefixFromBlobName(CloudFile cloudFile) {
+        String fullyQualifiedBlobName = cloudFile.getPath();
+        fullyQualifiedBlobName = fullyQualifiedBlobName.substring(config.getPrefix().length());
+        return CloudFile.of(fullyQualifiedBlobName, cloudFile.getSize());
+    }
+
+    private Stream<CloudFile> mapBlobItemsToStreamOfCloudFiles(PagedIterable<BlobItem> blobItems) {
+        return blobItems.stream()
+                .map(blobItem -> CloudFile.of(blobItem.getName(), blobItem.getProperties().getContentLength()));
+    }
+
+    private PagedIterable<BlobItem> getBlobItems(String bucket, String path) {
+        ListBlobsOptions options =
+                new ListBlobsOptions().setPrefix(path).setDetails(new BlobListDetails().setRetrieveMetadata(true));
+        return blobContainerClient.listBlobs(options, null);
+    }
+
+    @Override
+    public int read(String bucket, String path, long offset, ByteBuffer buffer) throws HyracksDataException {
+        guardian.checkReadAccess(bucket, path);
+        profiler.objectGet();
+        BlobClient blobClient = blobContainerClient.getBlobClient(config.getPrefix() + path);
+        ByteArrayOutputStream blobStream = new ByteArrayOutputStream(buffer.capacity());
+        long rem = buffer.remaining();
+        BlobRange blobRange = new BlobRange(offset, rem);
+        downloadBlob(blobClient, blobStream, blobRange);
+        readBlobStreamIntoBuffer(buffer, blobStream);
+        if (buffer.remaining() != 0)
+            throw new IllegalStateException("Expected buffer remaining = 0, found: " + buffer.remaining());
+        return ((int) rem - buffer.remaining());
+    }
+
+    private void readBlobStreamIntoBuffer(ByteBuffer buffer, ByteArrayOutputStream byteArrayOutputStream)
+            throws HyracksDataException {
+        byte[] byteArray = byteArrayOutputStream.toByteArray();
+        try {
+            buffer.put(byteArray);
+            byteArrayOutputStream.close();
+        } catch (BufferOverflowException | ReadOnlyBufferException | IOException ex) {
+            throw HyracksDataException.create(ex);
+        }
+    }
+
+    private void downloadBlob(BlobClient blobClient, ByteArrayOutputStream byteArrayOutputStream, BlobRange blobRange)
+            throws HyracksDataException {
+        try {
+            blobClient.downloadStreamWithResponse(byteArrayOutputStream, blobRange, null, null, false, null, null);
+        } catch (BlobStorageException ex) {
+            throw HyracksDataException.create(ex);
+        }
+    }
+
+    @Override
+    public byte[] readAllBytes(String bucket, String path) throws HyracksDataException {
+        guardian.checkReadAccess(bucket, path);
+        profiler.objectGet();
+        BlobClient blobClient = blobContainerClient.getBlobClient(config.getPrefix() + path);
+        try {
+            BinaryData binaryData = blobClient.downloadContent();
+            return binaryData.toBytes();
+        } catch (BlobStorageException ex) {
+            BlobErrorCode errorCode = ex.getErrorCode();
+            if (errorCode.equals(BlobErrorCode.BLOB_NOT_FOUND)) {
+                LOGGER.warn("Blob not found on cloud: {}", path);
+                return null;
+            }
+            throw HyracksDataException.create(ex);
+        }
+    }
+
+    @Override
+    public InputStream getObjectStream(String bucket, String path, long offset, long length) {
+        guardian.checkReadAccess(bucket, path);
+        profiler.objectGet();
+        BlobRange blobRange = new BlobRange(offset, length);
+        BlobClient blobClient = blobContainerClient.getBlobClient(config.getPrefix() + path);
+        try {
+            return blobClient.openInputStream(blobRange, null);
+        } catch (BlobStorageException ex) {
+            LOGGER.error("error getting object stream for path: {}. Exception: {}", path, ex.getMessage());
+            throw new IllegalStateException(ex);
+        }
+    }
+
+    @Override
+    public void write(String bucket, String path, byte[] data) {
+        guardian.checkWriteAccess(bucket, path);
+        profiler.objectWrite();
+        BinaryData binaryData = BinaryData.fromBytes(data);
+        BlobClient blobClient = blobContainerClient.getBlobClient(config.getPrefix() + path);
+        blobClient.upload(binaryData, true);
+    }
+
+    @Override
+    public void copy(String bucket, String srcPath, FileReference destPath) {
+        guardian.checkReadAccess(bucket, srcPath);
+        profiler.objectGet();
+        BlobClient srcBlobClient = blobContainerClient.getBlobClient(config.getPrefix() + srcPath);
+        String srcBlobUrl = srcBlobClient.getBlobUrl();
+        profiler.objectCopy();
+        guardian.checkWriteAccess(bucket, destPath.getRelativePath());
+        BlobClient destBlobClient = blobContainerClient.getBlobClient(destPath.getFile().getPath());
+        destBlobClient.beginCopy(srcBlobUrl, null);
+    }
+
+    @Override
+    public void deleteObjects(String bucket, Collection<String> paths) {
+        if (paths.isEmpty())
+            return;
+        Set<BlobItem> blobsToDelete = getBlobsMatchingThesePaths(paths);
+        List<String> blobURLs = getBlobURLs(blobsToDelete);
+        if (blobURLs.isEmpty())
+            return;
+        Collection<List<String>> batchedBlobURLs = getBatchedBlobURLs(blobURLs);
+        for (List<String> batch : batchedBlobURLs) {
+            blobBatchClient.deleteBlobs(batch, null).stream().count();
+        }
+    }
+
+    private Collection<List<String>> getBatchedBlobURLs(List<String> blobURLs) {
+        int startIdx = 0;
+        Collection<List<String>> batchedBLOBURLs = new ArrayList<>();
+        Iterator<String> iterator = blobURLs.iterator();
+        while (iterator.hasNext()) {
+            List<String> batch = new ArrayList<>();
+            while (startIdx < DELETE_BATCH_SIZE && iterator.hasNext()) {
+                batch.add(iterator.next());
+                startIdx++;
+            }
+            batchedBLOBURLs.add(batch);
+            startIdx = 0;
+        }
+        return batchedBLOBURLs;
+    }
+
+    private Set<BlobItem> getBlobsMatchingThesePaths(Collection<String> paths) {
+        List<String> pathWithPrefix =
+                paths.stream().map(path -> config.getPrefix() + path).collect(Collectors.toList());
+        PagedIterable<BlobItem> blobItems = blobContainerClient.listBlobs();
+        return blobItems.stream().filter(blobItem -> pathWithPrefix.contains(blobItem.getName()))
+                .collect(Collectors.toSet());
+    }
+
+    @Override
+    public long getObjectSize(String bucket, String path) throws HyracksDataException {
+        guardian.checkReadAccess(bucket, path);
+        profiler.objectGet();
+        try {
+            BlobClient blobClient = blobContainerClient.getBlobClient(config.getPrefix() + path);
+            return blobClient.getProperties().getBlobSize();
+        } catch (BlobStorageException ex) {
+            BlobErrorCode errorCode = ex.getErrorCode();
+            if (errorCode.equals(BlobErrorCode.BLOB_NOT_FOUND)) {
+                LOGGER.error("error while getting blob size; no such blob found: {} ", config.getPrefix() + path);
+                return 0;
+            }
+            throw HyracksDataException.create(ex);
+        } catch (Exception ex) {
+            LOGGER.error("error getting size of the blob: {}. Exception: {}", path, ex.getMessage());
+            throw HyracksDataException.create(ex);
+        }
+    }
+
+    @Override
+    public boolean exists(String bucket, String path) throws HyracksDataException {
+        guardian.checkReadAccess(bucket, path);
+        profiler.objectGet();
+        try {
+            BlobClient blobClient = blobContainerClient.getBlobClient(config.getPrefix() + path);
+            return blobClient.exists();
+        } catch (BlobStorageException ex) {
+            BlobErrorCode errorCode = ex.getErrorCode();
+            if (errorCode.equals(BlobErrorCode.BLOB_NOT_FOUND)) {
+                return false;
+            }
+            throw HyracksDataException.create(ex);
+        } catch (Exception ex) {
+            throw HyracksDataException.create(ex);
+        }
+    }
+
+    @Override
+    public boolean isEmptyPrefix(String bucket, String path) throws HyracksDataException {
+        profiler.objectsList();
+        ListBlobsOptions listBlobsOptions = new ListBlobsOptions().setPrefix(config.getPrefix() + path);
+        //MAX_VALUE below represents practically no timeout
+        PagedIterable<BlobItem> blobItems =
+                blobContainerClient.listBlobs(listBlobsOptions, Duration.ofDays(Long.MAX_VALUE));
+        return blobItems.stream().findAny().isEmpty();
+    }
+
+    @Override
+    public IParallelDownloader createParallelDownloader(String bucket, IOManager ioManager) {
+        return new AzureParallelDownloader(ioManager, blobContainerClient, profiler, config);
+    }
+
+    @Override
+    public JsonNode listAsJson(ObjectMapper objectMapper, String bucket) {
+        profiler.objectsList();
+        PagedIterable<BlobItem> blobItems = getBlobItems(bucket, BUCKET_ROOT_PATH);
+        List<BlobItem> blobs = blobItems.stream().distinct().collect(Collectors.toList());
+        blobs = sortBlobItemsByName(blobs);
+        return mapBlobItemsToJson(blobs, objectMapper);
+    }
+
+    private List<BlobItem> sortBlobItemsByName(List<BlobItem> blobs) {
+        return blobs.stream()
+                .sorted((blob1, blob2) -> String.CASE_INSENSITIVE_ORDER.compare(blob1.getName(), blob2.getName()))
+                .collect(Collectors.toList());
+    }
+
+    private ArrayNode mapBlobItemsToJson(List<BlobItem> blobs, ObjectMapper objectMapper) {
+        ArrayNode objectsInfo = objectMapper.createArrayNode();
+        for (BlobItem blob : blobs) {
+            ObjectNode objectInfo = objectsInfo.addObject();
+            objectInfo.put("path", blob.getName());
+            objectInfo.put("size", blob.getProperties().getContentLength());
+        }
+        return objectsInfo;
+    }
+
+    @Override
+    public void close() {
+        // Closing Azure Blob Clients is not required as the underlying netty connection pool
+        // handles the same for the apps.
+        // Ref: https://github.com/Azure/azure-sdk-for-java/issues/17903
+        // Hence this implementation is a no op.
+    }
+
+    private static BlobContainerClient buildClient(AzBlobStorageClientConfig config) {
+        BlobContainerClientBuilder blobContainerClientBuilder =
+                new BlobContainerClientBuilder().containerName(config.getBucket()).endpoint(getEndpoint(config));
+        configCredentialsToAzClient(blobContainerClientBuilder, config);
+        BlobContainerClient blobContainerClient = blobContainerClientBuilder.buildClient();
+        blobContainerClient.createIfNotExists();
+        return blobContainerClient;
+    }
+
+    private static void configCredentialsToAzClient(BlobContainerClientBuilder builder,
+            AzBlobStorageClientConfig config) {
+        if (config.isAnonymousAuth()) {
+            StorageSharedKeyCredential creds =
+                    new StorageSharedKeyCredential(AZURITE_ACCOUNT_NAME, AZURITE_ACCOUNT_KEY);
+            builder.credential(creds);
+        } else {
+            builder.credential(config.createCredentialsProvider());
+        }
+    }
+
+    private static String getEndpoint(AzBlobStorageClientConfig config) {
+        return config.isAnonymousAuth() ? AZURITE_ENDPOINT + config.getBucket()
+                : config.getEndpoint() + "/" + config.getBucket();
+    }
+
+    private List<String> getBlobURLs(Set<BlobItem> blobs) {
+        final String blobURLPrefix = blobContainerClient.getBlobContainerUrl() + "/";
+        return blobs.stream().map(BlobItem::getName).map(blobName -> blobURLPrefix + blobName)
+                .collect(Collectors.toList());
+    }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java
new file mode 100644
index 0000000..4980587
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.cloud.clients.azure.blobstorage;
+
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.InvalidPathException;
+import java.nio.file.Path;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.asterix.cloud.clients.IParallelDownloader;
+import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import com.azure.core.http.rest.PagedIterable;
+import com.azure.storage.blob.BlobClient;
+import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.models.BlobItem;
+import com.azure.storage.blob.models.ListBlobsOptions;
+
+public class AzureParallelDownloader implements IParallelDownloader {
+    public static final String STORAGE_SUB_DIR = "storage";
+    private final IOManager ioManager;
+    private final BlobContainerClient blobContainerClient;
+    private final IRequestProfilerLimiter profiler;
+    private final AzBlobStorageClientConfig config;
+    private static final Logger LOGGER = LogManager.getLogger();
+
+    public AzureParallelDownloader(IOManager ioManager, BlobContainerClient blobContainerClient,
+            IRequestProfilerLimiter profiler, AzBlobStorageClientConfig config) {
+        this.ioManager = ioManager;
+        this.blobContainerClient = blobContainerClient;
+        this.profiler = profiler;
+        this.config = config;
+    }
+
+    @Override
+    public void downloadFiles(Collection<FileReference> toDownload) throws HyracksDataException {
+        for (FileReference fileReference : toDownload) {
+            BlobClient blobClient =
+                    blobContainerClient.getBlobClient(config.getPrefix() + fileReference.getRelativePath());
+            Path absPath = Path.of(fileReference.getAbsolutePath());
+            Path parentPath = absPath.getParent();
+            OutputStream fileOutputStream = null;
+            try {
+                createDirectories(parentPath);
+                fileOutputStream = Files.newOutputStream(absPath);
+                blobClient.downloadStream(fileOutputStream);
+                fileOutputStream.close();
+            } catch (IOException e) {
+                throw HyracksDataException.create(e);
+            } finally {
+                closeOutputStream(fileOutputStream);
+            }
+        }
+    }
+
+    private static void closeOutputStream(OutputStream fileOutputStream) throws HyracksDataException {
+        if (fileOutputStream != null) {
+            try {
+                fileOutputStream.close();
+            } catch (IOException e) {
+                throw HyracksDataException.create(e);
+            }
+        }
+    }
+
+    @Override
+    public Collection<FileReference> downloadDirectories(Collection<FileReference> directories)
+            throws HyracksDataException {
+        Set<FileReference> failedFiles = new HashSet<>();
+        for (FileReference directory : directories) {
+            PagedIterable<BlobItem> blobsInDir = getBlobItems(directory);
+            for (BlobItem blobItem : blobsInDir) {
+                profiler.objectGet();
+                download(blobItem, failedFiles);
+            }
+        }
+        return failedFiles;
+    }
+
+    private void download(BlobItem blobItem, Set<FileReference> failedFiles) throws HyracksDataException {
+        BlobClient blobClient = blobContainerClient.getBlobClient(blobItem.getName());
+        FileReference diskDestFile = ioManager.resolve(createDiskSubPath(blobItem.getName()));
+        Path absDiskBlobPath = getDiskDestPath(diskDestFile);
+        Path parentDiskPath = absDiskBlobPath.getParent();
+        createDirectories(parentDiskPath);
+        FileOutputStream outputStreamToDest = getOutputStreamToDest(diskDestFile);
+        try {
+            blobClient.downloadStream(outputStreamToDest);
+        } catch (Exception e) {
+            FileReference failedFile = ioManager.resolve(blobItem.getName());
+            failedFiles.add(failedFile);
+        }
+    }
+
+    private String createDiskSubPath(String blobName) {
+        if (!blobName.startsWith(STORAGE_SUB_DIR)) {
+            blobName = blobName.substring(blobName.indexOf(STORAGE_SUB_DIR));
+        }
+        return blobName;
+    }
+
+    private FileOutputStream getOutputStreamToDest(FileReference destFile) throws HyracksDataException {
+        try {
+            return new FileOutputStream(destFile.getAbsolutePath());
+        } catch (FileNotFoundException ex) {
+            throw HyracksDataException.create(ex);
+        }
+    }
+
+    private void createDirectories(Path parentPath) throws HyracksDataException {
+        if (Files.notExists(parentPath))
+            try {
+                Files.createDirectories(parentPath);
+            } catch (IOException ex) {
+                throw HyracksDataException.create(ex);
+            }
+    }
+
+    private Path getDiskDestPath(FileReference destFile) throws HyracksDataException {
+        try {
+            return Path.of(destFile.getAbsolutePath());
+        } catch (InvalidPathException ex) {
+            throw HyracksDataException.create(ex);
+        }
+    }
+
+    private PagedIterable<BlobItem> getBlobItems(FileReference directoryToDownload) {
+        ListBlobsOptions listBlobsOptions =
+                new ListBlobsOptions().setPrefix(config.getPrefix() + directoryToDownload.getRelativePath());
+        return blobContainerClient.listBlobs(listBlobsOptions, null);
+    }
+
+    @Override
+    public void close() {
+        // Closing Azure Blob Clients is not required as the underlying netty connection pool
+        // handles the same for the apps.
+        // Ref: https://github.com/Azure/azure-sdk-for-java/issues/17903
+        // Hence this implementation is a no op.
+    }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureRequestRateLimiter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureRequestRateLimiter.java
new file mode 100644
index 0000000..6a76952
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureRequestRateLimiter.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.clients.azure.blobstorage;
+
+import org.apache.asterix.cloud.clients.profiler.limiter.IRateLimiter;
+import org.apache.asterix.cloud.clients.profiler.limiter.IRequestRateLimiter;
+import org.apache.asterix.cloud.clients.profiler.limiter.NoOpRateLimiter;
+import org.apache.asterix.cloud.clients.profiler.limiter.TokenBasedRateLimiter;
+
+public final class AzureRequestRateLimiter implements IRequestRateLimiter {
+    private final IRateLimiter writeLimiter;
+    private final IRateLimiter readLimiter;
+
+    public AzureRequestRateLimiter(AzBlobStorageClientConfig config) {
+        long tokenAcquireTimeout = config.getTokenAcquireTimeout();
+        this.writeLimiter = createLimiter(config.getWriteMaxRequestsPerSeconds(), tokenAcquireTimeout);
+        this.readLimiter = createLimiter(config.getReadMaxRequestsPerSeconds(), tokenAcquireTimeout);
+    }
+
+    @Override
+    public void writeRequest() {
+        writeLimiter.acquire();
+    }
+
+    @Override
+    public void readRequest() {
+        readLimiter.acquire();
+    }
+
+    @Override
+    public void listRequest() {
+        readLimiter.acquire();
+    }
+
+    private static IRateLimiter createLimiter(int maxRequestsPerSecond, long tokeAcquireTimeout) {
+        if (maxRequestsPerSecond > 0) {
+            return new TokenBasedRateLimiter(maxRequestsPerSecond, tokeAcquireTimeout);
+        }
+        return NoOpRateLimiter.INSTANCE;
+    }
+}
diff --git a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/azure/LSMAzBlobStorageTest.java b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/azure/LSMAzBlobStorageTest.java
new file mode 100644
index 0000000..1f49fd9
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/azure/LSMAzBlobStorageTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.cloud.azure;
+
+import org.apache.asterix.cloud.AbstractLSMTest;
+import org.apache.asterix.cloud.clients.ICloudGuardian;
+import org.apache.asterix.cloud.clients.azure.blobstorage.AzBlobStorageClientConfig;
+import org.apache.asterix.cloud.clients.azure.blobstorage.AzBlobStorageCloudClient;
+import org.apache.hyracks.util.StorageUtil;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import com.azure.core.http.rest.PagedIterable;
+import com.azure.storage.blob.BlobClient;
+import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.BlobServiceClientBuilder;
+import com.azure.storage.blob.models.BlobItem;
+import com.azure.storage.blob.models.ListBlobsOptions;
+import com.azure.storage.common.StorageSharedKeyCredential;
+
+public class LSMAzBlobStorageTest extends AbstractLSMTest {
+    private static BlobContainerClient client;
+
+    private static BlobServiceClient blobServiceClient;
+    private static final int MOCK_SERVER_PORT = 15055;
+    private static final String MOCK_SERVER_HOSTNAME = "http://127.0.0.1:" + MOCK_SERVER_PORT;
+    private static final String MOCK_SERVER_REGION = "us-west-2";
+
+    @BeforeClass
+    public static void setup() throws Exception {
+        LOGGER.info("LSMAzBlobStorageTest setup");
+
+        String endpointString = "http://127.0.0.1:15055/devstoreaccount1/" + PLAYGROUND_CONTAINER;
+        final String accKey =
+                "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==";
+        final String accName = "devstoreaccount1";
+
+        blobServiceClient = new BlobServiceClientBuilder().endpoint(endpointString)
+                .credential(new StorageSharedKeyCredential(accName, accKey)).buildClient();
+
+        // Start the test clean by deleting any residual data from previous tests
+        blobServiceClient.deleteBlobContainerIfExists(PLAYGROUND_CONTAINER);
+        client = blobServiceClient.createBlobContainerIfNotExists(PLAYGROUND_CONTAINER);
+
+        LOGGER.info("Az Blob Client created successfully");
+        int writeBufferSize = StorageUtil.getIntSizeInBytes(5, StorageUtil.StorageUnit.MEGABYTE);
+        AzBlobStorageClientConfig config = new AzBlobStorageClientConfig(MOCK_SERVER_REGION, MOCK_SERVER_HOSTNAME, "",
+                true, 0, PLAYGROUND_CONTAINER, 1, 0, 0, writeBufferSize);
+        CLOUD_CLIENT = new AzBlobStorageCloudClient(config, ICloudGuardian.NoOpCloudGuardian.INSTANCE);
+    }
+
+    private static void cleanup() {
+        try {
+            PagedIterable<BlobItem> blobItems = client.listBlobs(new ListBlobsOptions().setPrefix(""), null);
+            // Delete all the contents of the container
+            for (BlobItem blobItem : blobItems) {
+                BlobClient blobClient = client.getBlobClient(blobItem.getName());
+                blobClient.delete();
+            }
+            // Delete the container
+            blobServiceClient.deleteBlobContainer(PLAYGROUND_CONTAINER);
+        } catch (Exception ex) {
+            // ignore
+        }
+    }
+
+    @AfterClass
+    public static void tearDown() throws Exception {
+        LOGGER.info("Shutdown Azurite");
+        // Azure clients do not need explicit closure.
+        cleanup();
+    }
+}
diff --git a/asterixdb/src/main/licenses/content/com.microsoft.azure--msal4j--1.16.1_MIT_License.txt b/asterixdb/src/main/licenses/content/com.microsoft.azure--msal4j--1.16.1_MIT_License.txt
new file mode 100644
index 0000000..d1ca00f
--- /dev/null
+++ b/asterixdb/src/main/licenses/content/com.microsoft.azure--msal4j--1.16.1_MIT_License.txt
@@ -0,0 +1,21 @@
+    MIT License
+
+    Copyright (c) Microsoft Corporation. All rights reserved.
+
+    Permission is hereby granted, free of charge, to any person obtaining a copy
+    of this software and associated documentation files (the "Software"), to deal
+    in the Software without restriction, including without limitation the rights
+    to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+    copies of the Software, and to permit persons to whom the Software is
+    furnished to do so, subject to the following conditions:
+
+    The above copyright notice and this permission notice shall be included in all
+    copies or substantial portions of the Software.
+
+    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+    AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+    OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+    SOFTWARE
\ No newline at end of file
diff --git a/asterixdb/src/main/licenses/content/com.microsoft.azure--msal4j-persistence-extension--1.3.0_MIT_License.txt b/asterixdb/src/main/licenses/content/com.microsoft.azure--msal4j-persistence-extension--1.3.0_MIT_License.txt
new file mode 100644
index 0000000..d1ca00f
--- /dev/null
+++ b/asterixdb/src/main/licenses/content/com.microsoft.azure--msal4j-persistence-extension--1.3.0_MIT_License.txt
@@ -0,0 +1,21 @@
+    MIT License
+
+    Copyright (c) Microsoft Corporation. All rights reserved.
+
+    Permission is hereby granted, free of charge, to any person obtaining a copy
+    of this software and associated documentation files (the "Software"), to deal
+    in the Software without restriction, including without limitation the rights
+    to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+    copies of the Software, and to permit persons to whom the Software is
+    furnished to do so, subject to the following conditions:
+
+    The above copyright notice and this permission notice shall be included in all
+    copies or substantial portions of the Software.
+
+    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+    AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+    OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+    SOFTWARE
\ No newline at end of file
diff --git a/asterixdb/src/main/licenses/content/opensource.org_licenses_BSD-2-Clause.txt b/asterixdb/src/main/licenses/content/opensource.org_licenses_BSD-2-Clause.txt
new file mode 100644
index 0000000..226190a
--- /dev/null
+++ b/asterixdb/src/main/licenses/content/opensource.org_licenses_BSD-2-Clause.txt
@@ -0,0 +1,24 @@
+BSD 2-Clause License
+
+Copyright (c) 2024, Couchbase, Inc.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+1. Redistributions of source code must retain the above copyright notice, this
+   list of conditions and the following disclaimer.
+
+2. Redistributions in binary form must reproduce the above copyright notice,
+   this list of conditions and the following disclaimer in the documentation
+   and/or other materials provided with the distribution.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
+FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
\ No newline at end of file