[ASTERIXDB-2994][EXT]: Fix broken parquet format for azure datalake

Change-Id: I990c4b749d43ebafa4f25450b05c9f27dfd5e632
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/14363
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Wael Alkowaileet <wael.y.k@gmail.com>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStreamFactory.java
index 3a9ab1c..bf904a4 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStreamFactory.java
@@ -53,7 +53,7 @@
         ExternalDataUtils.validateIncludeExclude(configuration);
         IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration);
         BlobServiceClient blobServiceClient = ExternalDataUtils.Azure.buildAzureBlobClient(configuration);
-        List<BlobItem> filesOnly = ExternalDataUtils.Azure.listBlobItem(blobServiceClient, configuration,
+        List<BlobItem> filesOnly = ExternalDataUtils.Azure.listBlobItems(blobServiceClient, configuration,
                 includeExcludeMatcher, warningCollector);
 
         // Distribute work load amongst the partitions
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStreamFactory.java
index 6bb103b..e145e1f 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStreamFactory.java
@@ -18,34 +18,21 @@
  */
 package org.apache.asterix.external.input.record.reader.azure.datalake;
 
-import static org.apache.asterix.external.util.ExternalDataConstants.Azure.RECURSIVE_FIELD_NAME;
-import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
-
-import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.PriorityQueue;
-import java.util.function.BiPredicate;
-import java.util.regex.Matcher;
 
-import org.apache.asterix.common.exceptions.CompilationException;
-import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.external.api.AsterixInputStream;
 import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
-import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.application.IServiceContext;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
-import org.apache.hyracks.api.exceptions.Warning;
 
-import com.azure.core.http.rest.PagedIterable;
-import com.azure.storage.file.datalake.DataLakeFileSystemClient;
 import com.azure.storage.file.datalake.DataLakeServiceClient;
-import com.azure.storage.file.datalake.models.ListPathsOptions;
 import com.azure.storage.file.datalake.models.PathItem;
 
 public class AzureDataLakeInputStreamFactory extends AbstractExternalInputStreamFactory {
@@ -62,66 +49,15 @@
             throws AlgebricksException {
         super.configure(ctx, configuration, warningCollector);
 
-        String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
-
-        List<PathItem> filesOnly = new ArrayList<>();
-
         // Ensure the validity of include/exclude
         ExternalDataUtils.validateIncludeExclude(configuration);
-
+        IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration);
         DataLakeServiceClient client = ExternalDataUtils.Azure.buildAzureDatalakeClient(configuration);
-        DataLakeFileSystemClient fileSystemClient;
-        try {
-            fileSystemClient = client.getFileSystemClient(container);
+        List<PathItem> filesOnly = ExternalDataUtils.Azure.listDatalakePathItems(client, configuration,
+                includeExcludeMatcher, warningCollector);
 
-            // Get all objects in a container and extract the paths to files
-            ListPathsOptions listOptions = new ListPathsOptions();
-            boolean recursive = Boolean.parseBoolean(configuration.get(RECURSIVE_FIELD_NAME));
-            listOptions.setRecursive(recursive);
-            listOptions.setPath(ExternalDataUtils.getPrefix(configuration, false));
-            PagedIterable<PathItem> pathItems = fileSystemClient.listPaths(listOptions, null);
-
-            // Collect the paths to files only
-            IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration);
-            collectAndFilterFiles(pathItems, includeExcludeMatcher.getPredicate(),
-                    includeExcludeMatcher.getMatchersList(), filesOnly);
-
-            // Warn if no files are returned
-            if (filesOnly.isEmpty() && warningCollector.shouldWarn()) {
-                Warning warning = Warning.of(null, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
-                warningCollector.warn(warning);
-            }
-
-            // Distribute work load amongst the partitions
-            distributeWorkLoad(filesOnly, getPartitionsCount());
-        } catch (Exception ex) {
-            throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
-        }
-    }
-
-    /**
-     * Collects and filters the files only, and excludes any folders
-     *
-     * @param items     storage items
-     * @param predicate predicate to test with for file filtration
-     * @param matchers  include/exclude matchers to test against
-     * @param filesOnly List containing the files only (excluding folders)
-     */
-    private void collectAndFilterFiles(Iterable<PathItem> items, BiPredicate<List<Matcher>, String> predicate,
-            List<Matcher> matchers, List<PathItem> filesOnly) {
-        for (PathItem item : items) {
-            String uri = item.getName();
-
-            // skip folders
-            if (uri.endsWith("/")) {
-                continue;
-            }
-
-            // No filter, add file
-            if (predicate.test(matchers, uri)) {
-                filesOnly.add(item);
-            }
-        }
+        // Distribute work load amongst the partitions
+        distributeWorkLoad(filesOnly, getPartitionsCount());
     }
 
     /**
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java
index ee765ce..1f82dae 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureBlobParquetReaderFactory.java
@@ -92,7 +92,7 @@
     private static String buildPathURIs(Map<String, String> configuration, IWarningCollector warningCollector,
             BlobServiceClient blobServiceClient, String endPoint) throws CompilationException {
         IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration);
-        List<BlobItem> filesOnly = ExternalDataUtils.Azure.listBlobItem(blobServiceClient, configuration,
+        List<BlobItem> filesOnly = ExternalDataUtils.Azure.listBlobItems(blobServiceClient, configuration,
                 includeExcludeMatcher, warningCollector);
 
         StringBuilder builder = new StringBuilder();
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java
new file mode 100644
index 0000000..8474a74
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/parquet/AzureDataLakeParquetReaderFactory.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.azure.parquet;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.external.input.HDFSDataSourceFactory;
+import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+
+import com.azure.storage.file.datalake.DataLakeServiceClient;
+import com.azure.storage.file.datalake.models.PathItem;
+
+public class AzureDataLakeParquetReaderFactory extends HDFSDataSourceFactory {
+    private static final long serialVersionUID = -6140824803254158253L;
+    private static final List<String> recordReaderNames =
+            Collections.singletonList(ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_DATA_LAKE);
+
+    @Override
+    public void configure(IServiceContext serviceCtx, Map<String, String> configuration,
+            IWarningCollector warningCollector) throws AlgebricksException, HyracksDataException {
+        DataLakeServiceClient dataLakeServiceClient = ExternalDataUtils.Azure.buildAzureDatalakeClient(configuration);
+
+        //Get endpoint
+        String endPoint = extractEndPoint(dataLakeServiceClient.getAccountUrl());
+
+        //Get path
+        String path = buildPathURIs(configuration, warningCollector, dataLakeServiceClient, endPoint);
+
+        //Put Azure configurations to AsterixDB's Hadoop configuration
+        putAzureDataLakeConfToHadoopConf(configuration, path);
+
+        //Configure Hadoop Azure input splits
+        JobConf conf = createHdfsConf(serviceCtx, configuration);
+        ExternalDataUtils.Azure.configureAzureHdfsJobConf(conf, configuration, endPoint);
+        configureHdfsConf(conf, configuration);
+    }
+
+    @Override
+    public List<String> getRecordReaderNames() {
+        return recordReaderNames;
+    }
+
+    @Override
+    public Set<String> getReaderSupportedFormats() {
+        return Collections.singleton(ExternalDataConstants.FORMAT_PARQUET);
+    }
+
+    /**
+     * Prepare Hadoop configurations to read parquet files
+     *
+     * @param path Comma-delimited paths
+     */
+    private static void putAzureDataLakeConfToHadoopConf(Map<String, String> configuration, String path) {
+        configuration.put(ExternalDataConstants.KEY_PATH, path);
+        configuration.put(ExternalDataConstants.KEY_INPUT_FORMAT, ExternalDataConstants.INPUT_FORMAT_PARQUET);
+        configuration.put(ExternalDataConstants.KEY_PARSER, ExternalDataConstants.FORMAT_NOOP);
+    }
+
+    /**
+     * Build Azure Datalake Storage path-style for the requested files
+     *
+     * @param configuration    properties
+     * @param warningCollector warning collector
+     * @return Comma-delimited paths (e.g., "abfss://<container-name>@<accountName>.dfs.core.windows.net/file1.parquet,
+     * abfss://<container-name>@<accountName>.dfs.core.windows.net//file2.parquet")
+     * @throws CompilationException Compilation exception
+     */
+    private static String buildPathURIs(Map<String, String> configuration, IWarningCollector warningCollector,
+            DataLakeServiceClient dataLakeServiceClient, String endPoint) throws CompilationException {
+        IncludeExcludeMatcher includeExcludeMatcher = ExternalDataUtils.getIncludeExcludeMatchers(configuration);
+        List<PathItem> filesOnly = ExternalDataUtils.Azure.listDatalakePathItems(dataLakeServiceClient, configuration,
+                includeExcludeMatcher, warningCollector);
+
+        StringBuilder builder = new StringBuilder();
+        String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
+
+        if (!filesOnly.isEmpty()) {
+            appendFileURI(builder, container, endPoint, filesOnly.get(0));
+            for (int i = 1; i < filesOnly.size(); i++) {
+                builder.append(',');
+                appendFileURI(builder, container, endPoint, filesOnly.get(i));
+            }
+        }
+
+        return builder.toString();
+    }
+
+    private static String extractEndPoint(String uri) {
+        //The URI is in the form http(s)://<accountName>.dfs.core.windows.net
+        //We need to Remove the protocol (i.e., http(s)://) from the URI
+        return uri.substring(uri.indexOf("//") + "//".length());
+    }
+
+    private static void appendFileURI(StringBuilder builder, String container, String endPoint, PathItem file) {
+        builder.append(ExternalDataConstants.Azure.HADOOP_AZURE_DATALAKE_PROTOCOL);
+        builder.append("://");
+        builder.append(container);
+        builder.append('@');
+        builder.append(endPoint);
+        builder.append('/');
+        builder.append(file.getName());
+    }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 16a0f66..b462bd9 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -387,6 +387,7 @@
         //Used when a connectionString is provided
         public static final String HADOOP_AZURE_FS_SAS = "fs.azure.sas";
         public static final String HADOOP_AZURE_BLOB_PROTOCOL = "wasbs";
+        public static final String HADOOP_AZURE_DATALAKE_PROTOCOL = "abfss";
     }
 
     public static class GCS {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index d1f740e..b38f21d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -51,6 +51,7 @@
 import static org.apache.asterix.external.util.ExternalDataConstants.Azure.HADOOP_AZURE_FS_ACCOUNT_KEY;
 import static org.apache.asterix.external.util.ExternalDataConstants.Azure.HADOOP_AZURE_FS_SAS;
 import static org.apache.asterix.external.util.ExternalDataConstants.Azure.MANAGED_IDENTITY_ID_FIELD_NAME;
+import static org.apache.asterix.external.util.ExternalDataConstants.Azure.RECURSIVE_FIELD_NAME;
 import static org.apache.asterix.external.util.ExternalDataConstants.Azure.SHARED_ACCESS_SIGNATURE_FIELD_NAME;
 import static org.apache.asterix.external.util.ExternalDataConstants.Azure.TENANT_ID_FIELD_NAME;
 import static org.apache.asterix.external.util.ExternalDataConstants.GCS.JSON_CREDENTIALS_FIELD_NAME;
@@ -127,6 +128,7 @@
 import org.apache.hyracks.util.StorageUtil;
 
 import com.azure.core.credential.AzureSasCredential;
+import com.azure.core.http.rest.PagedIterable;
 import com.azure.identity.ClientCertificateCredentialBuilder;
 import com.azure.identity.ClientSecretCredentialBuilder;
 import com.azure.identity.ManagedIdentityCredentialBuilder;
@@ -1564,7 +1566,7 @@
             }
         }
 
