[ASTERIXDB-2780][EXT] Add support for Azure Blob Storage as External Dataset
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Added support for Azure Blob Storage as External Dataset.
- Added test cases for the above mentioned item.
- Refactored S3 and Azure Blob Storage test cases, now
both external datasets are re-using the same test files.
Change-Id: I93480bf25f534f212e93492cbb68db494ea20032
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/7524
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Hussain Towaileb <hussainht@gmail.com>
Reviewed-by: Dmitry Lychagin <dmitry.lychagin@couchbase.com>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml
index 4c5a882..7bf5127 100644
--- a/asterixdb/asterix-app/pom.xml
+++ b/asterixdb/asterix-app/pom.xml
@@ -855,6 +855,11 @@
<artifactId>akka-http-core_2.12</artifactId>
<scope>test</scope>
</dependency>
+ <!-- Azure -->
+ <dependency>
+ <groupId>com.azure</groupId>
+ <artifactId>azure-storage-blob</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-1.2-api</artifactId>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestConstants.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestConstants.java
index 71468dc..ef2ec6c 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestConstants.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestConstants.java
@@ -41,17 +41,13 @@
public static final String AZURE_ACCOUNT_KEY_PLACEHOLDER = "%accountKey%";
public static final String AZURE_AZURITE_ACCOUNT_KEY_DEFAULT =
"Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsu" + "Fq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==";
- public static final String AZURE_DEFAULT_ENDPOINTS_PROTOCOL_PLACEHOLDER = "%defaultEndpointsProtocol%";
- public static final String AZURE_DEFAULT_ENDPOINTS_PROTOCOL_DEFAULT = "http";
public static final String AZURE_BLOB_ENDPOINT_PLACEHOLDER = "%blobEndpoint%";
public static final String AZURE_BLOB_ENDPOINT_DEFAULT =
"http://localhost:20000/" + AZURE_AZURITE_ACCOUNT_NAME_DEFAULT;
- public static final String AZURE_TEMPLATE = "(\"defaultEndpointsProtocol\"=\""
- + AZURE_DEFAULT_ENDPOINTS_PROTOCOL_DEFAULT + "\"),\n" + "(\"accountName\"=\""
- + AZURE_AZURITE_ACCOUNT_NAME_DEFAULT + "\"),\n" + "(\"accountKey\"=\"" + AZURE_AZURITE_ACCOUNT_KEY_DEFAULT
- + "\"),\n" + "(\"blobEndpoint\"=\"" + AZURE_BLOB_ENDPOINT_PLACEHOLDER + "\")";
- public static final String AZURE_TEMPLATE_DEFAULT = "(\"defaultEndpointsProtocol\"=\""
- + AZURE_DEFAULT_ENDPOINTS_PROTOCOL_DEFAULT + "\"),\n" + "(\"accountName\"=\""
- + AZURE_AZURITE_ACCOUNT_NAME_DEFAULT + "\"),\n" + "(\"accountKey\"=\"" + AZURE_AZURITE_ACCOUNT_KEY_DEFAULT
- + "\"),\n" + "(\"blobEndpoint\"=\"" + AZURE_BLOB_ENDPOINT_DEFAULT + "\")";
+ public static final String AZURE_TEMPLATE = "(\"accountName\"=\"" + AZURE_AZURITE_ACCOUNT_NAME_DEFAULT + "\"),\n"
+ + "(\"accountKey\"=\"" + AZURE_AZURITE_ACCOUNT_KEY_DEFAULT + "\"),\n" + "(\"blobEndpoint\"=\""
+ + AZURE_BLOB_ENDPOINT_PLACEHOLDER + "\")";
+ public static final String AZURE_TEMPLATE_DEFAULT = "(\"accountName\"=\"" + AZURE_AZURITE_ACCOUNT_NAME_DEFAULT
+ + "\"),\n" + "(\"accountKey\"=\"" + AZURE_AZURITE_ACCOUNT_KEY_DEFAULT + "\"),\n" + "(\"blobEndpoint\"=\""
+ + AZURE_BLOB_ENDPOINT_DEFAULT + "\")";
}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index a21b718..f535f2c 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -2058,6 +2058,11 @@
if (placeholder.getName().equals("adapter")) {
str = str.replace("%adapter%", placeholder.getValue());
+ // Early terminate if there are no template place holders to replace
+ if (noTemplateRequired(str)) {
+ return str;
+ }
+
if (placeholder.getValue().equalsIgnoreCase("S3")) {
return applyS3Substitution(str, placeholders);
} else if (placeholder.getValue().equalsIgnoreCase("AzureBlob")) {
@@ -2071,6 +2076,10 @@
return str;
}
+ protected boolean noTemplateRequired(String str) {
+ return !str.contains("%template%");
+ }
+
protected String applyS3Substitution(String str, List<Placeholder> placeholders) {
boolean isReplaced = false;
boolean hasRegion = false;
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/microsoft/AzureBlobStorageExternalDatasetOnePartitionTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/microsoft/AzureBlobStorageExternalDatasetOnePartitionTest.java
new file mode 100644
index 0000000..9e66207
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/microsoft/AzureBlobStorageExternalDatasetOnePartitionTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.external_dataset.microsoft;
+
+import java.util.Collection;
+
+import org.apache.asterix.test.runtime.LangExecutionUtil;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.junit.FixMethodOrder;
+import org.junit.Ignore;
+import org.junit.runner.RunWith;
+import org.junit.runners.MethodSorters;
+import org.junit.runners.Parameterized;
+
+@Ignore
+@RunWith(Parameterized.class)
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public class AzureBlobStorageExternalDatasetOnePartitionTest extends AzureBlobStorageExternalDatasetTest {
+
+ public AzureBlobStorageExternalDatasetOnePartitionTest(TestCaseContext tcCtx) {
+ super(tcCtx);
+ }
+
+ @Parameterized.Parameters(name = "AwsS3ExternalDatasetOnePartitionTest {index}: {0}")
+ public static Collection<Object[]> tests() throws Exception {
+ SUITE_TESTS = "testsuite_external_dataset_azure_blob_storage_one_partition.xml";
+ ONLY_TESTS = "only_external_dataset.xml";
+ TEST_CONFIG_FILE_NAME = "src/test/resources/cc-single.conf";
+ PREPARE_PLAYGROUND_CONTAINER = AzureBlobStorageExternalDatasetOnePartitionTest::preparePlaygroundContainer;
+ PREPARE_FIXED_DATA_CONTAINER = AzureBlobStorageExternalDatasetOnePartitionTest::prepareFixedDataContainer;
+ PREPARE_MIXED_DATA_CONTAINER = AzureBlobStorageExternalDatasetOnePartitionTest::prepareMixedDataContainer;
+ return LangExecutionUtil.tests(ONLY_TESTS, SUITE_TESTS);
+ }
+
+ private static void preparePlaygroundContainer() {
+ }
+
+ private static void prepareFixedDataContainer() {
+ }
+
+ private static void prepareMixedDataContainer() {
+ }
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/microsoft/AzureBlobStorageExternalDatasetTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/microsoft/AzureBlobStorageExternalDatasetTest.java
new file mode 100644
index 0000000..da9f912
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/microsoft/AzureBlobStorageExternalDatasetTest.java
@@ -0,0 +1,594 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.external_dataset.microsoft;
+
+import static org.apache.hyracks.util.file.FileUtil.joinPath;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.zip.GZIPOutputStream;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.test.common.TestExecutor;
+import org.apache.asterix.test.runtime.ExecutionTestUtil;
+import org.apache.asterix.test.runtime.LangExecutionUtil;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.asterix.testframework.context.TestFileContext;
+import org.apache.asterix.testframework.xml.TestCase;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.FixMethodOrder;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.MethodSorters;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.BlobServiceClientBuilder;
+
+@Ignore
+@RunWith(Parameterized.class)
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public class AzureBlobStorageExternalDatasetTest {
+
+ private static final Logger LOGGER = LogManager.getLogger();
+
+ // subclasses of this class MUST instantiate these variables before using them to avoid unexpected behavior
+ static String SUITE_TESTS;
+ static String ONLY_TESTS;
+ static String TEST_CONFIG_FILE_NAME;
+ static Runnable PREPARE_PLAYGROUND_CONTAINER;
+ static Runnable PREPARE_FIXED_DATA_CONTAINER;
+ static Runnable PREPARE_MIXED_DATA_CONTAINER;
+
+ // Base directory paths for data files
+ private static final String JSON_DATA_PATH = joinPath("data", "json");
+ private static final String CSV_DATA_PATH = joinPath("data", "csv");
+ private static final String TSV_DATA_PATH = joinPath("data", "tsv");
+ private static final String MIXED_DATA_PATH = joinPath("data", "mixed");
+
+ // Service endpoint
+ private static final int BLOB_SERVICE_PORT = 20000;
+ private static final String BLOB_SERVICE_ENDPOINT = "http://localhost:" + BLOB_SERVICE_PORT;
+
+ // Region, container and definitions
+ private static final String PLAYGROUND_CONTAINER = "playground";
+ private static final String FIXED_DATA_CONTAINER = "fixed-data"; // Do not use, has fixed data
+ private static final String INCLUDE_EXCLUDE_CONTAINER = "include-exclude";
+ private static final String JSON_DEFINITION = "json-data/reviews/";
+ private static final String CSV_DEFINITION = "csv-data/reviews/";
+ private static final String TSV_DEFINITION = "tsv-data/reviews/";
+
+ // This is used for a test to generate over 1000 number of files
+ private static final String OVER_1000_OBJECTS_PATH = "over-1000-objects";
+ private static final int OVER_1000_OBJECTS_COUNT = 2999;
+
+ private static final Set<String> fileNames = new HashSet<>();
+
+ // Create a BlobServiceClient object which will be used to create a container client
+ private static final String connectionString = "AccountName=devstoreaccount1;"
+ + "AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;"
+ + "BlobEndpoint=" + BLOB_SERVICE_ENDPOINT + "/devstoreaccount1;";
+ private static BlobServiceClient blobServiceClient;
+ private static BlobContainerClient playgroundContainer;
+
+ protected TestCaseContext tcCtx;
+
+ public AzureBlobStorageExternalDatasetTest(TestCaseContext tcCtx) {
+ this.tcCtx = tcCtx;
+ }
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ final TestExecutor testExecutor = new AzureTestExecutor();
+ LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, testExecutor);
+ setNcEndpoints(testExecutor);
+ createBlobServiceClient();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ LangExecutionUtil.tearDown();
+ }
+
+ @Parameters(name = "AzureBlobStorageExternalDatasetTest {index}: {0}")
+ public static Collection<Object[]> tests() throws Exception {
+ SUITE_TESTS = "testsuite_external_dataset_azure_blob_storage.xml";
+ ONLY_TESTS = "only_external_dataset.xml";
+ TEST_CONFIG_FILE_NAME = "src/main/resources/cc.conf";
+ PREPARE_PLAYGROUND_CONTAINER = AzureBlobStorageExternalDatasetTest::preparePlaygroundContainer;
+ PREPARE_FIXED_DATA_CONTAINER = AzureBlobStorageExternalDatasetTest::prepareFixedDataContainer;
+ PREPARE_MIXED_DATA_CONTAINER = AzureBlobStorageExternalDatasetTest::prepareMixedDataContainer;
+ return LangExecutionUtil.tests(ONLY_TESTS, SUITE_TESTS);
+ }
+
+ @Test
+ public void test() throws Exception {
+ LangExecutionUtil.test(tcCtx);
+ }
+
+ private static void setNcEndpoints(TestExecutor testExecutor) {
+ final NodeControllerService[] ncs = ExecutionTestUtil.integrationUtil.ncs;
+ final Map<String, InetSocketAddress> ncEndPoints = new HashMap<>();
+ final String ip = InetAddress.getLoopbackAddress().getHostAddress();
+ for (NodeControllerService nc : ncs) {
+ final String nodeId = nc.getId();
+ final INcApplicationContext appCtx = (INcApplicationContext) nc.getApplicationContext();
+ int apiPort = appCtx.getExternalProperties().getNcApiPort();
+ ncEndPoints.put(nodeId, InetSocketAddress.createUnresolved(ip, apiPort));
+ }
+ testExecutor.setNcEndPoints(ncEndPoints);
+ }
+
+ private static void createBlobServiceClient() {
+ LOGGER.info("Creating Azurite Blob Service client");
+ blobServiceClient = new BlobServiceClientBuilder().connectionString(connectionString).buildClient();
+ LOGGER.info("Azurite Blob Service client created successfully");
+
+ // Create the container and upload some json files
+ PREPARE_PLAYGROUND_CONTAINER.run();
+ PREPARE_FIXED_DATA_CONTAINER.run();
+ PREPARE_MIXED_DATA_CONTAINER.run();
+ }
+
+ /**
+ * Creates a container and fills it with some files for testing purpose.
+ */
+ private static void preparePlaygroundContainer() {
+ deleteContainerSilently(PLAYGROUND_CONTAINER);
+
+ LOGGER.info("creating container " + PLAYGROUND_CONTAINER);
+ playgroundContainer = blobServiceClient.createBlobContainer(PLAYGROUND_CONTAINER);
+ LOGGER.info("container " + PLAYGROUND_CONTAINER + " created successfully");
+
+ LOGGER.info("Adding JSON files");
+ loadJsonFiles();
+ LOGGER.info("JSON Files added successfully");
+
+ LOGGER.info("Adding CSV files");
+ loadCsvFiles();
+ LOGGER.info("CSV Files added successfully");
+
+ LOGGER.info("Adding TSV files");
+ loadTsvFiles();
+ LOGGER.info("TSV Files added successfully");
+
+ LOGGER.info("Loading " + OVER_1000_OBJECTS_COUNT + " into " + OVER_1000_OBJECTS_PATH);
+ loadLargeNumberOfFiles();
+ LOGGER.info("Added " + OVER_1000_OBJECTS_COUNT + " files into " + OVER_1000_OBJECTS_PATH + " successfully");
+ }
+
+ /**
+ * This container is being filled by fixed data, a test is counting all records. If this container is
+ * changed, the test case will fail and its result will need to be updated each time
+ */
+ private static void prepareFixedDataContainer() {
+ deleteContainerSilently(FIXED_DATA_CONTAINER);
+ LOGGER.info("creating container " + FIXED_DATA_CONTAINER);
+ BlobContainerClient fixedDataContainer = blobServiceClient.createBlobContainer(FIXED_DATA_CONTAINER);
+ LOGGER.info("container " + FIXED_DATA_CONTAINER + " created successfully");
+
+ LOGGER.info("Loading fixed data to " + FIXED_DATA_CONTAINER);
+
+ // Files data
+ Path filePath = Paths.get(JSON_DATA_PATH, "single-line", "20-records.json");
+ fixedDataContainer.getBlobClient("1.json").uploadFromFile(filePath.toString());
+ fixedDataContainer.getBlobClient("2.json").uploadFromFile(filePath.toString());
+ fixedDataContainer.getBlobClient("lvl1/3.json").uploadFromFile(filePath.toString());
+ fixedDataContainer.getBlobClient("lvl1/4.json").uploadFromFile(filePath.toString());
+ fixedDataContainer.getBlobClient("lvl1/lvl2/5.json").uploadFromFile(filePath.toString());
+ }
+
+ private static void loadJsonFiles() {
+ String dataBasePath = JSON_DATA_PATH;
+ String definition = JSON_DEFINITION;
+
+ // Normal format
+ String definitionSegment = "json";
+ loadData(dataBasePath, "single-line", "20-records.json", definition, definitionSegment, false);
+ loadData(dataBasePath, "multi-lines", "20-records.json", definition, definitionSegment, false);
+ loadData(dataBasePath, "multi-lines-with-arrays", "5-records.json", definition, definitionSegment, false);
+ loadData(dataBasePath, "multi-lines-with-nested-objects", "5-records.json", definition, definitionSegment,
+ false);
+
+ definitionSegment = "json-array-of-objects";
+ loadData(dataBasePath, "single-line", "array_of_objects.json", "json-data/", definitionSegment, false, false);
+
+ // gz compressed format
+ definitionSegment = "gz";
+ loadGzData(dataBasePath, "single-line", "20-records.json", definition, definitionSegment, false);
+ loadGzData(dataBasePath, "multi-lines", "20-records.json", definition, definitionSegment, false);
+ loadGzData(dataBasePath, "multi-lines-with-arrays", "5-records.json", definition, definitionSegment, false);
+ loadGzData(dataBasePath, "multi-lines-with-nested-objects", "5-records.json", definition, definitionSegment,
+ false);
+
+ // Mixed normal and gz compressed format
+ definitionSegment = "mixed";
+ loadData(dataBasePath, "single-line", "20-records.json", definition, definitionSegment, false);
+ loadData(dataBasePath, "multi-lines", "20-records.json", definition, definitionSegment, false);
+ loadData(dataBasePath, "multi-lines-with-arrays", "5-records.json", definition, definitionSegment, false);
+ loadData(dataBasePath, "multi-lines-with-nested-objects", "5-records.json", definition, definitionSegment,
+ false);
+ loadGzData(dataBasePath, "single-line", "20-records.json", definition, definitionSegment, false);
+ loadGzData(dataBasePath, "multi-lines", "20-records.json", definition, definitionSegment, false);
+ loadGzData(dataBasePath, "multi-lines-with-arrays", "5-records.json", definition, definitionSegment, false);
+ loadGzData(dataBasePath, "multi-lines-with-nested-objects", "5-records.json", definition, definitionSegment,
+ false);
+ }
+
+ private static void loadCsvFiles() {
+ String dataBasePath = CSV_DATA_PATH;
+ String definition = CSV_DEFINITION;
+
+ // Normal format
+ String definitionSegment = "csv";
+ loadData(dataBasePath, "", "01.csv", definition, definitionSegment, false);
+ loadData(dataBasePath, "", "02.csv", definition, definitionSegment, false);
+
+ // gz compressed format
+ definitionSegment = "gz";
+ loadGzData(dataBasePath, "", "01.csv", definition, definitionSegment, false);
+ loadGzData(dataBasePath, "", "02.csv", definition, definitionSegment, false);
+
+ // Mixed normal and gz compressed format
+ definitionSegment = "mixed";
+ loadData(dataBasePath, "", "01.csv", definition, definitionSegment, false);
+ loadData(dataBasePath, "", "02.csv", definition, definitionSegment, false);
+ loadGzData(dataBasePath, "", "01.csv", definition, definitionSegment, false);
+ loadGzData(dataBasePath, "", "02.csv", definition, definitionSegment, false);
+ }
+
+ private static void loadTsvFiles() {
+ String dataBasePath = TSV_DATA_PATH;
+ String definition = TSV_DEFINITION;
+
+ // Normal format
+ String definitionSegment = "tsv";
+ loadData(dataBasePath, "", "01.tsv", definition, definitionSegment, false);
+ loadData(dataBasePath, "", "02.tsv", definition, definitionSegment, false);
+
+ // gz compressed format
+ definitionSegment = "gz";
+ loadGzData(dataBasePath, "", "01.tsv", definition, definitionSegment, false);
+ loadGzData(dataBasePath, "", "02.tsv", definition, definitionSegment, false);
+
+ // Mixed normal and gz compressed format
+ definitionSegment = "mixed";
+ loadData(dataBasePath, "", "01.tsv", definition, definitionSegment, false);
+ loadData(dataBasePath, "", "02.tsv", definition, definitionSegment, false);
+ loadGzData(dataBasePath, "", "01.tsv", definition, definitionSegment, false);
+ loadGzData(dataBasePath, "", "02.tsv", definition, definitionSegment, false);
+ }
+
+ private static void loadData(String fileBasePath, String filePathSegment, String filename, String definition,
+ String definitionSegment, boolean removeExtension) {
+ loadData(fileBasePath, filePathSegment, filename, definition, definitionSegment, removeExtension, true);
+ }
+
+ private static void loadData(String fileBasePath, String filePathSegment, String filename, String definition,
+ String definitionSegment, boolean removeExtension, boolean copyToSubLevels) {
+ // Files data
+ Path filePath = Paths.get(fileBasePath, filePathSegment, filename);
+
+ // Keep or remove the file extension
+ Assert.assertFalse("Files with no extension are not supported yet for external datasets", removeExtension);
+ String finalFileName;
+ if (removeExtension) {
+ finalFileName = FilenameUtils.removeExtension(filename);
+ } else {
+ finalFileName = filename;
+ }
+
+ // Files base definition
+ filePathSegment = filePathSegment.isEmpty() ? "" : filePathSegment + "/";
+ definitionSegment = definitionSegment.isEmpty() ? "" : definitionSegment + "/";
+ String basePath = definition + filePathSegment + definitionSegment;
+
+ // Load the data
+ playgroundContainer.getBlobClient(basePath + finalFileName).uploadFromFile(filePath.toString());
+ if (copyToSubLevels) {
+ playgroundContainer.getBlobClient(basePath + "level1a/" + finalFileName)
+ .uploadFromFile(filePath.toString());
+ playgroundContainer.getBlobClient(basePath + "level1b/" + finalFileName)
+ .uploadFromFile(filePath.toString());
+ playgroundContainer.getBlobClient(basePath + "level1a/level2a/" + finalFileName)
+ .uploadFromFile(filePath.toString());
+ playgroundContainer.getBlobClient(basePath + "level1a/level2b/" + finalFileName)
+ .uploadFromFile(filePath.toString());
+ }
+ }
+
+ private static void loadGzData(String fileBasePath, String filePathSegment, String filename, String definition,
+ String definitionSegment, boolean removeExtension) {
+ try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream)) {
+
+ // Files data
+ Path filePath = Paths.get(fileBasePath, filePathSegment, filename);
+
+ // Get the compressed data
+ gzipOutputStream.write(Files.readAllBytes(filePath));
+ gzipOutputStream.close(); // Need to close or data will be invalid
+ byte[] gzipBytes = byteArrayOutputStream.toByteArray();
+
+ // Keep or remove the file extension
+ Assert.assertFalse("Files with no extension are not supported yet for external datasets", removeExtension);
+ String finalFileName;
+ if (removeExtension) {
+ finalFileName = FilenameUtils.removeExtension(filename);
+ } else {
+ finalFileName = filename;
+ }
+ finalFileName += ".gz";
+
+ // Files base definition
+ filePathSegment = filePathSegment.isEmpty() ? "" : filePathSegment + "/";
+ definitionSegment = definitionSegment.isEmpty() ? "" : definitionSegment + "/";
+ String basePath = definition + filePathSegment + definitionSegment;
+
+ // Load the data
+ ByteArrayInputStream inputStream = new ByteArrayInputStream(gzipBytes);
+ playgroundContainer.getBlobClient(basePath + finalFileName).upload(inputStream, inputStream.available());
+ inputStream.reset();
+ playgroundContainer.getBlobClient(basePath + "level1a/" + finalFileName).upload(inputStream,
+ inputStream.available());
+ inputStream.reset();
+ playgroundContainer.getBlobClient(basePath + "level1b/" + finalFileName).upload(inputStream,
+ inputStream.available());
+ inputStream.reset();
+ playgroundContainer.getBlobClient(basePath + "level1a/level2a/" + finalFileName).upload(inputStream,
+ inputStream.available());
+ inputStream.reset();
+ playgroundContainer.getBlobClient(basePath + "level1a/level2b/" + finalFileName).upload(inputStream,
+ inputStream.available());
+ closeInputStreamSilently(inputStream);
+ } catch (Exception ex) {
+ LOGGER.error(ex.getMessage());
+ }
+ }
+
+ private static void loadLargeNumberOfFiles() {
+ ByteArrayInputStream inputStream = null;
+ for (int i = 0; i < OVER_1000_OBJECTS_COUNT; i++) {
+ inputStream = new ByteArrayInputStream(("{\"id\":" + i + "}").getBytes());
+ playgroundContainer.getBlobClient(OVER_1000_OBJECTS_PATH + "/" + i + ".json").upload(inputStream,
+ inputStream.available());
+ }
+ closeInputStreamSilently(inputStream);
+ }
+
+ /**
+ * Loads a combination of different file formats in the same path
+ */
+ private static void prepareMixedDataContainer() {
+ deleteContainerSilently(INCLUDE_EXCLUDE_CONTAINER);
+ LOGGER.info("creating container " + INCLUDE_EXCLUDE_CONTAINER);
+ BlobContainerClient includeExcludeContainer = blobServiceClient.createBlobContainer(INCLUDE_EXCLUDE_CONTAINER);
+ LOGGER.info("container " + INCLUDE_EXCLUDE_CONTAINER + " created successfully");
+
+ // JSON
+ ByteArrayInputStream inputStream = new ByteArrayInputStream(("{\"id\":" + 1 + "}").getBytes());
+ includeExcludeContainer.getBlobClient(MIXED_DATA_PATH + "/json/extension/" + "hello-world-2018.json")
+ .upload(inputStream, inputStream.available());
+ inputStream = new ByteArrayInputStream(("{\"id\":" + 2 + "}").getBytes());
+ includeExcludeContainer.getBlobClient(MIXED_DATA_PATH + "/json/extension/" + "hello-world-2019.json")
+ .upload(inputStream, inputStream.available());
+ inputStream = new ByteArrayInputStream(("{\"id\":" + 3 + "}").getBytes());
+ includeExcludeContainer.getBlobClient(MIXED_DATA_PATH + "/json/extension/" + "hello-world-2020.json")
+ .upload(inputStream, inputStream.available());
+ inputStream = new ByteArrayInputStream(("{\"id\":" + 4 + "}").getBytes());
+ includeExcludeContainer.getBlobClient(MIXED_DATA_PATH + "/json/EXTENSION/" + "goodbye-world-2018.json")
+ .upload(inputStream, inputStream.available());
+ inputStream = new ByteArrayInputStream(("{\"id\":" + 5 + "}").getBytes());
+ includeExcludeContainer.getBlobClient(MIXED_DATA_PATH + "/json/EXTENSION/" + "goodbye-world-2019.json")
+ .upload(inputStream, inputStream.available());
+ inputStream = new ByteArrayInputStream(("{\"id\":" + 6 + "}").getBytes());
+ includeExcludeContainer.getBlobClient(MIXED_DATA_PATH + "/json/EXTENSION/" + "goodbye-world-2020.json")
+ .upload(inputStream, inputStream.available());
+
+ // CSV
+ inputStream = new ByteArrayInputStream(("7,\"good\"").getBytes());
+ includeExcludeContainer.getBlobClient(MIXED_DATA_PATH + "/csv/extension/" + "hello-world-2018.csv")
+ .upload(inputStream, inputStream.available());
+ inputStream = new ByteArrayInputStream(("8,\"good\"").getBytes());
+ includeExcludeContainer.getBlobClient(MIXED_DATA_PATH + "/csv/extension/" + "hello-world-2019.csv")
+ .upload(inputStream, inputStream.available());
+ inputStream = new ByteArrayInputStream(("9,\"good\"").getBytes());
+ includeExcludeContainer.getBlobClient(MIXED_DATA_PATH + "/csv/extension/" + "hello-world-2020.csv")
+ .upload(inputStream, inputStream.available());
+ inputStream = new ByteArrayInputStream(("10,\"good\"").getBytes());
+ includeExcludeContainer.getBlobClient(MIXED_DATA_PATH + "/csv/EXTENSION/" + "goodbye-world-2018.csv")
+ .upload(inputStream, inputStream.available());
+ inputStream = new ByteArrayInputStream(("11,\"good\"").getBytes());
+ includeExcludeContainer.getBlobClient(MIXED_DATA_PATH + "/csv/EXTENSION/" + "goodbye-world-2019.csv")
+ .upload(inputStream, inputStream.available());
+ inputStream = new ByteArrayInputStream(("12,\"good\"").getBytes());
+ includeExcludeContainer.getBlobClient(MIXED_DATA_PATH + "/csv/EXTENSION/" + "goodbye-world-2020.csv")
+ .upload(inputStream, inputStream.available());
+
+ // TSV
+ inputStream = new ByteArrayInputStream(("13\t\"good\"").getBytes());
+ includeExcludeContainer.getBlobClient(MIXED_DATA_PATH + "/tsv/extension/" + "hello-world-2018.tsv")
+ .upload(inputStream, inputStream.available());
+ inputStream = new ByteArrayInputStream(("14\t\"good\"").getBytes());
+ includeExcludeContainer.getBlobClient(MIXED_DATA_PATH + "/tsv/extension/" + "hello-world-2019.tsv")
+ .upload(inputStream, inputStream.available());
+ inputStream = new ByteArrayInputStream(("15\t\"good\"").getBytes());
+ includeExcludeContainer.getBlobClient(MIXED_DATA_PATH + "/tsv/extension/" + "hello-world-2020.tsv")
+ .upload(inputStream, inputStream.available());
+ inputStream = new ByteArrayInputStream(("16\t\"good\"").getBytes());
+ includeExcludeContainer.getBlobClient(MIXED_DATA_PATH + "/tsv/EXTENSION/" + "goodbye-world-2018.tsv")
+ .upload(inputStream, inputStream.available());
+ inputStream = new ByteArrayInputStream(("17\t\"good\"").getBytes());
+ includeExcludeContainer.getBlobClient(MIXED_DATA_PATH + "/tsv/EXTENSION/" + "goodbye-world-2019.tsv")
+ .upload(inputStream, inputStream.available());
+ inputStream = new ByteArrayInputStream(("18\t\"good\"").getBytes());
+ includeExcludeContainer.getBlobClient(MIXED_DATA_PATH + "/tsv/EXTENSION/" + "goodbye-world-2020.tsv")
+ .upload(inputStream, inputStream.available());
+
+ // JSON no extension
+ inputStream = new ByteArrayInputStream(("{\"id\":" + 1 + "}").getBytes());
+ includeExcludeContainer.getBlobClient(MIXED_DATA_PATH + "/json/no-extension/" + "hello-world-2018")
+ .upload(inputStream, inputStream.available());
+ inputStream = new ByteArrayInputStream(("{\"id\":" + 2 + "}").getBytes());
+ includeExcludeContainer.getBlobClient(MIXED_DATA_PATH + "/json/no-extension/" + "hello-world-2019")
+ .upload(inputStream, inputStream.available());
+ inputStream = new ByteArrayInputStream(("{\"id\":" + 3 + "}").getBytes());
+ includeExcludeContainer.getBlobClient(MIXED_DATA_PATH + "/json/no-extension/" + "hello-world-2020")
+ .upload(inputStream, inputStream.available());
+ inputStream = new ByteArrayInputStream(("{\"id\":" + 4 + "}").getBytes());
+ includeExcludeContainer.getBlobClient(MIXED_DATA_PATH + "/json/NO-EXTENSION/" + "goodbye-world-2018")
+ .upload(inputStream, inputStream.available());
+ inputStream = new ByteArrayInputStream(("{\"id\":" + 5 + "}").getBytes());
+ includeExcludeContainer.getBlobClient(MIXED_DATA_PATH + "/json/NO-EXTENSION/" + "goodbye-world-2019")
+ .upload(inputStream, inputStream.available());
+ inputStream = new ByteArrayInputStream(("{\"id\":" + 6 + "}").getBytes());
+ includeExcludeContainer.getBlobClient(MIXED_DATA_PATH + "/json/NO-EXTENSION/" + "goodbye-world-2020")
+ .upload(inputStream, inputStream.available());
+
+ closeInputStreamSilently(inputStream);
+ }
+
+ static class AzureTestExecutor extends TestExecutor {
+
+ public void executeTestFile(TestCaseContext testCaseCtx, TestFileContext ctx, Map<String, Object> variableCtx,
+ String statement, boolean isDmlRecoveryTest, ProcessBuilder pb, TestCase.CompilationUnit cUnit,
+ MutableInt queryCount, List<TestFileContext> expectedResultFileCtxs, File testFile, String actualPath,
+ BitSet expectedWarnings) throws Exception {
+ String[] lines;
+ switch (ctx.getType()) {
+ case "container":
+ // <container> <def> <sub-path:new_fname:src_file1,sub-path:new_fname:src_file2,sub-path:src_file3>
+ lines = TestExecutor.stripAllComments(statement).trim().split("\n");
+ String lastLine = lines[lines.length - 1];
+ String[] command = lastLine.trim().split(" ");
+ int length = command.length;
+ if (length != 3) {
+ throw new Exception("invalid create container format");
+ }
+ dropRecreateContainer(command[0], command[1], command[2]);
+ break;
+ default:
+ super.executeTestFile(testCaseCtx, ctx, variableCtx, statement, isDmlRecoveryTest, pb, cUnit,
+ queryCount, expectedResultFileCtxs, testFile, actualPath, expectedWarnings);
+ }
+ }
+ }
+
+ private static void dropRecreateContainer(String containerName, String definition, String files) {
+ String definitionPath = definition + (definition.endsWith("/") ? "" : "/");
+ String[] fileSplits = files.split(",");
+
+ LOGGER.info("Deleting container " + containerName);
+ try {
+ blobServiceClient.deleteBlobContainer(containerName);
+ } catch (Exception ex) {
+ // Ignore
+ }
+ LOGGER.info("Container " + containerName + " deleted successfully");
+
+ BlobContainerClient containerClient;
+ LOGGER.info("Creating container " + containerName);
+ containerClient = blobServiceClient.createBlobContainer(containerName);
+ LOGGER.info("Uploading to container " + containerName + " definition " + definitionPath);
+ fileNames.clear();
+ for (String fileSplit : fileSplits) {
+ String[] pathAndSourceFile = fileSplit.split(":");
+ int size = pathAndSourceFile.length;
+ String path;
+ String sourceFilePath;
+ String uploadedFileName;
+ if (size == 1) {
+ // case: playground json-data/reviews SOURCE_FILE1,SOURCE_FILE2
+ path = definitionPath;
+ sourceFilePath = pathAndSourceFile[0];
+ uploadedFileName = FilenameUtils.getName(pathAndSourceFile[0]);
+ } else if (size == 2) {
+ // case: playground json-data/reviews level1/sub-level:SOURCE_FILE1,level2/sub-level:SOURCE_FILE2
+ String subPathOrNewFileName = pathAndSourceFile[0];
+ if (subPathOrNewFileName.startsWith("$$")) {
+ path = definitionPath;
+ sourceFilePath = pathAndSourceFile[1];
+ uploadedFileName = subPathOrNewFileName.substring(2);
+ } else {
+ path = definitionPath + subPathOrNewFileName + (subPathOrNewFileName.endsWith("/") ? "" : "/");
+ sourceFilePath = pathAndSourceFile[1];
+ uploadedFileName = FilenameUtils.getName(pathAndSourceFile[1]);
+ }
+ } else if (size == 3) {
+ path = definitionPath + pathAndSourceFile[0] + (pathAndSourceFile[0].endsWith("/") ? "" : "/");
+ uploadedFileName = pathAndSourceFile[1];
+ sourceFilePath = pathAndSourceFile[2];
+
+ } else {
+ throw new IllegalArgumentException();
+ }
+
+ String keyPath = path + uploadedFileName;
+ int k = 1;
+ while (fileNames.contains(keyPath)) {
+ keyPath = path + (k++) + uploadedFileName;
+ }
+ fileNames.add(keyPath);
+ containerClient.getBlobClient(keyPath).uploadFromFile(sourceFilePath);
+ }
+ LOGGER.info("Done creating container with data");
+ }
+
+ private static void deleteContainerSilently(String containerName) {
+ LOGGER.info("Deleting container " + containerName);
+ try {
+ blobServiceClient.deleteBlobContainer(containerName);
+ } catch (Exception ex) {
+ // Do nothing
+ }
+ LOGGER.info("Container " + containerName + " deleted successfully");
+ }
+
+ private static void closeInputStreamSilently(InputStream inputStream) {
+ try {
+ inputStream.close();
+ } catch (Exception ex) {
+ LOGGER.error(ex.getMessage());
+ }
+ }
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
index 20420ab..408882d 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
@@ -95,17 +95,26 @@
}
public static Collection<Object[]> tests(String onlyFilePath, String suiteFilePath) throws Exception {
- Collection<Object[]> testArgs = buildTestsInXml(onlyFilePath);
+ return tests(onlyFilePath, suiteFilePath, null);
+ }
+
+ public static Collection<Object[]> tests(String onlyFilePath, String suiteFilePath, String pathBase)
+ throws Exception {
+ Collection<Object[]> testArgs = buildTestsInXml(onlyFilePath, pathBase);
if (testArgs.size() == 0) {
- testArgs = buildTestsInXml(suiteFilePath);
+ testArgs = buildTestsInXml(suiteFilePath, pathBase);
}
return testArgs;
}
protected static Collection<Object[]> buildTestsInXml(String xmlfile) throws Exception {
+ return buildTestsInXml(xmlfile, null);
+ }
+
+ protected static Collection<Object[]> buildTestsInXml(String xmlfile, String pathBase) throws Exception {
Collection<Object[]> testArgs = new ArrayList<>();
TestCaseContext.Builder b = new TestCaseContext.Builder();
- for (TestCaseContext ctx : b.build(new File(PATH_BASE), xmlfile)) {
+ for (TestCaseContext ctx : b.build(new File(pathBase == null ? PATH_BASE : pathBase), xmlfile)) {
testArgs.add(new Object[] { ctx });
}
return testArgs;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_azure_blob_storage.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_azure_blob_storage.xml
new file mode 100644
index 0000000..a715e7e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_azure_blob_storage.xml
@@ -0,0 +1,292 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements. See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership. The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License. You may obtain a copy of the License at
+ !
+ ! http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied. See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<test-suite xmlns="urn:xml.testframework.asterix.apache.org" ResultOffsetPath="results" QueryOffsetPath="queries_sqlpp" QueryFileExtension=".sqlpp">
+ <test-group name="external-dataset">
+ <test-case FilePath="external-dataset">
+ <compilation-unit name="common/json/json">
+ <placeholder name="adapter" value="AZUREBLOB" />
+ <output-dir compare="Text">common/json/json</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset">
+ <compilation-unit name="common/json/gz">
+ <placeholder name="adapter" value="AZUREBLOB" />
+ <output-dir compare="Text">common/json/gz</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset">
+ <compilation-unit name="common/json/mixed">
+ <placeholder name="adapter" value="AZUREBLOB" />
+ <output-dir compare="Text">common/json/mixed</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset">
+ <compilation-unit name="common/csv/csv">
+ <placeholder name="adapter" value="AZUREBLOB" />
+ <output-dir compare="Text">common/csv/csv</output-dir>
+ </compilation-unit>
+ </test-case><test-case FilePath="external-dataset">
+ <compilation-unit name="common/csv/gz">
+ <placeholder name="adapter" value="AZUREBLOB" />
+ <output-dir compare="Text">common/csv/gz</output-dir>
+ </compilation-unit>
+ </test-case><test-case FilePath="external-dataset">
+ <compilation-unit name="common/csv/mixed">
+ <placeholder name="adapter" value="AZUREBLOB" />
+ <output-dir compare="Text">common/csv/mixed</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset">
+ <compilation-unit name="common/tsv/tsv">
+ <placeholder name="adapter" value="AZUREBLOB" />
+ <output-dir compare="Text">common/tsv/tsv</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset">
+ <compilation-unit name="common/tsv/gz">
+ <placeholder name="adapter" value="AZUREBLOB" />
+ <output-dir compare="Text">common/tsv/gz</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset">
+ <compilation-unit name="common/tsv/mixed">
+ <placeholder name="adapter" value="AZUREBLOB" />
+ <output-dir compare="Text">common/tsv/mixed</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset">
+ <compilation-unit name="common/empty-string-definition">
+ <placeholder name="adapter" value="AZUREBLOB" />
+ <output-dir compare="Text">common/empty-string-definition</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset">
+ <compilation-unit name="common/over-1000-objects">
+ <placeholder name="adapter" value="AZUREBLOB" />
+ <output-dir compare="Text">common/over-1000-objects</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset">
+ <compilation-unit name="common/malformed-json">
+ <placeholder name="adapter" value="AZUREBLOB" />
+ <output-dir compare="Text">common/malformed-json</output-dir>
+ <expected-error>Parsing error at malformed-data/duplicate-fields.json line 1 field field: Duplicate field 'field'</expected-error>
+ <expected-error>Parsing error at malformed-data/malformed-json.json line 1 field field: Unexpected character ('}' (code 125)): was expecting double-quote to start field name</expected-error>
+ <expected-error>Parsing error at malformed-data/malformed-json-2.json line 4 field array_f: Unexpected character (']' (code 93)): expected a valid value (JSON String, Number, Array, Object or token 'null', 'true' or 'false')</expected-error>
+ <expected-error>Parsing error at malformed-data/malformed-jsonl-1.json line 3 field field2: Unrecognized token 'truee': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')</expected-error>
+ <expected-error>Parsing error at malformed-data/malformed-jsonl-2.json line 11 field array_f: Unexpected character (']' (code 93)): expected a valid value (JSON String, Number, Array, Object or token 'null', 'true' or 'false')</expected-error>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset">
+ <compilation-unit name="common/definition-does-not-exist">
+ <placeholder name="adapter" value="AZUREBLOB" />
+ <output-dir compare="Text">common/definition-does-not-exist</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset">
+ <compilation-unit name="common/invalid-endpoint">
+ <placeholder name="adapter" value="AZUREBLOB" />
+ <placeholder name="blobEndpoint" value="http://^invalid-endpoint^" />
+ <output-dir compare="Text">common/invalid-endpoint</output-dir>
+ <expected-error>External source error. java.net.URISyntaxException: Illegal character in authority at index 7: http://^invalid-endpoint^</expected-error>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset">
+ <compilation-unit name="common/bucket-does-not-exist">
+ <placeholder name="adapter" value="AZUREBLOB" />
+ <output-dir compare="Text">common/bucket-does-not-exist</output-dir>
+ <expected-error>External source error. Status code 404</expected-error>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset" check-warnings="true">
+ <compilation-unit name="common/no-files-returned/definition-points-to-nothing">
+ <placeholder name="adapter" value="AZUREBLOB" />
+ <output-dir compare="Text">common/no-files-returned/definition-points-to-nothing</output-dir>
+ <expected-warn>The provided external dataset configuration returned no files from the external source</expected-warn>
+ <expected-warn>The provided external dataset configuration returned no files from the external source</expected-warn>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset" check-warnings="true">
+ <compilation-unit name="common/no-files-returned/exclude-all-files">
+ <placeholder name="adapter" value="AZUREBLOB" />
+ <output-dir compare="Text">common/no-files-returned/exclude-all-files</output-dir>
+ <expected-warn>The provided external dataset configuration returned no files from the external source</expected-warn>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset" check-warnings="true">
+ <compilation-unit name="common/no-files-returned/include-no-files">
+ <placeholder name="adapter" value="AZUREBLOB" />
+ <output-dir compare="Text">common/no-files-returned/include-no-files</output-dir>
+ <expected-warn>The provided external dataset configuration returned no files from the external source</expected-warn>
+ </compilation-unit>
+ </test-case>
+ </test-group>
+ <test-group name="include-exclude">
+ <test-case FilePath="external-dataset">
+ <compilation-unit name="common/include-exclude/bad-name-1">
+ <placeholder name="adapter" value="AZUREBLOB" />
+ <output-dir compare="Text">common/include-exclude/bad-name-1</output-dir>
+ <expected-error>Invalid format for property "exclude1"</expected-error>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset">
+ <compilation-unit name="common/include-exclude/bad-name-2">
+ <placeholder name="adapter" value="AZUREBLOB" />
+ <output-dir compare="Text">common/include-exclude/bad-name-2</output-dir>
+ <expected-error>Invalid format for property "exclude#"</expected-error>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset">
+ <compilation-unit name="common/include-exclude/bad-name-3">
+ <placeholder name="adapter" value="AZUREBLOB" />
+ <output-dir compare="Text">common/include-exclude/bad-name-3</output-dir>
+ <expected-error>Invalid format for property "exclude#hello"</expected-error>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset">
+ <compilation-unit name="common/include-exclude/both">
+ <placeholder name="adapter" value="AZUREBLOB" />
+ <output-dir compare="Text">common/include-exclude/both</output-dir>
+ <expected-error>The parameters "include" and "exclude" cannot be provided at the same time</expected-error>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset">
+ <compilation-unit name="common/include-exclude/exclude-all">
+ <placeholder name="adapter" value="AZUREBLOB" />
+ <output-dir compare="Text">common/include-exclude/exclude-all</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset">
+ <compilation-unit name="common/include-exclude/exclude-1">
+ <placeholder name="adapter" value="AZUREBLOB" />
+ <output-dir compare="Text">common/include-exclude/exclude-1</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset">
+ <compilation-unit name="common/include-exclude/exclude-2">
+ <placeholder name="adapter" value="AZUREBLOB" />
+ <output-dir compare="Text">common/include-exclude/exclude-2</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset">
+ <compilation-unit name="common/include-exclude/exclude-3">
+ <placeholder name="adapter" value="AZUREBLOB" />
+ <output-dir compare="Text">common/include-exclude/exclude-3</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset">
+ <compilation-unit name="common/include-exclude/exclude-4">
+ <placeholder name="adapter" value="AZUREBLOB" />
+ <output-dir compare="Text">common/include-exclude/exclude-4</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset">
+ <compilation-unit name="common/include-exclude/exclude-5">
+ <placeholder name="adapter" value="AZUREBLOB" />
+ <output-dir compare="Text">common/include-exclude/exclude-5</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset">
+ <compilation-unit name="common/include-exclude/exclude-6">
+ <placeholder name="adapter" value="AZUREBLOB" />
+ <output-dir compare="Text">common/include-exclude/exclude-6</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset">
+ <compilation-unit name="common/include-exclude/include-all">
+ <placeholder name="adapter" value="AZUREBLOB" />
+ <output-dir compare="Text">common/include-exclude/include-all</output-dir>
+ <expected-error>Malformed input stream</expected-error>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset">
+ <compilation-unit name="common/include-exclude/include-1">
+ <placeholder name="adapter" value="AZUREBLOB" />
+ <output-dir compare="Text">common/include-exclude/include-1</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset">
+ <compilation-unit name="common/include-exclude/include-2">
+ <placeholder name="adapter" value="AZUREBLOB" />
+ <output-dir compare="Text">common/include-exclude/include-2</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset">
+ <compilation-unit name="common/include-exclude/include-3">
+ <placeholder name="adapter" value="AZUREBLOB" />
+ <output-dir compare="Text">common/include-exclude/include-3</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset">
+ <compilation-unit name="common/include-exclude/include-4">
+ <placeholder name="adapter" value="AZUREBLOB" />
+ <output-dir compare="Text">common/include-exclude/include-4</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset">
+ <compilation-unit name="common/include-exclude/include-5">
+ <placeholder name="adapter" value="AZUREBLOB" />
+ <output-dir compare="Text">common/include-exclude/include-5</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset">
+ <compilation-unit name="common/include-exclude/include-6">
+ <placeholder name="adapter" value="AZUREBLOB" />
+ <output-dir compare="Text">common/include-exclude/include-6</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset">
+ <compilation-unit name="common/include-exclude/include-7">
+ <placeholder name="adapter" value="AZUREBLOB" />
+ <output-dir compare="Text">common/include-exclude/include-7</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset">
+ <compilation-unit name="common/include-exclude/include-8">
+ <placeholder name="adapter" value="AZUREBLOB" />
+ <output-dir compare="Text">common/include-exclude/include-8</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset">
+ <compilation-unit name="common/include-exclude/include-9">
+ <placeholder name="adapter" value="AZUREBLOB" />
+ <output-dir compare="Text">common/include-exclude/include-9</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset">
+ <compilation-unit name="common/include-exclude/include-10">
+ <placeholder name="adapter" value="AZUREBLOB" />
+ <output-dir compare="Text">common/include-exclude/include-10</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset">
+ <compilation-unit name="common/include-exclude/include-11">
+ <placeholder name="adapter" value="AZUREBLOB" />
+ <output-dir compare="Text">common/include-exclude/include-11</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset">
+ <compilation-unit name="common/include-exclude/include-12">
+ <placeholder name="adapter" value="AZUREBLOB" />
+ <output-dir compare="Text">common/include-exclude/include-12</output-dir>
+ </compilation-unit>
+ </test-case>
+ </test-group>
+</test-suite>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_azure_blob_storage_one_partition.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_azure_blob_storage_one_partition.xml
new file mode 100644
index 0000000..32a56b2
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_azure_blob_storage_one_partition.xml
@@ -0,0 +1,99 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements. See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership. The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License. You may obtain a copy of the License at
+ !
+ ! http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied. See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<test-suite xmlns="urn:xml.testframework.asterix.apache.org" ResultOffsetPath="results" QueryOffsetPath="queries_sqlpp" QueryFileExtension=".sqlpp">
+ <test-group name="external-dataset">
+ <test-case FilePath="external-dataset" check-warnings="true">
+ <compilation-unit name="common/csv-header">
+ <placeholder name="adapter" value="AZUREBLOB" />
+ <output-dir compare="Text">common/csv-header</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset" check-warnings="true">
+ <compilation-unit name="common/csv-no-header">
+ <placeholder name="adapter" value="AZUREBLOB" />
+ <output-dir compare="Text">common/csv-no-header</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset" check-warnings="true">
+ <compilation-unit name="common/tsv-header">
+ <placeholder name="adapter" value="AZUREBLOB" />
+ <output-dir compare="Text">common/tsv-header</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset" check-warnings="true">
+ <compilation-unit name="common/tsv-no-header">
+ <placeholder name="adapter" value="AZUREBLOB" />
+ <output-dir compare="Text">common/tsv-no-header</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset" check-warnings="true">
+ <compilation-unit name="common/csv-warnings">
+ <placeholder name="adapter" value="AZUREBLOB" />
+ <output-dir compare="Text">common/csv-warnings</output-dir>
+ <expected-warn>Parsing error at data_dir/no_h_missing_fields.csv line 2 field 3: some fields are missing</expected-warn>
+ <expected-warn>Parsing error at data_dir/no_h_no_closing_q.csv line 2 field 0: malformed input record ended abruptly</expected-warn>
+ <expected-warn>Parsing error at line 2 field 0: malformed input record ended abruptly</expected-warn>
+
+ <expected-warn>Parsing error at line 5 field 3: invalid value</expected-warn>
+ <expected-warn>Parsing error at line 2 field 1: invalid value</expected-warn>
+ <expected-warn>Parsing error at line 11 field 1: invalid value</expected-warn>
+ <expected-warn>Parsing error at line 3 field 1: invalid value</expected-warn>
+ <expected-warn>Parsing error at line 4 field 1: invalid value</expected-warn>
+ <expected-warn>Parsing error at line 7 field 7: invalid value</expected-warn>
+ <expected-warn>Parsing error at line 13 field 7: invalid value</expected-warn>
+ <expected-warn>Parsing error at line 12 field 3: invalid value</expected-warn>
+ <expected-warn>Parsing error at line 9 field 6: a quote should be in the beginning</expected-warn>
+
+ <expected-warn>Parsing error at data_dir/h_invalid_values.csv line 5 field 3: invalid value</expected-warn>
+ <expected-warn>Parsing error at data_dir/h_invalid_values.csv line 2 field 1: invalid value</expected-warn>
+ <expected-warn>Parsing error at data_dir/h_invalid_values.csv line 11 field 1: invalid value</expected-warn>
+ <expected-warn>Parsing error at data_dir/h_invalid_values.csv line 3 field 1: invalid value</expected-warn>
+ <expected-warn>Parsing error at data_dir/h_invalid_values.csv line 4 field 1: invalid value</expected-warn>
+ <expected-warn>Parsing error at data_dir/h_invalid_values.csv line 7 field 7: invalid value</expected-warn>
+ <expected-warn>Parsing error at data_dir/h_invalid_values.csv line 13 field 7: invalid value</expected-warn>
+ <expected-warn>Parsing error at data_dir/h_invalid_values.csv line 12 field 3: invalid value</expected-warn>
+ <expected-warn>Parsing error at data_dir/h_invalid_values.csv line 9 field 6: a quote should be in the beginning</expected-warn>
+
+ <expected-warn>Parsing error at data_dir/error1_line_num.csv line 3 field 2: a quote enclosing a field needs to be followed by the delimiter</expected-warn>
+ <expected-warn>Parsing error at data_dir/error2_line_num.csv line 4 field 2: a quote enclosing a field needs to be followed by the delimiter</expected-warn>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset" check-warnings="true">
+ <compilation-unit name="common/tsv-warnings">
+ <placeholder name="adapter" value="AZUREBLOB" />
+ <output-dir compare="Text">common/tsv-warnings</output-dir>
+ <expected-warn>Parsing error at data_dir/no_h_missing_fields.tsv line 2 field 3: some fields are missing</expected-warn>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset" check-warnings="true">
+ <compilation-unit name="common/json-warnings">
+ <placeholder name="adapter" value="AZUREBLOB" />
+ <output-dir compare="Text">common/json-warnings</output-dir>
+ <expected-warn>Parsing error at data_dir/1.json line 3 field 0: malformed input record ended abruptly</expected-warn>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset" check-warnings="true">
+ <compilation-unit name="common/jsonl">
+ <placeholder name="adapter" value="AZUREBLOB" />
+ <output-dir compare="Text">common/jsonl</output-dir>
+ </compilation-unit>
+ </test-case>
+ </test-group>
+</test-suite>
diff --git a/asterixdb/asterix-external-data/pom.xml b/asterixdb/asterix-external-data/pom.xml
index 4fed101..37c8a61 100644
--- a/asterixdb/asterix-external-data/pom.xml
+++ b/asterixdb/asterix-external-data/pom.xml
@@ -461,6 +461,10 @@
<artifactId>auth</artifactId>
</dependency>
<dependency>
+ <groupId>com.azure</groupId>
+ <artifactId>azure-storage-blob</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.msgpack</groupId>
<artifactId>msgpack-core</artifactId>
</dependency>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStream.java
new file mode 100644
index 0000000..898f828
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStream.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.abstracts;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.external.input.stream.AbstractMultipleInputStream;
+import org.apache.hyracks.api.util.CleanupUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public abstract class AbstractExternalInputStream extends AbstractMultipleInputStream {
+
+ protected static final Logger LOGGER = LogManager.getLogger();
+
+ // Configuration
+ protected final Map<String, String> configuration;
+
+ // File fields
+ protected final List<String> filePaths;
+ protected int nextFileIndex = 0;
+
+ public AbstractExternalInputStream(Map<String, String> configuration, List<String> filePaths) {
+ this.configuration = configuration;
+ this.filePaths = filePaths;
+ }
+
+ @Override
+ protected boolean advance() throws IOException {
+ // No files to read for this partition
+ if (filePaths == null || filePaths.isEmpty()) {
+ return false;
+ }
+
+ // Finished reading all the files
+ if (nextFileIndex >= filePaths.size()) {
+ if (in != null) {
+ CleanupUtils.close(in, null);
+ }
+ return false;
+ }
+
+ // Close the current stream before going to the next one
+ if (in != null) {
+ CleanupUtils.close(in, null);
+ }
+
+ boolean isAvailableStream = getInputStream();
+ nextFileIndex++; // Always point to next file after getting the current stream
+ if (!isAvailableStream) {
+ return advance();
+ }
+
+ if (notificationHandler != null) {
+ notificationHandler.notifyNewSource();
+ }
+ return true;
+ }
+
+ protected abstract boolean getInputStream() throws IOException;
+
+ @Override
+ public boolean stop() {
+ return false;
+ }
+
+ @Override
+ public boolean handleException(Throwable th) {
+ return false;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (in != null) {
+ CleanupUtils.close(in, null);
+ }
+ }
+
+ @Override
+ public String getStreamName() {
+ return getStreamNameAt(nextFileIndex - 1);
+ }
+
+ @Override
+ public String getPreviousStreamName() {
+ return getStreamNameAt(nextFileIndex - 2);
+ }
+
+ private String getStreamNameAt(int fileIndex) {
+ return fileIndex < 0 || filePaths == null || filePaths.isEmpty() ? "" : filePaths.get(fileIndex);
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java
new file mode 100644
index 0000000..ca55b6f
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.abstracts;
+
+import static org.apache.asterix.external.util.ExternalDataConstants.*;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiPredicate;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.external.api.AsterixInputStream;
+import org.apache.asterix.external.api.IInputStreamFactory;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+
+public abstract class AbstractExternalInputStreamFactory implements IInputStreamFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ protected Map<String, String> configuration;
+ protected final List<PartitionWorkLoadBasedOnSize> partitionWorkLoadsBasedOnSize = new ArrayList<>();
+ protected transient AlgebricksAbsolutePartitionConstraint partitionConstraint;
+
+ @Override
+ public DataSourceType getDataSourceType() {
+ return DataSourceType.STREAM;
+ }
+
+ @Override
+ public boolean isIndexible() {
+ return false;
+ }
+
+ @Override
+ public abstract AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition)
+ throws HyracksDataException;
+
+ @Override
+ public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
+ return partitionConstraint;
+ }
+
+ @Override
+ public abstract void configure(IServiceContext ctx, Map<String, String> configuration,
+ IWarningCollector warningCollector) throws AlgebricksException;
+
+ /**
+ * Finds the smallest workload and returns it
+ *
+ * @return the smallest workload
+ */
+ protected PartitionWorkLoadBasedOnSize getSmallestWorkLoad() {
+ PartitionWorkLoadBasedOnSize smallest = partitionWorkLoadsBasedOnSize.get(0);
+ for (PartitionWorkLoadBasedOnSize partition : partitionWorkLoadsBasedOnSize) {
+ // If the current total size is 0, add the file directly as this is a first time partition
+ if (partition.getTotalSize() == 0) {
+ smallest = partition;
+ break;
+ }
+ if (partition.getTotalSize() < smallest.getTotalSize()) {
+ smallest = partition;
+ }
+ }
+
+ return smallest;
+ }
+
+ protected IncludeExcludeMatcher getIncludeExcludeMatchers() throws CompilationException {
+ // Get and compile the patterns for include/exclude if provided
+ List<Matcher> includeMatchers = new ArrayList<>();
+ List<Matcher> excludeMatchers = new ArrayList<>();
+ String pattern = null;
+ try {
+ for (Map.Entry<String, String> entry : configuration.entrySet()) {
+ if (entry.getKey().startsWith(KEY_INCLUDE)) {
+ pattern = entry.getValue();
+ includeMatchers.add(Pattern.compile(ExternalDataUtils.patternToRegex(pattern)).matcher(""));
+ } else if (entry.getKey().startsWith(KEY_EXCLUDE)) {
+ pattern = entry.getValue();
+ excludeMatchers.add(Pattern.compile(ExternalDataUtils.patternToRegex(pattern)).matcher(""));
+ }
+ }
+ } catch (PatternSyntaxException ex) {
+ throw new CompilationException(ErrorCode.INVALID_REGEX_PATTERN, pattern);
+ }
+
+ IncludeExcludeMatcher includeExcludeMatcher;
+ if (!includeMatchers.isEmpty()) {
+ includeExcludeMatcher = new IncludeExcludeMatcher(includeMatchers,
+ (matchers1, key) -> ExternalDataUtils.matchPatterns(matchers1, key));
+ } else if (!excludeMatchers.isEmpty()) {
+ includeExcludeMatcher = new IncludeExcludeMatcher(excludeMatchers,
+ (matchers1, key) -> !ExternalDataUtils.matchPatterns(matchers1, key));
+ } else {
+ includeExcludeMatcher = new IncludeExcludeMatcher(Collections.emptyList(), (matchers1, key) -> true);
+ }
+
+ return includeExcludeMatcher;
+ }
+
+ public static class PartitionWorkLoadBasedOnSize implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private final List<String> filePaths = new ArrayList<>();
+ private long totalSize = 0;
+
+ public PartitionWorkLoadBasedOnSize() {
+ }
+
+ public List<String> getFilePaths() {
+ return filePaths;
+ }
+
+ public void addFilePath(String filePath, long size) {
+ this.filePaths.add(filePath);
+ this.totalSize += size;
+ }
+
+ public long getTotalSize() {
+ return totalSize;
+ }
+
+ @Override
+ public String toString() {
+ return "Files: " + filePaths.size() + ", Total Size: " + totalSize;
+ }
+ }
+
+ public static class IncludeExcludeMatcher implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private final List<Matcher> matchersList;
+ private final BiPredicate<List<Matcher>, String> predicate;
+
+ public IncludeExcludeMatcher(List<Matcher> matchersList, BiPredicate<List<Matcher>, String> predicate) {
+ this.matchersList = matchersList;
+ this.predicate = predicate;
+ }
+
+ public List<Matcher> getMatchersList() {
+ return matchersList;
+ }
+
+ public BiPredicate<List<Matcher>, String> getPredicate() {
+ return predicate;
+ }
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
index 9e10e6a..e3e53d5 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
@@ -28,59 +28,28 @@
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
-import org.apache.asterix.external.input.stream.AbstractMultipleInputStream;
+import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStream;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.util.CleanupUtils;
import org.apache.hyracks.util.LogRedactionUtil;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
-public class AwsS3InputStream extends AbstractMultipleInputStream {
-
- private static final Logger LOGGER = LogManager.getLogger();
-
- // Configuration
- private final Map<String, String> configuration;
+public class AwsS3InputStream extends AbstractExternalInputStream {
private final S3Client s3Client;
- // File fields
- private final List<String> filePaths;
- private int nextFileIndex = 0;
-
public AwsS3InputStream(Map<String, String> configuration, List<String> filePaths) throws HyracksDataException {
- this.configuration = configuration;
- this.filePaths = filePaths;
+ super(configuration, filePaths);
this.s3Client = buildAwsS3Client(configuration);
}
@Override
- protected boolean advance() throws IOException {
- // No files to read for this partition
- if (filePaths == null || filePaths.isEmpty()) {
- return false;
- }
-
- // Finished reading all the files
- if (nextFileIndex >= filePaths.size()) {
- if (in != null) {
- CleanupUtils.close(in, null);
- }
- return false;
- }
-
- // Close the current stream before going to the next one
- if (in != null) {
- CleanupUtils.close(in, null);
- }
-
+ protected boolean getInputStream() throws IOException {
String bucket = configuration.get(AwsS3.CONTAINER_NAME_FIELD_NAME);
GetObjectRequest.Builder getObjectBuilder = GetObjectRequest.builder();
GetObjectRequest getObjectRequest = getObjectBuilder.bucket(bucket).key(filePaths.get(nextFileIndex)).build();
@@ -92,8 +61,7 @@
} catch (NoSuchKeyException ex) {
LOGGER.debug(() -> "Key " + LogRedactionUtil.userData(getObjectRequest.key()) + " was not found in bucket "
+ getObjectRequest.bucket());
- nextFileIndex++;
- return advance();
+ return false;
} catch (SdkException ex) {
throw new RuntimeDataException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex.getMessage());
}
@@ -104,11 +72,6 @@
in = new GZIPInputStream(s3Client.getObject(getObjectRequest), ExternalDataConstants.DEFAULT_BUFFER_SIZE);
}
- // Current file ready, point to the next file
- nextFileIndex++;
- if (notificationHandler != null) {
- notificationHandler.notifyNewSource();
- }
return true;
}
@@ -119,35 +82,4 @@
throw HyracksDataException.create(ex);
}
}
-
- @Override
- public boolean stop() {
- return false;
- }
-
- @Override
- public boolean handleException(Throwable th) {
- return false;
- }
-
- @Override
- public void close() throws IOException {
- if (in != null) {
- CleanupUtils.close(in, null);
- }
- }
-
- @Override
- public String getStreamName() {
- return getStreamNameAt(nextFileIndex - 1);
- }
-
- @Override
- public String getPreviousStreamName() {
- return getStreamNameAt(nextFileIndex - 2);
- }
-
- private String getStreamNameAt(int fileIndex) {
- return fileIndex < 0 || filePaths == null || filePaths.isEmpty() ? "" : filePaths.get(fileIndex);
- }
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
index f3a36ff..08f8dec 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
@@ -19,27 +19,20 @@
package org.apache.asterix.external.input.record.reader.aws;
import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3;
-import static org.apache.asterix.external.util.ExternalDataConstants.KEY_EXCLUDE;
-import static org.apache.asterix.external.util.ExternalDataConstants.KEY_INCLUDE;
-import java.io.Serializable;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.BiPredicate;
import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.regex.PatternSyntaxException;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.WarningUtil;
import org.apache.asterix.external.api.AsterixInputStream;
-import org.apache.asterix.external.api.IInputStreamFactory;
+import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
import org.apache.asterix.external.util.ExternalDataUtils;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.application.IServiceContext;
import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -54,35 +47,16 @@
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.services.s3.model.S3Object;
-public class AwsS3InputStreamFactory implements IInputStreamFactory {
+public class AwsS3InputStreamFactory extends AbstractExternalInputStreamFactory {
private static final long serialVersionUID = 1L;
- private Map<String, String> configuration;
- private final List<PartitionWorkLoadBasedOnSize> partitionWorkLoadsBasedOnSize = new ArrayList<>();
- private transient AlgebricksAbsolutePartitionConstraint partitionConstraint;
-
- @Override
- public DataSourceType getDataSourceType() {
- return DataSourceType.STREAM;
- }
-
- @Override
- public boolean isIndexible() {
- return false;
- }
-
@Override
public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition) throws HyracksDataException {
return new AwsS3InputStream(configuration, partitionWorkLoadsBasedOnSize.get(partition).getFilePaths());
}
@Override
- public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
- return partitionConstraint;
- }
-
- @Override
public void configure(IServiceContext ctx, Map<String, String> configuration, IWarningCollector warningCollector)
throws AlgebricksException {
this.configuration = configuration;
@@ -93,44 +67,13 @@
List<S3Object> filesOnly = new ArrayList<>();
// Ensure the validity of include/exclude
- ExternalDataUtils.AwsS3.validateIncludeExclude(configuration);
-
- // Get and compile the patterns for include/exclude if provided
- List<Matcher> includeMatchers = new ArrayList<>();
- List<Matcher> excludeMatchers = new ArrayList<>();
- String pattern = null;
- try {
- for (Map.Entry<String, String> entry : configuration.entrySet()) {
- if (entry.getKey().startsWith(KEY_INCLUDE)) {
- pattern = entry.getValue();
- includeMatchers.add(Pattern.compile(ExternalDataUtils.patternToRegex(pattern)).matcher(""));
- } else if (entry.getKey().startsWith(KEY_EXCLUDE)) {
- pattern = entry.getValue();
- excludeMatchers.add(Pattern.compile(ExternalDataUtils.patternToRegex(pattern)).matcher(""));
- }
- }
- } catch (PatternSyntaxException ex) {
- throw new CompilationException(ErrorCode.INVALID_REGEX_PATTERN, pattern);
- }
-
- List<Matcher> matchersList;
- BiPredicate<List<Matcher>, String> p;
- if (!includeMatchers.isEmpty()) {
- matchersList = includeMatchers;
- p = (matchers, key) -> ExternalDataUtils.matchPatterns(matchers, key);
- } else if (!excludeMatchers.isEmpty()) {
- matchersList = excludeMatchers;
- p = (matchers, key) -> !ExternalDataUtils.matchPatterns(matchers, key);
- } else {
- matchersList = Collections.emptyList();
- p = (matchers, key) -> true;
- }
+ ExternalDataUtils.validateIncludeExclude(configuration);
S3Client s3Client = ExternalDataUtils.AwsS3.buildAwsS3Client(configuration);
// Get all objects in a bucket and extract the paths to files
ListObjectsV2Request.Builder listObjectsBuilder = ListObjectsV2Request.builder().bucket(container);
- ExternalDataUtils.AwsS3.setPrefix(configuration, listObjectsBuilder);
+ listObjectsBuilder.prefix(ExternalDataUtils.getPrefix(configuration));
ListObjectsV2Response listObjectsResponse;
boolean done = false;
@@ -147,7 +90,9 @@
}
// Collect the paths to files only
- collectAndFilterFiles(listObjectsResponse.contents(), p, matchersList, filesOnly);
+ IncludeExcludeMatcher includeExcludeMatcher = getIncludeExcludeMatchers();
+ collectAndFilterFiles(listObjectsResponse.contents(), includeExcludeMatcher.getPredicate(),
+ includeExcludeMatcher.getMatchersList(), filesOnly);
// Mark the flag as done if done, otherwise, get the marker of the previous response for the next request
if (!listObjectsResponse.isTruncated()) {
@@ -224,52 +169,4 @@
smallest.addFilePath(object.key(), object.size());
}
}
-
- /**
- * Finds the smallest workload and returns it
- *
- * @return the smallest workload
- */
- private PartitionWorkLoadBasedOnSize getSmallestWorkLoad() {
- PartitionWorkLoadBasedOnSize smallest = partitionWorkLoadsBasedOnSize.get(0);
- for (PartitionWorkLoadBasedOnSize partition : partitionWorkLoadsBasedOnSize) {
- // If the current total size is 0, add the file directly as this is a first time partition
- if (partition.getTotalSize() == 0) {
- smallest = partition;
- break;
- }
- if (partition.getTotalSize() < smallest.getTotalSize()) {
- smallest = partition;
- }
- }
-
- return smallest;
- }
-
- private static class PartitionWorkLoadBasedOnSize implements Serializable {
- private static final long serialVersionUID = 1L;
- private final List<String> filePaths = new ArrayList<>();
- private long totalSize = 0;
-
- PartitionWorkLoadBasedOnSize() {
- }
-
- public List<String> getFilePaths() {
- return filePaths;
- }
-
- public void addFilePath(String filePath, long size) {
- this.filePaths.add(filePath);
- this.totalSize += size;
- }
-
- public long getTotalSize() {
- return totalSize;
- }
-
- @Override
- public String toString() {
- return "Files: " + filePaths.size() + ", Total Size: " + totalSize;
- }
- }
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStream.java
new file mode 100644
index 0000000..358c412
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStream.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.azure;
+
+import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.zip.GZIPInputStream;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStream;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.util.LogRedactionUtil;
+
+import com.azure.storage.blob.BlobClient;
+import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.models.BlobErrorCode;
+import com.azure.storage.blob.models.BlobStorageException;
+
+public class AzureBlobInputStream extends AbstractExternalInputStream {
+
+ private final BlobServiceClient client;
+
+ public AzureBlobInputStream(Map<String, String> configuration, List<String> filePaths) throws HyracksDataException {
+ super(configuration, filePaths);
+ this.client = buildAzureClient(configuration);
+ }
+
+ @Override
+ protected boolean getInputStream() throws IOException {
+ String container = configuration.get(AzureBlob.CONTAINER_NAME_FIELD_NAME);
+ BlobContainerClient blobContainerClient;
+ BlobClient blob;
+ try {
+ blobContainerClient = client.getBlobContainerClient(container);
+ blob = blobContainerClient.getBlobClient(filePaths.get(nextFileIndex));
+ in = blob.openInputStream();
+
+ // Use gzip stream if needed
+ String filename = filePaths.get(nextFileIndex).toLowerCase();
+ if (filename.endsWith(".gz") || filename.endsWith(".gzip")) {
+ in = new GZIPInputStream(in = blob.openInputStream(), ExternalDataConstants.DEFAULT_BUFFER_SIZE);
+ }
+ } catch (BlobStorageException ex) {
+ if (ex.getErrorCode().equals(BlobErrorCode.BLOB_NOT_FOUND)) {
+ LOGGER.debug(() -> "Key " + LogRedactionUtil.userData(filePaths.get(nextFileIndex)) + " was not "
+ + "found in container " + container);
+ return false;
+ } else {
+ throw new RuntimeDataException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex.getMessage());
+ }
+ } catch (Exception ex) {
+ throw new RuntimeDataException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex.getMessage());
+ }
+
+ return true;
+ }
+
+ private BlobServiceClient buildAzureClient(Map<String, String> configuration) throws HyracksDataException {
+ try {
+ return ExternalDataUtils.Azure.buildAzureClient(configuration);
+ } catch (CompilationException ex) {
+ throw HyracksDataException.create(ex);
+ }
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStreamFactory.java
new file mode 100644
index 0000000..9b41b08
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobInputStreamFactory.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.azure;
+
+import static org.apache.asterix.external.util.ExternalDataConstants.*;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiPredicate;
+import java.util.regex.Matcher;
+
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.WarningUtil;
+import org.apache.asterix.external.api.AsterixInputStream;
+import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.Warning;
+
+import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.models.BlobItem;
+import com.azure.storage.blob.models.ListBlobsOptions;
+
+public class AzureBlobInputStreamFactory extends AbstractExternalInputStreamFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition) throws HyracksDataException {
+ return new AzureBlobInputStream(configuration, partitionWorkLoadsBasedOnSize.get(partition).getFilePaths());
+ }
+
+ @Override
+ public void configure(IServiceContext ctx, Map<String, String> configuration, IWarningCollector warningCollector)
+ throws AlgebricksException {
+ this.configuration = configuration;
+ ICcApplicationContext ccApplicationContext = (ICcApplicationContext) ctx.getApplicationContext();
+
+ String container = configuration.get(AzureBlob.CONTAINER_NAME_FIELD_NAME);
+
+ List<BlobItem> filesOnly = new ArrayList<>();
+
+ // Ensure the validity of include/exclude
+ ExternalDataUtils.validateIncludeExclude(configuration);
+
+ BlobServiceClient blobServiceClient = ExternalDataUtils.Azure.buildAzureClient(configuration);
+ BlobContainerClient blobContainer;
+ try {
+ blobContainer = blobServiceClient.getBlobContainerClient(container);
+
+ // Get all objects in a container and extract the paths to files
+ ListBlobsOptions listBlobsOptions = new ListBlobsOptions();
+ listBlobsOptions.setPrefix(ExternalDataUtils.getPrefix(configuration));
+ Iterable<BlobItem> blobItems = blobContainer.listBlobs(listBlobsOptions, null);
+
+ // Collect the paths to files only
+ IncludeExcludeMatcher includeExcludeMatcher = getIncludeExcludeMatchers();
+ collectAndFilterFiles(blobItems, includeExcludeMatcher.getPredicate(),
+ includeExcludeMatcher.getMatchersList(), filesOnly);
+
+ // Warn if no files are returned
+ if (filesOnly.isEmpty() && warningCollector.shouldWarn()) {
+ Warning warning =
+ WarningUtil.forAsterix(null, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
+ warningCollector.warn(warning);
+ }
+
+ // Partition constraints
+ partitionConstraint = ccApplicationContext.getClusterStateManager().getClusterLocations();
+ int partitionsCount = partitionConstraint.getLocations().length;
+
+ // Distribute work load amongst the partitions
+ distributeWorkLoad(filesOnly, partitionsCount);
+ } catch (Exception ex) {
+ throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex.getMessage());
+ }
+ }
+
+ /**
+ * Collects and filters the files only, and excludes any folders
+ *
+ * @param items storage items
+ * @param predicate predicate to test with for file filtration
+ * @param matchers include/exclude matchers to test against
+ * @param filesOnly List containing the files only (excluding folders)
+ */
+ private void collectAndFilterFiles(Iterable<BlobItem> items, BiPredicate<List<Matcher>, String> predicate,
+ List<Matcher> matchers, List<BlobItem> filesOnly) {
+ for (BlobItem item : items) {
+ String uri = item.getName();
+
+ // skip folders
+ if (uri.endsWith("/")) {
+ continue;
+ }
+
+ // No filter, add file
+ if (predicate.test(matchers, uri)) {
+ filesOnly.add(item);
+ }
+ }
+ }
+
+ /**
+ * To efficiently utilize the parallelism, work load will be distributed amongst the partitions based on the file
+ * size.
+ *
+ * Example:
+ * File1 1mb, File2 300kb, File3 300kb, File4 300kb
+ *
+ * Distribution:
+ * Partition1: [File1]
+ * Partition2: [File2, File3, File4]
+ *
+ * @param items items
+ * @param partitionsCount Partitions count
+ */
+ private void distributeWorkLoad(List<BlobItem> items, int partitionsCount) {
+ // Prepare the workloads based on the number of partitions
+ for (int i = 0; i < partitionsCount; i++) {
+ partitionWorkLoadsBasedOnSize.add(new PartitionWorkLoadBasedOnSize());
+ }
+
+ for (BlobItem item : items) {
+ PartitionWorkLoadBasedOnSize smallest = getSmallestWorkLoad();
+ smallest.addFilePath(item.getName(), item.getProperties().getContentLength());
+ }
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobReaderFactory.java
new file mode 100644
index 0000000..27e1b02
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/AzureBlobReaderFactory.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.azure;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.external.input.record.reader.stream.StreamRecordReaderFactory;
+import org.apache.asterix.external.provider.StreamRecordReaderProvider;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+
+public class AzureBlobReaderFactory extends StreamRecordReaderFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final List<String> recordReaderNames =
+ Collections.singletonList(ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_BLOB);
+
+ @Override
+ public List<String> getRecordReaderNames() {
+ return recordReaderNames;
+ }
+
+ @Override
+ public DataSourceType getDataSourceType() {
+ return DataSourceType.RECORDS;
+ }
+
+ @Override
+ public Class<?> getRecordClass() {
+ return char[].class;
+ }
+
+ @Override
+ public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AlgebricksException {
+ return streamFactory.getPartitionConstraint();
+ }
+
+ @Override
+ public void configure(IServiceContext ctx, Map<String, String> configuration, IWarningCollector warningCollector)
+ throws AlgebricksException, HyracksDataException {
+ this.configuration = configuration;
+
+ // Stream factory
+ streamFactory = new AzureBlobInputStreamFactory();
+ streamFactory.configure(ctx, configuration, warningCollector);
+
+ // record reader
+ recordReaderClazz = StreamRecordReaderProvider.getRecordReaderClazz(configuration);
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 252ed5b..e20270c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -131,6 +131,7 @@
public static final String KEY_ALIAS_ADAPTER_NAME_SOCKET = "socket_adapter";
public static final String KEY_ADAPTER_NAME_HTTP = "http_adapter";
public static final String KEY_ADAPTER_NAME_AWS_S3 = "S3";
+ public static final String KEY_ADAPTER_NAME_AZURE_BLOB = "AZUREBLOB";
/**
* HDFS class names
@@ -290,4 +291,25 @@
public static final String DEFINITION_FIELD_NAME = "definition";
public static final String SERVICE_END_POINT_FIELD_NAME = "serviceEndpoint";
}
+
+ public static class AzureBlob {
+ private AzureBlob() {
+ throw new AssertionError("do not instantiate");
+ }
+
+ public static final String ACCOUNT_NAME_FIELD_NAME = "accountName";
+ public static final String ACCOUNT_KEY_FIELD_NAME = "accountKey";
+ public static final String SHARED_ACCESS_SIGNATURE_FIELD_NAME = "sharedAccessSignature";
+ public static final String CONTAINER_NAME_FIELD_NAME = "container";
+ public static final String DEFINITION_FIELD_NAME = "definition";
+ public static final String BLOB_ENDPOINT_FIELD_NAME = "blobEndpoint";
+ public static final String ENDPOINT_SUFFIX_FIELD_NAME = "endpointSuffix";
+
+ // Connection string requires PascalCase (MyFieldFormat)
+ public static final String CONNECTION_STRING_ACCOUNT_NAME = "AccountName";
+ public static final String CONNECTION_STRING_ACCOUNT_KEY = "AccountKey";
+ public static final String CONNECTION_STRING_SHARED_ACCESS_SIGNATURE = "SharedAccessSignature";
+ public static final String CONNECTION_STRING_BLOB_ENDPOINT = "BlobEndpoint";
+ public static final String CONNECTION_STRING_ENDPOINT_SUFFIX = "EndpointSuffix";
+ }
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 3462b76..86eb5e0 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -18,6 +18,10 @@
*/
package org.apache.asterix.external.util;
+import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.CONNECTION_STRING_ACCOUNT_KEY;
+import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.CONNECTION_STRING_ACCOUNT_NAME;
+import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.CONNECTION_STRING_BLOB_ENDPOINT;
+import static org.apache.asterix.external.util.ExternalDataConstants.AzureBlob.CONNECTION_STRING_ENDPOINT_SUFFIX;
import static org.apache.asterix.external.util.ExternalDataConstants.KEY_DELIMITER;
import static org.apache.asterix.external.util.ExternalDataConstants.KEY_ESCAPE;
import static org.apache.asterix.external.util.ExternalDataConstants.KEY_QUOTE;
@@ -67,6 +71,12 @@
import org.apache.hyracks.dataflow.common.data.parsers.LongParserFactory;
import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.BlobServiceClientBuilder;
+import com.azure.storage.blob.models.BlobItem;
+import com.azure.storage.blob.models.ListBlobsOptions;
+
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.exception.SdkException;
@@ -500,6 +510,9 @@
case ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3:
ExternalDataUtils.AwsS3.validateProperties(configuration, srcLoc, collector);
break;
+ case ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_BLOB:
+ ExternalDataUtils.Azure.validateProperties(configuration, srcLoc, collector);
+ break;
default:
// Nothing needs to be done
break;
@@ -612,6 +625,63 @@
return result.toString();
}
+ /**
+ * Adjusts the prefix (if needed) and returns it
+ *
+ * @param configuration configuration
+ */
+ public static String getPrefix(Map<String, String> configuration) {
+ String definition = configuration.get(ExternalDataConstants.AzureBlob.DEFINITION_FIELD_NAME);
+ if (definition != null && !definition.isEmpty()) {
+ return definition + (!definition.endsWith("/") ? "/" : "");
+ }
+ return "";
+ }
+
+ /**
+ * @param configuration configuration map
+ * @throws CompilationException Compilation exception
+ */
+ public static void validateIncludeExclude(Map<String, String> configuration) throws CompilationException {
+ // Ensure that include and exclude are not provided at the same time + ensure valid format or property
+ List<Map.Entry<String, String>> includes = new ArrayList<>();
+ List<Map.Entry<String, String>> excludes = new ArrayList<>();
+
+ // Accepted formats are include, include#1, include#2, ... etc, same for excludes
+ for (Map.Entry<String, String> entry : configuration.entrySet()) {
+ String key = entry.getKey();
+
+ if (key.equals(ExternalDataConstants.KEY_INCLUDE)) {
+ includes.add(entry);
+ } else if (key.equals(ExternalDataConstants.KEY_EXCLUDE)) {
+ excludes.add(entry);
+ } else if (key.startsWith(ExternalDataConstants.KEY_INCLUDE)
+ || key.startsWith(ExternalDataConstants.KEY_EXCLUDE)) {
+
+ // Split by the "#", length should be 2, left should be include/exclude, right should be integer
+ String[] splits = key.split("#");
+
+ if (key.startsWith(ExternalDataConstants.KEY_INCLUDE) && splits.length == 2
+ && splits[0].equals(ExternalDataConstants.KEY_INCLUDE)
+ && NumberUtils.isIntegerNumericString(splits[1])) {
+ includes.add(entry);
+ } else if (key.startsWith(ExternalDataConstants.KEY_EXCLUDE) && splits.length == 2
+ && splits[0].equals(ExternalDataConstants.KEY_EXCLUDE)
+ && NumberUtils.isIntegerNumericString(splits[1])) {
+ excludes.add(entry);
+ } else {
+ throw new CompilationException(ErrorCode.INVALID_PROPERTY_FORMAT, key);
+ }
+ }
+ }
+
+ // Ensure either include or exclude are provided, but not both of them
+ if (!includes.isEmpty() && !excludes.isEmpty()) {
+ throw new CompilationException(ErrorCode.PARAMETERS_NOT_ALLOWED_AT_SAME_TIME,
+ ExternalDataConstants.KEY_INCLUDE, ExternalDataConstants.KEY_EXCLUDE);
+ }
+ }
+
public static class AwsS3 {
private AwsS3() {
throw new AssertionError("do not instantiate");
@@ -667,19 +737,6 @@
}
/**
- * Sets the prefix for the list objects builder if it is available
- *
- * @param configuration configuration
- * @param builder builder
- */
- public static void setPrefix(Map<String, String> configuration, ListObjectsV2Request.Builder builder) {
- String definition = configuration.get(ExternalDataConstants.AwsS3.DEFINITION_FIELD_NAME);
- if (definition != null) {
- builder.prefix(definition + (!definition.isEmpty() && !definition.endsWith("/") ? "/" : ""));
- }
- }
-
- /**
* Validate external dataset properties
*
* @param configuration properties
@@ -702,7 +759,7 @@
String container = configuration.get(ExternalDataConstants.AwsS3.CONTAINER_NAME_FIELD_NAME);
s3Client = buildAwsS3Client(configuration);
ListObjectsV2Request.Builder listObjectsBuilder = ListObjectsV2Request.builder();
- setPrefix(configuration, listObjectsBuilder);
+ listObjectsBuilder.prefix(getPrefix(configuration));
ListObjectsV2Response response =
s3Client.listObjectsV2(listObjectsBuilder.bucket(container).maxKeys(1).build());
@@ -726,49 +783,82 @@
}
}
}
+ }
+
+ public static class Azure {
+ private Azure() {
+ throw new AssertionError("do not instantiate");
+ }
/**
- * @param configuration
- * @throws CompilationException
+ * Builds the Azure storage account using the provided configuration
+ *
+ * @param configuration properties
+ * @return client
*/
- public static void validateIncludeExclude(Map<String, String> configuration) throws CompilationException {
- // Ensure that include and exclude are not provided at the same time + ensure valid format or property
- List<Map.Entry<String, String>> includes = new ArrayList<>();
- List<Map.Entry<String, String>> excludes = new ArrayList<>();
+ public static BlobServiceClient buildAzureClient(Map<String, String> configuration)
+ throws CompilationException {
+ // TODO(Hussain): Need to ensure that all required parameters are present in a previous step
+ String accountName = configuration.get(ExternalDataConstants.AzureBlob.ACCOUNT_NAME_FIELD_NAME);
+ String accountKey = configuration.get(ExternalDataConstants.AzureBlob.ACCOUNT_KEY_FIELD_NAME);
+ String blobEndpoint = configuration.get(ExternalDataConstants.AzureBlob.BLOB_ENDPOINT_FIELD_NAME);
+ String endpointSuffix = configuration.get(ExternalDataConstants.AzureBlob.ENDPOINT_SUFFIX_FIELD_NAME);
- // Accepted formats are include, include#1, include#2, ... etc, same for excludes
- for (Map.Entry<String, String> entry : configuration.entrySet()) {
- String key = entry.getKey();
+ // format: name1=value1;name2=value2;....
+ // TODO(Hussain): This will be different when SAS (Shared Access Signature) is introduced
+ StringBuilder connectionString = new StringBuilder();
+ connectionString.append(CONNECTION_STRING_ACCOUNT_NAME).append("=").append(accountName).append(";");
+ connectionString.append(CONNECTION_STRING_ACCOUNT_KEY).append("=").append(accountKey).append(";");
+ connectionString.append(CONNECTION_STRING_BLOB_ENDPOINT).append("=").append(blobEndpoint).append(";");
- if (key.equals(ExternalDataConstants.KEY_INCLUDE)) {
- includes.add(entry);
- } else if (key.equals(ExternalDataConstants.KEY_EXCLUDE)) {
- excludes.add(entry);
- } else if (key.startsWith(ExternalDataConstants.KEY_INCLUDE)
- || key.startsWith(ExternalDataConstants.KEY_EXCLUDE)) {
-
- // Split by the "#", length should be 2, left should be include/exclude, right should be integer
- String[] splits = key.split("#");
-
- if (key.startsWith(ExternalDataConstants.KEY_INCLUDE) && splits.length == 2
- && splits[0].equals(ExternalDataConstants.KEY_INCLUDE)
- && NumberUtils.isIntegerNumericString(splits[1])) {
- includes.add(entry);
- } else if (key.startsWith(ExternalDataConstants.KEY_EXCLUDE) && splits.length == 2
- && splits[0].equals(ExternalDataConstants.KEY_EXCLUDE)
- && NumberUtils.isIntegerNumericString(splits[1])) {
- excludes.add(entry);
- } else {
- throw new CompilationException(ErrorCode.INVALID_PROPERTY_FORMAT, key);
- }
- }
+ if (endpointSuffix != null) {
+ connectionString.append(CONNECTION_STRING_ENDPOINT_SUFFIX).append("=").append(endpointSuffix)
+ .append(";");
}
- // TODO: Should include/exclude be a common check or S3 specific?
- // Ensure either include or exclude are provided, but not both of them
- if (!includes.isEmpty() && !excludes.isEmpty()) {
- throw new CompilationException(ErrorCode.PARAMETERS_NOT_ALLOWED_AT_SAME_TIME,
- ExternalDataConstants.KEY_INCLUDE, ExternalDataConstants.KEY_EXCLUDE);
+ try {
+ return new BlobServiceClientBuilder().connectionString(connectionString.toString()).buildClient();
+ } catch (Exception ex) {
+ throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex.getMessage());
+ }
+ }
+
+ /**
+ * Validate external dataset properties
+ *
+ * @param configuration properties
+ *
+ * @throws CompilationException Compilation exception
+ */
+ public static void validateProperties(Map<String, String> configuration, SourceLocation srcLoc,
+ IWarningCollector collector) throws CompilationException {
+
+ // check if the format property is present
+ if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
+ throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, srcLoc, ExternalDataConstants.KEY_FORMAT);
+ }
+
+ validateIncludeExclude(configuration);
+
+ // Check if the bucket is present
+ BlobServiceClient blobServiceClient;
+ try {
+ String container = configuration.get(ExternalDataConstants.AwsS3.CONTAINER_NAME_FIELD_NAME);
+ blobServiceClient = buildAzureClient(configuration);
+ BlobContainerClient blobContainer = blobServiceClient.getBlobContainerClient(container);
+
+ // Get all objects in a container and extract the paths to files
+ ListBlobsOptions listBlobsOptions = new ListBlobsOptions();
+ listBlobsOptions.setPrefix(getPrefix(configuration));
+ Iterable<BlobItem> blobItems = blobContainer.listBlobs(listBlobsOptions, null);
+
+ if (!blobItems.iterator().hasNext() && collector.shouldWarn()) {
+ Warning warning =
+ WarningUtil.forAsterix(srcLoc, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
+ collector.warn(warning);
+ }
+ } catch (Exception ex) {
+ throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex.getMessage());
}
}
}
diff --git a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
index fd3e473..d66c44e 100644
--- a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
+++ b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
@@ -21,3 +21,4 @@
org.apache.asterix.external.input.record.reader.stream.StreamRecordReaderFactory
org.apache.asterix.external.input.record.reader.http.HttpServerRecordReaderFactory
org.apache.asterix.external.input.record.reader.aws.AwsS3ReaderFactory
+org.apache.asterix.external.input.record.reader.azure.AzureBlobReaderFactory
diff --git a/asterixdb/asterix-server/pom.xml b/asterixdb/asterix-server/pom.xml
index 2c12ab0..8c07792 100644
--- a/asterixdb/asterix-server/pom.xml
+++ b/asterixdb/asterix-server/pom.xml
@@ -202,6 +202,16 @@
<noticeUrl>https://raw.githubusercontent.com/awslabs/aws-eventstream-java/7be2dd80e12f8835674c8ffb0f4a2efb64c7b585/NOTICE</noticeUrl>
</override>
<override>
+ <gavs>
+ <gav>com.azure:azure-core:1.4.0</gav>
+ <gav>com.azure:azure-core-http-netty:1.5.0</gav>
+ <gav>com.azure:azure-storage-blob:12.6.0</gav>
+ <gav>com.azure:azure-storage-common:12.6.0</gav>
+ </gavs>
+ <noticeUrl>https://raw.githubusercontent.com/Azure/azure-sdk-for-java/master/NOTICE.txt</noticeUrl>
+ <url>https://raw.githubusercontent.com/Azure/azure-sdk-for-java/master/LICENSE.txt</url>
+ </override>
+ <override>
<gav>org.mindrot:jbcrypt:0.4</gav>
<url>http://www.mindrot.org/files/jBCrypt/LICENSE</url>
</override>
diff --git a/asterixdb/pom.xml b/asterixdb/pom.xml
index 712dcc1..d69d62b 100644
--- a/asterixdb/pom.xml
+++ b/asterixdb/pom.xml
@@ -85,6 +85,7 @@
<jacoco.version>0.7.6.201602180812</jacoco.version>
<log4j.version>2.13.3</log4j.version>
<awsjavasdk.version>2.10.83</awsjavasdk.version>
+ <azurejavasdk.version>12.6.0</azurejavasdk.version>
<implementation.title>Apache AsterixDB - ${project.name}</implementation.title>
<implementation.url>https://asterixdb.apache.org/</implementation.url>
@@ -1492,6 +1493,55 @@
<artifactId>akka-http-core_2.12</artifactId>
<version>10.1.0</version>
</dependency>
+ <!-- Azure Blob Storage start -->
+ <dependency>
+ <groupId>com.azure</groupId>
+ <artifactId>azure-storage-blob</artifactId>
+ <version>${azurejavasdk.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-handler</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-handler-proxy</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-codec-http</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-codec-http2</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-buffer</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-common</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-transport</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-transport-native-epoll</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-transport-native-unix-common</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-tcnative-boringssl-static</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <!-- Azure Blob Storage end -->
<dependency>
<groupId>org.mindrot</groupId>
<artifactId>jbcrypt</artifactId>
diff --git a/asterixdb/src/main/appended-resources/supplemental-models.xml b/asterixdb/src/main/appended-resources/supplemental-models.xml
index e457628..500a035 100644
--- a/asterixdb/src/main/appended-resources/supplemental-models.xml
+++ b/asterixdb/src/main/appended-resources/supplemental-models.xml
@@ -529,4 +529,118 @@
</properties>
</project>
</supplement>
+
+ <!-- Azure SDK for Java start -->
+ <!-- com.azure does not contain any embedded LICENSE or NOTICE file -->
+ <!-- see https://github.com/Azure/azure-sdk-for-java -->
+ <supplement>
+ <project>
+ <groupId>com.azure</groupId>
+ <artifactId>azure-storage-blob</artifactId>
+ <properties>
+ <license.ignoreMissingEmbeddedLicense>12.6.0</license.ignoreMissingEmbeddedLicense>
+ <license.ignoreMissingEmbeddedNotice>12.6.0</license.ignoreMissingEmbeddedNotice>
+ <license.ignoreLicenseOverride>12.6.0</license.ignoreLicenseOverride>
+ <license.ignoreNoticeOverride>12.6.0</license.ignoreNoticeOverride>
+ </properties>
+ </project>
+ </supplement>
+
+ <supplement>
+ <project>
+ <groupId>com.azure</groupId>
+ <artifactId>azure-storage-common</artifactId>
+ <properties>
+ <license.ignoreMissingEmbeddedLicense>12.6.0</license.ignoreMissingEmbeddedLicense>
+ <license.ignoreMissingEmbeddedNotice>12.6.0</license.ignoreMissingEmbeddedNotice>
+ <license.ignoreLicenseOverride>12.6.0</license.ignoreLicenseOverride>
+ <license.ignoreNoticeOverride>12.6.0</license.ignoreNoticeOverride>
+ </properties>
+ </project>
+ </supplement>
+
+ <supplement>
+ <project>
+ <groupId>com.azure</groupId>
+ <artifactId>azure-core</artifactId>
+ <properties>
+ <license.ignoreMissingEmbeddedLicense>1.4.0</license.ignoreMissingEmbeddedLicense>
+ <license.ignoreMissingEmbeddedNotice>1.4.0</license.ignoreMissingEmbeddedNotice>
+ <license.ignoreLicenseOverride>1.4.0</license.ignoreLicenseOverride>
+ <license.ignoreNoticeOverride>1.4.0</license.ignoreNoticeOverride>
+ </properties>
+ </project>
+ </supplement>
+
+ <supplement>
+ <project>
+ <groupId>com.azure</groupId>
+ <artifactId>azure-core-http-netty</artifactId>
+ <properties>
+ <license.ignoreMissingEmbeddedLicense>1.5.0</license.ignoreMissingEmbeddedLicense>
+ <license.ignoreMissingEmbeddedNotice>1.5.0</license.ignoreMissingEmbeddedNotice>
+ <license.ignoreLicenseOverride>1.5.0</license.ignoreLicenseOverride>
+ <license.ignoreNoticeOverride>1.5.0</license.ignoreNoticeOverride>
+ </properties>
+ </project>
+ </supplement>
+ <!-- Azure SDK for Java end -->
+
+ <!-- jackson-datatype-jsr contains embedded license but has no NOTICE -->
+ <!-- See https://github.com/FasterXML/jackson-modules-java8 -->
+ <supplement>
+ <project>
+ <groupId>com.fasterxml.jackson.datatype</groupId>
+ <artifactId>jackson-datatype-jsr310</artifactId>
+ <properties>
+ <license.ignoreMissingEmbeddedNotice>2.10.1</license.ignoreMissingEmbeddedNotice>
+ </properties>
+ </project>
+ </supplement>
+
+ <!-- com.fasterxml.woodstox contains embedded license but has no NOTICE -->
+ <!-- See https://github.com/FasterXML/woodstox -->
+ <supplement>
+ <project>
+ <groupId>com.fasterxml.woodstox</groupId>
+ <artifactId>woodstox-core</artifactId>
+ <properties>
+ <license.ignoreMissingEmbeddedNotice>6.0.2</license.ignoreMissingEmbeddedNotice>
+ </properties>
+ </project>
+ </supplement>
+
+ <!-- org.codehaus.woodstox contains embedded license but has no NOTICE -->
+ <!-- See https://github.com/FasterXML/stax2-api -->
+ <supplement>
+ <project>
+ <groupId>org.codehaus.woodstox</groupId>
+ <artifactId>stax2-api</artifactId>
+ <properties>
+ <license.ignoreMissingEmbeddedNotice>4.2</license.ignoreMissingEmbeddedNotice>
+ </properties>
+ </project>
+ </supplement>
+
+ <supplement>
+ <project>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-core</artifactId>
+ <properties>
+ <license.ignoreMissingEmbeddedLicense>3.3.3.RELEASE</license.ignoreMissingEmbeddedLicense>
+ <license.ignoreMissingEmbeddedNotice>3.3.3.RELEASE</license.ignoreMissingEmbeddedNotice>
+ </properties>
+ </project>
+ </supplement>
+
+ <supplement>
+ <project>
+ <groupId>io.projectreactor.netty</groupId>
+ <artifactId>reactor-netty</artifactId>
+ <properties>
+ <license.ignoreMissingEmbeddedLicense>0.9.5.RELEASE</license.ignoreMissingEmbeddedLicense>
+ <license.ignoreMissingEmbeddedNotice>0.9.5.RELEASE</license.ignoreMissingEmbeddedNotice>
+ </properties>
+ </project>
+ </supplement>
</supplementalDataModels>
diff --git a/asterixdb/src/main/licenses/content/raw.githubusercontent.com_Azure_azure-sdk-for-java_master_LICENSE.txt b/asterixdb/src/main/licenses/content/raw.githubusercontent.com_Azure_azure-sdk-for-java_master_LICENSE.txt
new file mode 100644
index 0000000..49d2166
--- /dev/null
+++ b/asterixdb/src/main/licenses/content/raw.githubusercontent.com_Azure_azure-sdk-for-java_master_LICENSE.txt
@@ -0,0 +1,21 @@
+The MIT License (MIT)
+
+Copyright (c) 2015 Microsoft
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
\ No newline at end of file
diff --git a/asterixdb/src/main/licenses/content/raw.githubusercontent.com_Azure_azure-sdk-for-java_master_NOTICE.txt b/asterixdb/src/main/licenses/content/raw.githubusercontent.com_Azure_azure-sdk-for-java_master_NOTICE.txt
new file mode 100644
index 0000000..76791aa
--- /dev/null
+++ b/asterixdb/src/main/licenses/content/raw.githubusercontent.com_Azure_azure-sdk-for-java_master_NOTICE.txt
@@ -0,0 +1,159 @@
+NOTICES AND INFORMATION
+Do Not Translate or Localize
+
+This software incorporates material from third parties. Microsoft makes certain
+open source code available at https://3rdpartysource.microsoft.com, or you may
+send a check or money order for US $5.00, including the product name, the open
+source component name, and version number, to:
+
+Source Code Compliance Team
+Microsoft Corporation
+One Microsoft Way
+Redmond, WA 98052
+USA
+
+Notwithstanding any other terms, you may reverse engineer this software to the
+extent required to debug changes to any libraries licensed under the GNU Lesser
+General Public License.
+
+------------------------------------------------------------------------------
+
+Azure SDK for Java uses third-party libraries or other resources that may be
+distributed under licenses different than the Azure SDK for Java software.
+
+In the event that we accidentally failed to list a required notice, please
+bring it to our attention. Post an issue or email us:
+
+ azjavasdkhelp@microsoft.com
+
+The attached notices are provided for information only.
+
+License notice for Hamcrest
+------------------------------------------------------------------------------
+
+The 3-Clause BSD License
+
+Copyright (c) 2000-2015 www.hamcrest.org
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+Redistributions of source code must retain the above copyright notice, this list of
+conditions and the following disclaimer. Redistributions in binary form must reproduce
+the above copyright notice, this list of conditions and the following disclaimer in
+the documentation and/or other materials provided with the distribution.
+
+Neither the name of Hamcrest nor the names of its contributors may be used to endorse
+or promote products derived from this software without specific prior written
+permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
+EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
+SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
+INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
+TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY
+WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
+DAMAGE.
+
+License notice for Slf4j API
+------------------------------------------------------------------------------
+
+ Copyright (c) 2004-2017 QOS.ch
+ All rights reserved.
+
+ Permission is hereby granted, free of charge, to any person obtaining
+ a copy of this software and associated documentation files (the
+ "Software"), to deal in the Software without restriction, including
+ without limitation the rights to use, copy, modify, merge, publish,
+ distribute, sublicense, and/or sell copies of the Software, and to
+ permit persons to whom the Software is furnished to do so, subject to
+ the following conditions:
+
+ The above copyright notice and this permission notice shall be
+ included in all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+ LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+ OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+ WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+License notice for Slf4j Simple
+------------------------------------------------------------------------------
+
+ Copyright (c) 2004-2017 QOS.ch
+ All rights reserved.
+
+ Permission is hereby granted, free of charge, to any person obtaining
+ a copy of this software and associated documentation files (the
+ "Software"), to deal in the Software without restriction, including
+ without limitation the rights to use, copy, modify, merge, publish,
+ distribute, sublicense, and/or sell copies of the Software, and to
+ permit persons to whom the Software is furnished to do so, subject to
+ the following conditions:
+
+ The above copyright notice and this permission notice shall be
+ included in all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+ LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+ OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+ WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+License notice for Guava (https://github.com/google/guava)
+------------------------------------------------------------------------------
+
+Copyright (C) 2010 The Guava Authors
+
+Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+in compliance with the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software distributed under the License
+is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+or implied. See the License for the specific language governing permissions and limitations under
+the License.
+
+License notice for Netty
+------------------------------------------------------------------------------
+
+Copyright 2014 The Netty Project
+
+The Netty Project licenses this file to you under the Apache License,
+version 2.0 (the "License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at:
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+License for the specific language governing permissions and limitations
+under the License.
+
+License notice for JUG Java Uuid Generator
+------------------------------------------------------------------------------
+
+JUG Java Uuid Generator
+
+Copyright (c) 2002- Tatu Saloranta, tatu.saloranta@iki.fi
+
+Licensed under the License specified in the file LICENSE which is
+included with the source code.
+You may not use this file except in compliance with the License.
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
\ No newline at end of file