[ASTERIXDB-2994][EXT]: Fix broken parquet format for azure datalake
Change-Id: I990c4b749d43ebafa4f25450b05c9f27dfd5e632
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/14363
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wael Alkowaileet <wael.y.k@gmail.com>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStreamFactory.java
index 3a9ab1c..bf904a4 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStreamFactory.java
@@ -53,7 +53,7 @@
ExternalDataUtils.validateIncludeExclude(configuration);
IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration);
BlobServiceClient blobServiceClient = ExternalDataUtils.Azure.buildAzureBlobClient(configuration);
- List<BlobItem> filesOnly = ExternalDataUtils.Azure.listBlobItem(blobServiceClient, configuration,
+ List<BlobItem> filesOnly = ExternalDataUtils.Azure.listBlobItems(blobServiceClient, configuration,
includeExcludeMatcher, warningCollector);
// Distribute work load amongst the partitions
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStreamFactory.java
index 6bb103b..e145e1f 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStreamFactory.java
@@ -18,34 +18,21 @@
*/
package org.apache.asterix.external.input.record.reader.azure.datalake;
-import static org.apache.asterix.external.util.ExternalDataConstants.Azure.RECURSIVE_FIELD_NAME;
-import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
-
-import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
-import java.util.function.BiPredicate;
-import java.util.regex.Matcher;
-import org.apache.asterix.common.exceptions.CompilationException;
-import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
-import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.application.IServiceContext;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
-import org.apache.hyracks.api.exceptions.Warning;
-import com.azure.core.http.rest.PagedIterable;
-import com.azure.storage.file.datalake.DataLakeFileSystemClient;
import com.azure.storage.file.datalake.DataLakeServiceClient;
-import com.azure.storage.file.datalake.models.ListPathsOptions;
import com.azure.storage.file.datalake.models.PathItem;
public class AzureDataLakeInputStreamFactory extends AbstractExternalInputStreamFactory {
@@ -62,66 +49,15 @@
throws AlgebricksException {
super.configure(ctx, configuration, warningCollector);
- String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
-
- List<PathItem> filesOnly = new ArrayList<>();
-
// Ensure the validity of include/exclude
ExternalDataUtils.validateIncludeExclude(configuration);
-
+ IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration);
DataLakeServiceClient client = ExternalDataUtils.Azure.buildAzureDatalakeClient(configuration);
- DataLakeFileSystemClient fileSystemClient;
- try {
- fileSystemClient = client.getFileSystemClient(container);
+ List<PathItem> filesOnly = ExternalDataUtils.Azure.listDatalakePathItems(client, configuration,
+ includeExcludeMatcher, warningCollector);
- // Get all objects in a container and extract the paths to files
- ListPathsOptions listOptions = new ListPathsOptions();
- boolean recursive = Boolean.parseBoolean(configuration.get(RECURSIVE_FIELD_NAME));
- listOptions.setRecursive(recursive);
- listOptions.setPath(ExternalDataUtils.getPrefix(configuration, false));
- PagedIterable<PathItem> pathItems = fileSystemClient.listPaths(listOptions, null);
-
- // Collect the paths to files only
- IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration);
- collectAndFilterFiles(pathItems, includeExcludeMatcher.getPredicate(),
- includeExcludeMatcher.getMatchersList(), filesOnly);
-
- // Warn if no files are returned
- if (filesOnly.isEmpty() && warningCollector.shouldWarn()) {
- Warning warning = Warning.of(null, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
- warningCollector.warn(warning);
- }
-
- // Distribute work load amongst the partitions
- distributeWorkLoad(filesOnly, getPartitionsCount());
- } catch (Exception ex) {
- throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
- }
- }
-
- /**
- * Collects and filters the files only, and excludes any folders
- *
- * @param items storage items
- * @param predicate predicate to test with for file filtration
- * @param matchers include/exclude matchers to test against
- * @param filesOnly List containing the files only (excluding folders)
- */
- private void collectAndFilterFiles(Iterable<PathItem> items, BiPredicate<List<Matcher>, String> predicate,
- List<Matcher> matchers, List<PathItem> filesOnly) {
- for (PathItem item : items) {
- String uri = item.getName();
-
- // skip folders
- if (uri.endsWith("/")) {
- continue;
- }
-
- // No filter, add file
- if (predicate.test(matchers, uri)) {
- filesOnly.add(item);
- }
- }
+ // Distribute work load amongst the partitions
+ distributeWorkLoad(filesOnly, getPartitionsCount());
}
/**
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java
index ee765ce..1f82dae 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java
@@ -92,7 +92,7 @@
private static String buildPathURIs(Map<String, String> configuration, IWarningCollector warningCollector,
BlobServiceClient blobServiceClient, String endPoint) throws CompilationException {
IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration);
- List<BlobItem> filesOnly = ExternalDataUtils.Azure.listBlobItem(blobServiceClient, configuration,
+ List<BlobItem> filesOnly = ExternalDataUtils.Azure.listBlobItems(blobServiceClient, configuration,
includeExcludeMatcher, warningCollector);
StringBuilder builder = new StringBuilder();
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java
new file mode 100644
index 0000000..8474a74
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.azure.parquet;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.external.input.HDFSDataSourceFactory;
+import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+
+import com.azure.storage.file.datalake.DataLakeServiceClient;
+import com.azure.storage.file.datalake.models.PathItem;
+
+public class AzureDataLakeParquetReaderFactory extends HDFSDataSourceFactory {
+ private static final long serialVersionUID = -6140824803254158253L;
+ private static final List<String> recordReaderNames =
+ Collections.singletonList(ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_DATA_LAKE);
+
+ @Override
+ public void configure(IServiceContext serviceCtx, Map<String, String> configuration,
+ IWarningCollector warningCollector) throws AlgebricksException, HyracksDataException {
+ DataLakeServiceClient dataLakeServiceClient = ExternalDataUtils.Azure.buildAzureDatalakeClient(configuration);
+
+ //Get endpoint
+ String endPoint = extractEndPoint(dataLakeServiceClient.getAccountUrl());
+
+ //Get path
+ String path = buildPathURIs(configuration, warningCollector, dataLakeServiceClient, endPoint);
+
+ //Put Azure configurations to AsterixDB's Hadoop configuration
+ putAzureDataLakeConfToHadoopConf(configuration, path);
+
+ //Configure Hadoop Azure input splits
+ JobConf conf = createHdfsConf(serviceCtx, configuration);
+ ExternalDataUtils.Azure.configureAzureHdfsJobConf(conf, configuration, endPoint);
+ configureHdfsConf(conf, configuration);
+ }
+
+ @Override
+ public List<String> getRecordReaderNames() {
+ return recordReaderNames;
+ }
+
+ @Override
+ public Set<String> getReaderSupportedFormats() {
+ return Collections.singleton(ExternalDataConstants.FORMAT_PARQUET);
+ }
+
+ /**
+ * Prepare Hadoop configurations to read parquet files
+ *
+ * @param path Comma-delimited paths
+ */
+ private static void putAzureDataLakeConfToHadoopConf(Map<String, String> configuration, String path) {
+ configuration.put(ExternalDataConstants.KEY_PATH, path);
+ configuration.put(ExternalDataConstants.KEY_INPUT_FORMAT, ExternalDataConstants.INPUT_FORMAT_PARQUET);
+ configuration.put(ExternalDataConstants.KEY_PARSER, ExternalDataConstants.FORMAT_NOOP);
+ }
+
+ /**
+ * Build Azure Datalake Storage path-style for the requested files
+ *
+ * @param configuration properties
+ * @param warningCollector warning collector
+ * @return Comma-delimited paths (e.g., "abfss://<container-name>@<accountName>.dfs.core.windows.net/file1.parquet,
+ * abfss://<container-name>@<accountName>.dfs.core.windows.net//file2.parquet")
+ * @throws CompilationException Compilation exception
+ */
+ private static String buildPathURIs(Map<String, String> configuration, IWarningCollector warningCollector,
+ DataLakeServiceClient dataLakeServiceClient, String endPoint) throws CompilationException {
+ IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration);
+ List<PathItem> filesOnly = ExternalDataUtils.Azure.listDatalakePathItems(dataLakeServiceClient, configuration,
+ includeExcludeMatcher, warningCollector);
+
+ StringBuilder builder = new StringBuilder();
+ String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
+
+ if (!filesOnly.isEmpty()) {
+ appendFileURI(builder, container, endPoint, filesOnly.get(0));
+ for (int i = 1; i < filesOnly.size(); i++) {
+ builder.append(',');
+ appendFileURI(builder, container, endPoint, filesOnly.get(i));
+ }
+ }
+
+ return builder.toString();
+ }
+
+ private static String extractEndPoint(String uri) {
+ //The URI is in the form http(s)://<accountName>.dfs.core.windows.net
+ //We need to Remove the protocol (i.e., http(s)://) from the URI
+ return uri.substring(uri.indexOf("//") + "//".length());
+ }
+
+ private static void appendFileURI(StringBuilder builder, String container, String endPoint, PathItem file) {
+ builder.append(ExternalDataConstants.Azure.HADOOP_AZURE_DATALAKE_PROTOCOL);
+ builder.append("://");
+ builder.append(container);
+ builder.append('@');
+ builder.append(endPoint);
+ builder.append('/');
+ builder.append(file.getName());
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 16a0f66..b462bd9 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -387,6 +387,7 @@
//Used when a connectionString is provided
public static final String HADOOP_AZURE_FS_SAS = "fs.azure.sas";
public static final String HADOOP_AZURE_BLOB_PROTOCOL = "wasbs";
+ public static final String HADOOP_AZURE_DATALAKE_PROTOCOL = "abfss";
}
public static class GCS {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index d1f740e..b38f21d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -51,6 +51,7 @@
import static org.apache.asterix.external.util.ExternalDataConstants.Azure.HADOOP_AZURE_FS_ACCOUNT_KEY;
import static org.apache.asterix.external.util.ExternalDataConstants.Azure.HADOOP_AZURE_FS_SAS;
import static org.apache.asterix.external.util.ExternalDataConstants.Azure.MANAGED_IDENTITY_ID_FIELD_NAME;
+import static org.apache.asterix.external.util.ExternalDataConstants.Azure.RECURSIVE_FIELD_NAME;
import static org.apache.asterix.external.util.ExternalDataConstants.Azure.SHARED_ACCESS_SIGNATURE_FIELD_NAME;
import static org.apache.asterix.external.util.ExternalDataConstants.Azure.TENANT_ID_FIELD_NAME;
import static org.apache.asterix.external.util.ExternalDataConstants.GCS.JSON_CREDENTIALS_FIELD_NAME;
@@ -127,6 +128,7 @@
import org.apache.hyracks.util.StorageUtil;
import com.azure.core.credential.AzureSasCredential;
+import com.azure.core.http.rest.PagedIterable;
import com.azure.identity.ClientCertificateCredentialBuilder;
import com.azure.identity.ClientSecretCredentialBuilder;
import com.azure.identity.ManagedIdentityCredentialBuilder;
@@ -1564,7 +1566,7 @@
}
}
- public static List<BlobItem> listBlobItem(BlobServiceClient blobServiceClient,
+ public static List<BlobItem> listBlobItems(BlobServiceClient blobServiceClient,
Map<String, String> configuration, IncludeExcludeMatcher includeExcludeMatcher,
IWarningCollector warningCollector) throws CompilationException {
String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
@@ -1584,7 +1586,7 @@
Iterable<BlobItem> blobItems = blobContainer.listBlobs(listBlobsOptions, null);
// Collect the paths to files only
- collectAndFilterFiles(blobItems, includeExcludeMatcher.getPredicate(),
+ collectAndFilterBlobFiles(blobItems, includeExcludeMatcher.getPredicate(),
includeExcludeMatcher.getMatchersList(), filesOnly);
// Warn if no files are returned
@@ -1607,7 +1609,7 @@
* @param matchers include/exclude matchers to test against
* @param filesOnly List containing the files only (excluding folders)
*/
- private static void collectAndFilterFiles(Iterable<BlobItem> items,
+ private static void collectAndFilterBlobFiles(Iterable<BlobItem> items,
BiPredicate<List<Matcher>, String> predicate, List<Matcher> matchers, List<BlobItem> filesOnly) {
for (BlobItem item : items) {
String uri = item.getName();
@@ -1624,6 +1626,68 @@
}
}
+ public static List<PathItem> listDatalakePathItems(DataLakeServiceClient client,
+ Map<String, String> configuration, IncludeExcludeMatcher includeExcludeMatcher,
+ IWarningCollector warningCollector) throws CompilationException {
+ String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
+
+ List<PathItem> filesOnly = new ArrayList<>();
+
+ // Ensure the validity of include/exclude
+ ExternalDataUtils.validateIncludeExclude(configuration);
+
+ DataLakeFileSystemClient fileSystemClient;
+ try {
+ fileSystemClient = client.getFileSystemClient(container);
+
+ // Get all objects in a container and extract the paths to files
+ ListPathsOptions listOptions = new ListPathsOptions();
+ boolean recursive = Boolean.parseBoolean(configuration.get(RECURSIVE_FIELD_NAME));
+ listOptions.setRecursive(recursive);
+ listOptions.setPath(ExternalDataUtils.getPrefix(configuration, false));
+ PagedIterable<PathItem> pathItems = fileSystemClient.listPaths(listOptions, null);
+
+ // Collect the paths to files only
+ collectAndFilterDatalakeFiles(pathItems, includeExcludeMatcher.getPredicate(),
+ includeExcludeMatcher.getMatchersList(), filesOnly);
+
+ // Warn if no files are returned
+ if (filesOnly.isEmpty() && warningCollector.shouldWarn()) {
+ Warning warning = Warning.of(null, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
+ warningCollector.warn(warning);
+ }
+ } catch (Exception ex) {
+ throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
+ }
+
+ return filesOnly;
+ }
+
+ /**
+ * Collects and filters the files only, and excludes any folders
+ *
+ * @param items storage items
+ * @param predicate predicate to test with for file filtration
+ * @param matchers include/exclude matchers to test against
+ * @param filesOnly List containing the files only (excluding folders)
+ */
+ private static void collectAndFilterDatalakeFiles(Iterable<PathItem> items,
+ BiPredicate<List<Matcher>, String> predicate, List<Matcher> matchers, List<PathItem> filesOnly) {
+ for (PathItem item : items) {
+ String uri = item.getName();
+
+ // skip folders
+ if (uri.endsWith("/")) {
+ continue;
+ }
+
+ // No filter, add file
+ if (predicate.test(matchers, uri)) {
+ filesOnly.add(item);
+ }
+ }
+ }
+
/**
* Validate external dataset properties
*
@@ -1677,12 +1741,6 @@
throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, srcLoc, ExternalDataConstants.KEY_FORMAT);
}
- // parquet is not supported for azure datalake
- if (isParquetFormat(configuration)) {
- throw new CompilationException(INVALID_REQ_PARAM_VAL, srcLoc, KEY_FORMAT,
- configuration.get(KEY_FORMAT));
- }
-
validateIncludeExclude(configuration);
// Check if the bucket is present
diff --git a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
index d551c5b..dceed82 100644
--- a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
+++ b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
@@ -25,4 +25,5 @@
org.apache.asterix.external.input.record.reader.gcs.GCSReaderFactory
org.apache.asterix.external.input.record.reader.azure.blob.AzureBlobReaderFactory
org.apache.asterix.external.input.record.reader.azure.datalake.AzureDataLakeReaderFactory
-org.apache.asterix.external.input.record.reader.azure.parquet.AzureBlobParquetReaderFactory
\ No newline at end of file
+org.apache.asterix.external.input.record.reader.azure.parquet.AzureBlobParquetReaderFactory
+org.apache.asterix.external.input.record.reader.azure.parquet.AzureDataLakeParquetReaderFactory
\ No newline at end of file