[ASTERIXDB-2925][EXT] Support reading Parquet from Azure Blob Storage
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Add the support to read Parquet from Azure Blob Storage
Change-Id: If4e3843e5627aabb6da4d9c376d9d5447093725f
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/12343
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Hussain Towaileb <hussainht@gmail.com>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestConstants.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestConstants.java
index 86bffd2..2d2a206 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestConstants.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestConstants.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.test.common;
+import org.apache.asterix.test.external_dataset.microsoft.AzureBlobStorageExternalDatasetTest;
+
public class TestConstants {
// AWS S3 constants and place holders
public static final String S3_ACCESS_KEY_ID_PLACEHOLDER = "%accessKeyId%";
@@ -52,7 +54,7 @@
// blob endpoint
public static final String AZURE_BLOB_ENDPOINT_PLACEHOLDER = "%azureblob-endpoint%";
public static final String AZURE_BLOB_ENDPOINT_DEFAULT =
- "http://localhost:20000/" + AZURE_AZURITE_ACCOUNT_NAME_DEFAULT;
+ "http://localhost:10000/" + AZURE_AZURITE_ACCOUNT_NAME_DEFAULT;
// connection string with account name & account key
public static final String AZURE_CONNECTION_STRING_ACCOUNT_KEY_PLACEHOLDER =
@@ -70,7 +72,6 @@
public static final String AZURE_TEMPLATE = "(\"accountName\"=\"" + AZURE_AZURITE_ACCOUNT_NAME_DEFAULT + "\"),\n"
+ "(\"accountKey\"=\"" + AZURE_AZURITE_ACCOUNT_KEY_DEFAULT + "\"),\n" + "(\"blobEndpoint\"=\""
+ AZURE_BLOB_ENDPOINT_PLACEHOLDER + "\")";
- public static final String AZURE_TEMPLATE_DEFAULT = "(\"accountName\"=\"" + AZURE_AZURITE_ACCOUNT_NAME_DEFAULT
- + "\"),\n" + "(\"accountKey\"=\"" + AZURE_AZURITE_ACCOUNT_KEY_DEFAULT + "\"),\n" + "(\"blobEndpoint\"=\""
- + AZURE_BLOB_ENDPOINT_DEFAULT + "\")";
+ public static final String AZURE_TEMPLATE_DEFAULT =
+ "(\"connectionString\"=\"" + AzureBlobStorageExternalDatasetTest.CONNECTION_STRING + "\")";
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/microsoft/AzureBlobStorageExternalDatasetTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/microsoft/AzureBlobStorageExternalDatasetTest.java
index 2d41686..9e8614c 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/microsoft/AzureBlobStorageExternalDatasetTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/microsoft/AzureBlobStorageExternalDatasetTest.java
@@ -18,6 +18,10 @@
*/
package org.apache.asterix.test.external_dataset.microsoft;
+import static com.azure.storage.common.implementation.Constants.ConnectionStringConstants.EMULATOR_ACCOUNT_KEY;
+import static com.azure.storage.common.implementation.Constants.ConnectionStringConstants.EMULATOR_ACCOUNT_NAME;
+import static org.apache.asterix.test.external_dataset.BinaryFileConverterUtil.BINARY_GEN_BASEDIR;
+import static org.apache.asterix.test.external_dataset.ExternalDatasetTestUtils.PARQUET_DEFINITION;
import static org.apache.hyracks.util.file.FileUtil.joinPath;
import java.io.ByteArrayInputStream;
@@ -42,6 +46,7 @@
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.test.common.TestConstants;
import org.apache.asterix.test.common.TestExecutor;
+import org.apache.asterix.test.external_dataset.ExternalDatasetTestUtils;
import org.apache.asterix.test.runtime.ExecutionTestUtil;
import org.apache.asterix.test.runtime.LangExecutionUtil;
import org.apache.asterix.testframework.context.TestCaseContext;
@@ -93,10 +98,11 @@
private static final String CSV_DATA_PATH = joinPath("data", "csv");
private static final String TSV_DATA_PATH = joinPath("data", "tsv");
private static final String MIXED_DATA_PATH = joinPath("data", "mixed");
+ private static final String PARQUET_RAW_DATA_PATH = joinPath("data", "hdfs", "parquet");
// Service endpoint
- private static final int BLOB_SERVICE_PORT = 20000;
- private static final String BLOB_SERVICE_ENDPOINT = "http://localhost:" + BLOB_SERVICE_PORT;
+ private static final int BLOB_SERVICE_PORT = 10000;
+ private static final String BLOB_SERVICE_ENDPOINT = "http://192.168.0.100:" + BLOB_SERVICE_PORT;
// Region, container and definitions
private static final String PLAYGROUND_CONTAINER = "playground";
@@ -114,9 +120,8 @@
private static final Set<String> fileNames = new HashSet<>();
// Create a BlobServiceClient object which will be used to create a container client
- private static final String connectionString = "AccountName=devstoreaccount1;"
- + "AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;"
- + "BlobEndpoint=" + BLOB_SERVICE_ENDPOINT + "/devstoreaccount1;";
+ public static final String CONNECTION_STRING = "AccountName=" + EMULATOR_ACCOUNT_NAME + ";" + "AccountKey="
+ + EMULATOR_ACCOUNT_KEY + ";" + "BlobEndpoint=" + BLOB_SERVICE_ENDPOINT + "/" + EMULATOR_ACCOUNT_NAME + ";";
private static BlobServiceClient blobServiceClient;
private static BlobContainerClient playgroundContainer;
private static BlobContainerClient publicAccessContainer;
@@ -130,6 +135,7 @@
@BeforeClass
public static void setUp() throws Exception {
final TestExecutor testExecutor = new AzureTestExecutor();
+ ExternalDatasetTestUtils.createBinaryFiles(PARQUET_RAW_DATA_PATH);
LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, testExecutor);
setNcEndpoints(testExecutor);
createBlobServiceClient();
@@ -171,7 +177,7 @@
private static void createBlobServiceClient() {
LOGGER.info("Creating Azurite Blob Service client");
- blobServiceClient = new BlobServiceClientBuilder().connectionString(connectionString).buildClient();
+ blobServiceClient = new BlobServiceClientBuilder().connectionString(CONNECTION_STRING).buildClient();
LOGGER.info("Azurite Blob Service client created successfully");
// Generate the SAS token for the SAS test cases
@@ -218,6 +224,10 @@
LOGGER.info("Loading " + OVER_1000_OBJECTS_COUNT + " into " + OVER_1000_OBJECTS_PATH);
loadLargeNumberOfFiles();
LOGGER.info("Added " + OVER_1000_OBJECTS_COUNT + " files into " + OVER_1000_OBJECTS_PATH + " successfully");
+
+ LOGGER.info("Adding Parquet files to the bucket");
+ loadParquetFiles();
+ LOGGER.info("Parquet files added successfully");
}
/**
@@ -322,6 +332,21 @@
loadGzData(dataBasePath, "", "02.tsv", definition, definitionSegment, false);
}
+ private static void loadParquetFiles() {
+ String dataBasePath = BINARY_GEN_BASEDIR;
+ String definition = PARQUET_DEFINITION;
+
+ // Normal format
+ String definitionSegment = "";
+ loadData(dataBasePath, "", "dummy_tweet.parquet", definition, definitionSegment, false, false);
+ loadData(dataBasePath, "", "id_age.parquet", definition, definitionSegment, false, false);
+ loadData(dataBasePath, "", "id_age-string.parquet", definition, definitionSegment, false, false);
+ loadData(dataBasePath, "", "id_name.parquet", definition, definitionSegment, false, false);
+ loadData(dataBasePath, "", "id_name_comment.parquet", definition, definitionSegment, false, false);
+ loadData(dataBasePath, "", "heterogeneous_1.parquet", definition, definitionSegment, false, false);
+ loadData(dataBasePath, "", "heterogeneous_2.parquet", definition, definitionSegment, false, false);
+ }
+
private static void loadData(String fileBasePath, String filePathSegment, String filename, String definition,
String definitionSegment, boolean removeExtension) {
loadData(fileBasePath, filePathSegment, filename, definition, definitionSegment, removeExtension, true);
@@ -514,6 +539,7 @@
static class AzureTestExecutor extends TestExecutor {
+ @Override
public void executeTestFile(TestCaseContext testCaseCtx, TestFileContext ctx, Map<String, Object> variableCtx,
String statement, boolean isDmlRecoveryTest, ProcessBuilder pb, TestCase.CompilationUnit cUnit,
MutableInt queryCount, List<TestFileContext> expectedResultFileCtxs, File testFile, String actualPath)
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_azure_blob_storage.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_azure_blob_storage.xml
index b732c55..d30d505 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_azure_blob_storage.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_azure_blob_storage.xml
@@ -138,6 +138,14 @@
<output-dir compare="Text">common/tsv/mixed</output-dir>
</compilation-unit>
</test-case>
+ <!-- Parquet Tests Start -->
+ <test-case FilePath="external-dataset">
+ <compilation-unit name="common/parquet/object-concat">
+ <placeholder name="adapter" value="AZUREBLOB" />
+ <output-dir compare="Text">common/parquet/object-concat</output-dir>
+ </compilation-unit>
+ </test-case>
+ <!-- Parquet Tests End -->
<test-case FilePath="external-dataset">
<compilation-unit name="common/empty-string-definition">
<placeholder name="adapter" value="AZUREBLOB" />
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index ecec240..893fe49 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -247,6 +247,7 @@
VIEW_EXISTS(1160),
UNSUPPORTED_TYPE_FOR_PARQUET(1161),
INVALID_PRIMARY_KEY_DEFINITION(1162),
+ UNSUPPORTED_AUTH_METHOD(1163),
// Feed errors
DATAFLOW_ILLEGAL_STATE(3001),
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index e964531..6f673de 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -249,6 +249,7 @@
1160 = A view with this name %1$s already exists
1161 = Type '%1$s' contains declared fields, which is not supported for 'parquet' format
1162 = Invalid primary key definition
+1163 = Authenticating with '%1$s' is not supported for '%2$s' format
# Feed Errors
3001 = Illegal state.
diff --git a/asterixdb/asterix-external-data/pom.xml b/asterixdb/asterix-external-data/pom.xml
index ee8dcfd..a6684d3 100644
--- a/asterixdb/asterix-external-data/pom.xml
+++ b/asterixdb/asterix-external-data/pom.xml
@@ -495,5 +495,9 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-azure</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStreamFactory.java
index e71d954..b9e46a1 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStreamFactory.java
@@ -18,31 +18,22 @@
*/
package org.apache.asterix.external.input.record.reader.azure;
-import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
-import java.util.function.BiPredicate;
-import java.util.regex.Matcher;
-import org.apache.asterix.common.exceptions.CompilationException;
-import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
-import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.application.IServiceContext;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
-import org.apache.hyracks.api.exceptions.Warning;
-import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.models.BlobItem;
-import com.azure.storage.blob.models.ListBlobsOptions;
public class AzureBlobInputStreamFactory extends AbstractExternalInputStreamFactory {
@@ -58,64 +49,15 @@
throws AlgebricksException {
super.configure(ctx, configuration, warningCollector);
- String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
-
- List<BlobItem> filesOnly = new ArrayList<>();
-
// Ensure the validity of include/exclude
ExternalDataUtils.validateIncludeExclude(configuration);
-
+ IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration);
BlobServiceClient blobServiceClient = ExternalDataUtils.Azure.buildAzureClient(configuration);
- BlobContainerClient blobContainer;
- try {
- blobContainer = blobServiceClient.getBlobContainerClient(container);
+ List<BlobItem> filesOnly = ExternalDataUtils.Azure.listBlobItem(blobServiceClient, configuration,
+ includeExcludeMatcher, warningCollector);
- // Get all objects in a container and extract the paths to files
- ListBlobsOptions listBlobsOptions = new ListBlobsOptions();
- listBlobsOptions.setPrefix(ExternalDataUtils.getPrefix(configuration));
- Iterable<BlobItem> blobItems = blobContainer.listBlobs(listBlobsOptions, null);
-
- // Collect the paths to files only
- IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration);
- collectAndFilterFiles(blobItems, includeExcludeMatcher.getPredicate(),
- includeExcludeMatcher.getMatchersList(), filesOnly);
-
- // Warn if no files are returned
- if (filesOnly.isEmpty() && warningCollector.shouldWarn()) {
- Warning warning = Warning.of(null, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
- warningCollector.warn(warning);
- }
-
- // Distribute work load amongst the partitions
- distributeWorkLoad(filesOnly, getPartitionsCount());
- } catch (Exception ex) {
- throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex.getMessage());
- }
- }
-
- /**
- * Collects and filters the files only, and excludes any folders
- *
- * @param items storage items
- * @param predicate predicate to test with for file filtration
- * @param matchers include/exclude matchers to test against
- * @param filesOnly List containing the files only (excluding folders)
- */
- private void collectAndFilterFiles(Iterable<BlobItem> items, BiPredicate<List<Matcher>, String> predicate,
- List<Matcher> matchers, List<BlobItem> filesOnly) {
- for (BlobItem item : items) {
- String uri = item.getName();
-
- // skip folders
- if (uri.endsWith("/")) {
- continue;
- }
-
- // No filter, add file
- if (predicate.test(matchers, uri)) {
- filesOnly.add(item);
- }
- }
+ // Distribute work load amongst the partitions
+ distributeWorkLoad(filesOnly, getPartitionsCount());
}
/**
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java
new file mode 100644
index 0000000..4d1311c
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.azure.parquet;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.external.input.HDFSDataSourceFactory;
+import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+
+import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.models.BlobItem;
+
+public class AzureBlobParquetReaderFactory extends HDFSDataSourceFactory {
+ private static final long serialVersionUID = -6140824803254158253L;
+ private static final List<String> recordReaderNames =
+ Collections.singletonList(ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_BLOB);
+
+ @Override
+ public void configure(IServiceContext serviceCtx, Map<String, String> configuration,
+ IWarningCollector warningCollector) throws AlgebricksException, HyracksDataException {
+ //We need to the client to parse connectionString
+ BlobServiceClient blobServiceClient = ExternalDataUtils.Azure.buildAzureClient(configuration);
+ //Get endpoint
+ String endPoint = extractEndPoint(blobServiceClient.getAccountUrl());
+ //Get path
+ String path = buildPathURIs(configuration, warningCollector, blobServiceClient, endPoint);
+ //Put Azure configurations to AsterixDB's Hadoop configuration
+ putAzureBlobConfToHadoopConf(configuration, path);
+
+ //Configure Hadoop Azure input splits
+ JobConf conf = createHdfsConf(serviceCtx, configuration, warningCollector.shouldWarn());
+ ExternalDataUtils.Azure.configureAzureHdfsJobConf(conf, configuration, endPoint);
+ configureHdfsConf(conf, configuration);
+ }
+
+ @Override
+ public List<String> getRecordReaderNames() {
+ return recordReaderNames;
+ }
+
+ @Override
+ public Set<String> getReaderSupportedFormats() {
+ return Collections.singleton(ExternalDataConstants.FORMAT_PARQUET);
+ }
+
+ /**
+ * Prepare Hadoop configurations to read parquet files
+ *
+ * @param path Comma-delimited paths
+ */
+ private static void putAzureBlobConfToHadoopConf(Map<String, String> configuration, String path) {
+ configuration.put(ExternalDataConstants.KEY_PATH, path);
+ configuration.put(ExternalDataConstants.KEY_INPUT_FORMAT, ExternalDataConstants.INPUT_FORMAT_PARQUET);
+ configuration.put(ExternalDataConstants.KEY_PARSER, ExternalDataConstants.FORMAT_NOOP);
+ }
+
+ /**
+ * Build Azure Blob Storage path-style for the requested files
+ *
+ * @param configuration properties
+ * @param warningCollector warning collector
+ * @return Comma-delimited paths (e.g., "wasbs://container@accountName.blob.core.windows.net/file1.parquet,
+ * wasbs://container@accountName.blob.core.windows.net/file2.parquet")
+ * @throws CompilationException Compilation exception
+ */
+ private static String buildPathURIs(Map<String, String> configuration, IWarningCollector warningCollector,
+ BlobServiceClient blobServiceClient, String endPoint) throws CompilationException {
+ IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration);
+ List<BlobItem> filesOnly = ExternalDataUtils.Azure.listBlobItem(blobServiceClient, configuration,
+ includeExcludeMatcher, warningCollector);
+
+ StringBuilder builder = new StringBuilder();
+ String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
+
+ if (!filesOnly.isEmpty()) {
+ appendFileURI(builder, container, endPoint, filesOnly.get(0));
+ for (int i = 1; i < filesOnly.size(); i++) {
+ builder.append(',');
+ appendFileURI(builder, container, endPoint, filesOnly.get(i));
+ }
+ }
+
+ return builder.toString();
+ }
+
+ private static String extractEndPoint(String uri) {
+ //The URI is in the form http(s)://<accountName>.blob.core.windows.net
+ //We need to Remove the protocol (i.e., http(s)://) from the URI
+ return uri.substring(uri.indexOf("//") + "//".length());
+ }
+
+ private String extractEndPointForEmulator(String uri) {
+ String emulatorURI = extractEndPoint(uri);
+ return emulatorURI.substring(0, emulatorURI.indexOf('/'));
+ }
+
+ private static void appendFileURI(StringBuilder builder, String container, String endPoint, BlobItem file) {
+ builder.append(ExternalDataConstants.AzureBlob.HADOOP_AZURE_BLOB_PROTOCOL);
+ builder.append("://");
+ builder.append(container);
+ builder.append('@');
+ builder.append(endPoint);
+ builder.append('/');
+ builder.append(file.getName());
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index f7d9de2..546e390 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -349,12 +349,28 @@
throw new AssertionError("do not instantiate");
}
+ //ConnectionString prefixes
+ public static final String ACCOUNT_KEY_PREFIX = "AccountKey=";
+ public static final String SAS_KEY_PREFIX = "SharedAccessSignature=";
+
+ /*
+ * Asterix Configuration Keys
+ */
public static final String CONNECTION_STRING_FIELD_NAME = "connectionString";
public static final String TENANT_ID_FIELD_NAME = "tenantId";
public static final String CLIENT_ID_FIELD_NAME = "clientId";
public static final String CLIENT_SECRET_FIELD_NAME = "clientSecret";
public static final String CLIENT_CERTIFICATE_FIELD_NAME = "clientCertificate";
public static final String CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME = "clientCertificatePassword";
+
+ /*
+ * Hadoop-Azure
+ */
+ //Used when accountName and accessKey are provided
+ public static final String HADOOP_AZURE_FS_ACCOUNT_KEY = "fs.azure.account.key";
+ //Used when a connectionString is provided
+ public static final String HADOOP_AZURE_FS_SAS = "fs.azure.sas";
+ public static final String HADOOP_AZURE_BLOB_PROTOCOL = "wasbs";
}
public static class GCS {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index dcd8f9a..754aecf 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -36,11 +36,16 @@
import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.HADOOP_SESSION_TOKEN;
import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.HADOOP_TEMP_ACCESS;
import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3.SECRET_ACCESS_KEY_FIELD_NAME;
+import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.ACCOUNT_KEY_PREFIX;
import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.CLIENT_CERTIFICATE_FIELD_NAME;
import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.CLIENT_CERTIFICATE_PASSWORD_FIELD_NAME;
import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.CLIENT_ID_FIELD_NAME;
import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.CLIENT_SECRET_FIELD_NAME;
import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.CONNECTION_STRING_FIELD_NAME;
+import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.HADOOP_AZURE_BLOB_PROTOCOL;
+import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.HADOOP_AZURE_FS_ACCOUNT_KEY;
+import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.HADOOP_AZURE_FS_SAS;
+import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.SAS_KEY_PREFIX;
import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.TENANT_ID_FIELD_NAME;
import static org.apache.asterix.external.util.ExternalDataConstants.GCS.JSON_CREDENTIALS_FIELD_NAME;
import static org.apache.asterix.external.util.ExternalDataConstants.KEY_ADAPTER_NAME_GCS;
@@ -1247,6 +1252,10 @@
} catch (Exception ex) {
throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex.getMessage());
}
+ } else if (isParquetFormat(configuration)) {
+ //TODO(wail) support AAD for parquet
+ throw new CompilationException(ErrorCode.UNSUPPORTED_AUTH_METHOD, "Azure Active Directory",
+ ExternalDataConstants.FORMAT_PARQUET);
}
// Active Directory authentication
@@ -1326,6 +1335,66 @@
}
}
+ public static List<BlobItem> listBlobItem(BlobServiceClient blobServiceClient,
+ Map<String, String> configuration, IncludeExcludeMatcher includeExcludeMatcher,
+ IWarningCollector warningCollector) throws CompilationException {
+ String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
+
+ List<BlobItem> filesOnly = new ArrayList<>();
+
+ // Ensure the validity of include/exclude
+ ExternalDataUtils.validateIncludeExclude(configuration);
+
+ BlobContainerClient blobContainer;
+ try {
+ blobContainer = blobServiceClient.getBlobContainerClient(container);
+
+ // Get all objects in a container and extract the paths to files
+ ListBlobsOptions listBlobsOptions = new ListBlobsOptions();
+ listBlobsOptions.setPrefix(ExternalDataUtils.getPrefix(configuration));
+ Iterable<BlobItem> blobItems = blobContainer.listBlobs(listBlobsOptions, null);
+
+ // Collect the paths to files only
+ collectAndFilterFiles(blobItems, includeExcludeMatcher.getPredicate(),
+ includeExcludeMatcher.getMatchersList(), filesOnly);
+
+ // Warn if no files are returned
+ if (filesOnly.isEmpty() && warningCollector.shouldWarn()) {
+ Warning warning = Warning.of(null, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
+ warningCollector.warn(warning);
+ }
+ } catch (Exception ex) {
+ throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex.getMessage());
+ }
+
+ return filesOnly;
+ }
+
+ /**
+ * Collects and filters the files only, and excludes any folders
+ *
+ * @param items storage items
+ * @param predicate predicate to test with for file filtration
+ * @param matchers include/exclude matchers to test against
+ * @param filesOnly List containing the files only (excluding folders)
+ */
+ private static void collectAndFilterFiles(Iterable<BlobItem> items,
+ BiPredicate<List<Matcher>, String> predicate, List<Matcher> matchers, List<BlobItem> filesOnly) {
+ for (BlobItem item : items) {
+ String uri = item.getName();
+
+ // skip folders
+ if (uri.endsWith("/")) {
+ continue;
+ }
+
+ // No filter, add file
+ if (predicate.test(matchers, uri)) {
+ filesOnly.add(item);
+ }
+ }
+ }
+
/**
* Validate external dataset properties
*
@@ -1364,6 +1433,49 @@
throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex.getMessage());
}
}
+
+ /**
+ * Builds the Azure Blob storage client using the provided configuration
+ *
+ * @param configuration properties
+ * @see <a href="https://docs.microsoft.com/en-us/azure/databricks/data/data-sources/azure/azure-storage">Azure Blob storage</a>
+ */
+ public static void configureAzureHdfsJobConf(JobConf conf, Map<String, String> configuration, String endPoint) {
+ String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
+ String connectionString = configuration.get(CONNECTION_STRING_FIELD_NAME);
+
+ //Disable caching S3 FileSystem
+ HDFSUtils.disableHadoopFileSystemCache(conf, HADOOP_AZURE_BLOB_PROTOCOL);
+
+ //Key for Hadoop configuration
+ StringBuilder hadoopKey = new StringBuilder();
+ //Value for Hadoop configuration
+ String hadoopValue;
+ if (connectionString != null) {
+ if (connectionString.contains(ACCOUNT_KEY_PREFIX)) {
+ hadoopKey.append(HADOOP_AZURE_FS_ACCOUNT_KEY).append('.');
+ //Set only the AccountKey
+ hadoopValue = extractKey(ACCOUNT_KEY_PREFIX, connectionString);
+ } else {
+ //Use SAS for Hadoop FS as connectionString is provided
+ hadoopKey.append(HADOOP_AZURE_FS_SAS).append('.');
+ //Setting the container is required for SAS
+ hadoopKey.append(container).append('.');
+ //Set the connection string for SAS
+ hadoopValue = extractKey(SAS_KEY_PREFIX, connectionString);
+ }
+ //Set the endPoint, which includes the AccountName
+ hadoopKey.append(endPoint);
+ //Tells Hadoop we are reading from Blob Storage
+ conf.set(hadoopKey.toString(), hadoopValue);
+ }
+ }
+
+ private static String extractKey(String prefix, String connectionString) {
+ int start = connectionString.indexOf(prefix) + prefix.length();
+ int end = connectionString.indexOf(';', start);
+ return connectionString.substring(start, end > 0 ? end : connectionString.length());
+ }
}
public static class GCS {
diff --git a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
index b2d634b..7d3f901 100644
--- a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
+++ b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
@@ -24,3 +24,4 @@
org.apache.asterix.external.input.record.reader.azure.AzureBlobReaderFactory
org.apache.asterix.external.input.record.reader.aws.parquet.AwsS3ParquetReaderFactory
org.apache.asterix.external.input.record.reader.gcs.GCSReaderFactory
+org.apache.asterix.external.input.record.reader.azure.parquet.AzureBlobParquetReaderFactory
diff --git a/asterixdb/asterix-server/pom.xml b/asterixdb/asterix-server/pom.xml
index b6f8f8a..d33004f 100644
--- a/asterixdb/asterix-server/pom.xml
+++ b/asterixdb/asterix-server/pom.xml
@@ -239,6 +239,10 @@
<url>https://raw.githubusercontent.com/AzureAD/microsoft-authentication-extensions-for-java/1.1.0/LICENSE</url>
</override>
<override>
+ <gav>com.microsoft.azure:azure-keyvault-core:1.2.4</gav>
+ <url>https://raw.githubusercontent.com/Azure/azure-sdk-for-java/main/sdk/keyvault/LICENSE</url>
+ </override>
+ <override>
<gav>xpp3:xpp3:1.1.3.3</gav>
<url>https://raw.githubusercontent.com/aslom/xpp3/master/LICENSE.txt</url>
</override>
diff --git a/asterixdb/pom.xml b/asterixdb/pom.xml
index 75a80b9..df93317 100644
--- a/asterixdb/pom.xml
+++ b/asterixdb/pom.xml
@@ -92,6 +92,8 @@
<hadoop-awsjavasdk.version>1.12.1</hadoop-awsjavasdk.version>
<azurejavasdk.version>12.12.0</azurejavasdk.version>
<gcsjavasdk.version>1.114.0</gcsjavasdk.version>
+ <hadoop-azuresdk.version>8.6.6</hadoop-azuresdk.version>
+
<implementation.title>Apache AsterixDB - ${project.name}</implementation.title>
<implementation.url>https://asterixdb.apache.org/</implementation.url>
<implementation.version>${project.version}</implementation.version>
@@ -1813,6 +1815,25 @@
</exclusions>
</dependency>
<!-- Hadoop AWS end -->
+ <!-- Hadoop Azure start -->
+ <dependency>
+ <!-- Pick a newer Azure connector -->
+ <groupId>com.microsoft.azure</groupId>
+ <artifactId>azure-storage</artifactId>
+ <version>${hadoop-azuresdk.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-azure</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <!-- Hadoop Azure end -->
</dependencies>
</dependencyManagement>
diff --git a/asterixdb/src/main/appended-resources/supplemental-models.xml b/asterixdb/src/main/appended-resources/supplemental-models.xml
index cd5ccf1..3f0d869 100644
--- a/asterixdb/src/main/appended-resources/supplemental-models.xml
+++ b/asterixdb/src/main/appended-resources/supplemental-models.xml
@@ -744,6 +744,32 @@
</supplement>
<!-- Azure SDK for Java end -->
+ <!-- Azure SDK for Hadoop start -->
+ <supplement>
+ <project>
+ <groupId>com.microsoft.azure</groupId>
+ <artifactId>azure-storage</artifactId>
+ <properties>
+ <license.ignoreMissingEmbeddedLicense>8.6.6</license.ignoreMissingEmbeddedLicense>
+ <license.ignoreMissingEmbeddedNotice>8.6.6</license.ignoreMissingEmbeddedNotice>
+ <license.ignoreLicenseOverride>8.6.6</license.ignoreLicenseOverride>
+ </properties>
+ </project>
+ </supplement>
+
+ <supplement>
+ <project>
+ <groupId>com.microsoft.azure</groupId>
+ <artifactId>azure-keyvault-core</artifactId>
+ <properties>
+ <license.ignoreMissingEmbeddedLicense>1.2.4</license.ignoreMissingEmbeddedLicense>
+ <license.ignoreMissingEmbeddedNotice>1.2.4</license.ignoreMissingEmbeddedNotice>
+ <license.ignoreLicenseOverride>1.2.4</license.ignoreLicenseOverride>
+ </properties>
+ </project>
+ </supplement>
+ <!-- Azure SDK for Hadoop end -->
+
<!-- jackson-dataformat-cbor does not contain embedded LICENSE and NOTICE -->
<!-- See https://github.com/FasterXML/jackson-modules-java8 -->
<supplement>
@@ -1638,5 +1664,4 @@
</properties>
</project>
</supplement>
-
</supplementalDataModels>
diff --git a/asterixdb/src/main/licenses/content/raw.githubusercontent.com_Azure_azure-sdk-for-java_main_sdk_keyvault_LICENSE.txt b/asterixdb/src/main/licenses/content/raw.githubusercontent.com_Azure_azure-sdk-for-java_main_sdk_keyvault_LICENSE.txt
new file mode 100644
index 0000000..d1ca00f
--- /dev/null
+++ b/asterixdb/src/main/licenses/content/raw.githubusercontent.com_Azure_azure-sdk-for-java_main_sdk_keyvault_LICENSE.txt
@@ -0,0 +1,21 @@
+ MIT License
+
+ Copyright (c) Microsoft Corporation. All rights reserved.
+
+ Permission is hereby granted, free of charge, to any person obtaining a copy
+ of this software and associated documentation files (the "Software"), to deal
+ in the Software without restriction, including without limitation the rights
+ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ copies of the Software, and to permit persons to whom the Software is
+ furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included in all
+ copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ SOFTWARE
\ No newline at end of file