[ASTERIXDB-3247][EXT]: Push computed field evaluation to files listing
Change-Id: I36ba077a26fbb142945e7b5ea7298548263c1d67
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17730
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Hussain Towaileb <hussainht@gmail.com>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
index 50502da..ce906f6 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
@@ -233,6 +233,7 @@
<expected-warn>Failed to evaluate computed field. File: 'external-filter/department/accounting/0.json'. Computed Field Name: 'name'. Computed Field Type: 'bigint'. Computed Field Value: 'accounting'. Reason: 'For input string: "accounting"'</expected-warn>
<expected-warn>Failed to evaluate computed field. File: 'external-filter/department/engineering/0.json'. Computed Field Name: 'name'. Computed Field Type: 'bigint'. Computed Field Value: 'engineering'. Reason: 'For input string: "engineering"'</expected-warn>
<expected-warn>Failed to evaluate computed field. File: 'external-filter/department/hr/0.json'. Computed Field Name: 'name'. Computed Field Type: 'bigint'. Computed Field Value: 'hr'. Reason: 'For input string: "hr"'</expected-warn>
+ <expected-warn>The provided external dataset configuration returned no files from the external source</expected-warn>
</compilation-unit>
</test-case>
<test-case FilePath="external-dataset/s3/filter">
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
index 7ae992a..045b746 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
@@ -18,7 +18,6 @@
*/
package org.apache.asterix.external.input.record.reader.aws;
-import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
@@ -56,39 +55,22 @@
// Ensure the validity of include/exclude
ExternalDataUtils.validateIncludeExclude(configuration);
IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration);
+ IExternalFilterEvaluator evaluator = filterEvaluatorFactory.create(ctx, warningCollector);
- //Get a list of S3 objects
+ // prepare prefix for computed field calculations
ExternalDataPrefix externalDataPrefix = new ExternalDataPrefix(configuration, warningCollector);
configuration.put(ExternalDataPrefix.PREFIX_ROOT_FIELD_NAME, externalDataPrefix.getRoot());
// TODO(htowaileb): Since we're using the root to load the files then start filtering, it might end up being
// very expensive since at the root of the prefix we might load millions of files, we should consider (when
// possible) to get the value and add it
- List<S3Object> filesOnly = S3Utils.listS3Objects(configuration, includeExcludeMatcher, warningCollector);
- filesOnly = filterPrefixes(externalDataPrefix, filesOnly, filterEvaluatorFactory.create(ctx, warningCollector));
+ List<S3Object> filesOnly = S3Utils.listS3Objects(configuration, includeExcludeMatcher, warningCollector,
+ externalDataPrefix, evaluator);
// Distribute work load amongst the partitions
distributeWorkLoad(filesOnly, getPartitionsCount());
}
- private List<S3Object> filterPrefixes(ExternalDataPrefix prefix, List<S3Object> filesOnly,
- IExternalFilterEvaluator evaluator) throws HyracksDataException {
-
- // if no computed fields or empty files list, return the original list
- if (filesOnly.isEmpty() || !prefix.hasComputedFields() || evaluator.isEmpty()) {
- return filesOnly;
- }
-
- List<S3Object> filteredList = new ArrayList<>();
- for (S3Object file : filesOnly) {
- if (prefix.evaluate(file.key(), evaluator)) {
- filteredList.add(file);
- }
- }
-
- return filteredList;
- }
-
/**
* To efficiently utilize the parallelism, work load will be distributed amongst the partitions based on the file
* size.
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
index 66312bb..bccb6f8 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
@@ -28,10 +28,12 @@
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.common.external.IExternalFilterEvaluator;
import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.input.HDFSDataSourceFactory;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher;
import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataPrefix;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.external.util.aws.s3.S3Constants;
import org.apache.asterix.external.util.aws.s3.S3Utils;
@@ -56,9 +58,16 @@
public void configure(IServiceContext serviceCtx, Map<String, String> configuration,
IWarningCollector warningCollector, IExternalFilterEvaluatorFactory filterEvaluatorFactory)
throws AlgebricksException, HyracksDataException {
+
+ // prepare prefix for computed field calculations
+ ExternalDataPrefix externalDataPrefix = new ExternalDataPrefix(configuration, warningCollector);
+ IExternalFilterEvaluator evaluator = filterEvaluatorFactory.create(serviceCtx, warningCollector);
+ configuration.put(ExternalDataPrefix.PREFIX_ROOT_FIELD_NAME, externalDataPrefix.getRoot());
+
//Get path
String path = configuration.containsKey(ExternalDataConstants.KEY_PATH)
- ? configuration.get(ExternalDataConstants.KEY_PATH) : buildPathURIs(configuration, warningCollector);
+ ? configuration.get(ExternalDataConstants.KEY_PATH)
+ : buildPathURIs(configuration, warningCollector, externalDataPrefix, evaluator);
//Put S3 configurations to AsterixDB's Hadoop configuration
putS3ConfToHadoopConf(configuration, path);
@@ -108,11 +117,13 @@
* @return Comma-delimited paths (e.g., "s3a://bucket/file1.parquet,s3a://bucket/file2.parquet")
* @throws CompilationException Compilation exception
*/
- private static String buildPathURIs(Map<String, String> configuration, IWarningCollector warningCollector)
- throws CompilationException {
+ private static String buildPathURIs(Map<String, String> configuration, IWarningCollector warningCollector,
+ ExternalDataPrefix externalDataPrefix, IExternalFilterEvaluator evaluator)
+ throws CompilationException, HyracksDataException {
String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration);
- List<S3Object> filesOnly = S3Utils.listS3Objects(configuration, includeExcludeMatcher, warningCollector);
+ List<S3Object> filesOnly = S3Utils.listS3Objects(configuration, includeExcludeMatcher, warningCollector,
+ externalDataPrefix, evaluator);
StringBuilder builder = new StringBuilder();
if (!filesOnly.isEmpty()) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataPrefix.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataPrefix.java
index 9d45fed..f6973ee 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataPrefix.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataPrefix.java
@@ -250,6 +250,11 @@
// TODO provide the List to avoid array creation
List<String> keySegments = extractPrefixSegments(key);
+ // no computed fields filter, accept path
+ if (!hasComputedFields() || evaluator.isEmpty()) {
+ return true;
+ }
+
// segments of object key have to be larger than segments of the prefix
if (keySegments.size() <= segments.size()) {
return false;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index e60190b..318716f 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -45,6 +45,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.function.BiPredicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
@@ -54,6 +55,7 @@
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.common.external.IExternalFilterEvaluator;
import org.apache.asterix.common.functions.ExternalFunctionLanguage;
import org.apache.asterix.common.library.ILibrary;
import org.apache.asterix.common.library.ILibraryManager;
@@ -980,4 +982,20 @@
argHolder.getDataOutput().writeByte(ARRAY16);
argHolder.getDataOutput().writeShort((short) 0);
}
+
+ /**
+ * Tests the provided key against all the provided predicates/evaluators and return true if they all pass.
+ *
+ * @param key key
+ * @param predicate predicate
+ * @param matchers matchers
+ * @param externalDataPrefix external data prefix
+ * @param evaluator evaluator
+ *
+ * @return true if key passes all tests, false otherwise
+ */
+ public static boolean evaluate(String key, BiPredicate<List<Matcher>, String> predicate, List<Matcher> matchers,
+ ExternalDataPrefix externalDataPrefix, IExternalFilterEvaluator evaluator) throws HyracksDataException {
+ return !key.endsWith("/") && predicate.test(matchers, key) && externalDataPrefix.evaluate(key, evaluator);
+ }
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
index a4f3a1e..1436e55 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
@@ -56,13 +56,16 @@
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.external.IExternalFilterEvaluator;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataPrefix;
+import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.external.util.HDFSUtils;
import org.apache.hadoop.fs.s3a.Constants;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
import org.apache.hyracks.api.exceptions.SourceLocation;
import org.apache.hyracks.api.exceptions.Warning;
@@ -360,7 +363,8 @@
*/
public static List<S3Object> listS3Objects(Map<String, String> configuration,
AbstractExternalInputStreamFactory.IncludeExcludeMatcher includeExcludeMatcher,
- IWarningCollector warningCollector) throws CompilationException {
+ IWarningCollector warningCollector, ExternalDataPrefix externalDataPrefix,
+ IExternalFilterEvaluator evaluator) throws CompilationException, HyracksDataException {
// Prepare to retrieve the objects
List<S3Object> filesOnly;
String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
@@ -368,13 +372,15 @@
String prefix = getPrefix(configuration);
try {
- filesOnly = listS3Objects(s3Client, container, prefix, includeExcludeMatcher);
+ filesOnly =
+ listS3Objects(s3Client, container, prefix, includeExcludeMatcher, externalDataPrefix, evaluator);
} catch (S3Exception ex) {
// New API is not implemented, try falling back to old API
try {
// For error code, see https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
if (ex.awsErrorDetails().errorCode().equals(ERROR_METHOD_NOT_IMPLEMENTED)) {
- filesOnly = oldApiListS3Objects(s3Client, container, prefix, includeExcludeMatcher);
+ filesOnly = oldApiListS3Objects(s3Client, container, prefix, includeExcludeMatcher,
+ externalDataPrefix, evaluator);
} else {
throw ex;
}
@@ -407,7 +413,8 @@
* @param includeExcludeMatcher include/exclude matchers to apply
*/
private static List<S3Object> listS3Objects(S3Client s3Client, String container, String prefix,
- AbstractExternalInputStreamFactory.IncludeExcludeMatcher includeExcludeMatcher) {
+ AbstractExternalInputStreamFactory.IncludeExcludeMatcher includeExcludeMatcher,
+ ExternalDataPrefix externalDataPrefix, IExternalFilterEvaluator evaluator) throws HyracksDataException {
String newMarker = null;
List<S3Object> filesOnly = new ArrayList<>();
@@ -425,7 +432,7 @@
// Collect the paths to files only
collectAndFilterFiles(listObjectsResponse.contents(), includeExcludeMatcher.getPredicate(),
- includeExcludeMatcher.getMatchersList(), filesOnly);
+ includeExcludeMatcher.getMatchersList(), filesOnly, externalDataPrefix, evaluator);
// Mark the flag as done if done, otherwise, get the marker of the previous response for the next request
if (!listObjectsResponse.isTruncated()) {
@@ -447,7 +454,8 @@
* @param includeExcludeMatcher include/exclude matchers to apply
*/
private static List<S3Object> oldApiListS3Objects(S3Client s3Client, String container, String prefix,
- AbstractExternalInputStreamFactory.IncludeExcludeMatcher includeExcludeMatcher) {
+ AbstractExternalInputStreamFactory.IncludeExcludeMatcher includeExcludeMatcher,
+ ExternalDataPrefix externalDataPrefix, IExternalFilterEvaluator evaluator) throws HyracksDataException {
String newMarker = null;
List<S3Object> filesOnly = new ArrayList<>();
@@ -465,7 +473,7 @@
// Collect the paths to files only
collectAndFilterFiles(listObjectsResponse.contents(), includeExcludeMatcher.getPredicate(),
- includeExcludeMatcher.getMatchersList(), filesOnly);
+ includeExcludeMatcher.getMatchersList(), filesOnly, externalDataPrefix, evaluator);
// Mark the flag as done if done, otherwise, get the marker of the previous response for the next request
if (!listObjectsResponse.isTruncated()) {
@@ -479,21 +487,20 @@
}
/**
- * AWS S3 returns all the objects as paths, not differentiating between folder and files. The path is considered
- * a file if it does not end up with a "/" which is the separator in a folder structure.
+ * Collects only files that pass all tests
*
- * @param s3Objects List of returned objects
+ * @param s3Objects s3 objects
+ * @param predicate predicate
+ * @param matchers matchers
+ * @param filesOnly filtered files
+ * @param externalDataPrefix external data prefix
+ * @param evaluator evaluator
*/
private static void collectAndFilterFiles(List<S3Object> s3Objects, BiPredicate<List<Matcher>, String> predicate,
- List<Matcher> matchers, List<S3Object> filesOnly) {
+ List<Matcher> matchers, List<S3Object> filesOnly, ExternalDataPrefix externalDataPrefix,
+ IExternalFilterEvaluator evaluator) throws HyracksDataException {
for (S3Object object : s3Objects) {
- // skip folders
- if (object.key().endsWith("/")) {
- continue;
- }
-
- // No filter, add file
- if (predicate.test(matchers, object.key())) {
+ if (ExternalDataUtils.evaluate(object.key(), predicate, matchers, externalDataPrefix, evaluator)) {
filesOnly.add(object);
}
}