[ASTERIXDB-2722][EXT] Require minimum permissions + improve error reporting
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Creating an external dataset requires S3:ListBucket permission.
- Querying an external dataset requires S3:ListBucket and
S3:GetObject permissions.
- Improved error reporting.
- Added test cases for the above mentioned items (where applicable).
Change-Id: Idc266cf63b8f92a07af7341118d2636673913160
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/6463
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Dmitry Lychagin <dmitry.lychagin@couchbase.com>
diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml
index 219595b..e3702a6 100644
--- a/asterixdb/asterix-app/pom.xml
+++ b/asterixdb/asterix-app/pom.xml
@@ -703,22 +703,18 @@
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sdk-core</artifactId>
- <scope>test</scope>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
- <scope>test</scope>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>regions</artifactId>
- <scope>test</scope>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>auth</artifactId>
- <scope>test</scope>
</dependency>
<!-- Mock for AWS S3 -->
<dependency>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 0f540ff..ed3eb8b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -18,8 +18,6 @@
*/
package org.apache.asterix.app.translator;
-import static org.apache.asterix.common.exceptions.ErrorCode.UNKNOWN_DATAVERSE;
-
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
@@ -637,7 +635,7 @@
throw new CompilationException(ErrorCode.UNKNOWN_DATAVERSE, sourceLoc, dataverseName);
}
- IDatasetDetails datasetDetails = null;
+ IDatasetDetails datasetDetails;
Dataset ds = metadataProvider.findDataset(dataverseName, datasetName);
if (ds != null) {
if (dd.getIfNotExists()) {
@@ -738,7 +736,7 @@
createExternalDatasetProperties(dataverseName, dd, metadataProvider, mdTxnCtx);
ExternalDataUtils.normalize(properties);
ExternalDataUtils.validate(properties);
- validateExternalDatasetProperties(externalDetails, properties, dd.getSourceLocation());
+ validateExternalDatasetProperties(externalDetails, properties, dd.getSourceLocation(), mdTxnCtx);
datasetDetails = new ExternalDatasetDetails(externalDetails.getAdapter(), properties, new Date(),
TransactionState.COMMIT);
break;
@@ -3236,12 +3234,26 @@
}
protected void validateExternalDatasetProperties(ExternalDetailsDecl externalDetails,
- Map<String, String> properties, SourceLocation srcLoc) throws CompilationException {
+ Map<String, String> properties, SourceLocation srcLoc, MetadataTransactionContext mdTxnCtx)
+ throws AlgebricksException, HyracksDataException {
String adapter = externalDetails.getAdapter();
// "format" parameter is needed for "S3" data source
if (ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3.equals(adapter)
&& properties.get(ExternalDataConstants.KEY_FORMAT) == null) {
throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, srcLoc, ExternalDataConstants.KEY_FORMAT);
}
+
+ Map<String, String> details = new HashMap<>(properties);
+ details.put(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE, adapter);
+ validateExternalSourceContainer(details);
+ }
+
+ /**
+ * Ensures that the external source container is present
+ *
+ * @param configuration external source properties
+ */
+ protected void validateExternalSourceContainer(Map<String, String> configuration) throws CompilationException {
+ ExternalDataUtils.validateExternalSourceContainer(configuration);
}
}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/bucket-does-not-exist/test.000.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/bucket-does-not-exist/test.000.ddl.sqlpp
new file mode 100644
index 0000000..b174162
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/bucket-does-not-exist/test.000.ddl.sqlpp
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use test;
+
+drop type test if exists;
+create type test as open {
+};
+
+drop dataset test if exists;
+create external dataset test(test) using S3 (
+("accessKeyId"="dummyAccessKey"),
+("secretAccessKey"="dummySecretKey"),
+("region"="us-west-2"),
+("serviceEndpoint"="http://localhost:8001"),
+("container"="bucket-does-not-exist"),
+("definition"="over-1000-objects"),
+("format"="json")
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/bucket-does-not-exist/test.099.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/bucket-does-not-exist/test.099.ddl.sqlpp
new file mode 100644
index 0000000..548e632
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/bucket-does-not-exist/test.099.ddl.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse test if exists;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/definition-does-not-exist/test.000.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/definition-does-not-exist/test.000.ddl.sqlpp
new file mode 100644
index 0000000..7a86e69
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/definition-does-not-exist/test.000.ddl.sqlpp
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use test;
+
+drop type test if exists;
+create type test as open {
+};
+
+drop dataset test if exists;
+create external dataset test(test) using S3 (
+("accessKeyId"="dummyAccessKey"),
+("secretAccessKey"="dummySecretKey"),
+("region"="us-west-2"),
+("serviceEndpoint"="http://localhost:8001"),
+("container"="playground"),
+("definition"="definition-does-not-exist"),
+("format"="json")
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/definition-does-not-exist/test.001.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/definition-does-not-exist/test.001.query.sqlpp
new file mode 100644
index 0000000..affdb87
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/definition-does-not-exist/test.001.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+select count(*) `count` from test;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/definition-does-not-exist/test.099.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/definition-does-not-exist/test.099.ddl.sqlpp
new file mode 100644
index 0000000..548e632
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/definition-does-not-exist/test.099.ddl.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse test if exists;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/invalid-endpoint/test.000.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/invalid-endpoint/test.000.ddl.sqlpp
new file mode 100644
index 0000000..da2b945
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/invalid-endpoint/test.000.ddl.sqlpp
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use test;
+
+drop type test if exists;
+create type test as open {
+};
+
+drop dataset test if exists;
+create external dataset test(test) using S3 (
+("accessKeyId"="dummyAccessKey"),
+("secretAccessKey"="dummySecretKey"),
+("region"="us-west-2"),
+("serviceEndpoint"="^invalid-endpoint^"),
+("container"="bucket-does-not-exist"),
+("definition"="over-1000-objects"),
+("format"="json")
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/invalid-endpoint/test.099.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/invalid-endpoint/test.099.ddl.sqlpp
new file mode 100644
index 0000000..548e632
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/invalid-endpoint/test.099.ddl.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse test if exists;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/definition-does-not-exist/result.001.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/definition-does-not-exist/result.001.adm
new file mode 100644
index 0000000..c1a0ea2
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/definition-does-not-exist/result.001.adm
@@ -0,0 +1 @@
+{ "count": 0 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset.xml
index b0346f8..7c1aa48 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset.xml
@@ -82,5 +82,22 @@
<expected-error>Parsing error at malformed-data/malformed-jsonl-2.json line 11 field array_f: Unexpected character (']' (code 93)): expected a valid value (JSON String, Number, Array, Object or token 'null', 'true' or 'false')</expected-error>
</compilation-unit>
</test-case>
+ <test-case FilePath="external-dataset">
+ <compilation-unit name="aws/s3/definition-does-not-exist">
+ <output-dir compare="Text">aws/s3/definition-does-not-exist</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset">
+ <compilation-unit name="aws/s3/invalid-endpoint">
+ <output-dir compare="Text">aws/s3/invalid-endpoint</output-dir>
+ <expected-error>External source error. Invalid service endpoint ^invalid-endpoint^</expected-error>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset">
+ <compilation-unit name="aws/s3/bucket-does-not-exist">
+ <output-dir compare="Text">aws/s3/bucket-does-not-exist</output-dir>
+ <expected-error>External source error. The specified bucket does not exist (Service: S3, Status Code: 404, Request ID: null)</expected-error>
+ </compilation-unit>
+ </test-case>
</test-group>
</test-suite>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 37043b2..6496c94 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -196,6 +196,8 @@
public static final int OPERATION_NOT_SUPPORTED_ON_PRIMARY_INDEX = 1105;
public static final int EXPECTED_CONSTANT_VALUE = 1106;
public static final int UNEXPECTED_HINT = 1107;
+ public static final int EXTERNAL_SOURCE_ERROR = 1108;
+ public static final int EXTERNAL_SOURCE_CONTAINER_NOT_FOUND = 1109;
// Feed errors
public static final int DATAFLOW_ILLEGAL_STATE = 3001;
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index d3a2215..7e75a51 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -191,6 +191,8 @@
1105 = Operation not supported on primary index %1$s
1106 = Expected constant value
1107 = Unexpected hint: %1$s. %2$s expected at this location
+1108 = External source error. %1$s
+1109 = External source container %1$s not found
# Feed Errors
3001 = Illegal state.
diff --git a/asterixdb/asterix-external-data/pom.xml b/asterixdb/asterix-external-data/pom.xml
index 30e7770..8270d71 100644
--- a/asterixdb/asterix-external-data/pom.xml
+++ b/asterixdb/asterix-external-data/pom.xml
@@ -437,6 +437,14 @@
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
+ <artifactId>http-client-spi</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>sdk-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
</dependency>
<dependency>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
index 448d3f5..bcbf540 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
@@ -18,23 +18,24 @@
*/
package org.apache.asterix.external.input.record.reader.aws;
-import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3Constants;
+import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3;
import java.io.IOException;
-import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.zip.GZIPInputStream;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.external.input.stream.AbstractMultipleInputStream;
import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.util.CleanupUtils;
-import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
-import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
-import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.s3.S3Client;
-import software.amazon.awssdk.services.s3.S3ClientBuilder;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
public class AwsS3InputStream extends AbstractMultipleInputStream {
@@ -48,7 +49,7 @@
private final List<String> filePaths;
private int nextFileIndex = 0;
- public AwsS3InputStream(Map<String, String> configuration, List<String> filePaths) {
+ public AwsS3InputStream(Map<String, String> configuration, List<String> filePaths) throws HyracksDataException {
this.configuration = configuration;
this.filePaths = filePaths;
this.s3Client = buildAwsS3Client(configuration);
@@ -74,13 +75,17 @@
CleanupUtils.close(in, null);
}
- String bucket = configuration.get(AwsS3Constants.CONTAINER_NAME_FIELD_NAME);
+ String bucket = configuration.get(AwsS3.CONTAINER_NAME_FIELD_NAME);
GetObjectRequest.Builder getObjectBuilder = GetObjectRequest.builder();
GetObjectRequest getObjectRequest = getObjectBuilder.bucket(bucket).key(filePaths.get(nextFileIndex)).build();
// Have a reference to the S3 stream to ensure that if GZipInputStream causes an IOException because of reading
// the header, then the S3 stream gets closed in the close method
- in = s3Client.getObject(getObjectRequest);
+ try {
+ in = s3Client.getObject(getObjectRequest);
+ } catch (SdkException ex) {
+ throw new RuntimeDataException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex.getMessage());
+ }
// Use gzip stream if needed
String filename = filePaths.get(nextFileIndex).toLowerCase();
@@ -96,6 +101,14 @@
return true;
}
+ private S3Client buildAwsS3Client(Map<String, String> configuration) throws HyracksDataException {
+ try {
+ return ExternalDataUtils.AwsS3.buildAwsS3Client(configuration);
+ } catch (CompilationException ex) {
+ throw HyracksDataException.create(ex);
+ }
+ }
+
@Override
public boolean stop() {
return false;
@@ -126,33 +139,4 @@
private String getStreamNameAt(int fileIndex) {
return fileIndex < 0 || filePaths == null || filePaths.isEmpty() ? "" : filePaths.get(fileIndex);
}
-
- /**
- * Prepares and builds the Amazon S3 client with the provided configuration
- *
- * @param configuration S3 client configuration
- *
- * @return Amazon S3 client
- */
- private static S3Client buildAwsS3Client(Map<String, String> configuration) {
- S3ClientBuilder builder = S3Client.builder();
-
- // Credentials
- String accessKeyId = configuration.get(AwsS3Constants.ACCESS_KEY_ID_FIELD_NAME);
- String secretAccessKey = configuration.get(AwsS3Constants.SECRET_ACCESS_KEY_FIELD_NAME);
- AwsBasicCredentials credentials = AwsBasicCredentials.create(accessKeyId, secretAccessKey);
- builder.credentialsProvider(StaticCredentialsProvider.create(credentials));
-
- // Region
- String region = configuration.get(AwsS3Constants.REGION_FIELD_NAME);
- builder.region(Region.of(region));
-
- // Use user's endpoint if provided
- if (configuration.get(AwsS3Constants.SERVICE_END_POINT_FIELD_NAME) != null) {
- String endPoint = configuration.get(AwsS3Constants.SERVICE_END_POINT_FIELD_NAME);
- builder.endpointOverride(URI.create(endPoint));
- }
-
- return builder.build();
- }
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
index 9158a57..e57b31a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
@@ -18,30 +18,30 @@
*/
package org.apache.asterix.external.input.record.reader.aws;
-import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3Constants;
+import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3;
import java.io.Serializable;
-import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.api.IInputStreamFactory;
import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.application.IServiceContext;
import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.CleanupUtils;
-import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
-import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
-import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.s3.S3Client;
-import software.amazon.awssdk.services.s3.S3ClientBuilder;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.services.s3.model.S3Object;
@@ -52,7 +52,8 @@
private Map<String, String> configuration;
// Files to read from
- private List<PartitionWorkLoadBasedOnSize> partitionWorkLoadsBasedOnSize = new ArrayList<>();
+ private final List<S3Object> filesOnly = new ArrayList<>();
+ private final List<PartitionWorkLoadBasedOnSize> partitionWorkLoadsBasedOnSize = new ArrayList<>();
private transient AlgebricksAbsolutePartitionConstraint partitionConstraint;
@@ -67,7 +68,7 @@
}
@Override
- public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition) {
+ public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition) throws HyracksDataException {
return new AwsS3InputStream(configuration, partitionWorkLoadsBasedOnSize.get(partition).getFilePaths());
}
@@ -81,51 +82,57 @@
this.configuration = configuration;
ICcApplicationContext ccApplicationContext = (ICcApplicationContext) ctx.getApplicationContext();
- String container = configuration.get(AwsS3Constants.CONTAINER_NAME_FIELD_NAME);
+ String container = configuration.get(AwsS3.CONTAINER_NAME_FIELD_NAME);
- S3Client s3Client = buildAwsS3Client(configuration);
+ S3Client s3Client = ExternalDataUtils.AwsS3.buildAwsS3Client(configuration);
// Get all objects in a bucket and extract the paths to files
ListObjectsV2Request.Builder listObjectsBuilder = ListObjectsV2Request.builder().bucket(container);
- String path = configuration.get(AwsS3Constants.DEFINITION_FIELD_NAME);
+ String path = configuration.get(AwsS3.DEFINITION_FIELD_NAME);
if (path != null) {
listObjectsBuilder.prefix(path + (!path.isEmpty() && !path.endsWith("/") ? "/" : ""));
}
ListObjectsV2Response listObjectsResponse;
- List<S3Object> s3Objects = new ArrayList<>();
boolean done = false;
String newMarker = null;
- while (!done) {
- // List the objects from the start, or from the last marker in case of truncated result
- if (newMarker == null) {
- listObjectsResponse = s3Client.listObjectsV2(listObjectsBuilder.build());
- } else {
- listObjectsResponse = s3Client.listObjectsV2(listObjectsBuilder.continuationToken(newMarker).build());
+ String fileFormat = configuration.get(ExternalDataConstants.KEY_FORMAT);
+
+ try {
+ while (!done) {
+ // List the objects from the start, or from the last marker in case of truncated result
+ if (newMarker == null) {
+ listObjectsResponse = s3Client.listObjectsV2(listObjectsBuilder.build());
+ } else {
+ listObjectsResponse =
+ s3Client.listObjectsV2(listObjectsBuilder.continuationToken(newMarker).build());
+ }
+
+ // Collect the paths to files only
+ collectFilesOnly(listObjectsResponse.contents(), fileFormat);
+
+ // Mark the flag as done if done, otherwise, get the marker of the previous response for the next request
+ if (!listObjectsResponse.isTruncated()) {
+ done = true;
+ } else {
+ newMarker = listObjectsResponse.nextContinuationToken();
+ }
}
-
- // Collect all the provided objects
- s3Objects.addAll(listObjectsResponse.contents());
-
- // Mark the flag as done if done, otherwise, get the marker of the previous response for the next request
- if (!listObjectsResponse.isTruncated()) {
- done = true;
- } else {
- newMarker = listObjectsResponse.nextContinuationToken();
+ } catch (SdkException ex) {
+ throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex.getMessage());
+ } finally {
+ if (s3Client != null) {
+ CleanupUtils.close(s3Client, null);
}
}
- // Exclude the directories and get the files only
- String fileFormat = configuration.get(ExternalDataConstants.KEY_FORMAT);
- List<S3Object> fileObjects = getFilesOnly(s3Objects, fileFormat);
-
// Partition constraints
partitionConstraint = ccApplicationContext.getClusterStateManager().getClusterLocations();
int partitionsCount = partitionConstraint.getLocations().length;
// Distribute work load amongst the partitions
- distributeWorkLoad(fileObjects, partitionsCount);
+ distributeWorkLoad(filesOnly, partitionsCount);
}
/**
@@ -133,21 +140,17 @@
* a file if it does not end up with a "/" which is the separator in a folder structure.
*
* @param s3Objects List of returned objects
- *
- * @return A list of string paths that point to files only
+ * @param fileFormat The expected file format
*
* @throws AsterixException AsterixException
*/
- private List<S3Object> getFilesOnly(List<S3Object> s3Objects, String fileFormat) throws AsterixException {
- List<S3Object> filesOnly = new ArrayList<>();
+ private void collectFilesOnly(List<S3Object> s3Objects, String fileFormat) throws AsterixException {
String fileExtension = getFileExtension(fileFormat);
if (fileExtension == null) {
throw AsterixException.create(ErrorCode.PROVIDER_STREAM_RECORD_READER_UNKNOWN_FORMAT, fileFormat);
}
s3Objects.stream().filter(object -> isValidFile(object.key(), fileFormat)).forEach(filesOnly::add);
-
- return filesOnly;
}
/**
@@ -214,35 +217,6 @@
}
/**
- * Prepares and builds the Amazon S3 client with the provided configuration
- *
- * @param configuration S3 client configuration
- *
- * @return Amazon S3 client
- */
- private static S3Client buildAwsS3Client(Map<String, String> configuration) {
- S3ClientBuilder builder = S3Client.builder();
-
- // Credentials
- String accessKeyId = configuration.get(AwsS3Constants.ACCESS_KEY_ID_FIELD_NAME);
- String secretAccessKey = configuration.get(AwsS3Constants.SECRET_ACCESS_KEY_FIELD_NAME);
- AwsBasicCredentials credentials = AwsBasicCredentials.create(accessKeyId, secretAccessKey);
- builder.credentialsProvider(StaticCredentialsProvider.create(credentials));
-
- // Region
- String region = configuration.get(AwsS3Constants.REGION_FIELD_NAME);
- builder.region(Region.of(region));
-
- // Use user's endpoint if provided
- if (configuration.get(AwsS3Constants.SERVICE_END_POINT_FIELD_NAME) != null) {
- String endPoint = configuration.get(AwsS3Constants.SERVICE_END_POINT_FIELD_NAME);
- builder.endpointOverride(URI.create(endPoint));
- }
-
- return builder.build();
- }
-
- /**
* Returns the file extension for the provided file format.
*
* @param format file format
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 63f57b6..e93c3b9 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -82,7 +82,7 @@
public static final String KEY_WAIT_FOR_DATA = "wait-for-data";
public static final String KEY_FEED_NAME = "feed";
// a string representing external bucket name
- public static final String KEY_BUCKET = "bucket";
+ public static final String KEY_EXTERNAL_SOURCE_TYPE = "type";
// a comma delimited list of nodes
public static final String KEY_NODES = "nodes";
// a string representing the password used to authenticate with the external data source
@@ -276,8 +276,9 @@
public static final String EMPTY_FIELD = "empty value";
public static final String INVALID_VAL = "invalid value";
- public static class AwsS3Constants {
- private AwsS3Constants() {
+ public static class AwsS3 {
+ private AwsS3() {
+ throw new AssertionError("do not instantiate");
}
public static final String REGION_FIELD_NAME = "region";
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 2ae2838..1501287 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -24,10 +24,15 @@
import static org.apache.asterix.external.util.ExternalDataConstants.KEY_RECORD_END;
import static org.apache.asterix.external.util.ExternalDataConstants.KEY_RECORD_START;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.util.EnumMap;
+import java.util.List;
import java.util.Map;
+import java.util.Optional;
import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.common.library.ILibraryManager;
@@ -40,6 +45,7 @@
import org.apache.asterix.om.types.AUnionType;
import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.CleanupUtils;
import org.apache.hyracks.dataflow.common.data.parsers.BooleanParserFactory;
import org.apache.hyracks.dataflow.common.data.parsers.DoubleParserFactory;
import org.apache.hyracks.dataflow.common.data.parsers.FloatParserFactory;
@@ -48,6 +54,15 @@
import org.apache.hyracks.dataflow.common.data.parsers.LongParserFactory;
import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.core.exception.SdkException;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.S3ClientBuilder;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
+
public class ExternalDataUtils {
private static final Map<ATypeTag, IValueParserFactory> valueParserFactoryMap = new EnumMap<>(ATypeTag.class);
@@ -436,4 +451,101 @@
configuration.put(key, paramValue.toLowerCase().trim());
}
}
+
+ /**
+ * Ensures that the external source container is present
+ *
+ * @param configuration external source properties
+ */
+ public static void validateExternalSourceContainer(Map<String, String> configuration) throws CompilationException {
+ String type = configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE);
+
+ switch (type) {
+ case ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3:
+ ExternalDataUtils.AwsS3.validateExternalSourceContainer(configuration);
+ break;
+ default:
+ // Nothing needs to be done
+ break;
+ }
+ }
+
+ public static class AwsS3 {
+ private AwsS3() {
+ throw new AssertionError("do not instantiate");
+ }
+
+ public static S3Client buildAwsS3Client(Map<String, String> configuration) throws CompilationException {
+ // TODO(Hussain): Need to ensure that all required parameters are present in a previous step
+ String accessKeyId = configuration.get(ExternalDataConstants.AwsS3.ACCESS_KEY_ID_FIELD_NAME);
+ String secretAccessKey = configuration.get(ExternalDataConstants.AwsS3.SECRET_ACCESS_KEY_FIELD_NAME);
+ String regionId = configuration.get(ExternalDataConstants.AwsS3.REGION_FIELD_NAME);
+ String serviceEndpoint = configuration.get(ExternalDataConstants.AwsS3.SERVICE_END_POINT_FIELD_NAME);
+
+ S3ClientBuilder builder = S3Client.builder();
+
+ // Credentials
+ AwsBasicCredentials credentials = AwsBasicCredentials.create(accessKeyId, secretAccessKey);
+ builder.credentialsProvider(StaticCredentialsProvider.create(credentials));
+
+ // Validate the region
+ List<Region> supportedRegions = S3Client.serviceMetadata().regions();
+ Optional<Region> selectedRegion =
+ supportedRegions.stream().filter(region -> region.id().equalsIgnoreCase(regionId)).findFirst();
+
+ if (!selectedRegion.isPresent()) {
+ throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR,
+ String.format("region %s is not supported", regionId));
+ }
+ builder.region(selectedRegion.get());
+
+ // Validate the service endpoint if present
+ if (serviceEndpoint != null) {
+ try {
+ URI uri = new URI(serviceEndpoint);
+ try {
+ builder.endpointOverride(uri);
+ } catch (NullPointerException ex) {
+ throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex.getMessage());
+ }
+ } catch (URISyntaxException ex) {
+ throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR,
+ String.format("Invalid service endpoint %s", serviceEndpoint));
+ }
+ }
+
+ return builder.build();
+ }
+
+ /**
+ * Validates if the container being used is available or not.
+ *
+ * @param configuration external datasource configuration
+ *
+ * @throws CompilationException Compilation exception
+ */
+ public static void validateExternalSourceContainer(Map<String, String> configuration)
+ throws CompilationException {
+ S3Client s3Client = null;
+
+ try {
+ String container = configuration.get(ExternalDataConstants.AwsS3.CONTAINER_NAME_FIELD_NAME);
+ s3Client = buildAwsS3Client(configuration);
+ ListObjectsV2Response response =
+ s3Client.listObjectsV2(ListObjectsV2Request.builder().bucket(container).maxKeys(1).build());
+
+ // Returns 200 only in case the bucket exists, however, otherwise, throws an exception. However, to
+ // ensure coverage, check if the result is successful as well and not only catch exceptions
+ if (!response.sdkHttpResponse().isSuccessful()) {
+ throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_CONTAINER_NOT_FOUND, container);
+ }
+ } catch (SdkException ex) {
+ throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex.getMessage());
+ } finally {
+ if (s3Client != null) {
+ CleanupUtils.close(s3Client, null);
+ }
+ }
+ }
+ }
}
diff --git a/asterixdb/pom.xml b/asterixdb/pom.xml
index cd5e060..9daf70f 100644
--- a/asterixdb/pom.xml
+++ b/asterixdb/pom.xml
@@ -1432,6 +1432,11 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>http-client-spi</artifactId>
+ <version>${awsjavasdk.version}</version>
+ </dependency>
<!-- Mock for AWS S3 -->
<dependency>
<groupId>io.findify</groupId>