ASTERIXDB-3379 Provide Azure Blob Storage support for the Engine
- user model changes: no
- storage format changes: no
- interface changes: yes
Ext-ref: MB-63021
Details:
- In order to provide compute/storage separation, the engine is to support reading and writing its data from Azure Blob Storage.
Change-Id: Ie9deece8ba98a5b8be7c817baea286cd50d88e68
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18235
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Tested-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml
index 63bb7da..ecd91d5 100644
--- a/asterixdb/asterix-app/pom.xml
+++ b/asterixdb/asterix-app/pom.xml
@@ -513,7 +513,7 @@
<profile>
<id>asterix-gerrit-asterix-app</id>
<properties>
- <test.excludes>**/CloudStorageTest.java,**/CloudStorageGCSTest.java,**/SqlppExecutionWithCancellationTest.java,**/DmlTest.java,**/RepeatedTest.java,**/SqlppExecutionTest.java,**/SqlppExecutionColumnTest.java,**/*StaticPartitioning*Test.java,**/*Ssl*Test.java,**/Podman*.java,**/*AnalyzedExecutionTest.java,**/SqlppProfiledExecutionTest.java,**/CloudPythonTest.java</test.excludes>
+ <test.excludes>**/CloudStorageTest.java,**/CloudStorageGCSTest.java,**/SqlppExecutionWithCancellationTest.java,**/DmlTest.java,**/RepeatedTest.java,**/SqlppExecutionTest.java,**/SqlppExecutionColumnTest.java,**/*StaticPartitioning*Test.java,**/*Ssl*Test.java,**/Podman*.java,**/*AnalyzedExecutionTest.java,**/SqlppProfiledExecutionTest.java,**/CloudPythonTest.java,**/CloudStorageAzTest.java</test.excludes>
<itest.excludes>**/*.java</itest.excludes>
</properties>
<build>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageAzTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageAzTest.java
new file mode 100644
index 0000000..aec3898
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageAzTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.cloud_storage;
+
+import static org.apache.asterix.api.common.LocalCloudUtil.CLOUD_STORAGE_BUCKET;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.test.common.TestExecutor;
+import org.apache.asterix.test.runtime.LangExecutionUtil;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.asterix.testframework.xml.Description;
+import org.apache.asterix.testframework.xml.TestCase;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.MethodSorters;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.BlobServiceClientBuilder;
+import com.azure.storage.common.StorageSharedKeyCredential;
+
+/**
+ * Run tests in cloud deployment environment
+ */
+@RunWith(Parameterized.class)
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public class CloudStorageAzTest {
+
+ private static final Logger LOGGER = LogManager.getLogger();
+
+ private final TestCaseContext tcCtx;
+ private static final String SUITE_TESTS = "testsuite_cloud_storage.xml";
+ private static final String ONLY_TESTS = "testsuite_cloud_storage_only.xml";
+ private static final String CONFIG_FILE_NAME = "src/test/resources/cc-cloud-storage-azblob.conf";
+ private static final String DELTA_RESULT_PATH = "results_cloud";
+ private static final String EXCLUDED_TESTS = "MP";
+
+ public CloudStorageAzTest(TestCaseContext tcCtx) {
+ this.tcCtx = tcCtx;
+ }
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ String endpointString = "http://127.0.0.1:15055/devstoreaccount1/" + CLOUD_STORAGE_BUCKET;
+ final String accKey =
+ "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==";
+ final String accName = "devstoreaccount1";
+
+ BlobServiceClient blobServiceClient = new BlobServiceClientBuilder().endpoint(endpointString)
+ .credential(new StorageSharedKeyCredential(accName, accKey)).buildClient();
+
+ cleanup(blobServiceClient);
+ initialize(blobServiceClient);
+
+ //storage.close(); WHAT IS THIS FOR IN GCS
+
+ TestExecutor testExecutor = new TestExecutor(DELTA_RESULT_PATH);
+ testExecutor.executorId = "cloud";
+ testExecutor.stripSubstring = "//DB:";
+ LangExecutionUtil.setUp(CONFIG_FILE_NAME, testExecutor);
+ System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, CONFIG_FILE_NAME);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ LangExecutionUtil.tearDown();
+ }
+
+ @Parameters(name = "CloudStorageAzBlobTest {index}: {0}")
+ public static Collection<Object[]> tests() throws Exception {
+ return LangExecutionUtil.tests(ONLY_TESTS, SUITE_TESTS);
+ }
+
+ @Test
+ public void test() throws Exception {
+ List<TestCase.CompilationUnit> cu = tcCtx.getTestCase().getCompilationUnit();
+ Assume.assumeTrue(cu.size() > 1 || !EXCLUDED_TESTS.equals(getText(cu.get(0).getDescription())));
+ LangExecutionUtil.test(tcCtx);
+ }
+
+ private static String getText(Description description) {
+ return description == null ? "" : description.getValue();
+ }
+
+ private static void cleanup(BlobServiceClient blobServiceClient) {
+ blobServiceClient.deleteBlobContainerIfExists(CLOUD_STORAGE_BUCKET);
+ }
+
+ private static void initialize(BlobServiceClient blobServiceClient) {
+ blobServiceClient.createBlobContainerIfNotExists(CLOUD_STORAGE_BUCKET);
+ }
+}
diff --git a/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-azblob.conf b/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-azblob.conf
new file mode 100644
index 0000000..d11cb5e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/cc-cloud-storage-azblob.conf
@@ -0,0 +1,72 @@
+; Licensed to the Apache Software Foundation (ASF) under one
+; or more contributor license agreements. See the NOTICE file
+; distributed with this work for additional information
+; regarding copyright ownership. The ASF licenses this file
+; to you under the Apache License, Version 2.0 (the
+; "License"); you may not use this file except in compliance
+; with the License. You may obtain a copy of the License at
+;
+; http://www.apache.org/licenses/LICENSE-2.0
+;
+; Unless required by applicable law or agreed to in writing,
+; software distributed under the License is distributed on an
+; "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+; KIND, either express or implied. See the License for the
+; specific language governing permissions and limitations
+; under the License.
+
+[nc/asterix_nc1]
+txn.log.dir=target/tmp/asterix_nc1/txnlog
+core.dump.dir=target/tmp/asterix_nc1/coredump
+iodevices=target/tmp/asterix_nc1/iodevice1
+iodevices=../asterix-server/target/tmp/asterix_nc1/iodevice2
+nc.api.port=19004
+#jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5006
+
+[nc/asterix_nc2]
+ncservice.port=9091
+txn.log.dir=target/tmp/asterix_nc2/txnlog
+core.dump.dir=target/tmp/asterix_nc2/coredump
+iodevices=target/tmp/asterix_nc2/iodevice1,../asterix-server/target/tmp/asterix_nc2/iodevice2
+nc.api.port=19005
+#jvm.args=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5007
+
+[nc]
+credential.file=src/test/resources/security/passwd
+python.cmd.autolocate=true
+python.env=FOO=BAR=BAZ,BAR=BAZ
+address=127.0.0.1
+command=asterixnc
+app.class=org.apache.asterix.hyracks.bootstrap.NCApplication
+jvm.args=-Xmx4096m -Dnode.Resolver="org.apache.asterix.external.util.IdentitiyResolverFactory"
+storage.buffercache.size=128MB
+storage.memorycomponent.globalbudget=512MB
+
+[cc]
+address = 127.0.0.1
+app.class=org.apache.asterix.hyracks.bootstrap.CCApplication
+heartbeat.period=2000
+heartbeat.max.misses=25
+credential.file=src/test/resources/security/passwd
+
+[common]
+log.dir = logs/
+log.level = INFO
+compiler.framesize=32KB
+compiler.sortmemory=320KB
+compiler.groupmemory=160KB
+compiler.joinmemory=256KB
+compiler.textsearchmemory=160KB
+compiler.windowmemory=192KB
+compiler.internal.sanitycheck=true
+messaging.frame.size=4096
+messaging.frame.count=512
+cloud.deployment=true
+storage.buffercache.pagesize=32KB
+storage.partitioning=static
+cloud.storage.scheme=azblob
+cloud.storage.bucket=cloud-storage-container
+cloud.storage.region=us-east-2
+cloud.storage.endpoint=http://127.0.0.1:15055
+cloud.storage.anonymous.auth=true
+cloud.storage.cache.policy=lazy
\ No newline at end of file
diff --git a/asterixdb/asterix-cloud/pom.xml b/asterixdb/asterix-cloud/pom.xml
index 659a6d7..b10929f 100644
--- a/asterixdb/asterix-cloud/pom.xml
+++ b/asterixdb/asterix-cloud/pom.xml
@@ -16,181 +16,193 @@
! specific language governing permissions and limitations
! under the License.
!-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <artifactId>apache-asterixdb</artifactId>
- <groupId>org.apache.asterix</groupId>
- <version>0.9.10-SNAPSHOT</version>
- </parent>
- <artifactId>asterix-cloud</artifactId>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>apache-asterixdb</artifactId>
+ <groupId>org.apache.asterix</groupId>
+ <version>0.9.10-SNAPSHOT</version>
+ </parent>
+ <artifactId>asterix-cloud</artifactId>
- <licenses>
- <license>
- <name>Apache License, Version 2.0</name>
- <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
- <distribution>repo</distribution>
- <comments>A business-friendly OSS license</comments>
- </license>
- </licenses>
+ <licenses>
+ <license>
+ <name>Apache License, Version 2.0</name>
+ <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+ <distribution>repo</distribution>
+ <comments>A business-friendly OSS license</comments>
+ </license>
+ </licenses>
- <properties>
- <root.dir>${basedir}/..</root.dir>
- </properties>
+ <properties>
+ <root.dir>${basedir}/..</root.dir>
+ </properties>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.rat</groupId>
- <artifactId>apache-rat-plugin</artifactId>
- <executions>
- <execution>
- <id>default</id>
- <phase>validate</phase>
- <goals>
- <goal>check</goal>
- </goals>
- <configuration>
- <licenses>
- <license implementation="org.apache.rat.analysis.license.ApacheSoftwareLicense20"/>
- </licenses>
- <excludes combine.children="append">
- <exclude>src/test/resources/result/**</exclude>
- </excludes>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>com.googlecode.maven-download-plugin</groupId>
- <artifactId>download-maven-plugin</artifactId>
- <version>1.4.2</version>
- <executions>
- <execution>
- <id>install-fake-gcs</id>
- <phase>${gcs.download.stage}</phase>
- <goals>
- <goal>wget</goal>
- </goals>
- <configuration>
- <url>https://github.com/fsouza/fake-gcs-server/releases/download/v1.48.0/fake-gcs-server_1.48.0_Linux_amd64.tar.gz</url>
- <outputFileName>fake-gcs-server_1.48.0_Linux_amd64.tar.gz</outputFileName>
- <outputDirectory>${project.build.directory}</outputDirectory>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-antrun-plugin</artifactId>
- <executions>
- <execution>
- <id>extract-gcs</id>
- <phase>${gcs.install.stage}</phase>
- <configuration>
- <target>
- <echo message="Extracting fake-gcs-server" />
- <mkdir dir="${project.build.directory}/fake-gcs-server" />
- <gunzip src="${project.build.directory}/fake-gcs-server_1.48.0_Linux_amd64.tar.gz" dest="${project.build.directory}/fake-gcs-server_1.48.0_Linux_amd64.tar" />
- <untar src="${project.build.directory}/fake-gcs-server_1.48.0_Linux_amd64.tar" dest="${project.build.directory}/fake-gcs-server" />
- <chmod file="${project.build.directory}/fake-gcs-server/fake-gcs-server" perm="ugo+rx" />
- </target>
- </configuration>
- <goals>
- <goal>run</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>exec-maven-plugin</artifactId>
- <executions>
- <execution>
- <id>fake-gcs-server</id>
- <phase>${gcs.stage}</phase>
- <goals>
- <goal>exec</goal>
- </goals>
- <configuration>
- <executable>${project.build.directory}/fake-gcs-server/fake-gcs-server</executable>
- <workingDirectory>${project.build.directory}/fake-gcs-server</workingDirectory>
- <arguments>
- <argument>-port</argument>
- <argument>4443</argument>
- <argument>-scheme</argument>
- <argument>http</argument>
- <argument>-host</argument>
- <argument>127.0.0.1</argument>
- <argument>-log-level</argument>
- <argument>error</argument>
- <argument>-filesystem-root</argument>
- <argument>${project.build.directory}/fake-gcs-server/storage</argument>
- </arguments>
- <async>true</async>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>default</id>
+ <phase>validate</phase>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ <configuration>
+ <licenses>
+ <license implementation="org.apache.rat.analysis.license.ApacheSoftwareLicense20"/>
+ </licenses>
+ <excludes combine.children="append">
+ <exclude>src/test/resources/result/**</exclude>
+ </excludes>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>com.googlecode.maven-download-plugin</groupId>
+ <artifactId>download-maven-plugin</artifactId>
+ <version>1.4.2</version>
+ <executions>
+ <execution>
+ <id>install-fake-gcs</id>
+ <phase>${gcs.download.stage}</phase>
+ <goals>
+ <goal>wget</goal>
+ </goals>
+ <configuration>
+ <url>
+ https://github.com/fsouza/fake-gcs-server/releases/download/v1.48.0/fake-gcs-server_1.48.0_Linux_amd64.tar.gz
+ </url>
+ <outputFileName>fake-gcs-server_1.48.0_Linux_amd64.tar.gz</outputFileName>
+ <outputDirectory>${project.build.directory}</outputDirectory>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>extract-gcs</id>
+ <phase>${gcs.install.stage}</phase>
+ <configuration>
+ <target>
+ <echo message="Extracting fake-gcs-server"/>
+ <mkdir dir="${project.build.directory}/fake-gcs-server"/>
+ <gunzip src="${project.build.directory}/fake-gcs-server_1.48.0_Linux_amd64.tar.gz"
+ dest="${project.build.directory}/fake-gcs-server_1.48.0_Linux_amd64.tar"/>
+ <untar src="${project.build.directory}/fake-gcs-server_1.48.0_Linux_amd64.tar"
+ dest="${project.build.directory}/fake-gcs-server"/>
+ <chmod file="${project.build.directory}/fake-gcs-server/fake-gcs-server" perm="ugo+rx"/>
+ </target>
+ </configuration>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>exec-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>fake-gcs-server</id>
+ <phase>${gcs.stage}</phase>
+ <goals>
+ <goal>exec</goal>
+ </goals>
+ <configuration>
+ <executable>${project.build.directory}/fake-gcs-server/fake-gcs-server</executable>
+ <workingDirectory>${project.build.directory}/fake-gcs-server</workingDirectory>
+ <arguments>
+ <argument>-port</argument>
+ <argument>4443</argument>
+ <argument>-scheme</argument>
+ <argument>http</argument>
+ <argument>-host</argument>
+ <argument>127.0.0.1</argument>
+ <argument>-log-level</argument>
+ <argument>error</argument>
+ <argument>-filesystem-root</argument>
+ <argument>${project.build.directory}/fake-gcs-server/storage</argument>
+ </arguments>
+ <async>true</async>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
- <dependencies>
- <dependency>
- <groupId>org.apache.hyracks</groupId>
- <artifactId>hyracks-cloud</artifactId>
- <version>${hyracks.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.asterix</groupId>
- <artifactId>asterix-common</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.asterix</groupId>
- <artifactId>asterix-external-data</artifactId>
- <version>${project.version}</version>
- </dependency>
- <!-- aws s3 start -->
- <dependency>
- <groupId>software.amazon.awssdk</groupId>
- <artifactId>sdk-core</artifactId>
- </dependency>
- <dependency>
- <groupId>software.amazon.awssdk</groupId>
- <artifactId>s3</artifactId>
- </dependency>
- <dependency>
- <groupId>software.amazon.awssdk</groupId>
- <artifactId>regions</artifactId>
- </dependency>
- <dependency>
- <groupId>software.amazon.awssdk</groupId>
- <artifactId>auth</artifactId>
- </dependency>
- <dependency>
- <groupId>software.amazon.awssdk</groupId>
- <artifactId>s3-transfer-manager</artifactId>
- </dependency>
- <dependency>
- <groupId>software.amazon.awssdk.crt</groupId>
- <artifactId>aws-crt</artifactId>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>io.findify</groupId>
- <artifactId>s3mock_2.12</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>com.typesafe.akka</groupId>
- <artifactId>akka-http-core_2.12</artifactId>
- <scope>test</scope>
- </dependency>
- <!-- aws s3 end -->
- </dependencies>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-cloud</artifactId>
+ <version>${hyracks.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.asterix</groupId>
+ <artifactId>asterix-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.asterix</groupId>
+ <artifactId>asterix-external-data</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <!-- aws s3 start -->
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>sdk-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>s3</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>regions</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>auth</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>s3-transfer-manager</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk.crt</groupId>
+ <artifactId>aws-crt</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.findify</groupId>
+ <artifactId>s3mock_2.12</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-http-core_2.12</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <!-- aws s3 end -->
+
+ <dependency>
+ <groupId>com.azure</groupId>
+ <artifactId>azure-storage-blob-batch</artifactId>
+ <version>12.23.0</version>
+ </dependency>
+
+ </dependencies>
</project>
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
index ee43a2c..c98c6b4 100644
--- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
@@ -20,6 +20,8 @@
import org.apache.asterix.cloud.clients.aws.s3.S3ClientConfig;
import org.apache.asterix.cloud.clients.aws.s3.S3CloudClient;
+import org.apache.asterix.cloud.clients.azure.blobstorage.AzBlobStorageClientConfig;
+import org.apache.asterix.cloud.clients.azure.blobstorage.AzBlobStorageCloudClient;
import org.apache.asterix.cloud.clients.google.gcs.GCSClientConfig;
import org.apache.asterix.cloud.clients.google.gcs.GCSCloudClient;
import org.apache.asterix.common.config.CloudProperties;
@@ -30,6 +32,7 @@
private static final boolean UNSTABLE = isUnstable();
public static final String S3 = "s3";
public static final String GCS = "gs";
+ public static final String AZ_BLOB = "azblob";
private CloudClientProvider() {
throw new AssertionError("do not instantiate");
@@ -45,6 +48,9 @@
} else if (GCS.equalsIgnoreCase(storageScheme)) {
GCSClientConfig config = GCSClientConfig.of(cloudProperties);
cloudClient = new GCSCloudClient(config, guardian);
+ } else if (AZ_BLOB.equalsIgnoreCase(storageScheme)) {
+ AzBlobStorageClientConfig config = AzBlobStorageClientConfig.of(cloudProperties);
+ cloudClient = new AzBlobStorageCloudClient(config, guardian);
} else {
throw new IllegalStateException("unsupported cloud storage scheme: " + storageScheme);
}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageBufferedWriter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageBufferedWriter.java
new file mode 100644
index 0000000..2a79c86
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageBufferedWriter.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.cloud.clients.azure.blobstorage;
+
+import java.io.BufferedInputStream;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
+import org.apache.asterix.cloud.clients.ICloudGuardian;
+import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter;
+import org.apache.commons.io.IOUtils;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import com.azure.core.util.BinaryData;
+import com.azure.storage.blob.BlobClient;
+import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.models.BlobStorageException;
+import com.azure.storage.blob.specialized.BlockBlobClient;
+
+public class AzBlobStorageBufferedWriter implements ICloudBufferedWriter {
+ private static final String PUT_UPLOAD_ID = "putUploadId";
+ private static final int MAX_RETRIES = 3;
+ private static final Logger LOGGER = LogManager.getLogger();
+ private final List<String> blockIDArrayList;
+ private final ICloudGuardian guardian;
+ private int blockNumber;
+ private final String path;
+ private String uploadID;
+
+ private final BlobContainerClient blobContainerClient;
+
+ private final IRequestProfilerLimiter profiler;
+
+ private final String bucket;
+
+ public AzBlobStorageBufferedWriter(BlobContainerClient blobContainerClient, IRequestProfilerLimiter profiler,
+ ICloudGuardian guardian, String bucket, String path) {
+ this.blobContainerClient = blobContainerClient;
+ this.profiler = profiler;
+ this.guardian = guardian;
+ this.bucket = bucket;
+ this.path = path;
+ this.blockIDArrayList = new ArrayList<>();
+ }
+
+ @Override
+ public void upload(InputStream stream, int length) {
+ profiler.objectMultipartUpload();
+ if (length <= 0) {
+ String errMsg = String.format("A block with size %d cannot be staged for upload", length);
+ LOGGER.error(errMsg);
+ throw new IllegalArgumentException(errMsg);
+ }
+ guardian.checkIsolatedWriteAccess(bucket, path);
+ try {
+ BlockBlobClient blockBlobClient = blobContainerClient.getBlobClient(path).getBlockBlobClient();
+ BufferedInputStream bufferedInputStream = IOUtils.buffer(stream, length);
+ String blockID =
+ Base64.getEncoder().encodeToString(UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8));
+ initBlockBlobUploads(blockID);
+ blockIDArrayList.add(blockID);
+ blockBlobClient.stageBlock(blockID, bufferedInputStream, length);
+ } catch (Exception e) {
+ LOGGER.error("Error while uploading blocks of data: {}", e.getMessage());
+ throw new RuntimeException(e);
+ }
+ blockNumber++;
+ }
+
+ private void initBlockBlobUploads(String blockID) {
+ if (this.uploadID == null) {
+ this.uploadID = blockID;
+ this.blockNumber = 1;
+ }
+ }
+
+ @Override
+ public void uploadLast(InputStream stream, ByteBuffer buffer) throws HyracksDataException {
+ if (uploadID == null) {
+ profiler.objectWrite();
+ BlobClient blobClient = blobContainerClient.getBlobClient(path);
+ BinaryData binaryData = BinaryData.fromBytes(getDataFromBuffer(buffer));
+ blobClient.upload(binaryData);
+ uploadID = PUT_UPLOAD_ID; // uploadID should be updated if the put-object operation succeeds
+ } else {
+ upload(stream, buffer.limit());
+ }
+ }
+
+ private byte[] getDataFromBuffer(ByteBuffer buffer) {
+ byte[] data = new byte[buffer.limit()];
+ buffer.get(data, 0, buffer.limit());
+ return data;
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return this.uploadID == null;
+ }
+
+ @Override
+ public void finish() throws HyracksDataException {
+ if (this.uploadID == null) {
+ throw new IllegalStateException("Cannot finish without writing any bytes");
+ } else if (PUT_UPLOAD_ID.equals(uploadID)) {
+ return;
+ }
+ int currRetryAttempt = 0;
+ BlockBlobClient blockBlobClient = blobContainerClient.getBlobClient(path).getBlockBlobClient();
+ while (true) {
+ try {
+ guardian.checkWriteAccess(bucket, path);
+ profiler.objectMultipartUpload();
+ blockBlobClient.commitBlockList(blockIDArrayList);
+ break;
+ } catch (BlobStorageException e) {
+ currRetryAttempt++;
+ if (currRetryAttempt == MAX_RETRIES) {
+ throw HyracksDataException.create(e);
+ }
+ LOGGER.info(() -> "AzBlob storage write retry, encountered: " + e.getMessage());
+
+ // Backoff for 1 sec for the first 2 retries, and 2 seconds from there onward
+ try {
+ Thread.sleep(TimeUnit.SECONDS.toMillis(currRetryAttempt < 2 ? 1 : 2));
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ throw HyracksDataException.create(ex);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void abort() throws HyracksDataException {
+ // Todo: As of the current Azure Java SDK, it does not support aborting a staged or under-upload block.
+ // https://github.com/Azure/azure-sdk-for-java/issues/31150
+ LOGGER.warn("Multipart upload for {} was aborted", path);
+ }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClientConfig.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClientConfig.java
new file mode 100644
index 0000000..9aedfc3
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageClientConfig.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.clients.azure.blobstorage;
+
+import java.util.Objects;
+
+import org.apache.asterix.common.config.CloudProperties;
+
+import com.azure.identity.DefaultAzureCredential;
+import com.azure.identity.DefaultAzureCredentialBuilder;
+
+public class AzBlobStorageClientConfig {
+ private final int writeBufferSize;
+ // Ref: https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch?tabs=microsoft-entra-id
+ static final int DELETE_BATCH_SIZE = 256;
+ private final String region;
+ private final String endpoint;
+ private final String prefix;
+
+ private final boolean anonymousAuth;
+ private final long profilerLogInterval;
+ private final String bucket;
+ private final long tokenAcquireTimeout;
+ private final int writeMaxRequestsPerSeconds;
+ private final int readMaxRequestsPerSeconds;
+
+ public AzBlobStorageClientConfig(String region, String endpoint, String prefix, boolean anonymousAuth,
+ long profilerLogInterval, String bucket, long tokenAcquireTimeout, int writeMaxRequestsPerSeconds,
+ int readMaxRequestsPerSeconds, int writeBufferSize) {
+ this.region = Objects.requireNonNull(region, "region");
+ this.endpoint = endpoint;
+ this.prefix = Objects.requireNonNull(prefix, "prefix");
+ this.anonymousAuth = anonymousAuth;
+ this.profilerLogInterval = profilerLogInterval;
+ this.bucket = bucket;
+ this.tokenAcquireTimeout = tokenAcquireTimeout;
+ this.writeMaxRequestsPerSeconds = writeMaxRequestsPerSeconds;
+ this.readMaxRequestsPerSeconds = readMaxRequestsPerSeconds;
+ this.writeBufferSize = writeBufferSize;
+ }
+
+ public static AzBlobStorageClientConfig of(CloudProperties cloudProperties) {
+ return new AzBlobStorageClientConfig(cloudProperties.getStorageRegion(), cloudProperties.getStorageEndpoint(),
+ cloudProperties.getStoragePrefix(), cloudProperties.isStorageAnonymousAuth(),
+ cloudProperties.getProfilerLogInterval(), cloudProperties.getStorageBucket(),
+ cloudProperties.getTokenAcquireTimeout(), cloudProperties.getWriteMaxRequestsPerSecond(),
+ cloudProperties.getReadMaxRequestsPerSecond(), cloudProperties.getWriteBufferSize());
+ }
+
+ public String getRegion() {
+ return region;
+ }
+
+ public String getEndpoint() {
+ return endpoint;
+ }
+
+ public String getPrefix() {
+ return prefix;
+ }
+
+ public String getBucket() {
+ return bucket;
+ }
+
+ public long getProfilerLogInterval() {
+ return profilerLogInterval;
+ }
+
+ public boolean isAnonymousAuth() {
+ return anonymousAuth;
+ }
+
+ public DefaultAzureCredential createCredentialsProvider() {
+ return new DefaultAzureCredentialBuilder().build();
+ }
+
+ public long getTokenAcquireTimeout() {
+ return tokenAcquireTimeout;
+ }
+
+ public int getWriteMaxRequestsPerSeconds() {
+ return writeMaxRequestsPerSeconds;
+ }
+
+ public int getReadMaxRequestsPerSeconds() {
+ return readMaxRequestsPerSeconds;
+ }
+
+ public int getWriteBufferSize() {
+ return writeBufferSize;
+ }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageCloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageCloudClient.java
new file mode 100644
index 0000000..b9f9421
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageCloudClient.java
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.cloud.clients.azure.blobstorage;
+
+import static org.apache.asterix.cloud.clients.azure.blobstorage.AzBlobStorageClientConfig.DELETE_BATCH_SIZE;
+
+import java.io.ByteArrayOutputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.nio.ReadOnlyBufferException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.asterix.cloud.CloudResettableInputStream;
+import org.apache.asterix.cloud.IWriteBufferProvider;
+import org.apache.asterix.cloud.clients.CloudFile;
+import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
+import org.apache.asterix.cloud.clients.ICloudClient;
+import org.apache.asterix.cloud.clients.ICloudGuardian;
+import org.apache.asterix.cloud.clients.ICloudWriter;
+import org.apache.asterix.cloud.clients.IParallelDownloader;
+import org.apache.asterix.cloud.clients.profiler.CountRequestProfilerLimiter;
+import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter;
+import org.apache.asterix.cloud.clients.profiler.RequestLimiterNoOpProfiler;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import com.azure.core.http.rest.PagedIterable;
+import com.azure.core.util.BinaryData;
+import com.azure.storage.blob.BlobClient;
+import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.BlobContainerClientBuilder;
+import com.azure.storage.blob.batch.BlobBatchClient;
+import com.azure.storage.blob.batch.BlobBatchClientBuilder;
+import com.azure.storage.blob.models.BlobErrorCode;
+import com.azure.storage.blob.models.BlobItem;
+import com.azure.storage.blob.models.BlobListDetails;
+import com.azure.storage.blob.models.BlobRange;
+import com.azure.storage.blob.models.BlobStorageException;
+import com.azure.storage.blob.models.ListBlobsOptions;
+import com.azure.storage.common.StorageSharedKeyCredential;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+public class AzBlobStorageCloudClient implements ICloudClient {
+ private static final String BUCKET_ROOT_PATH = "";
+ public static final String AZURITE_ENDPOINT = "http://127.0.0.1:15055/devstoreaccount1/";
+ private static final String AZURITE_ACCOUNT_NAME = "devstoreaccount1";
+ private static final String AZURITE_ACCOUNT_KEY =
+ "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==";
+ private final ICloudGuardian guardian;
+ private BlobContainerClient blobContainerClient;
+ private AzBlobStorageClientConfig config;
+ private IRequestProfilerLimiter profiler;
+ private final BlobBatchClient blobBatchClient;
+ private static final Logger LOGGER = LogManager.getLogger();
+
+ public AzBlobStorageCloudClient(AzBlobStorageClientConfig config, ICloudGuardian guardian) {
+ this(config, buildClient(config), guardian);
+ }
+
+ public AzBlobStorageCloudClient(AzBlobStorageClientConfig config, BlobContainerClient blobContainerClient,
+ ICloudGuardian guardian) {
+ this.blobContainerClient = blobContainerClient;
+ this.config = config;
+ this.guardian = guardian;
+ long profilerInterval = config.getProfilerLogInterval();
+ AzureRequestRateLimiter limiter = new AzureRequestRateLimiter(config);
+ if (profilerInterval > 0) {
+ profiler = new CountRequestProfilerLimiter(profilerInterval, limiter);
+ } else {
+ profiler = new RequestLimiterNoOpProfiler(limiter);
+ }
+ guardian.setCloudClient(this);
+ blobBatchClient = new BlobBatchClientBuilder(blobContainerClient.getServiceClient()).buildClient();
+ }
+
+ @Override
+ public int getWriteBufferSize() {
+ return config.getWriteBufferSize();
+ }
+
+ @Override
+ public IRequestProfilerLimiter getProfilerLimiter() {
+ return profiler;
+ }
+
+ @Override
+ public ICloudWriter createWriter(String bucket, String path, IWriteBufferProvider bufferProvider) {
+ ICloudBufferedWriter bufferedWriter = new AzBlobStorageBufferedWriter(blobContainerClient, profiler, guardian,
+ bucket, config.getPrefix() + path);
+ return new CloudResettableInputStream(bufferedWriter, bufferProvider);
+ }
+
+ @Override
+ public Set<CloudFile> listObjects(String bucket, String path, FilenameFilter filter) {
+ guardian.checkReadAccess(bucket, path);
+ profiler.objectsList();
+ PagedIterable<BlobItem> blobItems = getBlobItems(bucket, config.getPrefix() + path);
+ Stream<CloudFile> cloudFileStream = mapBlobItemsToStreamOfCloudFiles(blobItems);
+ return filterCloudFiles(filter, cloudFileStream);
+ }
+
+ private Set<CloudFile> filterCloudFiles(FilenameFilter filter, Stream<CloudFile> cloudFileStream) {
+ if (filter == null) {
+ return cloudFileStream.map(this::removeCloudPrefixFromBlobName).collect(Collectors.toSet());
+ }
+ return cloudFileStream.filter(cloudFile -> filter.accept(null, cloudFile.getPath()))
+ .map(this::removeCloudPrefixFromBlobName).collect(Collectors.toSet());
+ }
+
+ private CloudFile removeCloudPrefixFromBlobName(CloudFile cloudFile) {
+ String fullyQualifiedBlobName = cloudFile.getPath();
+ fullyQualifiedBlobName = fullyQualifiedBlobName.substring(config.getPrefix().length());
+ return CloudFile.of(fullyQualifiedBlobName, cloudFile.getSize());
+ }
+
+ private Stream<CloudFile> mapBlobItemsToStreamOfCloudFiles(PagedIterable<BlobItem> blobItems) {
+ return blobItems.stream()
+ .map(blobItem -> CloudFile.of(blobItem.getName(), blobItem.getProperties().getContentLength()));
+ }
+
+ private PagedIterable<BlobItem> getBlobItems(String bucket, String path) {
+ ListBlobsOptions options =
+ new ListBlobsOptions().setPrefix(path).setDetails(new BlobListDetails().setRetrieveMetadata(true));
+ return blobContainerClient.listBlobs(options, null);
+ }
+
+ @Override
+ public int read(String bucket, String path, long offset, ByteBuffer buffer) throws HyracksDataException {
+ guardian.checkReadAccess(bucket, path);
+ profiler.objectGet();
+ BlobClient blobClient = blobContainerClient.getBlobClient(config.getPrefix() + path);
+ ByteArrayOutputStream blobStream = new ByteArrayOutputStream(buffer.capacity());
+ long rem = buffer.remaining();
+ BlobRange blobRange = new BlobRange(offset, rem);
+ downloadBlob(blobClient, blobStream, blobRange);
+ readBlobStreamIntoBuffer(buffer, blobStream);
+ if (buffer.remaining() != 0)
+ throw new IllegalStateException("Expected buffer remaining = 0, found: " + buffer.remaining());
+ return ((int) rem - buffer.remaining());
+ }
+
+ private void readBlobStreamIntoBuffer(ByteBuffer buffer, ByteArrayOutputStream byteArrayOutputStream)
+ throws HyracksDataException {
+ byte[] byteArray = byteArrayOutputStream.toByteArray();
+ try {
+ buffer.put(byteArray);
+ byteArrayOutputStream.close();
+ } catch (BufferOverflowException | ReadOnlyBufferException | IOException ex) {
+ throw HyracksDataException.create(ex);
+ }
+ }
+
+ private void downloadBlob(BlobClient blobClient, ByteArrayOutputStream byteArrayOutputStream, BlobRange blobRange)
+ throws HyracksDataException {
+ try {
+ blobClient.downloadStreamWithResponse(byteArrayOutputStream, blobRange, null, null, false, null, null);
+ } catch (BlobStorageException ex) {
+ throw HyracksDataException.create(ex);
+ }
+ }
+
+ @Override
+ public byte[] readAllBytes(String bucket, String path) throws HyracksDataException {
+ guardian.checkReadAccess(bucket, path);
+ profiler.objectGet();
+ BlobClient blobClient = blobContainerClient.getBlobClient(config.getPrefix() + path);
+ try {
+ BinaryData binaryData = blobClient.downloadContent();
+ return binaryData.toBytes();
+ } catch (BlobStorageException ex) {
+ BlobErrorCode errorCode = ex.getErrorCode();
+ if (errorCode.equals(BlobErrorCode.BLOB_NOT_FOUND)) {
+ LOGGER.warn("Blob not found on cloud: {}", path);
+ return null;
+ }
+ throw HyracksDataException.create(ex);
+ }
+ }
+
+ @Override
+ public InputStream getObjectStream(String bucket, String path, long offset, long length) {
+ guardian.checkReadAccess(bucket, path);
+ profiler.objectGet();
+ BlobRange blobRange = new BlobRange(offset, length);
+ BlobClient blobClient = blobContainerClient.getBlobClient(config.getPrefix() + path);
+ try {
+ return blobClient.openInputStream(blobRange, null);
+ } catch (BlobStorageException ex) {
+ LOGGER.error("error getting object stream for path: {}. Exception: {}", path, ex.getMessage());
+ throw new IllegalStateException(ex);
+ }
+ }
+
+ @Override
+ public void write(String bucket, String path, byte[] data) {
+ guardian.checkWriteAccess(bucket, path);
+ profiler.objectWrite();
+ BinaryData binaryData = BinaryData.fromBytes(data);
+ BlobClient blobClient = blobContainerClient.getBlobClient(config.getPrefix() + path);
+ blobClient.upload(binaryData, true);
+ }
+
+ @Override
+ public void copy(String bucket, String srcPath, FileReference destPath) {
+ guardian.checkReadAccess(bucket, srcPath);
+ profiler.objectGet();
+ BlobClient srcBlobClient = blobContainerClient.getBlobClient(config.getPrefix() + srcPath);
+ String srcBlobUrl = srcBlobClient.getBlobUrl();
+ profiler.objectCopy();
+ guardian.checkWriteAccess(bucket, destPath.getRelativePath());
+ BlobClient destBlobClient = blobContainerClient.getBlobClient(destPath.getFile().getPath());
+ destBlobClient.beginCopy(srcBlobUrl, null);
+ }
+
+ @Override
+ public void deleteObjects(String bucket, Collection<String> paths) {
+ if (paths.isEmpty())
+ return;
+ Set<BlobItem> blobsToDelete = getBlobsMatchingThesePaths(paths);
+ List<String> blobURLs = getBlobURLs(blobsToDelete);
+ if (blobURLs.isEmpty())
+ return;
+ Collection<List<String>> batchedBlobURLs = getBatchedBlobURLs(blobURLs);
+ for (List<String> batch : batchedBlobURLs) {
+ blobBatchClient.deleteBlobs(batch, null).stream().count();
+ }
+ }
+
+ private Collection<List<String>> getBatchedBlobURLs(List<String> blobURLs) {
+ int startIdx = 0;
+ Collection<List<String>> batchedBLOBURLs = new ArrayList<>();
+ Iterator<String> iterator = blobURLs.iterator();
+ while (iterator.hasNext()) {
+ List<String> batch = new ArrayList<>();
+ while (startIdx < DELETE_BATCH_SIZE && iterator.hasNext()) {
+ batch.add(iterator.next());
+ startIdx++;
+ }
+ batchedBLOBURLs.add(batch);
+ startIdx = 0;
+ }
+ return batchedBLOBURLs;
+ }
+
+ private Set<BlobItem> getBlobsMatchingThesePaths(Collection<String> paths) {
+ List<String> pathWithPrefix =
+ paths.stream().map(path -> config.getPrefix() + path).collect(Collectors.toList());
+ PagedIterable<BlobItem> blobItems = blobContainerClient.listBlobs();
+ return blobItems.stream().filter(blobItem -> pathWithPrefix.contains(blobItem.getName()))
+ .collect(Collectors.toSet());
+ }
+
+ @Override
+ public long getObjectSize(String bucket, String path) throws HyracksDataException {
+ guardian.checkReadAccess(bucket, path);
+ profiler.objectGet();
+ try {
+ BlobClient blobClient = blobContainerClient.getBlobClient(config.getPrefix() + path);
+ return blobClient.getProperties().getBlobSize();
+ } catch (BlobStorageException ex) {
+ BlobErrorCode errorCode = ex.getErrorCode();
+ if (errorCode.equals(BlobErrorCode.BLOB_NOT_FOUND)) {
+ LOGGER.error("error while getting blob size; no such blob found: {} ", config.getPrefix() + path);
+ return 0;
+ }
+ throw HyracksDataException.create(ex);
+ } catch (Exception ex) {
+ LOGGER.error("error getting size of the blob: {}. Exception: {}", path, ex.getMessage());
+ throw HyracksDataException.create(ex);
+ }
+ }
+
+ @Override
+ public boolean exists(String bucket, String path) throws HyracksDataException {
+ guardian.checkReadAccess(bucket, path);
+ profiler.objectGet();
+ try {
+ BlobClient blobClient = blobContainerClient.getBlobClient(config.getPrefix() + path);
+ return blobClient.exists();
+ } catch (BlobStorageException ex) {
+ BlobErrorCode errorCode = ex.getErrorCode();
+ if (errorCode.equals(BlobErrorCode.BLOB_NOT_FOUND)) {
+ return false;
+ }
+ throw HyracksDataException.create(ex);
+ } catch (Exception ex) {
+ throw HyracksDataException.create(ex);
+ }
+ }
+
+ @Override
+ public boolean isEmptyPrefix(String bucket, String path) throws HyracksDataException {
+ profiler.objectsList();
+ ListBlobsOptions listBlobsOptions = new ListBlobsOptions().setPrefix(config.getPrefix() + path);
+ //MAX_VALUE below represents practically no timeout
+ PagedIterable<BlobItem> blobItems =
+ blobContainerClient.listBlobs(listBlobsOptions, Duration.ofDays(Long.MAX_VALUE));
+ return blobItems.stream().findAny().isEmpty();
+ }
+
+ @Override
+ public IParallelDownloader createParallelDownloader(String bucket, IOManager ioManager) {
+ return new AzureParallelDownloader(ioManager, blobContainerClient, profiler, config);
+ }
+
+ @Override
+ public JsonNode listAsJson(ObjectMapper objectMapper, String bucket) {
+ profiler.objectsList();
+ PagedIterable<BlobItem> blobItems = getBlobItems(bucket, BUCKET_ROOT_PATH);
+ List<BlobItem> blobs = blobItems.stream().distinct().collect(Collectors.toList());
+ blobs = sortBlobItemsByName(blobs);
+ return mapBlobItemsToJson(blobs, objectMapper);
+ }
+
+ private List<BlobItem> sortBlobItemsByName(List<BlobItem> blobs) {
+ return blobs.stream()
+ .sorted((blob1, blob2) -> String.CASE_INSENSITIVE_ORDER.compare(blob1.getName(), blob2.getName()))
+ .collect(Collectors.toList());
+ }
+
+ private ArrayNode mapBlobItemsToJson(List<BlobItem> blobs, ObjectMapper objectMapper) {
+ ArrayNode objectsInfo = objectMapper.createArrayNode();
+ for (BlobItem blob : blobs) {
+ ObjectNode objectInfo = objectsInfo.addObject();
+ objectInfo.put("path", blob.getName());
+ objectInfo.put("size", blob.getProperties().getContentLength());
+ }
+ return objectsInfo;
+ }
+
+ @Override
+ public void close() {
+ // Closing Azure Blob Clients is not required as the underlying netty connection pool
+ // handles the same for the apps.
+ // Ref: https://github.com/Azure/azure-sdk-for-java/issues/17903
+ // Hence this implementation is a no op.
+ }
+
+ private static BlobContainerClient buildClient(AzBlobStorageClientConfig config) {
+ BlobContainerClientBuilder blobContainerClientBuilder =
+ new BlobContainerClientBuilder().containerName(config.getBucket()).endpoint(getEndpoint(config));
+ configCredentialsToAzClient(blobContainerClientBuilder, config);
+ BlobContainerClient blobContainerClient = blobContainerClientBuilder.buildClient();
+ blobContainerClient.createIfNotExists();
+ return blobContainerClient;
+ }
+
+ private static void configCredentialsToAzClient(BlobContainerClientBuilder builder,
+ AzBlobStorageClientConfig config) {
+ if (config.isAnonymousAuth()) {
+ StorageSharedKeyCredential creds =
+ new StorageSharedKeyCredential(AZURITE_ACCOUNT_NAME, AZURITE_ACCOUNT_KEY);
+ builder.credential(creds);
+ } else {
+ builder.credential(config.createCredentialsProvider());
+ }
+ }
+
+ private static String getEndpoint(AzBlobStorageClientConfig config) {
+ return config.isAnonymousAuth() ? AZURITE_ENDPOINT + config.getBucket()
+ : config.getEndpoint() + "/" + config.getBucket();
+ }
+
+ private List<String> getBlobURLs(Set<BlobItem> blobs) {
+ final String blobURLPrefix = blobContainerClient.getBlobContainerUrl() + "/";
+ return blobs.stream().map(BlobItem::getName).map(blobName -> blobURLPrefix + blobName)
+ .collect(Collectors.toList());
+ }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java
new file mode 100644
index 0000000..4980587
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.cloud.clients.azure.blobstorage;
+
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.InvalidPathException;
+import java.nio.file.Path;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.asterix.cloud.clients.IParallelDownloader;
+import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import com.azure.core.http.rest.PagedIterable;
+import com.azure.storage.blob.BlobClient;
+import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.models.BlobItem;
+import com.azure.storage.blob.models.ListBlobsOptions;
+
+public class AzureParallelDownloader implements IParallelDownloader {
+ public static final String STORAGE_SUB_DIR = "storage";
+ private final IOManager ioManager;
+ private final BlobContainerClient blobContainerClient;
+ private final IRequestProfilerLimiter profiler;
+ private final AzBlobStorageClientConfig config;
+ private static final Logger LOGGER = LogManager.getLogger();
+
+ public AzureParallelDownloader(IOManager ioManager, BlobContainerClient blobContainerClient,
+ IRequestProfilerLimiter profiler, AzBlobStorageClientConfig config) {
+ this.ioManager = ioManager;
+ this.blobContainerClient = blobContainerClient;
+ this.profiler = profiler;
+ this.config = config;
+ }
+
+ @Override
+ public void downloadFiles(Collection<FileReference> toDownload) throws HyracksDataException {
+ for (FileReference fileReference : toDownload) {
+ BlobClient blobClient =
+ blobContainerClient.getBlobClient(config.getPrefix() + fileReference.getRelativePath());
+ Path absPath = Path.of(fileReference.getAbsolutePath());
+ Path parentPath = absPath.getParent();
+ OutputStream fileOutputStream = null;
+ try {
+ createDirectories(parentPath);
+ fileOutputStream = Files.newOutputStream(absPath);
+ blobClient.downloadStream(fileOutputStream);
+ fileOutputStream.close();
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ } finally {
+ closeOutputStream(fileOutputStream);
+ }
+ }
+ }
+
+ private static void closeOutputStream(OutputStream fileOutputStream) throws HyracksDataException {
+ if (fileOutputStream != null) {
+ try {
+ fileOutputStream.close();
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+ }
+
+ @Override
+ public Collection<FileReference> downloadDirectories(Collection<FileReference> directories)
+ throws HyracksDataException {
+ Set<FileReference> failedFiles = new HashSet<>();
+ for (FileReference directory : directories) {
+ PagedIterable<BlobItem> blobsInDir = getBlobItems(directory);
+ for (BlobItem blobItem : blobsInDir) {
+ profiler.objectGet();
+ download(blobItem, failedFiles);
+ }
+ }
+ return failedFiles;
+ }
+
+ private void download(BlobItem blobItem, Set<FileReference> failedFiles) throws HyracksDataException {
+ BlobClient blobClient = blobContainerClient.getBlobClient(blobItem.getName());
+ FileReference diskDestFile = ioManager.resolve(createDiskSubPath(blobItem.getName()));
+ Path absDiskBlobPath = getDiskDestPath(diskDestFile);
+ Path parentDiskPath = absDiskBlobPath.getParent();
+ createDirectories(parentDiskPath);
+ FileOutputStream outputStreamToDest = getOutputStreamToDest(diskDestFile);
+ try {
+ blobClient.downloadStream(outputStreamToDest);
+ } catch (Exception e) {
+ FileReference failedFile = ioManager.resolve(blobItem.getName());
+ failedFiles.add(failedFile);
+ }
+ }
+
+ private String createDiskSubPath(String blobName) {
+ if (!blobName.startsWith(STORAGE_SUB_DIR)) {
+ blobName = blobName.substring(blobName.indexOf(STORAGE_SUB_DIR));
+ }
+ return blobName;
+ }
+
+ private FileOutputStream getOutputStreamToDest(FileReference destFile) throws HyracksDataException {
+ try {
+ return new FileOutputStream(destFile.getAbsolutePath());
+ } catch (FileNotFoundException ex) {
+ throw HyracksDataException.create(ex);
+ }
+ }
+
+ private void createDirectories(Path parentPath) throws HyracksDataException {
+ if (Files.notExists(parentPath))
+ try {
+ Files.createDirectories(parentPath);
+ } catch (IOException ex) {
+ throw HyracksDataException.create(ex);
+ }
+ }
+
+ private Path getDiskDestPath(FileReference destFile) throws HyracksDataException {
+ try {
+ return Path.of(destFile.getAbsolutePath());
+ } catch (InvalidPathException ex) {
+ throw HyracksDataException.create(ex);
+ }
+ }
+
+ private PagedIterable<BlobItem> getBlobItems(FileReference directoryToDownload) {
+ ListBlobsOptions listBlobsOptions =
+ new ListBlobsOptions().setPrefix(config.getPrefix() + directoryToDownload.getRelativePath());
+ return blobContainerClient.listBlobs(listBlobsOptions, null);
+ }
+
+ @Override
+ public void close() {
+ // Closing Azure Blob Clients is not required as the underlying netty connection pool
+ // handles the same for the apps.
+ // Ref: https://github.com/Azure/azure-sdk-for-java/issues/17903
+ // Hence this implementation is a no op.
+ }
+}
diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureRequestRateLimiter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureRequestRateLimiter.java
new file mode 100644
index 0000000..6a76952
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureRequestRateLimiter.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.clients.azure.blobstorage;
+
+import org.apache.asterix.cloud.clients.profiler.limiter.IRateLimiter;
+import org.apache.asterix.cloud.clients.profiler.limiter.IRequestRateLimiter;
+import org.apache.asterix.cloud.clients.profiler.limiter.NoOpRateLimiter;
+import org.apache.asterix.cloud.clients.profiler.limiter.TokenBasedRateLimiter;
+
+public final class AzureRequestRateLimiter implements IRequestRateLimiter {
+ private final IRateLimiter writeLimiter;
+ private final IRateLimiter readLimiter;
+
+ public AzureRequestRateLimiter(AzBlobStorageClientConfig config) {
+ long tokenAcquireTimeout = config.getTokenAcquireTimeout();
+ this.writeLimiter = createLimiter(config.getWriteMaxRequestsPerSeconds(), tokenAcquireTimeout);
+ this.readLimiter = createLimiter(config.getReadMaxRequestsPerSeconds(), tokenAcquireTimeout);
+ }
+
+ @Override
+ public void writeRequest() {
+ writeLimiter.acquire();
+ }
+
+ @Override
+ public void readRequest() {
+ readLimiter.acquire();
+ }
+
+ @Override
+ public void listRequest() {
+ readLimiter.acquire();
+ }
+
+ private static IRateLimiter createLimiter(int maxRequestsPerSecond, long tokeAcquireTimeout) {
+ if (maxRequestsPerSecond > 0) {
+ return new TokenBasedRateLimiter(maxRequestsPerSecond, tokeAcquireTimeout);
+ }
+ return NoOpRateLimiter.INSTANCE;
+ }
+}
diff --git a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/azure/LSMAzBlobStorageTest.java b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/azure/LSMAzBlobStorageTest.java
new file mode 100644
index 0000000..1f49fd9
--- /dev/null
+++ b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/azure/LSMAzBlobStorageTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.cloud.azure;
+
+import org.apache.asterix.cloud.AbstractLSMTest;
+import org.apache.asterix.cloud.clients.ICloudGuardian;
+import org.apache.asterix.cloud.clients.azure.blobstorage.AzBlobStorageClientConfig;
+import org.apache.asterix.cloud.clients.azure.blobstorage.AzBlobStorageCloudClient;
+import org.apache.hyracks.util.StorageUtil;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import com.azure.core.http.rest.PagedIterable;
+import com.azure.storage.blob.BlobClient;
+import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.BlobServiceClientBuilder;
+import com.azure.storage.blob.models.BlobItem;
+import com.azure.storage.blob.models.ListBlobsOptions;
+import com.azure.storage.common.StorageSharedKeyCredential;
+
+public class LSMAzBlobStorageTest extends AbstractLSMTest {
+ private static BlobContainerClient client;
+
+ private static BlobServiceClient blobServiceClient;
+ private static final int MOCK_SERVER_PORT = 15055;
+ private static final String MOCK_SERVER_HOSTNAME = "http://127.0.0.1:" + MOCK_SERVER_PORT;
+ private static final String MOCK_SERVER_REGION = "us-west-2";
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ LOGGER.info("LSMAzBlobStorageTest setup");
+
+ String endpointString = "http://127.0.0.1:15055/devstoreaccount1/" + PLAYGROUND_CONTAINER;
+ final String accKey =
+ "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==";
+ final String accName = "devstoreaccount1";
+
+ blobServiceClient = new BlobServiceClientBuilder().endpoint(endpointString)
+ .credential(new StorageSharedKeyCredential(accName, accKey)).buildClient();
+
+ // Start the test clean by deleting any residual data from previous tests
+ blobServiceClient.deleteBlobContainerIfExists(PLAYGROUND_CONTAINER);
+ client = blobServiceClient.createBlobContainerIfNotExists(PLAYGROUND_CONTAINER);
+
+ LOGGER.info("Az Blob Client created successfully");
+ int writeBufferSize = StorageUtil.getIntSizeInBytes(5, StorageUtil.StorageUnit.MEGABYTE);
+ AzBlobStorageClientConfig config = new AzBlobStorageClientConfig(MOCK_SERVER_REGION, MOCK_SERVER_HOSTNAME, "",
+ true, 0, PLAYGROUND_CONTAINER, 1, 0, 0, writeBufferSize);
+ CLOUD_CLIENT = new AzBlobStorageCloudClient(config, ICloudGuardian.NoOpCloudGuardian.INSTANCE);
+ }
+
+ private static void cleanup() {
+ try {
+ PagedIterable<BlobItem> blobItems = client.listBlobs(new ListBlobsOptions().setPrefix(""), null);
+ // Delete all the contents of the container
+ for (BlobItem blobItem : blobItems) {
+ BlobClient blobClient = client.getBlobClient(blobItem.getName());
+ blobClient.delete();
+ }
+ // Delete the container
+ blobServiceClient.deleteBlobContainer(PLAYGROUND_CONTAINER);
+ } catch (Exception ex) {
+ // ignore
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ LOGGER.info("Shutdown Azurite");
+ // Azure clients do not need explicit closure.
+ cleanup();
+ }
+}
diff --git a/asterixdb/src/main/licenses/content/com.microsoft.azure--msal4j--1.16.1_MIT_License.txt b/asterixdb/src/main/licenses/content/com.microsoft.azure--msal4j--1.16.1_MIT_License.txt
new file mode 100644
index 0000000..d1ca00f
--- /dev/null
+++ b/asterixdb/src/main/licenses/content/com.microsoft.azure--msal4j--1.16.1_MIT_License.txt
@@ -0,0 +1,21 @@
+ MIT License
+
+ Copyright (c) Microsoft Corporation. All rights reserved.
+
+ Permission is hereby granted, free of charge, to any person obtaining a copy
+ of this software and associated documentation files (the "Software"), to deal
+ in the Software without restriction, including without limitation the rights
+ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ copies of the Software, and to permit persons to whom the Software is
+ furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included in all
+ copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ SOFTWARE
\ No newline at end of file
diff --git a/asterixdb/src/main/licenses/content/com.microsoft.azure--msal4j-persistence-extension--1.3.0_MIT_License.txt b/asterixdb/src/main/licenses/content/com.microsoft.azure--msal4j-persistence-extension--1.3.0_MIT_License.txt
new file mode 100644
index 0000000..d1ca00f
--- /dev/null
+++ b/asterixdb/src/main/licenses/content/com.microsoft.azure--msal4j-persistence-extension--1.3.0_MIT_License.txt
@@ -0,0 +1,21 @@
+ MIT License
+
+ Copyright (c) Microsoft Corporation. All rights reserved.
+
+ Permission is hereby granted, free of charge, to any person obtaining a copy
+ of this software and associated documentation files (the "Software"), to deal
+ in the Software without restriction, including without limitation the rights
+ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ copies of the Software, and to permit persons to whom the Software is
+ furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included in all
+ copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ SOFTWARE
\ No newline at end of file
diff --git a/asterixdb/src/main/licenses/content/opensource.org_licenses_BSD-2-Clause.txt b/asterixdb/src/main/licenses/content/opensource.org_licenses_BSD-2-Clause.txt
new file mode 100644
index 0000000..226190a
--- /dev/null
+++ b/asterixdb/src/main/licenses/content/opensource.org_licenses_BSD-2-Clause.txt
@@ -0,0 +1,24 @@
+BSD 2-Clause License
+
+Copyright (c) 2024, Couchbase, Inc.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+1. Redistributions of source code must retain the above copyright notice, this
+ list of conditions and the following disclaimer.
+
+2. Redistributions in binary form must reproduce the above copyright notice,
+ this list of conditions and the following disclaimer in the documentation
+ and/or other materials provided with the distribution.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
+FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
\ No newline at end of file