[ASTERIXDB-3247][EXT]: Push computed field evaluation to files listing

Change-Id: I36ba077a26fbb142945e7b5ea7298548263c1d67
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17730
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Hussain Towaileb <hussainht@gmail.com>
Reviewed-by: Wail Alkowaileet <wael.y.k@gmail.com>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
index 50502da..ce906f6 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
@@ -233,6 +233,7 @@
         <expected-warn>Failed to evaluate computed field. File: 'external-filter/department/accounting/0.json'. Computed Field Name: 'name'. Computed Field Type: 'bigint'. Computed Field Value: 'accounting'. Reason: 'For input string: "accounting"'</expected-warn>
         <expected-warn>Failed to evaluate computed field. File: 'external-filter/department/engineering/0.json'. Computed Field Name: 'name'. Computed Field Type: 'bigint'. Computed Field Value: 'engineering'. Reason: 'For input string: "engineering"'</expected-warn>
         <expected-warn>Failed to evaluate computed field. File: 'external-filter/department/hr/0.json'. Computed Field Name: 'name'. Computed Field Type: 'bigint'. Computed Field Value: 'hr'. Reason: 'For input string: "hr"'</expected-warn>
+        <expected-warn>The provided external dataset configuration returned no files from the external source</expected-warn>
       </compilation-unit>
     </test-case>
     <test-case FilePath="external-dataset/s3/filter">
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
index 7ae992a..045b746 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
@@ -18,7 +18,6 @@
  */
 package org.apache.asterix.external.input.record.reader.aws;
 
-import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
@@ -56,39 +55,22 @@
         // Ensure the validity of include/exclude
         ExternalDataUtils.validateIncludeExclude(configuration);
         IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration);
+        IExternalFilterEvaluator evaluator = filterEvaluatorFactory.create(ctx, warningCollector);
 
-        //Get a list of S3 objects
+        // prepare prefix for computed field calculations
         ExternalDataPrefix externalDataPrefix = new ExternalDataPrefix(configuration, warningCollector);
         configuration.put(ExternalDataPrefix.PREFIX_ROOT_FIELD_NAME, externalDataPrefix.getRoot());
 
         // TODO(htowaileb): Since we're using the root to load the files then start filtering, it might end up being
         // very expensive since at the root of the prefix we might load millions of files, we should consider (when
         // possible) to get the value and add it
-        List<S3Object> filesOnly = S3Utils.listS3Objects(configuration, includeExcludeMatcher, warningCollector);
-        filesOnly = filterPrefixes(externalDataPrefix, filesOnly, filterEvaluatorFactory.create(ctx, warningCollector));
+        List<S3Object> filesOnly = S3Utils.listS3Objects(configuration, includeExcludeMatcher, warningCollector,
+                externalDataPrefix, evaluator);
 
         // Distribute work load amongst the partitions
         distributeWorkLoad(filesOnly, getPartitionsCount());
     }
 
-    private List<S3Object> filterPrefixes(ExternalDataPrefix prefix, List<S3Object> filesOnly,
-            IExternalFilterEvaluator evaluator) throws HyracksDataException {
-
-        // if no computed fields or empty files list, return the original list
-        if (filesOnly.isEmpty() || !prefix.hasComputedFields() || evaluator.isEmpty()) {
-            return filesOnly;
-        }
-
-        List<S3Object> filteredList = new ArrayList<>();
-        for (S3Object file : filesOnly) {
-            if (prefix.evaluate(file.key(), evaluator)) {
-                filteredList.add(file);
-            }
-        }
-
-        return filteredList;
-    }
-
     /**
      * To efficiently utilize the parallelism, work load will be distributed amongst the partitions based on the file
      * size.
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
index 66312bb..bccb6f8 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
@@ -28,10 +28,12 @@
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.common.external.IExternalFilterEvaluator;
 import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
 import org.apache.asterix.external.input.HDFSDataSourceFactory;
 import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher;
 import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataPrefix;
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.external.util.aws.s3.S3Constants;
 import org.apache.asterix.external.util.aws.s3.S3Utils;
@@ -56,9 +58,16 @@
     public void configure(IServiceContext serviceCtx, Map<String, String> configuration,
             IWarningCollector warningCollector, IExternalFilterEvaluatorFactory filterEvaluatorFactory)
             throws AlgebricksException, HyracksDataException {
+
+        // prepare prefix for computed field calculations
+        ExternalDataPrefix externalDataPrefix = new ExternalDataPrefix(configuration, warningCollector);
+        IExternalFilterEvaluator evaluator = filterEvaluatorFactory.create(serviceCtx, warningCollector);
+        configuration.put(ExternalDataPrefix.PREFIX_ROOT_FIELD_NAME, externalDataPrefix.getRoot());
+
         //Get path
         String path = configuration.containsKey(ExternalDataConstants.KEY_PATH)
-                ? configuration.get(ExternalDataConstants.KEY_PATH) : buildPathURIs(configuration, warningCollector);
+                ? configuration.get(ExternalDataConstants.KEY_PATH)
+                : buildPathURIs(configuration, warningCollector, externalDataPrefix, evaluator);
         //Put S3 configurations to AsterixDB's Hadoop configuration
         putS3ConfToHadoopConf(configuration, path);
 
@@ -108,11 +117,13 @@
      * @return Comma-delimited paths (e.g., "s3a://bucket/file1.parquet,s3a://bucket/file2.parquet")
      * @throws CompilationException Compilation exception
      */