-        public static List<BlobItem> listBlobItem(BlobServiceClient blobServiceClient,
+        public static List<BlobItem> listBlobItems(BlobServiceClient blobServiceClient,
                 Map<String, String> configuration, IncludeExcludeMatcher includeExcludeMatcher,
                 IWarningCollector warningCollector) throws CompilationException {
             String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
@@ -1584,7 +1586,7 @@
                 Iterable<BlobItem> blobItems = blobContainer.listBlobs(listBlobsOptions, null);
 
                 // Collect the paths to files only
-                collectAndFilterFiles(blobItems, includeExcludeMatcher.getPredicate(),
+                collectAndFilterBlobFiles(blobItems, includeExcludeMatcher.getPredicate(),
                         includeExcludeMatcher.getMatchersList(), filesOnly);
 
                 // Warn if no files are returned
@@ -1607,7 +1609,7 @@
          * @param matchers  include/exclude matchers to test against
          * @param filesOnly List containing the files only (excluding folders)
          */
-        private static void collectAndFilterFiles(Iterable<BlobItem> items,
+        private static void collectAndFilterBlobFiles(Iterable<BlobItem> items,
                 BiPredicate<List<Matcher>, String> predicate, List<Matcher> matchers, List<BlobItem> filesOnly) {
             for (BlobItem item : items) {
                 String uri = item.getName();
@@ -1624,6 +1626,68 @@
             }
         }
 
+        public static List<PathItem> listDatalakePathItems(DataLakeServiceClient client,
+                Map<String, String> configuration, IncludeExcludeMatcher includeExcludeMatcher,
+                IWarningCollector warningCollector) throws CompilationException {
+            String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
+
+            List<PathItem> filesOnly = new ArrayList<>();
+
+            // Ensure the validity of include/exclude
+            ExternalDataUtils.validateIncludeExclude(configuration);
+
+            DataLakeFileSystemClient fileSystemClient;
+            try {
+                fileSystemClient = client.getFileSystemClient(container);
+
+                // Get all objects in a container and extract the paths to files
+                ListPathsOptions listOptions = new ListPathsOptions();
+                boolean recursive = Boolean.parseBoolean(configuration.get(RECURSIVE_FIELD_NAME));
+                listOptions.setRecursive(recursive);
+                listOptions.setPath(ExternalDataUtils.getPrefix(configuration, false));
+                PagedIterable<PathItem> pathItems = fileSystemClient.listPaths(listOptions, null);
+
+                // Collect the paths to files only
+                collectAndFilterDatalakeFiles(pathItems, includeExcludeMatcher.getPredicate(),
+                        includeExcludeMatcher.getMatchersList(), filesOnly);
+
+                // Warn if no files are returned
+                if (filesOnly.isEmpty() && warningCollector.shouldWarn()) {
+                    Warning warning = Warning.of(null, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
+                    warningCollector.warn(warning);
+                }
+            } catch (Exception ex) {
+                throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
+            }
+
+            return filesOnly;
+        }
+
+        /**
+         * Collects and filters the files only, and excludes any folders
+         *
+         * @param items     storage items
+         * @param predicate predicate to test with for file filtration
+         * @param matchers  include/exclude matchers to test against
+         * @param filesOnly List containing the files only (excluding folders)
+         */
+        private static void collectAndFilterDatalakeFiles(Iterable<PathItem> items,
+                BiPredicate<List<Matcher>, String> predicate, List<Matcher> matchers, List<PathItem> filesOnly) {
+            for (PathItem item : items) {
+                String uri = item.getName();
+
+                // skip folders
+                if (uri.endsWith("/")) {
+                    continue;
+                }
+
+                // No filter, add file
+                if (predicate.test(matchers, uri)) {
+                    filesOnly.add(item);
+                }
+            }
+        }
+
         /**
          * Validate external dataset properties
          *
@@ -1677,12 +1741,6 @@
                 throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, srcLoc, ExternalDataConstants.KEY_FORMAT);
             }
 
-            // parquet is not supported for azure datalake
-            if (isParquetFormat(configuration)) {
-                throw new CompilationException(INVALID_REQ_PARAM_VAL, srcLoc, KEY_FORMAT,
-                        configuration.get(KEY_FORMAT));
-            }
-
             validateIncludeExclude(configuration);
 
             // Check if the bucket is present
diff --git a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
index d551c5b..dceed82 100644
--- a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
+++ b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
@@ -25,4 +25,5 @@
 org.apache.asterix.external.input.record.reader.gcs.GCSReaderFactory
 org.apache.asterix.external.input.record.reader.azure.blob.AzureBlobReaderFactory
 org.apache.asterix.external.input.record.reader.azure.datalake.AzureDataLakeReaderFactory
-org.apache.asterix.external.input.record.reader.azure.parquet.AzureBlobParquetReaderFactory
\ No newline at end of file
+org.apache.asterix.external.input.record.reader.azure.parquet.AzureBlobParquetReaderFactory
+org.apache.asterix.external.input.record.reader.azure.parquet.AzureDataLakeParquetReaderFactory
\ No newline at end of file