[ATERIXDB-2975][EXT] Add support to Azure Data Lake external dataset
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Add suppor to azure data lake as external datasets
Change-Id: Idd6a414d6f412e541ad66cffe7a7d2e02abf3695
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/13643
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Hussain Towaileb <hussainht@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestConstants.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestConstants.java
index 875b0f6..f721aab 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestConstants.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestConstants.java
@@ -38,38 +38,38 @@
// Azure blob storage constants and placeholders
public static class Azure {
// account name
- public static final String ACCOUNT_NAME_PLACEHOLDER = "%azureblob-accountname%";
+ public static final String ACCOUNT_NAME_PLACEHOLDER = "%azure-accountname%";
public static final String AZURITE_ACCOUNT_NAME_DEFAULT = "devstoreaccount1";
// account key
- public static final String ACCOUNT_KEY_PLACEHOLDER = "%azureblob-accountkey%";
+ public static final String ACCOUNT_KEY_PLACEHOLDER = "%azure-accountkey%";
public static final String AZURITE_ACCOUNT_KEY_DEFAULT =
"Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==";
// SAS token: this is generated and assigned at runtime at the start of the test
- public static final String SAS_TOKEN_PLACEHOLDER = "%azureblob-sas%";
+ public static final String SAS_TOKEN_PLACEHOLDER = "%azure-sas%";
public static String sasToken = "";
// blob endpoint
- public static final String BLOB_ENDPOINT_PLACEHOLDER = "%azureblob-endpoint%";
+ public static final String BLOB_ENDPOINT_PLACEHOLDER = "%azure-endpoint%";
public static final String BLOB_ENDPOINT_DEFAULT = "http://localhost:10000/" + AZURITE_ACCOUNT_NAME_DEFAULT;
- public static final String MANAGED_IDENTITY_ID_PLACEHOLDER = "%azureblob-managedidentityid%";
+ public static final String MANAGED_IDENTITY_ID_PLACEHOLDER = "%azure-managedidentityid%";
public static final String MANAGED_IDENTITY_ID_DEFAULT = "myManagedIdentityId";
- public static final String CLIENT_ID_PLACEHOLDER = "%azureblob-clientid%";
+ public static final String CLIENT_ID_PLACEHOLDER = "%azure-clientid%";
public static final String CLIENT_ID_DEFAULT = "myClientId";
- public static final String CLIENT_SECRET_PLACEHOLDER = "%azureblob-clientsecret%";
+ public static final String CLIENT_SECRET_PLACEHOLDER = "%azure-clientsecret%";
public static final String CLIENT_SECRET_DEFAULT = "myClientSecret";
- public static final String CLIENT_CERTIFICATE_PLACEHOLDER = "%azureblob-clientcertificate%";
+ public static final String CLIENT_CERTIFICATE_PLACEHOLDER = "%azure-clientcertificate%";
public static final String CLIENT_CERTIFICATE_DEFAULT = "myClientCertificate";
- public static final String CLIENT_CERTIFICATE_PASSWORD_PLACEHOLDER = "%azureblob-clientcertificatepassword%";
+ public static final String CLIENT_CERTIFICATE_PASSWORD_PLACEHOLDER = "%azure-clientcertificatepassword%";
public static final String CLIENT_CERTIFICATE_PASSWORD_DEFAULT = "myClientCertificatePassword";
- public static final String TENANT_ID_PLACEHOLDER = "%azureblob-tenantid%";
+ public static final String TENANT_ID_PLACEHOLDER = "%azure-tenantid%";
public static final String TENANT_ID_DEFAULT = "myTenantId";
// azure template and default template
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/azure_blob_storage/auth-methods/anonymous-no-auth-public-access-allowed/test.000.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/azure_blob_storage/auth-methods/anonymous-no-auth-public-access-allowed/test.000.ddl.sqlpp
index 026696b..a665d05 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/azure_blob_storage/auth-methods/anonymous-no-auth-public-access-allowed/test.000.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/azure_blob_storage/auth-methods/anonymous-no-auth-public-access-allowed/test.000.ddl.sqlpp
@@ -27,7 +27,7 @@
drop dataset test if exists;
CREATE EXTERNAL DATASET test(test) USING AZUREBLOB (
-("endpoint"="%azureblob-endpoint%"),
+("endpoint"="%azure-endpoint%"),
("container"="public-access"),
("definition"="json-data/reviews/single-line/json"),
("format"="json")
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/azure_blob_storage/auth-methods/anonymous-no-auth-public-access-not-allowed/test.000.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/azure_blob_storage/auth-methods/anonymous-no-auth-public-access-not-allowed/test.000.ddl.sqlpp
index d8cbaa1..22385f2 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/azure_blob_storage/auth-methods/anonymous-no-auth-public-access-not-allowed/test.000.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/azure_blob_storage/auth-methods/anonymous-no-auth-public-access-not-allowed/test.000.ddl.sqlpp
@@ -28,7 +28,7 @@
// bad case: no auth method is provided
drop dataset test if exists;
CREATE EXTERNAL DATASET test(test) USING AZUREBLOB (
-("endpoint"="%azureblob-endpoint%"),
+("endpoint"="%azure-endpoint%"),
("container"="playground"),
("definition"="json-data/reviews/single-line/json"),
("format"="json")
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/azure_blob_storage/auth-methods/invalid-auth-methods/test.000.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/azure_blob_storage/auth-methods/invalid-auth-methods/test.000.ddl.sqlpp
index f0225aa..a2f680f 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/azure_blob_storage/auth-methods/invalid-auth-methods/test.000.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/azure_blob_storage/auth-methods/invalid-auth-methods/test.000.ddl.sqlpp
@@ -28,10 +28,10 @@
// bad case: more than one authentication method is provided at once
drop dataset test if exists;
CREATE EXTERNAL DATASET test(test) USING AZUREBLOB (
-("accountName"="%azureblob-accountname%"),
-("%azureblob-credentialsname-1%"="%azureblob-credentialsvalue-1%"),
-("%azureblob-credentialsname-2%"="%azureblob-credentialsvalue-2%"),
-("endpoint"="%azureblob-endpoint%"),
+("accountName"="%azure-accountname%"),
+("%azure-credentialsname-1%"="%azure-credentialsvalue-1%"),
+("%azure-credentialsname-2%"="%azure-credentialsvalue-2%"),
+("endpoint"="%azure-endpoint%"),
("container"="playground"),
("definition"="json-data/reviews/single-line/json"),
("format"="json")
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/azure_blob_storage/auth-methods/valid-auth-methods/test.000.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/azure_blob_storage/auth-methods/valid-auth-methods/test.000.ddl.sqlpp
index 01a2373..ca750fa 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/azure_blob_storage/auth-methods/valid-auth-methods/test.000.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/azure_blob_storage/auth-methods/valid-auth-methods/test.000.ddl.sqlpp
@@ -27,9 +27,9 @@
drop dataset test if exists;
CREATE EXTERNAL DATASET test(test) USING AZUREBLOB (
-("accountName"="%azureblob-accountname%"),
-("%azureblob-credentialsname%"="%azureblob-credentialsvalue%"),
-("endpoint"="%azureblob-endpoint%"),
+("accountName"="%azure-accountname%"),
+("%azure-credentialsname%"="%azure-credentialsvalue%"),
+("endpoint"="%azure-endpoint%"),
("container"="playground"),
("definition"="json-data/reviews/single-line/json"),
("format"="json")
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_azure_blob_storage.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_azure_blob_storage.xml
index 8010b57..8844368 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_azure_blob_storage.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_azure_blob_storage.xml
@@ -22,37 +22,37 @@
<test-case FilePath="external-dataset/azure_blob_storage/auth-methods">
<compilation-unit name="valid-auth-methods">
<placeholder name="azureblob-credentialsname" value="accountKey" />
- <placeholder name="azureblob-credentialsvalue" value="%azureblob-accountkey%" />
+ <placeholder name="azureblob-credentialsvalue" value="%azure-accountkey%" />
<output-dir compare="Text">valid-auth-methods</output-dir>
</compilation-unit>
</test-case>
<test-case FilePath="external-dataset/azure_blob_storage/auth-methods">
<compilation-unit name="valid-auth-methods">
<placeholder name="azureblob-credentialsname" value="sharedAccessSignature" />
- <placeholder name="azureblob-credentialsvalue" value="%azureblob-sas%" />
+ <placeholder name="azureblob-credentialsvalue" value="%azure-sas%" />
<output-dir compare="Text">valid-auth-methods</output-dir>
</compilation-unit>
</test-case>
<test-case FilePath="external-dataset/azure_blob_storage/auth-methods">
<compilation-unit name="valid-auth-methods">
<placeholder name="azureblob-credentialsname" value="connectionString" />
- <placeholder name="azureblob-credentialsvalue" value="%azureblob-connectionstringaccountkey%" />
+ <placeholder name="azureblob-credentialsvalue" value="%azure-connectionstringaccountkey%" />
<output-dir compare="Text">valid-auth-methods</output-dir>
</compilation-unit>
</test-case>
<test-case FilePath="external-dataset/azure_blob_storage/auth-methods">
<compilation-unit name="valid-auth-methods">
<placeholder name="azureblob-credentialsname" value="connectionString" />
- <placeholder name="azureblob-credentialsvalue" value="%azureblob-connectionstringsas%" />
+ <placeholder name="azureblob-credentialsvalue" value="%azure-connectionstringsas%" />
<output-dir compare="Text">valid-auth-methods</output-dir>
</compilation-unit>
</test-case>
<test-case FilePath="external-dataset/azure_blob_storage/auth-methods">
<compilation-unit name="invalid-auth-methods">
<placeholder name="azureblob-credentialsname-1" value="accountKey" />
- <placeholder name="azureblob-credentialsvalue-1" value="%azureblob-accountkey%" />
+ <placeholder name="azureblob-credentialsvalue-1" value="%azure-accountkey%" />
<placeholder name="azureblob-credentialsname-2" value="connectionString" />
- <placeholder name="azureblob-credentialsvalue-2" value="%azureblob-connectionstringaccountkey%" />
+ <placeholder name="azureblob-credentialsvalue-2" value="%azure-connectionstringaccountkey%" />
<output-dir compare="Text">invalid-auth-methods</output-dir>
<expected-error>ASX1138: Only a single authentication method is allowed: connectionString, accountName & accountKey, or accountName & sharedAccessSignature</expected-error>
</compilation-unit>
@@ -60,9 +60,9 @@
<test-case FilePath="external-dataset/azure_blob_storage/auth-methods">
<compilation-unit name="invalid-auth-methods">
<placeholder name="azureblob-credentialsname-1" value="sharedAccessSignature" />
- <placeholder name="azureblob-credentialsvalue-1" value="%azureblob-sas%" />
+ <placeholder name="azureblob-credentialsvalue-1" value="%azure-sas%" />
<placeholder name="azureblob-credentialsname-2" value="connectionString" />
- <placeholder name="azureblob-credentialsvalue-2" value="%azureblob-connectionstringaccountkey%" />
+ <placeholder name="azureblob-credentialsvalue-2" value="%azure-connectionstringaccountkey%" />
<output-dir compare="Text">invalid-auth-methods</output-dir>
<expected-error>ASX1138: Only a single authentication method is allowed: connectionString, accountName & accountKey, or accountName & sharedAccessSignature</expected-error>
</compilation-unit>
diff --git a/asterixdb/asterix-external-data/pom.xml b/asterixdb/asterix-external-data/pom.xml
index a6684d3..9a1f31c 100644
--- a/asterixdb/asterix-external-data/pom.xml
+++ b/asterixdb/asterix-external-data/pom.xml
@@ -461,6 +461,10 @@
</dependency>
<dependency>
<groupId>com.azure</groupId>
+ <artifactId>azure-storage-file-datalake</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.azure</groupId>
<artifactId>azure-identity</artifactId>
</dependency>
<dependency>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStream.java
similarity index 96%
rename from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStream.java
rename to asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStream.java
index c7bbcbf..b402f25 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStream.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.external.input.record.reader.azure;
+package org.apache.asterix.external.input.record.reader.azure.blob;
import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
@@ -83,7 +83,7 @@
private BlobServiceClient buildAzureClient(Map<String, String> configuration) throws HyracksDataException {
try {
- return ExternalDataUtils.Azure.buildAzureClient(configuration);
+ return ExternalDataUtils.Azure.buildAzureBlobClient(configuration);
} catch (CompilationException ex) {
throw HyracksDataException.create(ex);
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStreamFactory.java
similarity index 97%
rename from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStreamFactory.java
rename to asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStreamFactory.java
index b9e46a1..3a9ab1c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStreamFactory.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.external.input.record.reader.azure;
+package org.apache.asterix.external.input.record.reader.azure.blob;
import java.util.Comparator;
import java.util.List;
@@ -52,7 +52,7 @@
// Ensure the validity of include/exclude
ExternalDataUtils.validateIncludeExclude(configuration);
IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration);
- BlobServiceClient blobServiceClient = ExternalDataUtils.Azure.buildAzureClient(configuration);
+ BlobServiceClient blobServiceClient = ExternalDataUtils.Azure.buildAzureBlobClient(configuration);
List<BlobItem> filesOnly = ExternalDataUtils.Azure.listBlobItem(blobServiceClient, configuration,
includeExcludeMatcher, warningCollector);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobReaderFactory.java
similarity index 97%
rename from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobReaderFactory.java
rename to asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobReaderFactory.java
index 27e1b02..525ee63 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobReaderFactory.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.external.input.record.reader.azure;
+package org.apache.asterix.external.input.record.reader.azure.blob;
import java.util.Collections;
import java.util.List;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStream.java
similarity index 70%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStream.java
copy to asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStream.java
index c7bbcbf..f0c185e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStream.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.external.input.record.reader.azure;
+package org.apache.asterix.external.input.record.reader.azure.datalake;
import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
@@ -34,18 +34,19 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.util.LogRedactionUtil;
-import com.azure.storage.blob.BlobClient;
-import com.azure.storage.blob.BlobContainerClient;
-import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.models.BlobErrorCode;
-import com.azure.storage.blob.models.BlobStorageException;
+import com.azure.storage.file.datalake.DataLakeFileClient;
+import com.azure.storage.file.datalake.DataLakeFileSystemClient;
+import com.azure.storage.file.datalake.DataLakeServiceClient;
+import com.azure.storage.file.datalake.models.DataLakeStorageException;
-public class AzureBlobInputStream extends AbstractExternalInputStream {
+public class AzureDataLakeInputStream extends AbstractExternalInputStream {
- private final BlobServiceClient client;
+ private final DataLakeServiceClient client;
private final String container;
- public AzureBlobInputStream(Map<String, String> configuration, List<String> filePaths) throws HyracksDataException {
+ public AzureDataLakeInputStream(Map<String, String> configuration, List<String> filePaths)
+ throws HyracksDataException {
super(configuration, filePaths);
this.client = buildAzureClient(configuration);
this.container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
@@ -54,19 +55,20 @@
@Override
protected boolean getInputStream() throws IOException {
String fileName = filePaths.get(nextFileIndex);
- BlobContainerClient blobContainerClient;
- BlobClient blob;
+ DataLakeFileSystemClient fileSystemClient;
+ DataLakeFileClient fileClient;
try {
- blobContainerClient = client.getBlobContainerClient(container);
- blob = blobContainerClient.getBlobClient(filePaths.get(nextFileIndex));
- in = blob.openInputStream();
+ fileSystemClient = client.getFileSystemClient(container);
+ fileClient = fileSystemClient.getFileClient(filePaths.get(nextFileIndex));
+ in = fileClient.openInputStream().getInputStream();
// Use gzip stream if needed
String lowerCaseFileName = fileName.toLowerCase();
if (lowerCaseFileName.endsWith(".gz") || lowerCaseFileName.endsWith(".gzip")) {
in = new GZIPInputStream(in, ExternalDataConstants.DEFAULT_BUFFER_SIZE);
}
- } catch (BlobStorageException ex) {
+ } catch (DataLakeStorageException ex) {
+ // TODO(htowaileb): need to find the right error for Azure Data Lake
if (ex.getErrorCode().equals(BlobErrorCode.BLOB_NOT_FOUND)) {
LOGGER.debug(() -> "Key " + LogRedactionUtil.userData(filePaths.get(nextFileIndex)) + " was not "
+ "found in container " + container);
@@ -81,9 +83,9 @@
return true;
}
- private BlobServiceClient buildAzureClient(Map<String, String> configuration) throws HyracksDataException {
+ private DataLakeServiceClient buildAzureClient(Map<String, String> configuration) throws HyracksDataException {
try {
- return ExternalDataUtils.Azure.buildAzureClient(configuration);
+ return ExternalDataUtils.Azure.buildAzureDatalakeClient(configuration);
} catch (CompilationException ex) {
throw HyracksDataException.create(ex);
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStreamFactory.java
new file mode 100644
index 0000000..6bb103b
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStreamFactory.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.azure.datalake;
+
+import static org.apache.asterix.external.util.ExternalDataConstants.Azure.RECURSIVE_FIELD_NAME;
+import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.function.BiPredicate;
+import java.util.regex.Matcher;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.external.api.AsterixInputStream;
+import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.Warning;
+
+import com.azure.core.http.rest.PagedIterable;
+import com.azure.storage.file.datalake.DataLakeFileSystemClient;
+import com.azure.storage.file.datalake.DataLakeServiceClient;
+import com.azure.storage.file.datalake.models.ListPathsOptions;
+import com.azure.storage.file.datalake.models.PathItem;
+
+public class AzureDataLakeInputStreamFactory extends AbstractExternalInputStreamFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition) throws HyracksDataException {
+ return new AzureDataLakeInputStream(configuration, partitionWorkLoadsBasedOnSize.get(partition).getFilePaths());
+ }
+
+ @Override
+ public void configure(IServiceContext ctx, Map<String, String> configuration, IWarningCollector warningCollector)
+ throws AlgebricksException {
+ super.configure(ctx, configuration, warningCollector);
+
+ String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
+
+ List<PathItem> filesOnly = new ArrayList<>();
+
+ // Ensure the validity of include/exclude
+ ExternalDataUtils.validateIncludeExclude(configuration);
+
+ DataLakeServiceClient client = ExternalDataUtils.Azure.buildAzureDatalakeClient(configuration);
+ DataLakeFileSystemClient fileSystemClient;
+ try {
+ fileSystemClient = client.getFileSystemClient(container);
+
+ // Get all objects in a container and extract the paths to files
+ ListPathsOptions listOptions = new ListPathsOptions();
+ boolean recursive = Boolean.parseBoolean(configuration.get(RECURSIVE_FIELD_NAME));
+ listOptions.setRecursive(recursive);
+ listOptions.setPath(ExternalDataUtils.getPrefix(configuration, false));
+ PagedIterable<PathItem> pathItems = fileSystemClient.listPaths(listOptions, null);
+
+ // Collect the paths to files only
+ IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration);
+ collectAndFilterFiles(pathItems, includeExcludeMatcher.getPredicate(),
+ includeExcludeMatcher.getMatchersList(), filesOnly);
+
+ // Warn if no files are returned
+ if (filesOnly.isEmpty() && warningCollector.shouldWarn()) {
+ Warning warning = Warning.of(null, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
+ warningCollector.warn(warning);
+ }
+
+ // Distribute work load amongst the partitions
+ distributeWorkLoad(filesOnly, getPartitionsCount());
+ } catch (Exception ex) {
+ throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
+ }
+ }
+
+ /**
+ * Collects and filters the files only, and excludes any folders
+ *
+ * @param items storage items
+ * @param predicate predicate to test with for file filtration
+ * @param matchers include/exclude matchers to test against
+ * @param filesOnly List containing the files only (excluding folders)
+ */
+ private void collectAndFilterFiles(Iterable<PathItem> items, BiPredicate<List<Matcher>, String> predicate,
+ List<Matcher> matchers, List<PathItem> filesOnly) {
+ for (PathItem item : items) {
+ String uri = item.getName();
+
+ // skip folders
+ if (uri.endsWith("/")) {
+ continue;
+ }
+
+ // No filter, add file
+ if (predicate.test(matchers, uri)) {
+ filesOnly.add(item);
+ }
+ }
+ }
+
+ /**
+ * To efficiently utilize the parallelism, work load will be distributed amongst the partitions based on the file
+ * size.
+ * <p>
+ * Example:
+ * File1 1mb, File2 300kb, File3 300kb, File4 300kb
+ * <p>
+ * Distribution:
+ * Partition1: [File1]
+ * Partition2: [File2, File3, File4]
+ *
+ * @param items items
+ * @param partitionsCount Partitions count
+ */
+ private void distributeWorkLoad(List<PathItem> items, int partitionsCount) {
+ PriorityQueue<PartitionWorkLoadBasedOnSize> workloadQueue = new PriorityQueue<>(partitionsCount,
+ Comparator.comparingLong(PartitionWorkLoadBasedOnSize::getTotalSize));
+
+ // Prepare the workloads based on the number of partitions
+ for (int i = 0; i < partitionsCount; i++) {
+ workloadQueue.add(new PartitionWorkLoadBasedOnSize());
+ }
+
+ for (PathItem object : items) {
+ PartitionWorkLoadBasedOnSize workload = workloadQueue.poll();
+ workload.addFilePath(object.getName(), object.getContentLength());
+ workloadQueue.add(workload);
+ }
+ partitionWorkLoadsBasedOnSize.addAll(workloadQueue);
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeReaderFactory.java
similarity index 91%
copy from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobReaderFactory.java
copy to asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeReaderFactory.java
index 27e1b02..594bacf 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeReaderFactory.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.external.input.record.reader.azure;
+package org.apache.asterix.external.input.record.reader.azure.datalake;
import java.util.Collections;
import java.util.List;
@@ -31,12 +31,12 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
-public class AzureBlobReaderFactory extends StreamRecordReaderFactory {
+public class AzureDataLakeReaderFactory extends StreamRecordReaderFactory {
private static final long serialVersionUID = 1L;
private static final List<String> recordReaderNames =
- Collections.singletonList(ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_BLOB);
+ Collections.singletonList(ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_DATA_LAKE);
@Override
public List<String> getRecordReaderNames() {
@@ -64,7 +64,7 @@
this.configuration = configuration;
// Stream factory
- streamFactory = new AzureBlobInputStreamFactory();
+ streamFactory = new AzureDataLakeInputStreamFactory();
streamFactory.configure(ctx, configuration, warningCollector);
// record reader
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java
index b2b667a..0f9f484 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java
@@ -45,8 +45,7 @@
@Override
public void configure(IServiceContext serviceCtx, Map<String, String> configuration,
IWarningCollector warningCollector) throws AlgebricksException, HyracksDataException {
- //We need to the client to parse connectionString
- BlobServiceClient blobServiceClient = ExternalDataUtils.Azure.buildAzureClient(configuration);
+ BlobServiceClient blobServiceClient = ExternalDataUtils.Azure.buildAzureBlobClient(configuration);
//Get endpoint
String endPoint = extractEndPoint(blobServiceClient.getAccountUrl());
//Get path
@@ -117,7 +116,7 @@
}
private static void appendFileURI(StringBuilder builder, String container, String endPoint, BlobItem file) {
- builder.append(ExternalDataConstants.AzureBlob.HADOOP_AZURE_BLOB_PROTOCOL);
+ builder.append(ExternalDataConstants.Azure.HADOOP_AZURE_BLOB_PROTOCOL);
builder.append("://");
builder.append(container);
builder.append('@');
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 9589212..ec25997 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -142,6 +142,7 @@
public static final String KEY_ADAPTER_NAME_HTTP = "http_adapter";
public static final String KEY_ADAPTER_NAME_AWS_S3 = "S3";
public static final String KEY_ADAPTER_NAME_AZURE_BLOB = "AZUREBLOB";
+ public static final String KEY_ADAPTER_NAME_AZURE_DATA_LAKE = "AZUREDATALAKE";
public static final String KEY_ADAPTER_NAME_GCS = "GCS";
/**
@@ -344,8 +345,12 @@
}
- public static class AzureBlob {
- private AzureBlob() {
+ /*
+ * Note: Azure Blob and Azure Datalake use identical authentication, so they are using the same properties.
+ * If they end up diverging, then properties for AzureBlob and AzureDataLake need to be created.
+ */
+ public static class Azure {
+ private Azure() {
throw new AssertionError("do not instantiate");
}
@@ -363,6 +368,18 @@
public static final String CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME = "clientCertificatePassword";
public static final String ENDPOINT_FIELD_NAME = "endpoint";
+ // Specific Azure data lake property
+ /*
+ The behavior of Data Lake (true file system) is to read the files of the specified prefix only, example:
+ storage/myData/personal/file1.json
+ storage/myData/personal/file2.json
+ storage/myData/file3.json
+ If the prefix used is "myData", then only the file file3.json is read. However, if the property "recursive"
+ is set to "true" when creating the external dataset, then it goes recursively overall the paths, and the result
+ is file1.json, file2.json and file3.json.
+ */
+ public static final String RECURSIVE_FIELD_NAME = "recursive";
+
/*
* Hadoop-Azure
*/
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 09cf433..6be2896 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -38,19 +38,19 @@
import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.HADOOP_SESSION_TOKEN;
import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.HADOOP_TEMP_ACCESS;
import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.SECRET_ACCESS_KEY_FIELD_NAME;
-import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.ACCOUNT_KEY_FIELD_NAME;
-import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.ACCOUNT_NAME_FIELD_NAME;
-import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.CLIENT_CERTIFICATE_FIELD_NAME;
-import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME;
-import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.CLIENT_ID_FIELD_NAME;
-import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.CLIENT_SECRET_FIELD_NAME;
-import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.ENDPOINT_FIELD_NAME;
-import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.HADOOP_AZURE_BLOB_PROTOCOL;
-import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.HADOOP_AZURE_FS_ACCOUNT_KEY;
-import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.HADOOP_AZURE_FS_SAS;
-import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.MANAGED_IDENTITY_ID_FIELD_NAME;
-import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.SHARED_ACCESS_SIGNATURE_FIELD_NAME;
-import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.TENANT_ID_FIELD_NAME;
+import static org.apache.asterix.external.util.ExternalDataConstants.Azure.ACCOUNT_KEY_FIELD_NAME;
+import static org.apache.asterix.external.util.ExternalDataConstants.Azure.ACCOUNT_NAME_FIELD_NAME;
+import static org.apache.asterix.external.util.ExternalDataConstants.Azure.CLIENT_CERTIFICATE_FIELD_NAME;
+import static org.apache.asterix.external.util.ExternalDataConstants.Azure.CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME;
+import static org.apache.asterix.external.util.ExternalDataConstants.Azure.CLIENT_ID_FIELD_NAME;
+import static org.apache.asterix.external.util.ExternalDataConstants.Azure.CLIENT_SECRET_FIELD_NAME;
+import static org.apache.asterix.external.util.ExternalDataConstants.Azure.ENDPOINT_FIELD_NAME;
+import static org.apache.asterix.external.util.ExternalDataConstants.Azure.HADOOP_AZURE_BLOB_PROTOCOL;
+import static org.apache.asterix.external.util.ExternalDataConstants.Azure.HADOOP_AZURE_FS_ACCOUNT_KEY;
+import static org.apache.asterix.external.util.ExternalDataConstants.Azure.HADOOP_AZURE_FS_SAS;
+import static org.apache.asterix.external.util.ExternalDataConstants.Azure.MANAGED_IDENTITY_ID_FIELD_NAME;
+import static org.apache.asterix.external.util.ExternalDataConstants.Azure.SHARED_ACCESS_SIGNATURE_FIELD_NAME;
+import static org.apache.asterix.external.util.ExternalDataConstants.Azure.TENANT_ID_FIELD_NAME;
import static org.apache.asterix.external.util.ExternalDataConstants.GCS.JSON_CREDENTIALS_FIELD_NAME;
import static org.apache.asterix.external.util.ExternalDataConstants.KEY_ADAPTER_NAME_GCS;
import static org.apache.asterix.external.util.ExternalDataConstants.KEY_DELIMITER;
@@ -133,6 +133,11 @@
import com.azure.storage.blob.models.BlobItem;
import com.azure.storage.blob.models.ListBlobsOptions;
import com.azure.storage.common.StorageSharedKeyCredential;
+import com.azure.storage.file.datalake.DataLakeFileSystemClient;
+import com.azure.storage.file.datalake.DataLakeServiceClient;
+import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;
+import com.azure.storage.file.datalake.models.ListPathsOptions;
+import com.azure.storage.file.datalake.models.PathItem;
import com.google.api.gax.paging.Page;
import com.google.auth.oauth2.ServiceAccountCredentials;
import com.google.cloud.storage.Blob;
@@ -594,7 +599,10 @@
AwsS3.validateProperties(configuration, srcLoc, collector);
break;
case ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_BLOB:
- Azure.validateProperties(configuration, srcLoc, collector);
+ Azure.validateAzureBlobProperties(configuration, srcLoc, collector);
+ break;
+ case ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_DATA_LAKE:
+ Azure.validateAzureDataLakeProperties(configuration, srcLoc, collector);
break;
case KEY_ADAPTER_NAME_GCS:
GCS.validateProperties(configuration, srcLoc, collector);
@@ -715,9 +723,13 @@
* @param configuration configuration
*/
public static String getPrefix(Map<String, String> configuration) {
+ return getPrefix(configuration, true);
+ }
+
+ public static String getPrefix(Map<String, String> configuration, boolean appendSlash) {
String definition = configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME);
if (definition != null && !definition.isEmpty()) {
- return definition + (!definition.endsWith("/") ? "/" : "");
+ return appendSlash ? definition + (!definition.endsWith("/") ? "/" : "") : definition;
}
return "";
}
@@ -1229,6 +1241,10 @@
}
}
+ /*
+ * Note: Azure Blob and Azure Datalake use identical authentication, so they are using the same properties.
+ * If they end up diverging, then properties for AzureBlob and AzureDataLake need to be created.
+ */
public static class Azure {
private Azure() {
throw new AssertionError("do not instantiate");
@@ -1240,7 +1256,7 @@
* @param configuration properties
* @return client
*/
- public static BlobServiceClient buildAzureClient(Map<String, String> configuration)
+ public static BlobServiceClient buildAzureBlobClient(Map<String, String> configuration)
throws CompilationException {
String managedIdentityId = configuration.get(MANAGED_IDENTITY_ID_FIELD_NAME);
String accountName = configuration.get(ACCOUNT_NAME_FIELD_NAME);
@@ -1361,6 +1377,157 @@
pemCertificate.invoke(certificate, certificateContent, clientCertificatePassword);
}
} catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException ex) {
+ throw new CompilationException(EXTERNAL_SOURCE_ERROR, ex.getMessage());
+ }
+ builder.credential(certificate.build());
+ }
+ }
+
+ // If client id is not present, ensure client secret, certificate, tenant id and client certificate
+ // password are not present
+ if (clientId == null) {
+ Optional<String> provided = getFirstNotNull(configuration, CLIENT_SECRET_FIELD_NAME,
+ CLIENT_CERTIFICATE_FIELD_NAME, CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, TENANT_ID_FIELD_NAME);
+ if (provided.isPresent()) {
+ throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, provided.get(),
+ SHARED_ACCESS_SIGNATURE_FIELD_NAME);
+ }
+ }
+
+ try {
+ return builder.buildClient();
+ } catch (Exception ex) {
+ throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
+ }
+ }
+
+ /**
+ * Builds the Azure data lake storage account using the provided configuration
+ *
+ * @param configuration properties
+ * @return client
+ */
+ public static DataLakeServiceClient buildAzureDatalakeClient(Map<String, String> configuration)
+ throws CompilationException {
+ String managedIdentityId = configuration.get(MANAGED_IDENTITY_ID_FIELD_NAME);
+ String accountName = configuration.get(ACCOUNT_NAME_FIELD_NAME);
+ String accountKey = configuration.get(ACCOUNT_KEY_FIELD_NAME);
+ String sharedAccessSignature = configuration.get(SHARED_ACCESS_SIGNATURE_FIELD_NAME);
+ String tenantId = configuration.get(TENANT_ID_FIELD_NAME);
+ String clientId = configuration.get(CLIENT_ID_FIELD_NAME);
+ String clientSecret = configuration.get(CLIENT_SECRET_FIELD_NAME);
+ String clientCertificate = configuration.get(CLIENT_CERTIFICATE_FIELD_NAME);
+ String clientCertificatePassword = configuration.get(CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME);
+ String endpoint = configuration.get(ENDPOINT_FIELD_NAME);
+
+ // Client builder
+ DataLakeServiceClientBuilder builder = new DataLakeServiceClientBuilder();
+
+ // Endpoint is required
+ if (endpoint == null) {
+ throw new CompilationException(PARAMETERS_REQUIRED, ENDPOINT_FIELD_NAME);
+ }
+ builder.endpoint(endpoint);
+
+ // Shared Key
+ if (accountName != null || accountKey != null) {
+ if (accountName == null) {
+ throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ACCOUNT_NAME_FIELD_NAME,
+ ACCOUNT_KEY_FIELD_NAME);
+ }
+
+ if (accountKey == null) {
+ throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ACCOUNT_KEY_FIELD_NAME,
+ ACCOUNT_NAME_FIELD_NAME);
+ }
+
+ Optional<String> provided = getFirstNotNull(configuration, SHARED_ACCESS_SIGNATURE_FIELD_NAME,
+ MANAGED_IDENTITY_ID_FIELD_NAME, CLIENT_ID_FIELD_NAME, CLIENT_SECRET_FIELD_NAME,
+ CLIENT_CERTIFICATE_FIELD_NAME, CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, TENANT_ID_FIELD_NAME);
+ if (provided.isPresent()) {
+ throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, provided.get(),
+ ACCOUNT_KEY_FIELD_NAME);
+ }
+ StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName, accountKey);
+ builder.credential(credential);
+ }
+
+ // Shared access signature
+ if (sharedAccessSignature != null) {
+ Optional<String> provided = getFirstNotNull(configuration, MANAGED_IDENTITY_ID_FIELD_NAME,
+ CLIENT_ID_FIELD_NAME, CLIENT_SECRET_FIELD_NAME, CLIENT_CERTIFICATE_FIELD_NAME,
+ CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, TENANT_ID_FIELD_NAME);
+ if (provided.isPresent()) {
+ throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, provided.get(),
+ SHARED_ACCESS_SIGNATURE_FIELD_NAME);
+ }
+ AzureSasCredential credential = new AzureSasCredential(sharedAccessSignature);
+ builder.credential(credential);
+ }
+
+ // Managed Identity auth
+ if (managedIdentityId != null) {
+ Optional<String> provided = getFirstNotNull(configuration, CLIENT_ID_FIELD_NAME,
+ CLIENT_SECRET_FIELD_NAME, CLIENT_CERTIFICATE_FIELD_NAME, CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME,
+ TENANT_ID_FIELD_NAME);
+ if (provided.isPresent()) {
+ throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, provided.get(),
+ MANAGED_IDENTITY_ID_FIELD_NAME);
+ }
+ builder.credential(new ManagedIdentityCredentialBuilder().clientId(managedIdentityId).build());
+ }
+
+ // Client secret & certificate auth
+ if (clientId != null) {
+ // Both (or neither) client secret and client secret were provided, only one is allowed
+ if ((clientSecret == null) == (clientCertificate == null)) {
+ if (clientSecret != null) {
+ throw new CompilationException(PARAMETERS_NOT_ALLOWED_AT_SAME_TIME, CLIENT_SECRET_FIELD_NAME,
+ CLIENT_CERTIFICATE_FIELD_NAME);
+ } else {
+ throw new CompilationException(REQUIRED_PARAM_OR_PARAM_IF_PARAM_IS_PRESENT,
+ CLIENT_SECRET_FIELD_NAME, CLIENT_CERTIFICATE_FIELD_NAME, CLIENT_ID_FIELD_NAME);
+ }
+ }
+
+ // Tenant ID is required
+ if (tenantId == null) {
+ throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, TENANT_ID_FIELD_NAME,
+ CLIENT_ID_FIELD_NAME);
+ }
+
+ // Client certificate password is not allowed if client secret is used
+ if (clientCertificatePassword != null && clientSecret != null) {
+ throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT,
+ CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME, CLIENT_SECRET_FIELD_NAME);
+ }
+
+ // Use AD authentication
+ if (clientSecret != null) {
+ ClientSecretCredentialBuilder secret = new ClientSecretCredentialBuilder();
+ secret.clientId(clientId);
+ secret.tenantId(tenantId);
+ secret.clientSecret(clientSecret);
+ builder.credential(secret.build());
+ } else {
+ // Certificate
+ ClientCertificateCredentialBuilder certificate = new ClientCertificateCredentialBuilder();
+ certificate.clientId(clientId);
+ certificate.tenantId(tenantId);
+ try {
+ InputStream certificateContent = new ByteArrayInputStream(clientCertificate.getBytes(UTF_8));
+ if (clientCertificatePassword == null) {
+ Method pemCertificate = ClientCertificateCredentialBuilder.class
+ .getDeclaredMethod("pemCertificate", InputStream.class);
+ pemCertificate.setAccessible(true);
+ pemCertificate.invoke(certificate, certificateContent);
+ } else {
+ Method pemCertificate = ClientCertificateCredentialBuilder.class
+ .getDeclaredMethod("pfxCertificate", InputStream.class, String.class);
+ pemCertificate.setAccessible(true);
+ pemCertificate.invoke(certificate, certificateContent, clientCertificatePassword);
+ }
+ } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException ex) {
throw new CompilationException(EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
}
builder.credential(certificate.build());
@@ -1451,7 +1618,7 @@
* @param configuration properties
* @throws CompilationException Compilation exception
*/
- public static void validateProperties(Map<String, String> configuration, SourceLocation srcLoc,
+ public static void validateAzureBlobProperties(Map<String, String> configuration, SourceLocation srcLoc,
IWarningCollector collector) throws CompilationException {
// check if the format property is present
@@ -1465,7 +1632,7 @@
BlobServiceClient blobServiceClient;
try {
String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
- blobServiceClient = buildAzureClient(configuration);
+ blobServiceClient = buildAzureBlobClient(configuration);
BlobContainerClient blobContainer = blobServiceClient.getBlobContainerClient(container);
// Get all objects in a container and extract the paths to files
@@ -1485,6 +1652,45 @@
}
/**
+ * Validate external dataset properties
+ *
+ * @param configuration properties
+ * @throws CompilationException Compilation exception
+ */
+ public static void validateAzureDataLakeProperties(Map<String, String> configuration, SourceLocation srcLoc,
+ IWarningCollector collector) throws CompilationException {
+
+ // check if the format property is present
+ if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
+ throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, srcLoc, ExternalDataConstants.KEY_FORMAT);
+ }
+
+ validateIncludeExclude(configuration);
+
+ // Check if the bucket is present
+ DataLakeServiceClient dataLakeServiceClient;
+ try {
+ String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
+ dataLakeServiceClient = buildAzureDatalakeClient(configuration);
+ DataLakeFileSystemClient fileSystemClient = dataLakeServiceClient.getFileSystemClient(container);
+
+ // Get all objects in a container and extract the paths to files
+ ListPathsOptions listPathsOptions = new ListPathsOptions();
+ listPathsOptions.setPath(getPrefix(configuration));
+ Iterable<PathItem> blobItems = fileSystemClient.listPaths(listPathsOptions, null);
+
+ if (!blobItems.iterator().hasNext() && collector.shouldWarn()) {
+ Warning warning = Warning.of(srcLoc, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
+ collector.warn(warning);
+ }
+ } catch (CompilationException ex) {
+ throw ex;
+ } catch (Exception ex) {
+ throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
+ }
+ }
+
+ /**
* Builds the Azure Blob storage client using the provided configuration
*
* @param configuration properties
@@ -1492,10 +1698,8 @@
*/
public static void configureAzureHdfsJobConf(JobConf conf, Map<String, String> configuration, String endPoint) {
String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
- String accountName = configuration.get(ACCOUNT_NAME_FIELD_NAME);
String accountKey = configuration.get(ACCOUNT_KEY_FIELD_NAME);
String sharedAccessSignature = configuration.get(SHARED_ACCESS_SIGNATURE_FIELD_NAME);
- String endpoint = configuration.get(ENDPOINT_FIELD_NAME);
//Disable caching S3 FileSystem
HDFSUtils.disableHadoopFileSystemCache(conf, HADOOP_AZURE_BLOB_PROTOCOL);
diff --git a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
index 7d3f901..d551c5b 100644
--- a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
+++ b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
@@ -21,7 +21,8 @@
org.apache.asterix.external.input.record.reader.stream.StreamRecordReaderFactory
org.apache.asterix.external.input.record.reader.http.HttpServerRecordReaderFactory
org.apache.asterix.external.input.record.reader.aws.AwsS3ReaderFactory
-org.apache.asterix.external.input.record.reader.azure.AzureBlobReaderFactory
org.apache.asterix.external.input.record.reader.aws.parquet.AwsS3ParquetReaderFactory
org.apache.asterix.external.input.record.reader.gcs.GCSReaderFactory
-org.apache.asterix.external.input.record.reader.azure.parquet.AzureBlobParquetReaderFactory
+org.apache.asterix.external.input.record.reader.azure.blob.AzureBlobReaderFactory
+org.apache.asterix.external.input.record.reader.azure.datalake.AzureDataLakeReaderFactory
+org.apache.asterix.external.input.record.reader.azure.parquet.AzureBlobParquetReaderFactory
\ No newline at end of file
diff --git a/asterixdb/asterix-server/pom.xml b/asterixdb/asterix-server/pom.xml
index d33004f..1d08361 100644
--- a/asterixdb/asterix-server/pom.xml
+++ b/asterixdb/asterix-server/pom.xml
@@ -226,6 +226,7 @@
<gav>com.azure:azure-storage-blob:12.12.0</gav>
<gav>com.azure:azure-storage-common:12.12.0</gav>
<gav>com.azure:azure-storage-internal-avro:12.0.5</gav>
+ <gav>com.azure:azure-storage-file-datalake:12.7.0</gav>
</gavs>
<noticeUrl>https://raw.githubusercontent.com/Azure/azure-sdk-for-java/master/NOTICE.txt</noticeUrl>
<url>https://raw.githubusercontent.com/Azure/azure-sdk-for-java/master/LICENSE.txt</url>
diff --git a/asterixdb/pom.xml b/asterixdb/pom.xml
index b93138f..7d449fa 100644
--- a/asterixdb/pom.xml
+++ b/asterixdb/pom.xml
@@ -90,7 +90,8 @@
<awsjavasdk.version>2.10.83</awsjavasdk.version>
<parquet.version>1.12.0</parquet.version>
<hadoop-awsjavasdk.version>1.12.1</hadoop-awsjavasdk.version>
- <azurejavasdk.version>12.12.0</azurejavasdk.version>
+ <azureblobjavasdk.version>12.12.0</azureblobjavasdk.version>
+ <azuredatalakejavasdk.version>12.7.0</azuredatalakejavasdk.version>
<gcsjavasdk.version>1.114.0</gcsjavasdk.version>
<hadoop-azuresdk.version>8.6.6</hadoop-azuresdk.version>
@@ -1583,7 +1584,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-blob</artifactId>
- <version>${azurejavasdk.version}</version>
+ <version>${azureblobjavasdk.version}</version>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
@@ -1654,7 +1655,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-common</artifactId>
- <version>${azurejavasdk.version}</version>
+ <version>${azureblobjavasdk.version}</version>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
@@ -1730,6 +1731,13 @@
<version>${gcsjavasdk.version}</version>
</dependency>
<!-- Google Cloud Storage end -->
+ <!-- Azure Data Lake start -->
+ <dependency>
+ <groupId>com.azure</groupId>
+ <artifactId>azure-storage-file-datalake</artifactId>
+ <version>${azuredatalakejavasdk.version}</version>
+ </dependency>
+ <!-- Azure Data Lake end -->
<dependency>
<groupId>org.mindrot</groupId>
<artifactId>jbcrypt</artifactId>
diff --git a/asterixdb/src/main/appended-resources/supplemental-models.xml b/asterixdb/src/main/appended-resources/supplemental-models.xml
index 93fe3a0..1f1d4fc 100644
--- a/asterixdb/src/main/appended-resources/supplemental-models.xml
+++ b/asterixdb/src/main/appended-resources/supplemental-models.xml
@@ -652,6 +652,20 @@
</project>
</supplement>
+ <!-- com.azure does not contain any embedded LICENSE or NOTICE file -->
+ <!-- see https://github.com/Azure/azure-sdk-for-java -->
+ <supplement>
+ <project>
+ <groupId>com.azure</groupId>
+ <artifactId>azure-storage-file-datalake</artifactId>
+ <properties>
+ <license.ignoreMissingEmbeddedLicense>12.7.0</license.ignoreMissingEmbeddedLicense>
+ <license.ignoreMissingEmbeddedNotice>12.7.0</license.ignoreMissingEmbeddedNotice>
+ <license.ignoreLicenseOverride>12.7.0</license.ignoreLicenseOverride>
+ </properties>
+ </project>
+ </supplement>
+
<supplement>
<project>
<groupId>com.azure</groupId>