[ASTERIXDB-3248][EXT]: Refactor + use dynamic prefixes in azure + gcs
Change-Id: I5be18503f01808d1d31a0995c69b452f6896b9b3
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17732
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Hussain Towaileb <hussainht@gmail.com>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
index 045b746..e99c0ef 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
@@ -52,18 +52,15 @@
IExternalFilterEvaluatorFactory filterEvaluatorFactory) throws AlgebricksException, HyracksDataException {
super.configure(ctx, configuration, warningCollector, filterEvaluatorFactory);
- // Ensure the validity of include/exclude
- ExternalDataUtils.validateIncludeExclude(configuration);
+ // get include/exclude matchers
IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration);
- IExternalFilterEvaluator evaluator = filterEvaluatorFactory.create(ctx, warningCollector);
// prepare prefix for computed field calculations
+ IExternalFilterEvaluator evaluator = filterEvaluatorFactory.create(ctx, warningCollector);
ExternalDataPrefix externalDataPrefix = new ExternalDataPrefix(configuration, warningCollector);
configuration.put(ExternalDataPrefix.PREFIX_ROOT_FIELD_NAME, externalDataPrefix.getRoot());
- // TODO(htowaileb): Since we're using the root to load the files then start filtering, it might end up being
- // very expensive since at the root of the prefix we might load millions of files, we should consider (when
- // possible) to get the value and add it
+ // get the items
List<S3Object> filesOnly = S3Utils.listS3Objects(configuration, includeExcludeMatcher, warningCollector,
externalDataPrefix, evaluator);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
index bccb6f8..88aeb1b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.external.input.record.reader.aws.parquet;
+import static org.apache.asterix.external.util.aws.s3.S3Utils.configureAwsS3HdfsJobConf;
+import static org.apache.asterix.external.util.aws.s3.S3Utils.listS3Objects;
import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
import java.util.Collections;
@@ -25,7 +27,6 @@
import java.util.Map;
import java.util.Set;
-import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.common.external.IExternalFilterEvaluator;
@@ -36,7 +37,6 @@
import org.apache.asterix.external.util.ExternalDataPrefix;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.external.util.aws.s3.S3Constants;
-import org.apache.asterix.external.util.aws.s3.S3Utils;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.application.IServiceContext;
@@ -59,23 +59,33 @@
IWarningCollector warningCollector, IExternalFilterEvaluatorFactory filterEvaluatorFactory)
throws AlgebricksException, HyracksDataException {
- // prepare prefix for computed field calculations
- ExternalDataPrefix externalDataPrefix = new ExternalDataPrefix(configuration, warningCollector);
- IExternalFilterEvaluator evaluator = filterEvaluatorFactory.create(serviceCtx, warningCollector);
- configuration.put(ExternalDataPrefix.PREFIX_ROOT_FIELD_NAME, externalDataPrefix.getRoot());
+ // get path
+ String path;
+ if (configuration.containsKey(ExternalDataConstants.KEY_PATH)) {
+ path = configuration.get(ExternalDataConstants.KEY_PATH);
+ } else {
+ // get include/exclude matchers
+ IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration);
- //Get path
- String path = configuration.containsKey(ExternalDataConstants.KEY_PATH)
- ? configuration.get(ExternalDataConstants.KEY_PATH)
- : buildPathURIs(configuration, warningCollector, externalDataPrefix, evaluator);
- //Put S3 configurations to AsterixDB's Hadoop configuration
+ // prepare prefix for computed field calculations
+ IExternalFilterEvaluator evaluator = filterEvaluatorFactory.create(serviceCtx, warningCollector);
+ ExternalDataPrefix externalDataPrefix = new ExternalDataPrefix(configuration, warningCollector);
+ configuration.put(ExternalDataPrefix.PREFIX_ROOT_FIELD_NAME, externalDataPrefix.getRoot());
+
+ String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
+ List<S3Object> filesOnly = listS3Objects(configuration, includeExcludeMatcher, warningCollector,
+ externalDataPrefix, evaluator);
+ path = buildPathURIs(container, filesOnly);
+ }
+
+ // put S3 configurations to AsterixDB's Hadoop configuration
putS3ConfToHadoopConf(configuration, path);
//Configure Hadoop S3 input splits
try {
JobConf conf = createHdfsConf(serviceCtx, configuration);
int numberOfPartitions = getPartitionConstraint().getLocations().length;
- S3Utils.configureAwsS3HdfsJobConf(conf, configuration, numberOfPartitions);
+ configureAwsS3HdfsJobConf(conf, configuration, numberOfPartitions);
configureHdfsConf(conf, configuration);
} catch (SdkException | SdkBaseException ex) {
throw new RuntimeDataException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
@@ -112,18 +122,12 @@
/**
* Build S3 path-style for the requested files
*
- * @param configuration properties
- * @param warningCollector warning collector
+ * @param container container
+ * @param filesOnly files
+ *
* @return Comma-delimited paths (e.g., "s3a://bucket/file1.parquet,s3a://bucket/file2.parquet")
- * @throws CompilationException Compilation exception
*/
- private static String buildPathURIs(Map<String, String> configuration, IWarningCollector warningCollector,
- ExternalDataPrefix externalDataPrefix, IExternalFilterEvaluator evaluator)
- throws CompilationException, HyracksDataException {
- String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
- IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration);
- List<S3Object> filesOnly = S3Utils.listS3Objects(configuration, includeExcludeMatcher, warningCollector,
- externalDataPrefix, evaluator);
+ private static String buildPathURIs(String container, List<S3Object> filesOnly) {
StringBuilder builder = new StringBuilder();
if (!filesOnly.isEmpty()) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStreamFactory.java
index 5cd396e..173cc15 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStreamFactory.java
@@ -18,7 +18,6 @@
*/
package org.apache.asterix.external.input.record.reader.azure.blob;
-import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.buildAzureBlobClient;
import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.listBlobItems;
import java.util.Comparator;
@@ -27,9 +26,11 @@
import java.util.PriorityQueue;
import org.apache.asterix.common.api.IApplicationContext;
+import org.apache.asterix.common.external.IExternalFilterEvaluator;
import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
+import org.apache.asterix.external.util.ExternalDataPrefix;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.application.IServiceContext;
@@ -37,7 +38,6 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
-import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.models.BlobItem;
public class AzureBlobInputStreamFactory extends AbstractExternalInputStreamFactory {
@@ -56,14 +56,18 @@
public void configure(IServiceContext ctx, Map<String, String> configuration, IWarningCollector warningCollector,
IExternalFilterEvaluatorFactory filterEvaluatorFactory) throws AlgebricksException, HyracksDataException {
super.configure(ctx, configuration, warningCollector, filterEvaluatorFactory);
-
IApplicationContext appCtx = (IApplicationContext) ctx.getApplicationContext();
- // Ensure the validity of include/exclude
- ExternalDataUtils.validateIncludeExclude(configuration);
+
+ // get include/exclude matchers
IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration);
- BlobServiceClient blobServiceClient = buildAzureBlobClient(appCtx, configuration);
- List<BlobItem> filesOnly =
- listBlobItems(blobServiceClient, configuration, includeExcludeMatcher, warningCollector);
+
+ // prepare prefix for computed field calculations
+ IExternalFilterEvaluator evaluator = filterEvaluatorFactory.create(ctx, warningCollector);
+ ExternalDataPrefix externalDataPrefix = new ExternalDataPrefix(configuration, warningCollector);
+ configuration.put(ExternalDataPrefix.PREFIX_ROOT_FIELD_NAME, externalDataPrefix.getRoot());
+
+ List<BlobItem> filesOnly = listBlobItems(appCtx, configuration, includeExcludeMatcher, warningCollector,
+ externalDataPrefix, evaluator);
// Distribute work load amongst the partitions
distributeWorkLoad(filesOnly, getPartitionsCount());
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStreamFactory.java
index bd2535d..89a1d80 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStreamFactory.java
@@ -58,9 +58,10 @@
super.configure(ctx, configuration, warningCollector, filterEvaluatorFactory);
IApplicationContext appCtx = (IApplicationContext) ctx.getApplicationContext();
- // Ensure the validity of include/exclude
- ExternalDataUtils.validateIncludeExclude(configuration);
+
+ // get include/exclude matchers
IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration);
+
DataLakeServiceClient client = buildAzureDatalakeClient(appCtx, configuration);
List<PathItem> filesOnly =
listDatalakePathItems(client, configuration, includeExcludeMatcher, warningCollector);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java
index 927e74e..cec29d4 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java
@@ -29,11 +29,12 @@
import java.util.Set;
import org.apache.asterix.common.api.IApplicationContext;
-import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.external.IExternalFilterEvaluator;
import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.input.HDFSDataSourceFactory;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher;
import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataPrefix;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -54,15 +55,30 @@
IWarningCollector warningCollector, IExternalFilterEvaluatorFactory filterEvaluatorFactory)
throws AlgebricksException, HyracksDataException {
IApplicationContext appCtx = (IApplicationContext) serviceCtx.getApplicationContext();
+
+ // get endpoint
BlobServiceClient blobServiceClient = buildAzureBlobClient(appCtx, configuration);
- //Get endpoint
String endPoint = extractEndPoint(blobServiceClient.getAccountUrl());
- //Get path
- String path = buildPathURIs(configuration, warningCollector, blobServiceClient, endPoint);
- //Put Azure configurations to AsterixDB's Hadoop configuration
+
+ // get include/exclude matchers
+ IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration);
+
+ // prepare prefix for computed field calculations
+ IExternalFilterEvaluator evaluator = filterEvaluatorFactory.create(serviceCtx, warningCollector);
+ ExternalDataPrefix externalDataPrefix = new ExternalDataPrefix(configuration, warningCollector);
+ configuration.put(ExternalDataPrefix.PREFIX_ROOT_FIELD_NAME, externalDataPrefix.getRoot());
+
+ List<BlobItem> filesOnly = listBlobItems(blobServiceClient, configuration, includeExcludeMatcher,
+ warningCollector, externalDataPrefix, evaluator);
+
+ // get path
+ String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
+ String path = buildPathURIs(container, filesOnly, endPoint);
+
+ // put Azure configurations to AsterixDB's Hadoop configuration
putAzureBlobConfToHadoopConf(configuration, path);
- //Configure Hadoop Azure input splits
+ // configure Hadoop Azure input splits
JobConf conf = createHdfsConf(serviceCtx, configuration);
configureAzureHdfsJobConf(conf, configuration, endPoint);
configureHdfsConf(conf, configuration);
@@ -92,20 +108,15 @@
/**
* Build Azure Blob Storage path-style for the requested files
*
- * @param configuration properties
- * @param warningCollector warning collector
+ * @param container container
+ * @param filesOnly files
+ * @param endPoint endpoint
+ *
* @return Comma-delimited paths (e.g., "wasbs://container@accountName.blob.core.windows.net/file1.parquet,
* wasbs://container@accountName.blob.core.windows.net/file2.parquet")
- * @throws CompilationException Compilation exception
*/
- private static String buildPathURIs(Map<String, String> configuration, IWarningCollector warningCollector,
- BlobServiceClient blobServiceClient, String endPoint) throws CompilationException {
- IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration);
- List<BlobItem> filesOnly =
- listBlobItems(blobServiceClient, configuration, includeExcludeMatcher, warningCollector);
-
+ private static String buildPathURIs(String container, List<BlobItem> filesOnly, String endPoint) {
StringBuilder builder = new StringBuilder();
- String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
if (!filesOnly.isEmpty()) {
appendFileURI(builder, container, endPoint, filesOnly.get(0));
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStreamFactory.java
index 165f340..89ff24b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStreamFactory.java
@@ -25,9 +25,11 @@
import java.util.Map;
import java.util.PriorityQueue;
+import org.apache.asterix.common.external.IExternalFilterEvaluator;
import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
+import org.apache.asterix.external.util.ExternalDataPrefix;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.external.util.google.gcs.GCSUtils;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -52,12 +54,17 @@
IExternalFilterEvaluatorFactory filterEvaluatorFactory) throws AlgebricksException, HyracksDataException {
super.configure(ctx, configuration, warningCollector, filterEvaluatorFactory);
- // Ensure the validity of include/exclude
- ExternalDataUtils.validateIncludeExclude(configuration);
- IncludeExcludeMatcher includeExcludeMatcher = getIncludeExcludeMatchers(configuration);
+ // get include/exclude matchers
+ IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration);
+
+ // prepare prefix for computed field calculations
+ IExternalFilterEvaluator evaluator = filterEvaluatorFactory.create(ctx, warningCollector);
+ ExternalDataPrefix externalDataPrefix = new ExternalDataPrefix(configuration, warningCollector);
+ configuration.put(ExternalDataPrefix.PREFIX_ROOT_FIELD_NAME, externalDataPrefix.getRoot());
// get the items
- List<Blob> filesOnly = GCSUtils.listItems(configuration, includeExcludeMatcher, warningCollector);
+ List<Blob> filesOnly = GCSUtils.listItems(configuration, includeExcludeMatcher, warningCollector,
+ externalDataPrefix, evaluator);
// Distribute work load amongst the partitions
distributeWorkLoad(filesOnly, getPartitionsCount());
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/parquet/GCSParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/parquet/GCSParquetReaderFactory.java
index 1de944b..bb98abe 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/parquet/GCSParquetReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/parquet/GCSParquetReaderFactory.java
@@ -23,11 +23,12 @@
import java.util.Map;
import java.util.Set;
-import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.external.IExternalFilterEvaluator;
import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.input.HDFSDataSourceFactory;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher;
import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataPrefix;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.external.util.google.gcs.GCSConstants;
import org.apache.asterix.external.util.google.gcs.GCSUtils;
@@ -48,8 +49,21 @@
public void configure(IServiceContext serviceCtx, Map<String, String> configuration,
IWarningCollector warningCollector, IExternalFilterEvaluatorFactory filterEvaluatorFactory)
throws AlgebricksException, HyracksDataException {
+
+ // get include/exclude matchers
+ IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration);
+
+ // prepare prefix for computed field calculations
+ IExternalFilterEvaluator evaluator = filterEvaluatorFactory.create(serviceCtx, warningCollector);
+ ExternalDataPrefix externalDataPrefix = new ExternalDataPrefix(configuration, warningCollector);
+ configuration.put(ExternalDataPrefix.PREFIX_ROOT_FIELD_NAME, externalDataPrefix.getRoot());
+
+ String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
+ List<Blob> filesOnly = GCSUtils.listItems(configuration, includeExcludeMatcher, warningCollector,
+ externalDataPrefix, evaluator);
+
// get path
- String path = buildPathURIs(configuration, warningCollector);
+ String path = buildPathURIs(container, filesOnly);
// put GCS configurations to AsterixDB's Hadoop configuration
putGCSConfToHadoopConf(configuration, path);
@@ -85,21 +99,13 @@
/**
* Build Google Cloud Storage path-style for the requested files
*
- * @param configuration properties
- * @param warningCollector warning collector
+ * @param container container
+ * @param filesOnly files
+ *
* @return Comma-delimited paths (e.g., "gs://bucket/file1.parquet,gs://bucket/file2.parquet")
- * @throws CompilationException Compilation exception
*/
- private static String buildPathURIs(Map<String, String> configuration, IWarningCollector warningCollector)
- throws CompilationException {
- String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
+ private static String buildPathURIs(String container, List<Blob> filesOnly) {
- // Ensure the validity of include/exclude
- ExternalDataUtils.validateIncludeExclude(configuration);
- IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration);
-
- // get the items
- List<Blob> filesOnly = GCSUtils.listItems(configuration, includeExcludeMatcher, warningCollector);
StringBuilder builder = new StringBuilder();
if (!filesOnly.isEmpty()) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataPrefix.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataPrefix.java
index f6973ee..f8076ef 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataPrefix.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataPrefix.java
@@ -151,12 +151,11 @@
// we need to keep track of the end position
StringBuilder expression = new StringBuilder();
- int end = 0;
for (int i = 0; i < segments.size(); i++) {
matcher.reset(segments.get(i));
expression.setLength(0);
- end = 0;
+ int end = 0;
while (matcher.find()) {
expression.append(segments.get(i), end, matcher.start());
@@ -207,14 +206,14 @@
* Returns the longest static path (root) before encountering the first computed field
*/
private void extractRoot() {
- StringBuilder builder = new StringBuilder();
-
// check if there are any computed fields before doing any testing
if (computedFieldNames.isEmpty()) {
root = original;
return;
}
+ StringBuilder builder = new StringBuilder();
+
// construct all static parts before encountering the first computed field
for (int i = 0; i < computedFieldSegmentIndexes.get(0); i++) {
builder.append(segments.get(i)).append("/");
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 318716f..2605fe7 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -839,6 +839,9 @@
public static IncludeExcludeMatcher getIncludeExcludeMatchers(Map<String, String> configuration)
throws CompilationException {
+ // ensure validity of include/exclude matchers
+ validateIncludeExclude(configuration);
+
// Get and compile the patterns for include/exclude if provided
List<Matcher> includeMatchers = new ArrayList<>();
List<Matcher> excludeMatchers = new ArrayList<>();
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/blob_storage/AzureUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/blob_storage/AzureUtils.java
index 0dc9ad2..aced253 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/blob_storage/AzureUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/azure/blob_storage/AzureUtils.java
@@ -58,10 +58,14 @@
import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.external.IExternalFilterEvaluator;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataPrefix;
+import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.external.util.HDFSUtils;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
import org.apache.hyracks.api.exceptions.SourceLocation;
import org.apache.hyracks.api.exceptions.Warning;
@@ -395,19 +399,24 @@
}
}
+ public static List<BlobItem> listBlobItems(IApplicationContext context, Map<String, String> configuration,
+ AbstractExternalInputStreamFactory.IncludeExcludeMatcher includeExcludeMatcher,
+ IWarningCollector warningCollector, ExternalDataPrefix externalDataPrefix,
+ IExternalFilterEvaluator evaluator) throws CompilationException {
+ BlobServiceClient blobServiceClient = buildAzureBlobClient(context, configuration);
+ return listBlobItems(blobServiceClient, configuration, includeExcludeMatcher, warningCollector,
+ externalDataPrefix, evaluator);
+ }
+
public static List<BlobItem> listBlobItems(BlobServiceClient blobServiceClient, Map<String, String> configuration,
AbstractExternalInputStreamFactory.IncludeExcludeMatcher includeExcludeMatcher,
- IWarningCollector warningCollector) throws CompilationException {
+ IWarningCollector warningCollector, ExternalDataPrefix externalDataPrefix,
+ IExternalFilterEvaluator evaluator) throws CompilationException {
String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
-
List<BlobItem> filesOnly = new ArrayList<>();
- // Ensure the validity of include/exclude
- validateIncludeExclude(configuration);
-
- BlobContainerClient blobContainer;
try {
- blobContainer = blobServiceClient.getBlobContainerClient(container);
+ BlobContainerClient blobContainer = blobServiceClient.getBlobContainerClient(container);
// Get all objects in a container and extract the paths to files
ListBlobsOptions listBlobsOptions = new ListBlobsOptions();
@@ -416,7 +425,7 @@
// Collect the paths to files only
collectAndFilterBlobFiles(blobItems, includeExcludeMatcher.getPredicate(),
- includeExcludeMatcher.getMatchersList(), filesOnly);
+ includeExcludeMatcher.getMatchersList(), filesOnly, externalDataPrefix, evaluator);
// Warn if no files are returned
if (filesOnly.isEmpty() && warningCollector.shouldWarn()) {
@@ -439,17 +448,10 @@
* @param filesOnly List containing the files only (excluding folders)
*/
private static void collectAndFilterBlobFiles(Iterable<BlobItem> items,
- BiPredicate<List<Matcher>, String> predicate, List<Matcher> matchers, List<BlobItem> filesOnly) {
+ BiPredicate<List<Matcher>, String> predicate, List<Matcher> matchers, List<BlobItem> filesOnly,
+ ExternalDataPrefix externalDataPrefix, IExternalFilterEvaluator evaluator) throws HyracksDataException {
for (BlobItem item : items) {
- String uri = item.getName();
-
- // skip folders
- if (uri.endsWith("/")) {
- continue;
- }
-
- // No filter, add file
- if (predicate.test(matchers, uri)) {
+ if (ExternalDataUtils.evaluate(item.getName(), predicate, matchers, externalDataPrefix, evaluator)) {
filesOnly.add(item);
}
}
@@ -462,9 +464,6 @@
List<PathItem> filesOnly = new ArrayList<>();
- // Ensure the validity of include/exclude
- validateIncludeExclude(configuration);
-
DataLakeFileSystemClient fileSystemClient;
try {
fileSystemClient = client.getFileSystemClient(container);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
index 6183a88..0029e6a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
@@ -46,11 +46,14 @@
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.external.IExternalFilterEvaluator;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher;
import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataPrefix;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.external.util.HDFSUtils;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
import org.apache.hyracks.api.exceptions.SourceLocation;
import org.apache.hyracks.api.exceptions.Warning;
@@ -154,7 +157,8 @@
}
public static List<Blob> listItems(Map<String, String> configuration, IncludeExcludeMatcher includeExcludeMatcher,
- IWarningCollector warningCollector) throws CompilationException {
+ IWarningCollector warningCollector, ExternalDataPrefix externalDataPrefix,
+ IExternalFilterEvaluator evaluator) throws CompilationException, HyracksDataException {
// Prepare to retrieve the objects
List<Blob> filesOnly = new ArrayList<>();
String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
@@ -170,7 +174,7 @@
// Collect the paths to files only
collectAndFilterFiles(items, includeExcludeMatcher.getPredicate(), includeExcludeMatcher.getMatchersList(),
- filesOnly);
+ filesOnly, externalDataPrefix, evaluator);
// Warn if no files are returned
if (filesOnly.isEmpty() && warningCollector.shouldWarn()) {
@@ -187,15 +191,10 @@
* @param items List of returned objects
*/
private static void collectAndFilterFiles(Page<Blob> items, BiPredicate<List<Matcher>, String> predicate,
- List<Matcher> matchers, List<Blob> filesOnly) {
+ List<Matcher> matchers, List<Blob> filesOnly, ExternalDataPrefix externalDataPrefix,
+ IExternalFilterEvaluator evaluator) throws HyracksDataException {
for (Blob item : items.iterateAll()) {
- // skip folders
- if (item.getName().endsWith("/")) {
- continue;
- }
-
- // No filter, add file
- if (predicate.test(matchers, item.getName())) {
+ if (ExternalDataUtils.evaluate(item.getName(), predicate, matchers, externalDataPrefix, evaluator)) {
filesOnly.add(item);
}
}