-    private static String buildPathURIs(Map<String, String> configuration, IWarningCollector warningCollector)
-            throws CompilationException {
+    private static String buildPathURIs(Map<String, String> configuration, IWarningCollector warningCollector,
+            ExternalDataPrefix externalDataPrefix, IExternalFilterEvaluator evaluator)
+            throws CompilationException, HyracksDataException {
         String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
         IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration);
-        List<S3Object> filesOnly = S3Utils.listS3Objects(configuration, includeExcludeMatcher, warningCollector);
+        List<S3Object> filesOnly = S3Utils.listS3Objects(configuration, includeExcludeMatcher, warningCollector,
+                externalDataPrefix, evaluator);
         StringBuilder builder = new StringBuilder();
 
         if (!filesOnly.isEmpty()) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataPrefix.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataPrefix.java
index 9d45fed..f6973ee 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataPrefix.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataPrefix.java
@@ -250,6 +250,11 @@
         // TODO provide the List to avoid array creation
         List<String> keySegments = extractPrefixSegments(key);
 
+        // no computed fields filter, accept path
+        if (!hasComputedFields() || evaluator.isEmpty()) {
+            return true;
+        }
+
         // segments of object key have to be larger than segments of the prefix
         if (keySegments.size() <= segments.size()) {
             return false;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index e60190b..318716f 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -45,6 +45,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.function.BiPredicate;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.regex.PatternSyntaxException;
@@ -54,6 +55,7 @@
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.common.external.IExternalFilterEvaluator;
 import org.apache.asterix.common.functions.ExternalFunctionLanguage;
 import org.apache.asterix.common.library.ILibrary;
 import org.apache.asterix.common.library.ILibraryManager;
@@ -980,4 +982,20 @@
         argHolder.getDataOutput().writeByte(ARRAY16);
         argHolder.getDataOutput().writeShort((short) 0);
     }
+
+    /**
+     * Tests the provided key against all the provided predicates/evaluators and return true if they all pass.
+     *
+     * @param key key
+     * @param predicate predicate
+     * @param matchers matchers
+     * @param externalDataPrefix external data prefix
+     * @param evaluator evaluator
+     *
+     * @return true if key passes all tests, false otherwise
+     */
+    public static boolean evaluate(String key, BiPredicate<List<Matcher>, String> predicate, List<Matcher> matchers,
+            ExternalDataPrefix externalDataPrefix, IExternalFilterEvaluator evaluator) throws HyracksDataException {
+        return !key.endsWith("/") && predicate.test(matchers, key) && externalDataPrefix.evaluate(key, evaluator);
+    }
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
index a4f3a1e..1436e55 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
@@ -56,13 +56,16 @@
 
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.external.IExternalFilterEvaluator;
 import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataPrefix;
+import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.external.util.HDFSUtils;
 import org.apache.hadoop.fs.s3a.Constants;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
 import org.apache.hyracks.api.exceptions.SourceLocation;
 import org.apache.hyracks.api.exceptions.Warning;
@@ -360,7 +363,8 @@
      */
     public static List<S3Object> listS3Objects(Map<String, String> configuration,
             AbstractExternalInputStreamFactory.IncludeExcludeMatcher includeExcludeMatcher,
-            IWarningCollector warningCollector) throws CompilationException {
+            IWarningCollector warningCollector, ExternalDataPrefix externalDataPrefix,
+            IExternalFilterEvaluator evaluator) throws CompilationException, HyracksDataException {
         // Prepare to retrieve the objects
         List<S3Object> filesOnly;
         String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
@@ -368,13 +372,15 @@
         String prefix = getPrefix(configuration);
 
         try {
-            filesOnly = listS3Objects(s3Client, container, prefix, includeExcludeMatcher);
+            filesOnly =
+                    listS3Objects(s3Client, container, prefix, includeExcludeMatcher, externalDataPrefix, evaluator);
         } catch (S3Exception ex) {
             // New API is not implemented, try falling back to old API
             try {
                 // For error code, see https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
                 if (ex.awsErrorDetails().errorCode().equals(ERROR_METHOD_NOT_IMPLEMENTED)) {
-                    filesOnly = oldApiListS3Objects(s3Client, container, prefix, includeExcludeMatcher);
+                    filesOnly = oldApiListS3Objects(s3Client, container, prefix, includeExcludeMatcher,
+                            externalDataPrefix, evaluator);
                 } else {
                     throw ex;
                 }
@@ -407,7 +413,8 @@
      * @param includeExcludeMatcher include/exclude matchers to apply
      */
     private static List<S3Object> listS3Objects(S3Client s3Client, String container, String prefix,
-            AbstractExternalInputStreamFactory.IncludeExcludeMatcher includeExcludeMatcher) {
+            AbstractExternalInputStreamFactory.IncludeExcludeMatcher includeExcludeMatcher,
+            ExternalDataPrefix externalDataPrefix, IExternalFilterEvaluator evaluator) throws HyracksDataException {
         String newMarker = null;
         List<S3Object> filesOnly = new ArrayList<>();
 
@@ -425,7 +432,7 @@
 
             // Collect the paths to files only
             collectAndFilterFiles(listObjectsResponse.contents(), includeExcludeMatcher.getPredicate(),
-                    includeExcludeMatcher.getMatchersList(), filesOnly);
+                    includeExcludeMatcher.getMatchersList(), filesOnly, externalDataPrefix, evaluator);
 
             // Mark the flag as done if done, otherwise, get the marker of the previous response for the next request
             if (!listObjectsResponse.isTruncated()) {
@@ -447,7 +454,8 @@
      * @param includeExcludeMatcher include/exclude matchers to apply
      */
     private static List<S3Object> oldApiListS3Objects(S3Client s3Client, String container, String prefix,
-            AbstractExternalInputStreamFactory.IncludeExcludeMatcher includeExcludeMatcher) {
+            AbstractExternalInputStreamFactory.IncludeExcludeMatcher includeExcludeMatcher,
+            ExternalDataPrefix externalDataPrefix, IExternalFilterEvaluator evaluator) throws HyracksDataException {
         String newMarker = null;
         List<S3Object> filesOnly = new ArrayList<>();
 
@@ -465,7 +473,7 @@
 
             // Collect the paths to files only
             collectAndFilterFiles(listObjectsResponse.contents(), includeExcludeMatcher.getPredicate(),
-                    includeExcludeMatcher.getMatchersList(), filesOnly);
+                    includeExcludeMatcher.getMatchersList(), filesOnly, externalDataPrefix, evaluator);
 
             // Mark the flag as done if done, otherwise, get the marker of the previous response for the next request
             if (!listObjectsResponse.isTruncated()) {
@@ -479,21 +487,20 @@
     }
 
     /**
-     * AWS S3 returns all the objects as paths, not differentiating between folder and files. The path is considered
-     * a file if it does not end up with a "/" which is the separator in a folder structure.
+     * Collects only files that pass all tests
      *
-     * @param s3Objects List of returned objects
+     * @param s3Objects s3 objects
+     * @param predicate predicate
+     * @param matchers matchers
+     * @param filesOnly filtered files
+     * @param externalDataPrefix external data prefix
+     * @param evaluator evaluator
      */
     private static void collectAndFilterFiles(List<S3Object> s3Objects, BiPredicate<List<Matcher>, String> predicate,
-            List<Matcher> matchers, List<S3Object> filesOnly) {
+            List<Matcher> matchers, List<S3Object> filesOnly, ExternalDataPrefix externalDataPrefix,
+            IExternalFilterEvaluator evaluator) throws HyracksDataException {
         for (S3Object object : s3Objects) {
-            // skip folders
-            if (object.key().endsWith("/")) {
-                continue;
-            }
-
-            // No filter, add file
-            if (predicate.test(matchers, object.key())) {
+            if (ExternalDataUtils.evaluate(object.key(), predicate, matchers, externalDataPrefix, evaluator)) {
                 filesOnly.add(object);
             }
         }