[ASTERIXDB-2697]: Implementing AWS s3 as external data source
- user model changes: yes
- storage format changes: no
- interface changes: no
Details:
- Added an external reader for AWS S3.
- Updated query translator to include the WITH
parameters into the dataset details when creating
an external dataset.
- Added test case for AWS S3 using an S3 mocking server
to avoid using real credentials.
Change-Id: I71d89116c0bb404c9621b16f21a6a31cbf7bb7f6
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/5025
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Hussain Towaileb <hussainht@gmail.com>
Reviewed-by: Dmitry Lychagin <dmitry.lychagin@couchbase.com>
diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml
index ac2c303..219595b 100644
--- a/asterixdb/asterix-app/pom.xml
+++ b/asterixdb/asterix-app/pom.xml
@@ -699,5 +699,38 @@
</exclusion>
</exclusions>
</dependency>
+ <!-- AWS -->
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>sdk-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>s3</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>regions</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>auth</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <!-- Mock for AWS S3 -->
+ <dependency>
+ <groupId>io.findify</groupId>
+ <artifactId>s3mock_2.12</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <!-- Needed for the s3 mock -->
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-http-core_2.12</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java
index a989941..c185340 100755
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java
@@ -34,9 +34,9 @@
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.external.IDataSourceAdapter;
import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.common.library.ILibraryManager;
-import org.apache.asterix.external.api.IDataSourceAdapter;
import org.apache.asterix.external.dataset.adapter.AdapterIdentifier;
import org.apache.asterix.external.library.ExternalLibrary;
import org.apache.asterix.external.library.LibraryAdapter;
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index ce1a354..b8a048d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -155,6 +155,8 @@
import org.apache.asterix.metadata.utils.MetadataConstants;
import org.apache.asterix.metadata.utils.MetadataLockUtil;
import org.apache.asterix.metadata.utils.MetadataUtil;
+import org.apache.asterix.object.base.AdmObjectNode;
+import org.apache.asterix.object.base.AdmStringNode;
import org.apache.asterix.om.base.IAObject;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.ATypeTag;
@@ -646,9 +648,16 @@
keySourceIndicators, partitioningTypes, autogenerated, filterField);
break;
case EXTERNAL:
+ validateExternalDatasetRequirements(appCtx, metadataProvider, mdTxnCtx, dd);
String adapter = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getAdapter();
Map<String, String> properties = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getProperties();
+ // Add the withObjectNode items to the external dataset properties
+ if (!dd.getWithObjectNode().isEmpty()) {
+ AdmObjectNode withObjectNode = dd.getWithObjectNode();
+ dd.getWithObjectNode().getFieldNames().iterator().forEachRemaining(fieldName -> properties
+ .put(fieldName, ((AdmStringNode) withObjectNode.get(fieldName)).get()));
+ }
datasetDetails =
new ExternalDatasetDetails(adapter, properties, new Date(), TransactionState.COMMIT);
break;
@@ -3051,4 +3060,17 @@
throw new RuntimeDataException(ErrorCode.REQUEST_CANCELLED, clientRequest.getId());
}
}
+
+ /**
+ * Performs any required validation before creating an external dataset
+ *
+ * @param appContext {@link ICcApplicationContext} context
+ * @param metadataProvider {@link MetadataProvider} metadata provider
+ * @param mdTxnCtx {@link MetadataTransactionContext} metadata transaction context
+ * @param datasetDecl {@link DatasetDecl} dataset declaration statement
+ */
+ protected void validateExternalDatasetRequirements(ICcApplicationContext appContext,
+ MetadataProvider metadataProvider, MetadataTransactionContext mdTxnCtx, DatasetDecl datasetDecl)
+ throws Exception {
+ }
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 26c092f..fc912b0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -69,9 +69,11 @@
import org.apache.asterix.common.config.ReplicationProperties;
import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.external.IAdapterFactoryService;
import org.apache.asterix.common.library.ILibraryManager;
import org.apache.asterix.common.replication.INcLifecycleCoordinator;
import org.apache.asterix.common.utils.Servlets;
+import org.apache.asterix.external.adapter.factory.AdapterFactoryService;
import org.apache.asterix.external.library.ExternalLibraryManager;
import org.apache.asterix.file.StorageComponentProvider;
import org.apache.asterix.messaging.CCMessageBroker;
@@ -154,7 +156,7 @@
ccExtensionManager = new CCExtensionManager(new ArrayList<>(getExtensions()));
IGlobalRecoveryManager globalRecoveryManager = createGlobalRecoveryManager();
appCtx = createApplicationContext(libraryManager, globalRecoveryManager, lifecycleCoordinator,
- () -> new Receptionist("CC"), ConfigValidator::new, ccExtensionManager);
+ () -> new Receptionist("CC"), ConfigValidator::new, ccExtensionManager, new AdapterFactoryService());
final CCConfig ccConfig = controllerService.getCCConfig();
if (System.getProperty("java.rmi.server.hostname") == null) {
System.setProperty("java.rmi.server.hostname", ccConfig.getClusterPublicAddress());
@@ -182,10 +184,12 @@
protected ICcApplicationContext createApplicationContext(ILibraryManager libraryManager,
IGlobalRecoveryManager globalRecoveryManager, INcLifecycleCoordinator lifecycleCoordinator,
IReceptionistFactory receptionistFactory, IConfigValidatorFactory configValidatorFactory,
- CCExtensionManager ccExtensionManager) throws AlgebricksException, IOException {
+ CCExtensionManager ccExtensionManager, IAdapterFactoryService adapterFactoryService)
+ throws AlgebricksException, IOException {
return new CcApplicationContext(ccServiceCtx, getHcc(), libraryManager, () -> MetadataManager.INSTANCE,
globalRecoveryManager, lifecycleCoordinator, new ActiveNotificationHandler(), componentProvider,
- new MetadataLockManager(), receptionistFactory, configValidatorFactory, ccExtensionManager);
+ new MetadataLockManager(), receptionistFactory, configValidatorFactory, ccExtensionManager,
+ adapterFactoryService);
}
protected IGlobalRecoveryManager createGlobalRecoveryManager() throws Exception {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index b74f4c6..fc64d99 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -42,7 +42,7 @@
import org.apache.asterix.common.transactions.TxnId;
import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.asterix.compiler.provider.SqlppCompilationProvider;
-import org.apache.asterix.external.api.IAdapterFactory;
+import org.apache.asterix.external.api.ITypedAdapterFactory;
import org.apache.asterix.external.feed.management.FeedConnectionId;
import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
import org.apache.asterix.external.feed.watch.FeedActivityDetails;
@@ -135,14 +135,14 @@
private FeedOperations() {
}
- private static Pair<JobSpecification, IAdapterFactory> buildFeedIntakeJobSpec(Feed feed,
+ private static Pair<JobSpecification, ITypedAdapterFactory> buildFeedIntakeJobSpec(Feed feed,
MetadataProvider metadataProvider, FeedPolicyAccessor policyAccessor) throws Exception {
JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
spec.setFrameSize(metadataProvider.getApplicationContext().getCompilerProperties().getFrameSize());
- IAdapterFactory adapterFactory;
+ ITypedAdapterFactory adapterFactory;
IOperatorDescriptor feedIngestor;
AlgebricksPartitionConstraint ingesterPc;
- Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, IAdapterFactory> t =
+ Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, ITypedAdapterFactory> t =
metadataProvider.buildFeedIntakeRuntime(spec, feed, policyAccessor);
feedIngestor = t.first;
ingesterPc = t.second;
@@ -447,13 +447,13 @@
MetadataProvider metadataProvider, Feed feed, List<FeedConnection> feedConnections,
IStatementExecutor statementExecutor, IHyracksClientConnection hcc) throws Exception {
FeedPolicyAccessor fpa = new FeedPolicyAccessor(new HashMap<>());
- Pair<JobSpecification, IAdapterFactory> intakeInfo = buildFeedIntakeJobSpec(feed, metadataProvider, fpa);
+ Pair<JobSpecification, ITypedAdapterFactory> intakeInfo = buildFeedIntakeJobSpec(feed, metadataProvider, fpa);
List<JobSpecification> jobsList = new ArrayList<>();
// TODO: Figure out a better way to handle insert/upsert per conn instead of per feed
Boolean insertFeed = ExternalDataUtils.isInsertFeed(feed.getConfiguration());
// Construct the ingestion Job
JobSpecification intakeJob = intakeInfo.getLeft();
- IAdapterFactory ingestionAdaptorFactory = intakeInfo.getRight();
+ ITypedAdapterFactory ingestionAdaptorFactory = intakeInfo.getRight();
String[] ingestionLocations = ingestionAdaptorFactory.getPartitionConstraint().getLocations();
// Add metadata configs
metadataProvider.getConfig().put(FunctionUtil.IMPORT_PRIVATE_FUNCTIONS, Boolean.TRUE.toString());
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetTest.java
new file mode 100644
index 0000000..3b4cdf8
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetTest.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.external_dataset.aws;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.test.common.TestExecutor;
+import org.apache.asterix.test.runtime.ExecutionTestUtil;
+import org.apache.asterix.test.runtime.LangExecutionUtil;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import io.findify.s3mock.S3Mock;
+import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
+import software.amazon.awssdk.core.sync.RequestBody;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.S3ClientBuilder;
+import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
+import software.amazon.awssdk.services.s3.model.PutObjectRequest;
+
+/**
+ * Runs an AWS S3 mock server and test it as an external dataset
+ */
+@RunWith(Parameterized.class)
+public class AwsS3ExternalDatasetTest {
+
+ private static final Logger LOGGER = LogManager.getLogger();
+
+ protected static final String TEST_CONFIG_FILE_NAME = "src/main/resources/cc.conf";
+
+ // S3 mock server
+ private static S3Mock s3MockServer;
+
+ // IMPORTANT: The following values must be used in the AWS S3 test case
+ private static S3Client client;
+ private static final String S3_MOCK_SERVER_BUCKET = "playground";
+ private static final String S3_MOCK_SERVER_BUCKET_DEFINITION = "json-data/reviews/"; // data resides here
+ private static final String S3_MOCK_SERVER_REGION = "us-west-2";
+ private static final int S3_MOCK_SERVER_PORT = 8001;
+ private static final String S3_MOCK_SERVER_HOSTNAME = "http://localhost:" + S3_MOCK_SERVER_PORT;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ final TestExecutor testExecutor = new TestExecutor();
+ LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, testExecutor);
+ setNcEndpoints(testExecutor);
+ startAwsS3MockServer();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ LangExecutionUtil.tearDown();
+
+ // Shutting down S3 mock server
+ LOGGER.info("Shutting down S3 mock server and client");
+ if (client != null) {
+ client.close();
+ }
+ if (s3MockServer != null) {
+ s3MockServer.shutdown();
+ }
+ LOGGER.info("S3 mock down and client shut down successfully");
+ }
+
+ @Parameters(name = "SqlppExecutionTest {index}: {0}")
+ public static Collection<Object[]> tests() throws Exception {
+ return LangExecutionUtil.tests("only_external_dataset.xml", "testsuite_external_dataset.xml");
+ }
+
+ protected TestCaseContext tcCtx;
+
+ public AwsS3ExternalDatasetTest(TestCaseContext tcCtx) {
+ this.tcCtx = tcCtx;
+ }
+
+ @Test
+ public void test() throws Exception {
+ LangExecutionUtil.test(tcCtx);
+ }
+
+ private static void setNcEndpoints(TestExecutor testExecutor) {
+ final NodeControllerService[] ncs = ExecutionTestUtil.integrationUtil.ncs;
+ final Map<String, InetSocketAddress> ncEndPoints = new HashMap<>();
+ final String ip = InetAddress.getLoopbackAddress().getHostAddress();
+ for (NodeControllerService nc : ncs) {
+ final String nodeId = nc.getId();
+ final INcApplicationContext appCtx = (INcApplicationContext) nc.getApplicationContext();
+ int apiPort = appCtx.getExternalProperties().getNcApiPort();
+ ncEndPoints.put(nodeId, InetSocketAddress.createUnresolved(ip, apiPort));
+ }
+ testExecutor.setNcEndPoints(ncEndPoints);
+ }
+
+ /**
+ * Starts the AWS s3 mocking server and loads some files for testing
+ */
+ private static void startAwsS3MockServer() {
+ // Starting S3 mock server to be used instead of real S3 server
+ LOGGER.info("Starting S3 mock server");
+ s3MockServer = new S3Mock.Builder().withPort(S3_MOCK_SERVER_PORT).withInMemoryBackend().build();
+ s3MockServer.start();
+ LOGGER.info("S3 mock server started successfully");
+
+ // Create a client and add some files to the S3 mock server
+ LOGGER.info("Creating S3 client to load initial files to S3 mock server");
+ S3ClientBuilder builder = S3Client.builder();
+ URI endpoint = URI.create(S3_MOCK_SERVER_HOSTNAME); // endpoint pointing to S3 mock server
+ builder.region(Region.of(S3_MOCK_SERVER_REGION)).credentialsProvider(AnonymousCredentialsProvider.create())
+ .endpointOverride(endpoint);
+ client = builder.build();
+ LOGGER.info("Client created successfully");
+
+ // Create the bucket and upload some json files
+ prepareS3Bucket();
+ }
+
+ /**
+ * Creates a bucket and fills it with some files for testing purpose.
+ */
+ private static void prepareS3Bucket() {
+ LOGGER.info("creating bucket " + S3_MOCK_SERVER_BUCKET);
+ client.createBucket(CreateBucketRequest.builder().bucket(S3_MOCK_SERVER_BUCKET).build());
+ LOGGER.info("bucket created successfully");
+
+ LOGGER.info("Adding JSON files to the bucket");
+ client.putObject(
+ PutObjectRequest.builder().bucket(S3_MOCK_SERVER_BUCKET)
+ .key(S3_MOCK_SERVER_BUCKET_DEFINITION + "0.json").build(),
+ RequestBody.fromString("{\"id\": 1, \"year\": null, \"quarter\": null, \"review\": \"good\"}"));
+ client.putObject(
+ PutObjectRequest.builder().bucket(S3_MOCK_SERVER_BUCKET)
+ .key(S3_MOCK_SERVER_BUCKET_DEFINITION + "1.json").build(),
+ RequestBody.fromString("{\"id\": 2, \"year\": null, \"quarter\": null, \"review\": \"good\"}"));
+ client.putObject(
+ PutObjectRequest.builder().bucket(S3_MOCK_SERVER_BUCKET)
+ .key(S3_MOCK_SERVER_BUCKET_DEFINITION + "2018/1.json").build(),
+ RequestBody.fromString("{\"id\": 3, \"year\": 2018, \"quarter\": null, \"review\": \"good\"}"));
+ client.putObject(
+ PutObjectRequest.builder().bucket(S3_MOCK_SERVER_BUCKET)
+ .key(S3_MOCK_SERVER_BUCKET_DEFINITION + "2018/2.json").build(),
+ RequestBody.fromString("{\"id\": 4, \"year\": 2018, \"quarter\": null, \"review\": \"bad\"}"));
+ client.putObject(
+ PutObjectRequest.builder().bucket(S3_MOCK_SERVER_BUCKET)
+ .key(S3_MOCK_SERVER_BUCKET_DEFINITION + "2018/q1/1.json").build(),
+ RequestBody.fromString("{\"id\": 5, \"year\": 2018, \"quarter\": 1, \"review\": \"good\"}"));
+ client.putObject(
+ PutObjectRequest.builder().bucket(S3_MOCK_SERVER_BUCKET)
+ .key(S3_MOCK_SERVER_BUCKET_DEFINITION + "2018/q1/2.json").build(),
+ RequestBody.fromString("{\"id\": 6, \"year\": 2018, \"quarter\": 1, \"review\": \"bad\"}"));
+ client.putObject(
+ PutObjectRequest.builder().bucket(S3_MOCK_SERVER_BUCKET)
+ .key(S3_MOCK_SERVER_BUCKET_DEFINITION + "2018/q2/1.json").build(),
+ RequestBody.fromString("{\"id\": 7, \"year\": 2018, \"quarter\": 2, \"review\": \"good\"}"));
+ client.putObject(
+ PutObjectRequest.builder().bucket(S3_MOCK_SERVER_BUCKET)
+ .key(S3_MOCK_SERVER_BUCKET_DEFINITION + "2018/q2/2.json").build(),
+ RequestBody.fromString("{\"id\": 8, \"year\": 2018, \"quarter\": 2, \"review\": \"bad\"}"));
+ client.putObject(
+ PutObjectRequest.builder().bucket(S3_MOCK_SERVER_BUCKET)
+ .key(S3_MOCK_SERVER_BUCKET_DEFINITION + "2019/1.json").build(),
+ RequestBody.fromString("{\"id\": 9, \"year\": 2019, \"quarter\": null, \"review\": \"good\"}"));
+ client.putObject(
+ PutObjectRequest.builder().bucket(S3_MOCK_SERVER_BUCKET)
+ .key(S3_MOCK_SERVER_BUCKET_DEFINITION + "2019/2.json").build(),
+ RequestBody.fromString("{\"id\": 10, \"year\": 2019, \"quarter\": null, \"review\": \"bad\"}"));
+ client.putObject(
+ PutObjectRequest.builder().bucket(S3_MOCK_SERVER_BUCKET)
+ .key(S3_MOCK_SERVER_BUCKET_DEFINITION + "2019/q1/1.json").build(),
+ RequestBody.fromString("{\"id\": 11, \"year\": 2019, \"quarter\": 1, \"review\": \"good\"}"));
+ client.putObject(
+ PutObjectRequest.builder().bucket(S3_MOCK_SERVER_BUCKET)
+ .key(S3_MOCK_SERVER_BUCKET_DEFINITION + "2019/q1/2.json").build(),
+ RequestBody.fromString("{\"id\": 12, \"year\": 2019, \"quarter\": 1, \"review\": \"bad\"}"));
+ client.putObject(
+ PutObjectRequest.builder().bucket(S3_MOCK_SERVER_BUCKET)
+ .key(S3_MOCK_SERVER_BUCKET_DEFINITION + "2019/q2/1.json").build(),
+ RequestBody.fromString("{\"id\": 13, \"year\": 2019, \"quarter\": 2, \"review\": \"good\"}"));
+ client.putObject(
+ PutObjectRequest.builder().bucket(S3_MOCK_SERVER_BUCKET)
+ .key(S3_MOCK_SERVER_BUCKET_DEFINITION + "2019/q2/2.json").build(),
+ RequestBody.fromString("{\"id\": 14, \"year\": 2019, \"quarter\": 2, \"review\": \"bad\"}"));
+ LOGGER.info("Files added successfully");
+ }
+}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/only_external_dataset.xml b/asterixdb/asterix-app/src/test/resources/runtimets/only_external_dataset.xml
new file mode 100644
index 0000000..334dd52
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/only_external_dataset.xml
@@ -0,0 +1,23 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements. See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership. The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License. You may obtain a copy of the License at
+ !
+ ! http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied. See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<test-suite xmlns="urn:xml.testframework.asterix.apache.org" ResultOffsetPath="results" QueryOffsetPath="queries_sqlpp" QueryFileExtension=".sqlpp">
+ <test-group name="failed">
+ </test-group>
+</test-suite>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/000/external_dataset.000.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/000/external_dataset.000.ddl.sqlpp
new file mode 100644
index 0000000..9c6a994
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/000/external_dataset.000.ddl.sqlpp
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use test;
+
+drop type test if exists;
+create type test as open {
+};
+
+drop dataset test if exists;
+create external dataset test(test) using S3 (
+("accessKey"="dummyAccessKey"),
+("secretKey"="dummySecretKey"),
+("region"="us-west-2"),
+("serviceEndpoint"="http://localhost:8001"),
+("container"="playground"),
+("definition"="json-data/reviews"),
+("format"="json")
+);
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/000/external_dataset.001.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/000/external_dataset.001.query.sqlpp
new file mode 100644
index 0000000..2dd9cc5
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/000/external_dataset.001.query.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+from test
+select value test
+order by id asc;
+
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/000/external_dataset.002.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/000/external_dataset.002.ddl.sqlpp
new file mode 100644
index 0000000..548e632
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/000/external_dataset.002.ddl.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse test if exists;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/000/external_dataset.001.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/000/external_dataset.001.adm
new file mode 100644
index 0000000..a7ce908
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/000/external_dataset.001.adm
@@ -0,0 +1,14 @@
+{ "id": 1, "year": null, "quarter": null, "review": "good" }
+{ "id": 2, "year": null, "quarter": null, "review": "good" }
+{ "id": 3, "year": 2018, "quarter": null, "review": "good" }
+{ "id": 4, "year": 2018, "quarter": null, "review": "bad" }
+{ "id": 5, "year": 2018, "quarter": 1, "review": "good" }
+{ "id": 6, "year": 2018, "quarter": 1, "review": "bad" }
+{ "id": 7, "year": 2018, "quarter": 2, "review": "good" }
+{ "id": 8, "year": 2018, "quarter": 2, "review": "bad" }
+{ "id": 9, "year": 2019, "quarter": null, "review": "good" }
+{ "id": 10, "year": 2019, "quarter": null, "review": "bad" }
+{ "id": 11, "year": 2019, "quarter": 1, "review": "good" }
+{ "id": 12, "year": 2019, "quarter": 1, "review": "bad" }
+{ "id": 13, "year": 2019, "quarter": 2, "review": "good" }
+{ "id": 14, "year": 2019, "quarter": 2, "review": "bad" }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_01/feeds_01.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_01/feeds_01.1.adm
index 85cd967..1dc31dc 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_01/feeds_01.1.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feeds_01/feeds_01.1.adm
@@ -1 +1 @@
-{ "DataverseName": "feeds", "FeedName": "TweetFeed", "AdapterConfiguration": {{ { "Name": "path", "Value": "asterix_nc1://data/twitter/obamatweets.adm" }, { "Name": "feed", "Value": "TweetFeed" }, { "Name": "adapter-name", "Value": "localfs" }, { "Name": "is-feed", "Value": "true" }, { "Name": "parser", "Value": "adm" }, { "Name": "reader", "Value": "localfs" }, { "Name": "format", "Value": "adm" }, { "Name": "tuple-interval", "Value": "10" }, { "Name": "type-name", "Value": "TweetType" }, { "Name": "dataverse", "Value": "feeds" } }}, "Timestamp": "Thu Dec 07 19:22:41 PST 2017" }
+{ "DataverseName": "feeds", "FeedName": "TweetFeed", "AdapterConfiguration": {{ { "Name": "path", "Value": "asterix_nc1://data/twitter/obamatweets.adm" }, { "Name": "feed", "Value": "TweetFeed" }, { "Name": "adapter-name", "Value": "localfs" }, { "Name": "is-feed", "Value": "true" }, { "Name": "parser", "Value": "adm" }, { "Name": "reader", "Value": "localfs" }, { "Name": "format", "Value": "adm" }, { "Name": "tuple-interval", "Value": "10" }, { "Name": "linkName", "Value": "localfs" }, { "Name": "type-name", "Value": "TweetType" }, { "Name": "dataverse", "Value": "feeds" } }}, "Timestamp": "Wed Feb 26 20:33:46 AST 2020" }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset.xml
new file mode 100644
index 0000000..cd1fb12
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset.xml
@@ -0,0 +1,28 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements. See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership. The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License. You may obtain a copy of the License at
+ !
+ ! http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied. See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<test-suite xmlns="urn:xml.testframework.asterix.apache.org" ResultOffsetPath="results" QueryOffsetPath="queries_sqlpp">
+ <test-group name="external-dataset">
+ <test-case FilePath="external-dataset">
+ <compilation-unit name="aws/s3/000">
+ <output-dir compare="Text">aws/s3/000</output-dir>
+ </compilation-unit>
+ </test-case>
+ </test-group>
+</test-suite>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
index 3389962..5fc1bb7 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
@@ -19,6 +19,7 @@
package org.apache.asterix.common.dataflow;
import org.apache.asterix.common.api.IApplicationContext;
+import org.apache.asterix.common.api.ICoordinationService;
import org.apache.asterix.common.api.IMetadataLockManager;
import org.apache.asterix.common.api.INodeJobTracker;
import org.apache.asterix.common.api.IRequestTracker;
@@ -26,6 +27,7 @@
import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
import org.apache.asterix.common.config.ExtensionProperties;
import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.asterix.common.external.IAdapterFactoryService;
import org.apache.asterix.common.metadata.IMetadataBootstrap;
import org.apache.asterix.common.replication.INcLifecycleCoordinator;
import org.apache.asterix.common.storage.ICompressionManager;
@@ -127,4 +129,18 @@
* @return the request tracker.
*/
IRequestTracker getRequestTracker();
+
+ /**
+ * Gets the coordination service
+ *
+ * @return the coordination service
+ */
+ ICoordinationService getCoordinationService();
+
+ /**
+ * Gets the adapter factory service
+ *
+ * @return the adapter factory service
+ */
+ IAdapterFactoryService getAdapterFactoryService();
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IAdapterFactory.java
similarity index 82%
rename from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java
rename to asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IAdapterFactory.java
index 40bc7d8..e2e7e3e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IAdapterFactory.java
@@ -16,12 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.external.api;
+package org.apache.asterix.common.external;
import java.io.Serializable;
import java.util.Map;
-import org.apache.asterix.om.types.ARecordType;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.application.IServiceContext;
@@ -60,8 +59,8 @@
/**
* Creates an instance of IDatasourceAdapter.
*
- * @param HyracksTaskContext
- * @param partition
+ * @param ctx HyracksTaskContext
+ * @param partition partition number
* @return An instance of IDatasourceAdapter.
* @throws Exception
*/
@@ -77,28 +76,4 @@
*/
void configure(IServiceContext serviceContext, Map<String, String> configuration)
throws HyracksDataException, AlgebricksException;
-
- /**
- * Set the expected record output type of the adapter
- *
- * @param outputType
- */
- void setOutputType(ARecordType outputType);
-
- /**
- * Set the expected meta output type of the adapter
- *
- * @param metaType
- */
- void setMetaType(ARecordType metaType);
-
- /**
- * @return the adapter record output type
- */
- ARecordType getOutputType();
-
- /**
- * @return the adapter meta output type
- */
- ARecordType getMetaType();
}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IAdapterFactoryService.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IAdapterFactoryService.java
new file mode 100644
index 0000000..55e25b7
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IAdapterFactoryService.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.external;
+
+@FunctionalInterface
+public interface IAdapterFactoryService {
+
+ /**
+ * Creates and returns and adapter factory
+ *
+ * @return adapter factory
+ */
+ IAdapterFactory createAdapterFactory();
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IDataSourceAdapter.java
similarity index 97%
rename from asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java
rename to asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IDataSourceAdapter.java
index 472cdae..18f59f2 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IDataSourceAdapter.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.external.api;
+package org.apache.asterix.common.external;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.exceptions.HyracksDataException;
diff --git a/asterixdb/asterix-external-data/pom.xml b/asterixdb/asterix-external-data/pom.xml
index 605fbe5..30e7770 100644
--- a/asterixdb/asterix-external-data/pom.xml
+++ b/asterixdb/asterix-external-data/pom.xml
@@ -435,5 +435,17 @@
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>s3</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>regions</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>auth</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/AdapterFactoryService.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/AdapterFactoryService.java
new file mode 100644
index 0000000..aaf2002
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/AdapterFactoryService.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.adapter.factory;
+
+import org.apache.asterix.common.external.IAdapterFactoryService;
+import org.apache.asterix.external.api.ITypedAdapterFactory;
+
+public class AdapterFactoryService implements IAdapterFactoryService {
+
+ /**
+ * Creates and returns an adapter factory
+ *
+ * @return adaptor factory
+ */
+ @Override
+ public ITypedAdapterFactory createAdapterFactory() {
+ return new GenericAdapterFactory();
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
index fc59f68..d081e56 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
@@ -25,14 +25,14 @@
import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.external.IDataSourceAdapter;
import org.apache.asterix.common.library.ILibraryManager;
-import org.apache.asterix.external.api.IAdapterFactory;
import org.apache.asterix.external.api.IDataFlowController;
import org.apache.asterix.external.api.IDataParserFactory;
-import org.apache.asterix.external.api.IDataSourceAdapter;
import org.apache.asterix.external.api.IExternalDataSourceFactory;
import org.apache.asterix.external.api.IIndexibleExternalDataSource;
import org.apache.asterix.external.api.IIndexingAdapterFactory;
+import org.apache.asterix.external.api.ITypedAdapterFactory;
import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
import org.apache.asterix.external.dataset.adapter.FeedAdapter;
import org.apache.asterix.external.dataset.adapter.GenericAdapter;
@@ -59,7 +59,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-public class GenericAdapterFactory implements IIndexingAdapterFactory, IAdapterFactory {
+public class GenericAdapterFactory implements IIndexingAdapterFactory, ITypedAdapterFactory {
private static final long serialVersionUID = 1L;
private static final Logger LOGGER = LogManager.getLogger();
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingAdapterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingAdapterFactory.java
index 37cc1cf..8d42046 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingAdapterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingAdapterFactory.java
@@ -22,6 +22,6 @@
import org.apache.asterix.external.indexing.ExternalFile;
-public interface IIndexingAdapterFactory extends IAdapterFactory {
+public interface IIndexingAdapterFactory extends ITypedAdapterFactory {
public void setSnapshot(List<ExternalFile> files, boolean indexingOp);
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/ITypedAdapterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/ITypedAdapterFactory.java
new file mode 100644
index 0000000..13e3b34
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/ITypedAdapterFactory.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.api;
+
+import org.apache.asterix.common.external.IAdapterFactory;
+import org.apache.asterix.om.types.ARecordType;
+
+/**
+ * Base interface for IGenericDatasetAdapterFactory and ITypedDatasetAdapterFactory.
+ * Acts as a marker interface indicating that the implementation provides functionality
+ * for creating an adapter.
+ */
+public interface ITypedAdapterFactory extends IAdapterFactory {
+
+ /**
+ * Set the expected record output type of the adapter
+ *
+ * @param outputType
+ */
+ void setOutputType(ARecordType outputType);
+
+ /**
+ * Set the expected meta output type of the adapter
+ *
+ * @param metaType
+ */
+ void setMetaType(ARecordType metaType);
+
+ /**
+ * @return the adapter record output type
+ */
+ ARecordType getOutputType();
+
+ /**
+ * @return the adapter meta output type
+ */
+ ARecordType getMetaType();
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
index 2a92d40..0ab59fe 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
@@ -21,7 +21,7 @@
import java.io.Closeable;
import java.io.IOException;
-import org.apache.asterix.external.api.IDataSourceAdapter;
+import org.apache.asterix.common.external.IDataSourceAdapter;
import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.exceptions.HyracksDataException;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
index 916fe0a..0904384 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
@@ -18,8 +18,8 @@
*/
package org.apache.asterix.external.dataset.adapter;
+import org.apache.asterix.common.external.IDataSourceAdapter;
import org.apache.asterix.external.api.IDataFlowController;
-import org.apache.asterix.external.api.IDataSourceAdapter;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.exceptions.HyracksDataException;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
new file mode 100644
index 0000000..cfa1f6a
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws;
+
+import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3Constants;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.external.api.AsterixInputStream;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.CleanupUtils;
+
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.S3ClientBuilder;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+
+public class AwsS3InputStream extends AsterixInputStream {
+
+ // Configuration
+ private final Map<String, String> configuration;
+
+ private final S3Client s3Client;
+
+ // File fields
+ private final List<String> filePaths;
+ private int nextFileIndex = 0;
+
+ // File reading fields
+ private InputStream inputStream;
+
+ public AwsS3InputStream(Map<String, String> configuration, List<String> filePaths) {
+ this.configuration = configuration;
+ this.filePaths = filePaths;
+
+ this.s3Client = buildAwsS3Client(configuration);
+ }
+
+ @Override
+ public int read() throws IOException {
+ throw new HyracksDataException(
+ "read() is not supported with this stream. use read(byte[] b, int off, int len)");
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (inputStream == null) {
+ if (!advance()) {
+ return -1;
+ }
+ }
+
+ int result = inputStream.read(b, off, len);
+
+ // If file reading is done, go to the next file, or finish up if no files are left
+ if (result < 0) {
+ if (advance()) {
+ result = inputStream.read(b, off, len);
+ } else {
+ return -1;
+ }
+ }
+
+ return result;
+ }
+
+ private boolean advance() throws IOException {
+ // No files to read for this partition
+ if (filePaths == null || filePaths.isEmpty()) {
+ return false;
+ }
+
+ // Finished reading all the files
+ if (nextFileIndex == filePaths.size()) {
+ if (inputStream != null) {
+ inputStream.close();
+ }
+ return false;
+ }
+
+ // Close the current stream before going to the next one
+ if (inputStream != null) {
+ inputStream.close();
+ }
+
+ String bucket = configuration.get(AwsS3Constants.CONTAINER_NAME_FIELD_NAME);
+ GetObjectRequest.Builder getObjectBuilder = GetObjectRequest.builder();
+ GetObjectRequest getObjectRequest = getObjectBuilder.bucket(bucket).key(filePaths.get(nextFileIndex)).build();
+ inputStream = s3Client.getObject(getObjectRequest);
+
+ // Current file ready, point to the next file
+ nextFileIndex++;
+ return true;
+ }
+
+ @Override
+ public boolean stop() {
+ return false;
+ }
+
+ @Override
+ public boolean handleException(Throwable th) {
+ return false;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (inputStream != null) {
+ CleanupUtils.close(inputStream, null);
+ }
+ }
+
+ /**
+ * Prepares and builds the Amazon S3 client with the provided configuration
+ *
+ * @param configuration S3 client configuration
+ *
+ * @return Amazon S3 client
+ */
+ private static S3Client buildAwsS3Client(Map<String, String> configuration) {
+ S3ClientBuilder builder = S3Client.builder();
+
+ // Credentials
+ String accessKey = configuration.get(AwsS3Constants.ACCESS_KEY_FIELD_NAME);
+ String secretKey = configuration.get(AwsS3Constants.SECRET_KEY_FIELD_NAME);
+ AwsBasicCredentials credentials = AwsBasicCredentials.create(accessKey, secretKey);
+ builder.credentialsProvider(StaticCredentialsProvider.create(credentials));
+
+ // Region
+ String region = configuration.get(AwsS3Constants.REGION_FIELD_NAME);
+ builder.region(Region.of(region));
+
+ // Use user's endpoint if provided
+ if (configuration.get(AwsS3Constants.SERVICE_END_POINT_FIELD_NAME) != null) {
+ String endPoint = configuration.get(AwsS3Constants.SERVICE_END_POINT_FIELD_NAME);
+ builder.endpointOverride(URI.create(endPoint));
+ }
+
+ return builder.build();
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
new file mode 100644
index 0000000..a9f7898
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws;
+
+import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3Constants;
+
+import java.io.Serializable;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.external.api.AsterixInputStream;
+import org.apache.asterix.external.api.IInputStreamFactory;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.S3ClientBuilder;
+import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
+import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
+import software.amazon.awssdk.services.s3.model.S3Object;
+
+public class AwsS3InputStreamFactory implements IInputStreamFactory {
+
+ private static final long serialVersionUID = 1L;
+ private Map<String, String> configuration;
+
+ // Files to read from
+ private List<PartitionWorkLoadBasedOnSize> partitionWorkLoadsBasedOnSize = new ArrayList<>();
+
+ private transient AlgebricksAbsolutePartitionConstraint partitionConstraint;
+
+ @Override
+ public DataSourceType getDataSourceType() {
+ return DataSourceType.STREAM;
+ }
+
+ @Override
+ public boolean isIndexible() {
+ return false;
+ }
+
+ @Override
+ public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition) {
+ return new AwsS3InputStream(configuration, partitionWorkLoadsBasedOnSize.get(partition).getFilePaths());
+ }
+
+ @Override
+ public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
+ return partitionConstraint;
+ }
+
+ @Override
+ public void configure(IServiceContext ctx, Map<String, String> configuration)
+ throws AlgebricksException, HyracksDataException {
+ this.configuration = configuration;
+ ICcApplicationContext ccApplicationContext = (ICcApplicationContext) ctx.getApplicationContext();
+
+ String container = configuration.get(AwsS3Constants.CONTAINER_NAME_FIELD_NAME);
+
+ S3Client s3Client = buildAwsS3Client(configuration);
+
+ // Get all objects in a bucket and extract the paths to files
+ ListObjectsRequest.Builder listObjectsBuilder = ListObjectsRequest.builder().bucket(container);
+ String path = configuration.get(AwsS3Constants.DEFINITION_FIELD_NAME);
+ if (path != null) {
+ listObjectsBuilder.prefix(path + (path.endsWith("/") ? "" : "/"));
+ }
+ ListObjectsResponse listObjectsResponse = s3Client.listObjects(listObjectsBuilder.build());
+ List<S3Object> s3Objects = listObjectsResponse.contents();
+
+ // Exclude the directories and get the files only
+ String fileFormat = configuration.get(ExternalDataConstants.KEY_FORMAT);
+ List<S3Object> fileObjects = getFilesOnly(s3Objects, fileFormat);
+
+ // Partition constraints
+ partitionConstraint = ccApplicationContext.getClusterStateManager().getClusterLocations();
+ int partitionsCount = partitionConstraint.getLocations().length;
+
+ // Distribute work load amongst the partitions
+ distributeWorkLoad(fileObjects, partitionsCount);
+ }
+
+ /**
+ * AWS S3 returns all the objects as paths, not differentiating between folder and files. The path is considered
+ * a file if it does not end up with a "/" which is the separator in a folder structure.
+ *
+ * @param s3Objects List of returned objects
+ *
+ * @return A list of string paths that point to files only
+ *
+ * @throws HyracksDataException HyracksDataException
+ */
+ private List<S3Object> getFilesOnly(List<S3Object> s3Objects, String fileFormat) throws HyracksDataException {
+ List<S3Object> filesOnly = new ArrayList<>();
+ String fileExtension = getFileExtension(fileFormat);
+ if (fileExtension == null) {
+ throw HyracksDataException.create(ErrorCode.INVALID_FORMAT);
+ }
+
+ s3Objects.stream().filter(object -> object.key().endsWith(fileExtension)).forEach(filesOnly::add);
+
+ return filesOnly;
+ }
+
+ /**
+ * To efficiently utilize the parallelism, work load will be distributed amongst the partitions based on the file
+ * size.
+ *
+ * Example:
+ * File1 1mb, File2 300kb, File3 300kb, File4 300kb
+ *
+ * Distribution:
+ * Partition1: [File1]
+ * Partition2: [File2, File3, File4]
+ *
+ * @param fileObjects AWS S3 file objects
+ * @param partitionsCount Partitions count
+ */
+ private void distributeWorkLoad(List<S3Object> fileObjects, int partitionsCount) {
+ // Prepare the workloads based on the number of partitions
+ for (int i = 0; i < partitionsCount; i++) {
+ partitionWorkLoadsBasedOnSize.add(new PartitionWorkLoadBasedOnSize());
+ }
+
+ for (S3Object object : fileObjects) {
+ PartitionWorkLoadBasedOnSize smallest = getSmallestWorkLoad();
+ smallest.addFilePath(object.key(), object.size());
+ }
+ }
+
+ /**
+ * Finds the smallest workload and returns it
+ *
+ * @return the smallest workload
+ */
+ private PartitionWorkLoadBasedOnSize getSmallestWorkLoad() {
+ PartitionWorkLoadBasedOnSize smallest = partitionWorkLoadsBasedOnSize.get(0);
+ for (PartitionWorkLoadBasedOnSize partition : partitionWorkLoadsBasedOnSize) {
+ // If the current total size is 0, add the file directly as this is a first time partition
+ if (partition.getTotalSize() == 0) {
+ smallest = partition;
+ break;
+ }
+ if (partition.getTotalSize() < smallest.getTotalSize()) {
+ smallest = partition;
+ }
+ }
+
+ return smallest;
+ }
+
+ /**
+ * Prepares and builds the Amazon S3 client with the provided configuration
+ *
+ * @param configuration S3 client configuration
+ *
+ * @return Amazon S3 client
+ */
+ private static S3Client buildAwsS3Client(Map<String, String> configuration) {
+ S3ClientBuilder builder = S3Client.builder();
+
+ // Credentials
+ String accessKey = configuration.get(AwsS3Constants.ACCESS_KEY_FIELD_NAME);
+ String secretKey = configuration.get(AwsS3Constants.SECRET_KEY_FIELD_NAME);
+ AwsBasicCredentials credentials = AwsBasicCredentials.create(accessKey, secretKey);
+ builder.credentialsProvider(StaticCredentialsProvider.create(credentials));
+
+ // Region
+ String region = configuration.get(AwsS3Constants.REGION_FIELD_NAME);
+ builder.region(Region.of(region));
+
+ // Use user's endpoint if provided
+ if (configuration.get(AwsS3Constants.SERVICE_END_POINT_FIELD_NAME) != null) {
+ String endPoint = configuration.get(AwsS3Constants.SERVICE_END_POINT_FIELD_NAME);
+ builder.endpointOverride(URI.create(endPoint));
+ }
+
+ return builder.build();
+ }
+
+ /**
+ * Returns the file extension for the provided file format.
+ *
+ * @param format file format
+ *
+ * @return file extension for the provided file format, null otherwise.
+ */
+ private String getFileExtension(String format) {
+ switch (format.toLowerCase()) {
+ case "json":
+ return ".json";
+ default:
+ return null;
+ }
+ }
+
+ private static class PartitionWorkLoadBasedOnSize implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private List<String> filePaths = new ArrayList<>();
+ private long totalSize = 0;
+
+ PartitionWorkLoadBasedOnSize() {
+ }
+
+ public List<String> getFilePaths() {
+ return filePaths;
+ }
+
+ public void addFilePath(String filePath, long size) {
+ this.filePaths.add(filePath);
+ this.totalSize += size;
+ }
+
+ public long getTotalSize() {
+ return totalSize;
+ }
+
+ @Override
+ public String toString() {
+ return "Files: " + filePaths.size() + ", Total Size: " + totalSize;
+ }
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3ReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3ReaderFactory.java
new file mode 100644
index 0000000..e78783a
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3ReaderFactory.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.input.record.reader.stream.StreamRecordReader;
+import org.apache.asterix.external.input.record.reader.stream.StreamRecordReaderFactory;
+import org.apache.asterix.external.provider.StreamRecordReaderProvider;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class AwsS3ReaderFactory extends StreamRecordReaderFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final List<String> recordReaderNames =
+ Collections.singletonList(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3);
+
+ @Override
+ public List<String> getRecordReaderNames() {
+ return recordReaderNames;
+ }
+
+ @Override
+ public DataSourceType getDataSourceType() {
+ return DataSourceType.RECORDS;
+ }
+
+ @Override
+ public Class<?> getRecordClass() {
+ return char[].class;
+ }
+
+ @Override
+ public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AlgebricksException {
+ return streamFactory.getPartitionConstraint();
+ }
+
+ @Override
+ public void configure(IServiceContext ctx, Map<String, String> configuration)
+ throws AlgebricksException, HyracksDataException {
+ this.configuration = configuration;
+
+ // Stream factory
+ streamFactory = new AwsS3InputStreamFactory();
+ streamFactory.configure(ctx, configuration);
+
+ // record reader
+ recordReaderClazz = StreamRecordReaderProvider.getRecordReaderClazz(configuration);
+ }
+
+ @Override
+ public IRecordReader<? extends char[]> createRecordReader(IHyracksTaskContext ctx, int partition)
+ throws HyracksDataException {
+ try {
+ StreamRecordReader streamRecordReader =
+ (StreamRecordReader) recordReaderClazz.getConstructor().newInstance();
+ streamRecordReader.configure(streamFactory.createInputStream(ctx, partition), configuration);
+ return streamRecordReader;
+ } catch (InstantiationException | IllegalAccessException | InvocationTargetException
+ | NoSuchMethodException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalScanOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalScanOperatorDescriptor.java
index 081d49ec..4fd5151 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalScanOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalScanOperatorDescriptor.java
@@ -18,8 +18,8 @@
*/
package org.apache.asterix.external.operators;
-import org.apache.asterix.external.api.IAdapterFactory;
-import org.apache.asterix.external.api.IDataSourceAdapter;
+import org.apache.asterix.common.external.IDataSourceAdapter;
+import org.apache.asterix.external.api.ITypedAdapterFactory;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -37,10 +37,10 @@
private static final long serialVersionUID = 1L;
- private IAdapterFactory adapterFactory;
+ private ITypedAdapterFactory adapterFactory;
public ExternalScanOperatorDescriptor(JobSpecification spec, RecordDescriptor rDesc,
- IAdapterFactory dataSourceAdapterFactory) {
+ ITypedAdapterFactory dataSourceAdapterFactory) {
super(spec, 0, 1);
outRecDescs[0] = rDesc;
this.adapterFactory = dataSourceAdapterFactory;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
index 7a0341a..d63e8a8 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
@@ -25,7 +25,7 @@
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.common.library.ILibraryManager;
-import org.apache.asterix.external.api.IAdapterFactory;
+import org.apache.asterix.external.api.ITypedAdapterFactory;
import org.apache.asterix.external.feed.api.IFeed;
import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
import org.apache.asterix.om.types.ARecordType;
@@ -57,7 +57,7 @@
private final FeedPolicyAccessor policyAccessor;
private final ARecordType adapterOutputType;
/** The adaptor factory that is used to create an instance of the feed adaptor **/
- private IAdapterFactory adaptorFactory;
+ private ITypedAdapterFactory adaptorFactory;
/** The library that contains the adapter in use. **/
private String adaptorLibraryName;
/**
@@ -68,7 +68,7 @@
/** The configuration parameters associated with the adapter. **/
private Map<String, String> adaptorConfiguration;
- public FeedIntakeOperatorDescriptor(JobSpecification spec, IFeed primaryFeed, IAdapterFactory adapterFactory,
+ public FeedIntakeOperatorDescriptor(JobSpecification spec, IFeed primaryFeed, ITypedAdapterFactory adapterFactory,
ARecordType adapterOutputType, FeedPolicyAccessor policyAccessor, RecordDescriptor rDesc) {
super(spec, 0, 1);
this.feedId = new EntityId(FEED_EXTENSION_NAME, primaryFeed.getDataverseName(), primaryFeed.getFeedName());
@@ -100,15 +100,15 @@
return new FeedIntakeOperatorNodePushable(ctx, feedId, adaptorFactory, partition, recordDescProvider, this);
}
- private IAdapterFactory createExternalAdapterFactory(IHyracksTaskContext ctx) throws HyracksDataException {
- IAdapterFactory adapterFactory;
+ private ITypedAdapterFactory createExternalAdapterFactory(IHyracksTaskContext ctx) throws HyracksDataException {
+ ITypedAdapterFactory adapterFactory;
INcApplicationContext runtimeCtx =
(INcApplicationContext) ctx.getJobletContext().getServiceContext().getApplicationContext();
ILibraryManager libraryManager = runtimeCtx.getLibraryManager();
ClassLoader classLoader = libraryManager.getLibraryClassLoader(feedId.getDataverse(), adaptorLibraryName);
if (classLoader != null) {
try {
- adapterFactory = (IAdapterFactory) (classLoader.loadClass(adaptorFactoryClassName).newInstance());
+ adapterFactory = (ITypedAdapterFactory) (classLoader.loadClass(adaptorFactoryClassName).newInstance());
adapterFactory.setOutputType(adapterOutputType);
adapterFactory.configure(ctx.getJobletContext().getServiceContext(), adaptorConfiguration);
} catch (Exception e) {
@@ -128,11 +128,11 @@
return feedId;
}
- public IAdapterFactory getAdaptorFactory() {
+ public ITypedAdapterFactory getAdaptorFactory() {
return this.adaptorFactory;
}
- public void setAdaptorFactory(IAdapterFactory factory) {
+ public void setAdaptorFactory(ITypedAdapterFactory factory) {
this.adaptorFactory = factory;
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
index 98f75df..7002a23 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
@@ -23,7 +23,7 @@
import org.apache.asterix.active.ActiveRuntimeId;
import org.apache.asterix.active.ActiveSourceOperatorNodePushable;
import org.apache.asterix.active.EntityId;
-import org.apache.asterix.external.api.IAdapterFactory;
+import org.apache.asterix.external.api.ITypedAdapterFactory;
import org.apache.asterix.external.dataset.adapter.FeedAdapter;
import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.comm.VSizeFrame;
@@ -50,7 +50,7 @@
private final FeedAdapter adapter;
private boolean poisoned = false;
- public FeedIntakeOperatorNodePushable(IHyracksTaskContext ctx, EntityId feedId, IAdapterFactory adapterFactory,
+ public FeedIntakeOperatorNodePushable(IHyracksTaskContext ctx, EntityId feedId, ITypedAdapterFactory adapterFactory,
int partition, IRecordDescriptorProvider recordDescProvider,
FeedIntakeOperatorDescriptor feedIntakeOperatorDescriptor) throws HyracksDataException {
super(ctx, new ActiveRuntimeId(feedId, FeedIntakeOperatorNodePushable.class.getSimpleName(), partition));
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
index 5740143..414c460 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
@@ -21,10 +21,11 @@
import java.util.List;
import java.util.Map;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.external.adapter.factory.GenericAdapterFactory;
import org.apache.asterix.external.adapter.factory.LookupAdapterFactory;
-import org.apache.asterix.external.api.IAdapterFactory;
import org.apache.asterix.external.api.IIndexingAdapterFactory;
+import org.apache.asterix.external.api.ITypedAdapterFactory;
import org.apache.asterix.external.indexing.ExternalFile;
import org.apache.asterix.external.util.ExternalDataCompatibilityUtils;
import org.apache.asterix.om.types.ARecordType;
@@ -39,11 +40,13 @@
public class AdapterFactoryProvider {
// Adapters
- public static IAdapterFactory getAdapterFactory(IServiceContext serviceCtx, String adapterName,
+ public static ITypedAdapterFactory getAdapterFactory(IServiceContext serviceCtx, String adapterName,
Map<String, String> configuration, ARecordType itemType, ARecordType metaType)
throws HyracksDataException, AlgebricksException {
ExternalDataCompatibilityUtils.prepare(adapterName, configuration);
- GenericAdapterFactory adapterFactory = new GenericAdapterFactory();
+ ICcApplicationContext context = (ICcApplicationContext) serviceCtx.getApplicationContext();
+ ITypedAdapterFactory adapterFactory =
+ (ITypedAdapterFactory) context.getAdapterFactoryService().createAdapterFactory();
adapterFactory.setOutputType(itemType);
adapterFactory.setMetaType(metaType);
adapterFactory.configure(serviceCtx, configuration);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
index 8024dc4..2a2289c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
@@ -53,6 +53,7 @@
public static IExternalDataSourceFactory getExternalDataSourceFactory(ILibraryManager libraryManager,
Map<String, String> configuration) throws HyracksDataException, AsterixException {
+ // Take a copy of the configuration
if (ExternalDataUtils.getDataSourceType(configuration).equals(DataSourceType.RECORDS)) {
String reader = configuration.get(ExternalDataConstants.KEY_READER);
return DatasourceFactoryProvider.getRecordReaderFactory(libraryManager, reader, configuration);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataCompatibilityUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataCompatibilityUtils.java
index e222e99..77cbb96 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataCompatibilityUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataCompatibilityUtils.java
@@ -60,8 +60,11 @@
}
public static void prepare(String adapterName, Map<String, String> configuration) {
- if (!configuration.containsKey(ExternalDataConstants.KEY_READER)) {
- configuration.put(ExternalDataConstants.KEY_READER, adapterName);
+ // Adapter name in some cases can carry the link name for external datasets, always add it to configuration
+ configuration.put(ExternalDataConstants.KEY_LINK_NAME, adapterName);
+
+ if (!configuration.containsKey(ExternalDataConstants.KEY_READER)) { // SThree
+ configuration.put(ExternalDataConstants.KEY_READER, adapterName); // myAwsLink
}
if (!configuration.containsKey(ExternalDataConstants.KEY_PARSER)) {
if (configuration.containsKey(ExternalDataConstants.KEY_FORMAT)) {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 729215e..e44144a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -114,6 +114,7 @@
public static final String KEY_ADAPTER_NAME_SOCKET = "socket";
public static final String KEY_ALIAS_ADAPTER_NAME_SOCKET = "socket_adapter";
public static final String KEY_ADAPTER_NAME_HTTP = "http_adapter";
+ public static final String KEY_ADAPTER_NAME_AWS_S3 = "S3";
/**
* HDFS class names
@@ -229,4 +230,26 @@
public static final String FORMAT_CSV = "csv";
public static final String ERROR_PARSE_RECORD = "Parser failed to parse record";
+
+ // TODO(Hussain): Move link related items to a different place
+ /**
+ * Common external link fields
+ */
+ public static final String KEY_DATAVERSE_NAME = "dataverseName";
+ public static final String KEY_LINK_NAME = "linkName";
+ public static final String KEY_LINK_TYPE = "linkType";
+ public static final String[] KEY_EXTERNAL_DATASET_REQUIRED_CONNECTION_PARAMETERS =
+ new String[] { KEY_DATAVERSE_NAME, KEY_LINK_NAME, KEY_LINK_TYPE };
+
+ public static class AwsS3Constants {
+ public static final String REGION_FIELD_NAME = "region";
+ public static final String ACCESS_KEY_FIELD_NAME = "accessKey";
+ public static final String SECRET_KEY_FIELD_NAME = "secretKey";
+ public static final String CONTAINER_NAME_FIELD_NAME = "container";
+ public static final String DEFINITION_FIELD_NAME = "definition";
+ public static final String SERVICE_END_POINT_FIELD_NAME = "serviceEndpoint";
+ public static final String[] REQUIRED_LINK_PARAMETERS =
+ new String[] { ACCESS_KEY_FIELD_NAME, SECRET_KEY_FIELD_NAME, REGION_FIELD_NAME };
+ public static final String[] OPTIONAL_LINK_PARAMETERS = new String[] { SERVICE_END_POINT_FIELD_NAME };
+ }
}
diff --git a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
index 0d96658..fd3e473 100644
--- a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
+++ b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
@@ -20,3 +20,4 @@
org.apache.asterix.external.input.HDFSDataSourceFactory
org.apache.asterix.external.input.record.reader.stream.StreamRecordReaderFactory
org.apache.asterix.external.input.record.reader.http.HttpServerRecordReaderFactory
+org.apache.asterix.external.input.record.reader.aws.AwsS3ReaderFactory
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
index 8ee8a57..a947c7e 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
@@ -25,9 +25,9 @@
import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.asterix.external.api.IAdapterFactory;
-import org.apache.asterix.external.api.IDataSourceAdapter;
+import org.apache.asterix.common.external.IDataSourceAdapter;
import org.apache.asterix.external.api.IExternalDataSourceFactory;
+import org.apache.asterix.external.api.ITypedAdapterFactory;
import org.apache.asterix.external.dataflow.TupleForwarder;
import org.apache.asterix.external.parser.ADMDataParser;
import org.apache.asterix.om.types.ARecordType;
@@ -41,7 +41,7 @@
import org.apache.hyracks.dataflow.std.file.ITupleParser;
import org.apache.hyracks.dataflow.std.file.ITupleParserFactory;
-public class TestTypedAdapterFactory implements IAdapterFactory {
+public class TestTypedAdapterFactory implements ITypedAdapterFactory {
private static final long serialVersionUID = 1L;
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DatasetDecl.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DatasetDecl.java
index 0a17b24..45fc33a 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DatasetDecl.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/DatasetDecl.java
@@ -44,7 +44,7 @@
protected final DatasetType datasetType;
protected final IDatasetDetailsDecl datasetDetailsDecl;
protected final Map<String, String> hints;
- private final AdmObjectNode withObjectNode;
+ private AdmObjectNode withObjectNode;
protected final boolean ifNotExists;
public DatasetDecl(Identifier dataverse, Identifier name, Identifier itemTypeDataverse, Identifier itemTypeName,
@@ -67,7 +67,7 @@
}
this.nodegroupName = nodeGroupName;
this.hints = hints;
- this.withObjectNode = DatasetDeclParametersUtil.validateAndGetWithObjectNode(withRecord);
+ this.withObjectNode = DatasetDeclParametersUtil.validateAndGetWithObjectNode(withRecord, datasetType);
this.ifNotExists = ifNotExists;
this.datasetType = datasetType;
this.datasetDetailsDecl = idd;
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/DatasetDeclParametersUtil.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/DatasetDeclParametersUtil.java
index a26a638..52285d9 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/DatasetDeclParametersUtil.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/DatasetDeclParametersUtil.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.lang.common.util;
+import org.apache.asterix.common.config.DatasetConfig;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.lang.common.expression.RecordConstructor;
import org.apache.asterix.object.base.AdmObjectNode;
@@ -60,14 +61,21 @@
private DatasetDeclParametersUtil() {
}
- public static AdmObjectNode validateAndGetWithObjectNode(RecordConstructor withRecord) throws CompilationException {
+ public static AdmObjectNode validateAndGetWithObjectNode(RecordConstructor withRecord,
+ DatasetConfig.DatasetType datasetType) throws CompilationException {
if (withRecord == null) {
return EMPTY_WITH_OBJECT;
}
- final ConfigurationTypeValidator validator = new ConfigurationTypeValidator();
- final AdmObjectNode node = ExpressionUtils.toNode(withRecord);
- validator.validateType(WITH_OBJECT_TYPE, node);
- return node;
+
+ // Handle based on dataset type
+ if (datasetType == DatasetConfig.DatasetType.INTERNAL) {
+ final ConfigurationTypeValidator validator = new ConfigurationTypeValidator();
+ final AdmObjectNode node = ExpressionUtils.toNode(withRecord);
+ validator.validateType(WITH_OBJECT_TYPE, node);
+ return node;
+ } else {
+ return ExpressionUtils.toNode(withRecord);
+ }
}
private static ARecordType getWithObjectType() {
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index 49fffe6..3412941 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -35,13 +35,13 @@
import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.exceptions.MetadataException;
+import org.apache.asterix.common.external.IDataSourceAdapter;
import org.apache.asterix.common.ioopcallbacks.LSMIndexIOOperationCallbackFactory;
import org.apache.asterix.common.ioopcallbacks.LSMIndexPageWriteCallbackFactory;
import org.apache.asterix.common.utils.StorageConstants;
import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.asterix.external.adapter.factory.GenericAdapterFactory;
-import org.apache.asterix.external.api.IAdapterFactory;
-import org.apache.asterix.external.api.IDataSourceAdapter;
+import org.apache.asterix.external.api.ITypedAdapterFactory;
import org.apache.asterix.external.dataset.adapter.AdapterIdentifier;
import org.apache.asterix.external.indexing.ExternalFile;
import org.apache.asterix.metadata.IDatasetDetails;
@@ -294,7 +294,8 @@
private static DatasourceAdapter getAdapter(String adapterFactoryClassName) throws AlgebricksException {
try {
- String adapterName = ((IAdapterFactory) (Class.forName(adapterFactoryClassName).newInstance())).getAlias();
+ String adapterName =
+ ((ITypedAdapterFactory) (Class.forName(adapterFactoryClassName).newInstance())).getAlias();
return new DatasourceAdapter(new AdapterIdentifier(MetadataConstants.METADATA_DATAVERSE_NAME, adapterName),
adapterFactoryClassName, IDataSourceAdapter.AdapterType.INTERNAL);
} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
index 62cce05..07bbc57 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
@@ -23,7 +23,7 @@
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.external.api.IAdapterFactory;
+import org.apache.asterix.external.api.ITypedAdapterFactory;
import org.apache.asterix.metadata.IDatasetDetails;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.entities.Dataset;
@@ -111,7 +111,7 @@
externalDataset.getItemTypeDataverseName(), itemTypeName).getDatatype();
ExternalDatasetDetails edd = (ExternalDatasetDetails) externalDataset.getDatasetDetails();
- IAdapterFactory adapterFactory = metadataProvider.getConfiguredAdapterFactory(externalDataset,
+ ITypedAdapterFactory adapterFactory = metadataProvider.getConfiguredAdapterFactory(externalDataset,
edd.getAdapter(), edd.getProperties(), (ARecordType) itemType, null);
return metadataProvider.buildExternalDatasetDataScannerRuntime(jobSpec, itemType, adapterFactory);
case INTERNAL:
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java
index 3460a46..c2983af 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java
@@ -26,7 +26,7 @@
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.external.api.IAdapterFactory;
+import org.apache.asterix.external.api.ITypedAdapterFactory;
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.metadata.entities.InternalDatasetDetails;
import org.apache.asterix.om.types.ARecordType;
@@ -137,7 +137,7 @@
}
LoadableDataSource alds = (LoadableDataSource) dataSource;
ARecordType itemType = (ARecordType) alds.getLoadedType();
- IAdapterFactory adapterFactory = metadataProvider.getConfiguredAdapterFactory(alds.getTargetDataset(),
+ ITypedAdapterFactory adapterFactory = metadataProvider.getConfiguredAdapterFactory(alds.getTargetDataset(),
alds.getAdapter(), alds.getAdapterProperties(), itemType, null);
RecordDescriptor rDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
return metadataProvider.buildLoadableDatasetScan(jobSpec, adapterFactory, rDesc);
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 0a72ceb..5bdf2a7 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -39,6 +39,7 @@
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.dataflow.LSMTreeInsertDeleteOperatorDescriptor;
import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.external.IDataSourceAdapter;
import org.apache.asterix.common.metadata.LockList;
import org.apache.asterix.common.storage.ICompressionManager;
import org.apache.asterix.common.transactions.ITxnIdFactory;
@@ -48,8 +49,7 @@
import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory;
import org.apache.asterix.dataflow.data.nontagged.serde.SerializerDeserializerUtil;
import org.apache.asterix.external.adapter.factory.LookupAdapterFactory;
-import org.apache.asterix.external.api.IAdapterFactory;
-import org.apache.asterix.external.api.IDataSourceAdapter;
+import org.apache.asterix.external.api.ITypedAdapterFactory;
import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
import org.apache.asterix.external.indexing.ExternalFile;
import org.apache.asterix.external.indexing.IndexingConstants;
@@ -416,7 +416,7 @@
}
protected Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildLoadableDatasetScan(
- JobSpecification jobSpec, IAdapterFactory adapterFactory, RecordDescriptor rDesc)
+ JobSpecification jobSpec, ITypedAdapterFactory adapterFactory, RecordDescriptor rDesc)
throws AlgebricksException {
ExternalScanOperatorDescriptor dataScanner = new ExternalScanOperatorDescriptor(jobSpec, rDesc, adapterFactory);
try {
@@ -430,14 +430,14 @@
return MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
}
- public Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, IAdapterFactory> buildFeedIntakeRuntime(
+ public Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, ITypedAdapterFactory> buildFeedIntakeRuntime(
JobSpecification jobSpec, Feed feed, FeedPolicyAccessor policyAccessor) throws Exception {
- Triple<IAdapterFactory, RecordDescriptor, IDataSourceAdapter.AdapterType> factoryOutput;
+ Triple<ITypedAdapterFactory, RecordDescriptor, IDataSourceAdapter.AdapterType> factoryOutput;
factoryOutput =
FeedMetadataUtil.getFeedFactoryAndOutput(feed, policyAccessor, mdTxnCtx, getApplicationContext());
ARecordType recordType =
FeedMetadataUtil.getOutputType(feed, feed.getConfiguration().get(ExternalDataConstants.KEY_TYPE_NAME));
- IAdapterFactory adapterFactory = factoryOutput.first;
+ ITypedAdapterFactory adapterFactory = factoryOutput.first;
FeedIntakeOperatorDescriptor feedIngestor = null;
switch (factoryOutput.third) {
case INTERNAL:
@@ -775,11 +775,11 @@
return numElementsHint / numPartitions;
}
- protected IAdapterFactory getConfiguredAdapterFactory(Dataset dataset, String adapterName,
+ protected ITypedAdapterFactory getConfiguredAdapterFactory(Dataset dataset, String adapterName,
Map<String, String> configuration, ARecordType itemType, ARecordType metaType) throws AlgebricksException {
try {
configuration.put(ExternalDataConstants.KEY_DATAVERSE, dataset.getDataverseName());
- IAdapterFactory adapterFactory = AdapterFactoryProvider.getAdapterFactory(
+ ITypedAdapterFactory adapterFactory = AdapterFactoryProvider.getAdapterFactory(
getApplicationContext().getServiceContext(), adapterName, configuration, itemType, metaType);
// check to see if dataset is indexed
@@ -922,7 +922,7 @@
}
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDatasetDataScannerRuntime(
- JobSpecification jobSpec, IAType itemType, IAdapterFactory adapterFactory) throws AlgebricksException {
+ JobSpecification jobSpec, IAType itemType, ITypedAdapterFactory adapterFactory) throws AlgebricksException {
if (itemType.getTypeTag() != ATypeTag.OBJECT) {
throw new AlgebricksException("Can only scan datasets of records.");
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/DatasourceAdapter.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/DatasourceAdapter.java
index b72c058..c29fb93 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/DatasourceAdapter.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/DatasourceAdapter.java
@@ -18,7 +18,7 @@
*/
package org.apache.asterix.metadata.entities;
-import org.apache.asterix.external.api.IDataSourceAdapter.AdapterType;
+import org.apache.asterix.common.external.IDataSourceAdapter.AdapterType;
import org.apache.asterix.external.dataset.adapter.AdapterIdentifier;
import org.apache.asterix.metadata.MetadataCache;
import org.apache.asterix.metadata.api.IMetadataEntity;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasourceAdapterTupleTranslator.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasourceAdapterTupleTranslator.java
index 8f630cf..9e65c08 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasourceAdapterTupleTranslator.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasourceAdapterTupleTranslator.java
@@ -24,7 +24,7 @@
import java.io.DataInputStream;
import java.util.Calendar;
-import org.apache.asterix.external.api.IDataSourceAdapter;
+import org.apache.asterix.common.external.IDataSourceAdapter;
import org.apache.asterix.external.dataset.adapter.AdapterIdentifier;
import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
import org.apache.asterix.metadata.bootstrap.MetadataPrimaryIndexes;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
index 3ae0fec..7ed53e4 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
@@ -28,9 +28,9 @@
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.MetadataException;
-import org.apache.asterix.external.api.IAdapterFactory;
-import org.apache.asterix.external.api.IDataSourceAdapter;
-import org.apache.asterix.external.api.IDataSourceAdapter.AdapterType;
+import org.apache.asterix.common.external.IDataSourceAdapter;
+import org.apache.asterix.common.external.IDataSourceAdapter.AdapterType;
+import org.apache.asterix.external.api.ITypedAdapterFactory;
import org.apache.asterix.external.feed.api.IFeed;
import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
import org.apache.asterix.external.provider.AdapterFactoryProvider;
@@ -117,20 +117,20 @@
adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, feed.getDataverseName(), adapterName);
}
AdapterType adapterType;
- IAdapterFactory adapterFactory;
+ ITypedAdapterFactory adapterFactory;
if (adapterEntity != null) {
adapterType = adapterEntity.getType();
String adapterFactoryClassname = adapterEntity.getClassname();
switch (adapterType) {
case INTERNAL:
- adapterFactory = (IAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
+ adapterFactory = (ITypedAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
break;
case EXTERNAL:
String[] anameComponents = adapterName.split("#");
String libraryName = anameComponents[0];
ClassLoader cl =
appCtx.getLibraryManager().getLibraryClassLoader(feed.getDataverseName(), libraryName);
- adapterFactory = (IAdapterFactory) cl.loadClass(adapterFactoryClassname).newInstance();
+ adapterFactory = (ITypedAdapterFactory) cl.loadClass(adapterFactoryClassname).newInstance();
break;
default:
throw new AsterixException("Unknown Adapter type " + adapterType);
@@ -165,17 +165,17 @@
}
@SuppressWarnings("rawtypes")
- public static Triple<IAdapterFactory, RecordDescriptor, AdapterType> getFeedFactoryAndOutput(Feed feed,
+ public static Triple<ITypedAdapterFactory, RecordDescriptor, AdapterType> getFeedFactoryAndOutput(Feed feed,
FeedPolicyAccessor policyAccessor, MetadataTransactionContext mdTxnCtx, ICcApplicationContext appCtx)
throws AlgebricksException {
// This method needs to be re-visited
String adapterName = null;
DatasourceAdapter adapterEntity = null;
String adapterFactoryClassname = null;
- IAdapterFactory adapterFactory = null;
+ ITypedAdapterFactory adapterFactory = null;
ARecordType adapterOutputType = null;
ARecordType metaType = null;
- Triple<IAdapterFactory, RecordDescriptor, IDataSourceAdapter.AdapterType> feedProps = null;
+ Triple<ITypedAdapterFactory, RecordDescriptor, IDataSourceAdapter.AdapterType> feedProps = null;
IDataSourceAdapter.AdapterType adapterType = null;
try {
Map<String, String> configuration = feed.getConfiguration();
@@ -196,14 +196,14 @@
adapterFactoryClassname = adapterEntity.getClassname();
switch (adapterType) {
case INTERNAL:
- adapterFactory = (IAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
+ adapterFactory = (ITypedAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
break;
case EXTERNAL:
String[] anameComponents = adapterName.split("#");
String libraryName = anameComponents[0];
ClassLoader cl =
appCtx.getLibraryManager().getLibraryClassLoader(feed.getDataverseName(), libraryName);
- adapterFactory = (IAdapterFactory) cl.loadClass(adapterFactoryClassname).newInstance();
+ adapterFactory = (ITypedAdapterFactory) cl.loadClass(adapterFactoryClassname).newInstance();
break;
default:
throw new AsterixException("Unknown Adapter type " + adapterType);
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java
index 47db3b0..c1d8f42 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java
@@ -30,7 +30,7 @@
import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
import org.apache.asterix.common.config.DatasetConfig.TransactionState;
import org.apache.asterix.common.context.IStorageComponentProvider;
-import org.apache.asterix.external.api.IAdapterFactory;
+import org.apache.asterix.external.api.ITypedAdapterFactory;
import org.apache.asterix.external.indexing.ExternalFile;
import org.apache.asterix.external.indexing.IndexingConstants;
import org.apache.asterix.external.operators.ExternalDatasetIndexesAbortOperatorDescriptor;
@@ -254,7 +254,7 @@
throws HyracksDataException, AlgebricksException {
ExternalDatasetDetails externalDatasetDetails = (ExternalDatasetDetails) dataset.getDatasetDetails();
Map<String, String> configuration = externalDatasetDetails.getProperties();
- IAdapterFactory adapterFactory = AdapterFactoryProvider.getIndexingAdapterFactory(
+ ITypedAdapterFactory adapterFactory = AdapterFactoryProvider.getIndexingAdapterFactory(
metadataProvider.getApplicationContext().getServiceContext(), externalDatasetDetails.getAdapter(),
configuration, (ARecordType) itemType, files, true, null);
ExternalScanOperatorDescriptor scanOp =
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
index a0b10c6..3366ac1 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
@@ -45,6 +45,7 @@
import org.apache.asterix.common.config.TransactionProperties;
import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.external.IAdapterFactoryService;
import org.apache.asterix.common.library.ILibraryManager;
import org.apache.asterix.common.metadata.IMetadataBootstrap;
import org.apache.asterix.common.replication.INcLifecycleCoordinator;
@@ -98,14 +99,15 @@
private final IReceptionist receptionist;
private final IRequestTracker requestTracker;
private final IConfigValidator configValidator;
+ private final IAdapterFactoryService adapterFactoryService;
public CcApplicationContext(ICCServiceContext ccServiceCtx, IHyracksClientConnection hcc,
ILibraryManager libraryManager, Supplier<IMetadataBootstrap> metadataBootstrapSupplier,
IGlobalRecoveryManager globalRecoveryManager, INcLifecycleCoordinator ftStrategy,
IJobLifecycleListener activeLifeCycleListener, IStorageComponentProvider storageComponentProvider,
IMetadataLockManager mdLockManager, IReceptionistFactory receptionistFactory,
- IConfigValidatorFactory configValidatorFactory, Object extensionManager)
- throws AlgebricksException, IOException {
+ IConfigValidatorFactory configValidatorFactory, Object extensionManager,
+ IAdapterFactoryService adapterFactoryService) throws AlgebricksException, IOException {
this.ccServiceCtx = ccServiceCtx;
this.hcc = hcc;
this.libraryManager = libraryManager;
@@ -139,6 +141,7 @@
receptionist = receptionistFactory.create();
requestTracker = new RequestTracker(this);
configValidator = configValidatorFactory.create();
+ this.adapterFactoryService = adapterFactoryService;
}
@Override
@@ -306,4 +309,9 @@
public IRequestTracker getRequestTracker() {
return requestTracker;
}
+
+ @Override
+ public IAdapterFactoryService getAdapterFactoryService() {
+ return adapterFactoryService;
+ }
}
diff --git a/asterixdb/asterix-server/pom.xml b/asterixdb/asterix-server/pom.xml
index 409b13d..7acfc04 100644
--- a/asterixdb/asterix-server/pom.xml
+++ b/asterixdb/asterix-server/pom.xml
@@ -172,6 +172,11 @@
<gav>io.netty:netty-all:4.1.46.Final</gav>
<noticeUrl>https://raw.githubusercontent.com/netty/netty/netty-4.1.46.Final/NOTICE.txt</noticeUrl>
</override>
+ <override>
+ <gav>org.reactivestreams:reactive-streams:1.0.2</gav>
+ <noticeUrl>https://raw.githubusercontent.com/reactive-streams/reactive-streams-jvm/v1.0.2/COPYING.txt</noticeUrl>
+ <url>https://raw.githubusercontent.com/reactive-streams/reactive-streams-jvm/v1.0.2/LICENSE.txt</url>
+ </override>
</overrides>
<licenses>
<license>
@@ -205,6 +210,7 @@
<aliasUrl>http://www.apache.org/licenses/LICENSE-2.0</aliasUrl>
<aliasUrl>https://www.apache.org/licenses/LICENSE-2.0.txt</aliasUrl>
<aliasUrl>http://www.apache.org/licenses/LICENSE-2.0.html</aliasUrl>
+ <aliasUrl>https://aws.amazon.com/apache2.0</aliasUrl>
</aliasUrls>
<metric>1</metric>
</license>
diff --git a/asterixdb/pom.xml b/asterixdb/pom.xml
index 25550bf..76ec3d2 100644
--- a/asterixdb/pom.xml
+++ b/asterixdb/pom.xml
@@ -81,6 +81,7 @@
<hyracks.version>0.3.5-SNAPSHOT</hyracks.version>
<hadoop.version>2.8.5</hadoop.version>
<jacoco.version>0.7.6.201602180812</jacoco.version>
+ <awsjavasdk.version>2.10.83</awsjavasdk.version>
<implementation.title>Apache AsterixDB - ${project.name}</implementation.title>
<implementation.url>https://asterixdb.apache.org/</implementation.url>
@@ -1340,6 +1341,84 @@
<artifactId>reflections</artifactId>
<version>0.9.12</version>
</dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>s3</artifactId>
+ <version>${awsjavasdk.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-codec-http</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-codec-http2</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-codec</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-transport</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-common</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-buffer</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-handler</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-transport-native-epoll</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>regions</artifactId>
+ <version>${awsjavasdk.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>auth</artifactId>
+ <version>${awsjavasdk.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>sdk-core</artifactId>
+ <version>${awsjavasdk.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <!-- Mock for AWS S3 -->
+ <dependency>
+ <groupId>io.findify</groupId>
+ <artifactId>s3mock_2.12</artifactId>
+ <version>0.2.5</version>
+ </dependency>
+ <!-- Needed for the s3 mock -->
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-http-core_2.12</artifactId>
+ <version>10.1.0</version>
+ </dependency>
</dependencies>
</dependencyManagement>
diff --git a/asterixdb/src/main/licenses/content/raw.githubusercontent.com_reactive-streams_reactive-streams-jvm_v1.0.2_COPYING.txt b/asterixdb/src/main/licenses/content/raw.githubusercontent.com_reactive-streams_reactive-streams-jvm_v1.0.2_COPYING.txt
new file mode 100644
index 0000000..1625c17
--- /dev/null
+++ b/asterixdb/src/main/licenses/content/raw.githubusercontent.com_reactive-streams_reactive-streams-jvm_v1.0.2_COPYING.txt
@@ -0,0 +1,121 @@
+Creative Commons Legal Code
+
+CC0 1.0 Universal
+
+ CREATIVE COMMONS CORPORATION IS NOT A LAW FIRM AND DOES NOT PROVIDE
+ LEGAL SERVICES. DISTRIBUTION OF THIS DOCUMENT DOES NOT CREATE AN
+ ATTORNEY-CLIENT RELATIONSHIP. CREATIVE COMMONS PROVIDES THIS
+ INFORMATION ON AN "AS-IS" BASIS. CREATIVE COMMONS MAKES NO WARRANTIES
+ REGARDING THE USE OF THIS DOCUMENT OR THE INFORMATION OR WORKS
+ PROVIDED HEREUNDER, AND DISCLAIMS LIABILITY FOR DAMAGES RESULTING FROM
+ THE USE OF THIS DOCUMENT OR THE INFORMATION OR WORKS PROVIDED
+ HEREUNDER.
+
+Statement of Purpose
+
+The laws of most jurisdictions throughout the world automatically confer
+exclusive Copyright and Related Rights (defined below) upon the creator
+and subsequent owner(s) (each and all, an "owner") of an original work of
+authorship and/or a database (each, a "Work").
+
+Certain owners wish to permanently relinquish those rights to a Work for
+the purpose of contributing to a commons of creative, cultural and
+scientific works ("Commons") that the public can reliably and without fear
+of later claims of infringement build upon, modify, incorporate in other
+works, reuse and redistribute as freely as possible in any form whatsoever
+and for any purposes, including without limitation commercial purposes.
+These owners may contribute to the Commons to promote the ideal of a free
+culture and the further production of creative, cultural and scientific
+works, or to gain reputation or greater distribution for their Work in
+part through the use and efforts of others.
+
+For these and/or other purposes and motivations, and without any
+expectation of additional consideration or compensation, the person
+associating CC0 with a Work (the "Affirmer"), to the extent that he or she
+is an owner of Copyright and Related Rights in the Work, voluntarily
+elects to apply CC0 to the Work and publicly distribute the Work under its
+terms, with knowledge of his or her Copyright and Related Rights in the
+Work and the meaning and intended legal effect of CC0 on those rights.
+
+1. Copyright and Related Rights. A Work made available under CC0 may be
+protected by copyright and related or neighboring rights ("Copyright and
+Related Rights"). Copyright and Related Rights include, but are not
+limited to, the following:
+
+ i. the right to reproduce, adapt, distribute, perform, display,
+ communicate, and translate a Work;
+ ii. moral rights retained by the original author(s) and/or performer(s);
+iii. publicity and privacy rights pertaining to a person's image or
+ likeness depicted in a Work;
+ iv. rights protecting against unfair competition in regards to a Work,
+ subject to the limitations in paragraph 4(a), below;
+ v. rights protecting the extraction, dissemination, use and reuse of data
+ in a Work;
+ vi. database rights (such as those arising under Directive 96/9/EC of the
+ European Parliament and of the Council of 11 March 1996 on the legal
+ protection of databases, and under any national implementation
+ thereof, including any amended or successor version of such
+ directive); and
+vii. other similar, equivalent or corresponding rights throughout the
+ world based on applicable law or treaty, and any national
+ implementations thereof.
+
+2. Waiver. To the greatest extent permitted by, but not in contravention
+of, applicable law, Affirmer hereby overtly, fully, permanently,
+irrevocably and unconditionally waives, abandons, and surrenders all of
+Affirmer's Copyright and Related Rights and associated claims and causes
+of action, whether now known or unknown (including existing as well as
+future claims and causes of action), in the Work (i) in all territories
+worldwide, (ii) for the maximum duration provided by applicable law or
+treaty (including future time extensions), (iii) in any current or future
+medium and for any number of copies, and (iv) for any purpose whatsoever,
+including without limitation commercial, advertising or promotional
+purposes (the "Waiver"). Affirmer makes the Waiver for the benefit of each
+member of the public at large and to the detriment of Affirmer's heirs and
+successors, fully intending that such Waiver shall not be subject to
+revocation, rescission, cancellation, termination, or any other legal or
+equitable action to disrupt the quiet enjoyment of the Work by the public
+as contemplated by Affirmer's express Statement of Purpose.
+
+3. Public License Fallback. Should any part of the Waiver for any reason
+be judged legally invalid or ineffective under applicable law, then the
+Waiver shall be preserved to the maximum extent permitted taking into
+account Affirmer's express Statement of Purpose. In addition, to the
+extent the Waiver is so judged Affirmer hereby grants to each affected
+person a royalty-free, non transferable, non sublicensable, non exclusive,
+irrevocable and unconditional license to exercise Affirmer's Copyright and
+Related Rights in the Work (i) in all territories worldwide, (ii) for the
+maximum duration provided by applicable law or treaty (including future
+time extensions), (iii) in any current or future medium and for any number
+of copies, and (iv) for any purpose whatsoever, including without
+limitation commercial, advertising or promotional purposes (the
+"License"). The License shall be deemed effective as of the date CC0 was
+applied by Affirmer to the Work. Should any part of the License for any
+reason be judged legally invalid or ineffective under applicable law, such
+partial invalidity or ineffectiveness shall not invalidate the remainder
+of the License, and in such case Affirmer hereby affirms that he or she
+will not (i) exercise any of his or her remaining Copyright and Related
+Rights in the Work or (ii) assert any associated claims and causes of
+action with respect to the Work, in either case contrary to Affirmer's
+express Statement of Purpose.
+
+4. Limitations and Disclaimers.
+
+ a. No trademark or patent rights held by Affirmer are waived, abandoned,
+ surrendered, licensed or otherwise affected by this document.
+ b. Affirmer offers the Work as-is and makes no representations or
+ warranties of any kind concerning the Work, express, implied,
+ statutory or otherwise, including without limitation warranties of
+ title, merchantability, fitness for a particular purpose, non
+ infringement, or the absence of latent or other defects, accuracy, or
+ the present or absence of errors, whether or not discoverable, all to
+ the greatest extent permissible under applicable law.
+ c. Affirmer disclaims responsibility for clearing rights of other persons
+ that may apply to the Work or any use thereof, including without
+ limitation any person's Copyright and Related Rights in the Work.
+ Further, Affirmer disclaims responsibility for obtaining any necessary
+ consents, permissions or other rights required for any use of the
+ Work.
+ d. Affirmer understands and acknowledges that Creative Commons is not a
+ party to this document and has no duty or obligation with respect to
+ this CC0 or use of the Work.
\ No newline at end of file
diff --git a/asterixdb/src/main/licenses/content/raw.githubusercontent.com_reactive-streams_reactive-streams-jvm_v1.0.2_LICENSE.txt b/asterixdb/src/main/licenses/content/raw.githubusercontent.com_reactive-streams_reactive-streams-jvm_v1.0.2_LICENSE.txt
new file mode 100644
index 0000000..eadae05
--- /dev/null
+++ b/asterixdb/src/main/licenses/content/raw.githubusercontent.com_reactive-streams_reactive-streams-jvm_v1.0.2_LICENSE.txt
@@ -0,0 +1,8 @@
+Licensed under Public Domain (CC0)
+
+To the extent possible under law, the person who associated CC0 with
+this code has waived all copyright and related or neighboring
+rights to this code.
+
+You should have received a copy of the CC0 legalcode along with this
+work. If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.
\ No newline at end of file