Merge commit 'a548307b9f' from 'gerrit/mad-hatter'
Change-Id: I57eb5d96af8f227e5a9550381418f9468eb400d8
diff --git a/asterixdb/asterix-app/data/json/malformed-json-2.json b/asterixdb/asterix-app/data/json/malformed-json-2.json
new file mode 100644
index 0000000..88ef9f2
--- /dev/null
+++ b/asterixdb/asterix-app/data/json/malformed-json-2.json
@@ -0,0 +1,7 @@
+{"field": 1,
+ "field2": {
+ "nested1": 9,
+ "array_f": [1, 2,],
+ "nested2": 10
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/data/jsonl/malformed-jsonl-1.json b/asterixdb/asterix-app/data/jsonl/malformed-jsonl-1.json
new file mode 100644
index 0000000..7d15a9c
--- /dev/null
+++ b/asterixdb/asterix-app/data/jsonl/malformed-jsonl-1.json
@@ -0,0 +1,4 @@
+{"field": 1, "field2": true}
+{"field": 2, "field2": false}
+{"field": 3, "field2": truee}
+{"field": 4, "field2": true}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/data/jsonl/malformed-jsonl-2.json b/asterixdb/asterix-app/data/jsonl/malformed-jsonl-2.json
new file mode 100644
index 0000000..ebd3538
--- /dev/null
+++ b/asterixdb/asterix-app/data/jsonl/malformed-jsonl-2.json
@@ -0,0 +1,14 @@
+{"field": 1,
+ "field2": {
+ "nested1": 8,
+ "array_f": [1, 2],
+ "nested2": 9
+ }
+}
+{"field": 2,
+ "field2": {
+ "nested1": 88,
+ "array_f": [11, 22, ],
+ "nested2": 99
+ }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml
index a26a335..9342afd 100644
--- a/asterixdb/asterix-app/pom.xml
+++ b/asterixdb/asterix-app/pom.xml
@@ -836,22 +836,18 @@
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sdk-core</artifactId>
- <scope>test</scope>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
- <scope>test</scope>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>regions</artifactId>
- <scope>test</scope>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>auth</artifactId>
- <scope>test</scope>
</dependency>
<!-- Mock for AWS S3 -->
<dependency>
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index d40957f..1e4e15b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -18,8 +18,6 @@
*/
package org.apache.asterix.app.translator;
-import static org.apache.asterix.common.exceptions.ErrorCode.UNKNOWN_DATAVERSE;
-
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
@@ -679,7 +677,7 @@
throw new CompilationException(ErrorCode.UNKNOWN_DATAVERSE, sourceLoc, dataverseName);
}
- IDatasetDetails datasetDetails = null;
+ IDatasetDetails datasetDetails;
Dataset ds = metadataProvider.findDataset(dataverseName, datasetName);
if (ds != null) {
if (dd.getIfNotExists()) {
@@ -781,7 +779,7 @@
createExternalDatasetProperties(dataverseName, dd, metadataProvider, mdTxnCtx);
ExternalDataUtils.normalize(properties);
ExternalDataUtils.validate(properties);
- validateExternalDatasetProperties(externalDetails, properties, dd.getSourceLocation());
+ validateExternalDatasetProperties(externalDetails, properties, dd.getSourceLocation(), mdTxnCtx);
datasetDetails = new ExternalDatasetDetails(externalDetails.getAdapter(), properties, new Date(),
TransactionState.COMMIT);
break;
@@ -3481,12 +3479,26 @@
}
protected void validateExternalDatasetProperties(ExternalDetailsDecl externalDetails,
- Map<String, String> properties, SourceLocation srcLoc) throws CompilationException {
+ Map<String, String> properties, SourceLocation srcLoc, MetadataTransactionContext mdTxnCtx)
+ throws AlgebricksException, HyracksDataException {
String adapter = externalDetails.getAdapter();
// "format" parameter is needed for "S3" data source
if (ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3.equals(adapter)
&& properties.get(ExternalDataConstants.KEY_FORMAT) == null) {
throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, srcLoc, ExternalDataConstants.KEY_FORMAT);
}
+
+ Map<String, String> details = new HashMap<>(properties);
+ details.put(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE, adapter);
+ validateExternalSourceContainer(details);
+ }
+
+ /**
+ * Ensures that the external source container is present
+ *
+ * @param configuration external source properties
+ */
+ protected void validateExternalSourceContainer(Map<String, String> configuration) throws CompilationException {
+ ExternalDataUtils.validateExternalSourceContainer(configuration);
}
}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/bucket-does-not-exist/test.000.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/bucket-does-not-exist/test.000.ddl.sqlpp
new file mode 100644
index 0000000..b174162
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/bucket-does-not-exist/test.000.ddl.sqlpp
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use test;
+
+drop type test if exists;
+create type test as open {
+};
+
+drop dataset test if exists;
+create external dataset test(test) using S3 (
+("accessKeyId"="dummyAccessKey"),
+("secretAccessKey"="dummySecretKey"),
+("region"="us-west-2"),
+("serviceEndpoint"="http://localhost:8001"),
+("container"="bucket-does-not-exist"),
+("definition"="over-1000-objects"),
+("format"="json")
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/bucket-does-not-exist/test.099.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/bucket-does-not-exist/test.099.ddl.sqlpp
new file mode 100644
index 0000000..548e632
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/bucket-does-not-exist/test.099.ddl.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse test if exists;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/definition-does-not-exist/test.000.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/definition-does-not-exist/test.000.ddl.sqlpp
new file mode 100644
index 0000000..7a86e69
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/definition-does-not-exist/test.000.ddl.sqlpp
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use test;
+
+drop type test if exists;
+create type test as open {
+};
+
+drop dataset test if exists;
+create external dataset test(test) using S3 (
+("accessKeyId"="dummyAccessKey"),
+("secretAccessKey"="dummySecretKey"),
+("region"="us-west-2"),
+("serviceEndpoint"="http://localhost:8001"),
+("container"="playground"),
+("definition"="definition-does-not-exist"),
+("format"="json")
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/definition-does-not-exist/test.001.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/definition-does-not-exist/test.001.query.sqlpp
new file mode 100644
index 0000000..affdb87
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/definition-does-not-exist/test.001.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+select count(*) `count` from test;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/definition-does-not-exist/test.099.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/definition-does-not-exist/test.099.ddl.sqlpp
new file mode 100644
index 0000000..548e632
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/definition-does-not-exist/test.099.ddl.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse test if exists;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/invalid-endpoint/test.000.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/invalid-endpoint/test.000.ddl.sqlpp
new file mode 100644
index 0000000..da2b945
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/invalid-endpoint/test.000.ddl.sqlpp
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use test;
+
+drop type test if exists;
+create type test as open {
+};
+
+drop dataset test if exists;
+create external dataset test(test) using S3 (
+("accessKeyId"="dummyAccessKey"),
+("secretAccessKey"="dummySecretKey"),
+("region"="us-west-2"),
+("serviceEndpoint"="^invalid-endpoint^"),
+("container"="bucket-does-not-exist"),
+("definition"="over-1000-objects"),
+("format"="json")
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/invalid-endpoint/test.099.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/invalid-endpoint/test.099.ddl.sqlpp
new file mode 100644
index 0000000..548e632
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/invalid-endpoint/test.099.ddl.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse test if exists;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/malformed-json/malformed-json.000.s3bucket.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/malformed-json/malformed-json.000.s3bucket.sqlpp
new file mode 100644
index 0000000..edae2a6
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/malformed-json/malformed-json.000.s3bucket.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+// create S3 bucket with data
+playground malformed-data data/json/duplicate-fields.json
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/malformed-json/malformed-json.001.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/malformed-json/malformed-json.001.ddl.sqlpp
new file mode 100644
index 0000000..6ee0509
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/malformed-json/malformed-json.001.ddl.sqlpp
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+USE test;
+
+DROP TYPE t1 IF EXISTS;
+CREATE TYPE t1 AS {};
+
+DROP DATASET ds1 IF EXISTS;
+CREATE EXTERNAL DATASET ds1(t1) USING S3 (
+("accessKeyId"="dummyAccessKey"),
+("secretAccessKey"="dummySecretKey"),
+("region"="us-west-2"),
+("serviceEndpoint"="http://localhost:8001"),
+("container"="playground"),
+("definition"="malformed-data"),
+("format"="json")
+);
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/malformed-json/malformed-json.002.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/malformed-json/malformed-json.002.query.sqlpp
new file mode 100644
index 0000000..3dedd2f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/malformed-json/malformed-json.002.query.sqlpp
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+USE test;
+
+FROM ds1 v SELECT VALUE v;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/malformed-json/malformed-json.003.s3bucket.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/malformed-json/malformed-json.003.s3bucket.sqlpp
new file mode 100644
index 0000000..ea73e7e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/malformed-json/malformed-json.003.s3bucket.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+// create S3 bucket with data
+playground malformed-data data/json/malformed-json.json
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/malformed-json/malformed-json.004.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/malformed-json/malformed-json.004.query.sqlpp
new file mode 100644
index 0000000..3dedd2f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/malformed-json/malformed-json.004.query.sqlpp
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+USE test;
+
+FROM ds1 v SELECT VALUE v;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/malformed-json/malformed-json.005.s3bucket.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/malformed-json/malformed-json.005.s3bucket.sqlpp
new file mode 100644
index 0000000..ef6d8df
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/malformed-json/malformed-json.005.s3bucket.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+// create S3 bucket with data
+playground malformed-data data/json/malformed-json-2.json
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/malformed-json/malformed-json.006.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/malformed-json/malformed-json.006.query.sqlpp
new file mode 100644
index 0000000..3dedd2f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/malformed-json/malformed-json.006.query.sqlpp
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+USE test;
+
+FROM ds1 v SELECT VALUE v;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/malformed-json/malformed-json.007.s3bucket.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/malformed-json/malformed-json.007.s3bucket.sqlpp
new file mode 100644
index 0000000..3c84eda
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/malformed-json/malformed-json.007.s3bucket.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+// create S3 bucket with data
+playground malformed-data data/jsonl/malformed-jsonl-1.json
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/malformed-json/malformed-json.008.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/malformed-json/malformed-json.008.query.sqlpp
new file mode 100644
index 0000000..3dedd2f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/malformed-json/malformed-json.008.query.sqlpp
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+USE test;
+
+FROM ds1 v SELECT VALUE v;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/malformed-json/malformed-json.009.s3bucket.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/malformed-json/malformed-json.009.s3bucket.sqlpp
new file mode 100644
index 0000000..25f0c8c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/malformed-json/malformed-json.009.s3bucket.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+// create S3 bucket with data
+playground malformed-data data/jsonl/malformed-jsonl-2.json
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/malformed-json/malformed-json.010.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/malformed-json/malformed-json.010.query.sqlpp
new file mode 100644
index 0000000..3dedd2f
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/malformed-json/malformed-json.010.query.sqlpp
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+USE test;
+
+FROM ds1 v SELECT VALUE v;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/malformed-json/malformed-json.099.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/malformed-json/malformed-json.099.ddl.sqlpp
new file mode 100644
index 0000000..36b2bab
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/malformed-json/malformed-json.099.ddl.sqlpp
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATAVERSE test IF EXISTS;
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/definition-does-not-exist/result.001.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/definition-does-not-exist/result.001.adm
new file mode 100644
index 0000000..c1a0ea2
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/definition-does-not-exist/result.001.adm
@@ -0,0 +1 @@
+{ "count": 0 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset.xml
index 5aa1326..7c1aa48 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset.xml
@@ -72,5 +72,32 @@
<output-dir compare="Text">aws/s3/over-1000-objects</output-dir>
</compilation-unit>
</test-case>
+ <test-case FilePath="external-dataset">
+ <compilation-unit name="aws/s3/malformed-json">
+ <output-dir compare="Text">aws/s3/malformed-json</output-dir>
+ <expected-error>Parsing error at malformed-data/duplicate-fields.json line 1 field field: Duplicate field 'field'</expected-error>
+ <expected-error>Parsing error at malformed-data/malformed-json.json line 1 field field: Unexpected character ('}' (code 125)): was expecting double-quote to start field name</expected-error>
+ <expected-error>Parsing error at malformed-data/malformed-json-2.json line 4 field array_f: Unexpected character (']' (code 93)): expected a valid value (JSON String, Number, Array, Object or token 'null', 'true' or 'false')</expected-error>
+ <expected-error>Parsing error at malformed-data/malformed-jsonl-1.json line 3 field field2: Unrecognized token 'truee': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')</expected-error>
+ <expected-error>Parsing error at malformed-data/malformed-jsonl-2.json line 11 field array_f: Unexpected character (']' (code 93)): expected a valid value (JSON String, Number, Array, Object or token 'null', 'true' or 'false')</expected-error>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset">
+ <compilation-unit name="aws/s3/definition-does-not-exist">
+ <output-dir compare="Text">aws/s3/definition-does-not-exist</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset">
+ <compilation-unit name="aws/s3/invalid-endpoint">
+ <output-dir compare="Text">aws/s3/invalid-endpoint</output-dir>
+ <expected-error>External source error. Invalid service endpoint ^invalid-endpoint^</expected-error>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset">
+ <compilation-unit name="aws/s3/bucket-does-not-exist">
+ <output-dir compare="Text">aws/s3/bucket-does-not-exist</output-dir>
+ <expected-error>External source error. The specified bucket does not exist (Service: S3, Status Code: 404, Request ID: null)</expected-error>
+ </compilation-unit>
+ </test-case>
</test-group>
</test-suite>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 2663453..102b56a 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -5595,14 +5595,14 @@
<test-case FilePath="group-by">
<compilation-unit name="grouping-sets-3-negative">
<output-dir compare="Text">grouping-sets-2</output-dir>
- <expected-error>ASX1113: Unexpected alias: v21</expected-error>
- <expected-error>ASX1113: Unexpected alias: v22</expected-error>
- <expected-error>ASX1113: Unexpected alias: v23</expected-error>
+ <expected-error>ASX1115: Unexpected alias: v21</expected-error>
+ <expected-error>ASX1115: Unexpected alias: v22</expected-error>
+ <expected-error>ASX1115: Unexpected alias: v23</expected-error>
<expected-error>ASX1087: Invalid number of arguments for function grouping</expected-error>
- <expected-error>ASX1112: Invalid argument to grouping() function</expected-error>
- <expected-error>ASX1112: Invalid argument to grouping() function</expected-error>
- <expected-error>ASX1112: Invalid argument to grouping() function</expected-error>
- <expected-error>ASX1111: Too many grouping sets in group by clause: 512. Maximum allowed: 128.</expected-error>
+ <expected-error>ASX1114: Invalid argument to grouping() function</expected-error>
+ <expected-error>ASX1114: Invalid argument to grouping() function</expected-error>
+ <expected-error>ASX1114: Invalid argument to grouping() function</expected-error>
+ <expected-error>ASX1113: Too many grouping sets in group by clause: 512. Maximum allowed: 128.</expected-error>
</compilation-unit>
</test-case>
<test-case FilePath="group-by">
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index f689b5a..97d653d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -197,12 +197,14 @@
public static final int OPERATION_NOT_SUPPORTED_ON_PRIMARY_INDEX = 1105;
public static final int EXPECTED_CONSTANT_VALUE = 1106;
public static final int UNEXPECTED_HINT = 1107;
- public static final int SYNONYM_EXISTS = 1108;
- public static final int UNKNOWN_SYNONYM = 1109;
- public static final int UNKNOWN_LIBRARY = 1110;
- public static final int COMPILATION_GROUPING_SETS_OVERFLOW = 1111;
- public static final int COMPILATION_GROUPING_OPERATION_INVALID_ARG = 1112;
- public static final int COMPILATION_UNEXPECTED_ALIAS = 1113;
+ public static final int EXTERNAL_SOURCE_ERROR = 1108;
+ public static final int EXTERNAL_SOURCE_CONTAINER_NOT_FOUND = 1109;
+ public static final int SYNONYM_EXISTS = 1110;
+ public static final int UNKNOWN_SYNONYM = 1111;
+ public static final int UNKNOWN_LIBRARY = 1112;
+ public static final int COMPILATION_GROUPING_SETS_OVERFLOW = 1113;
+ public static final int COMPILATION_GROUPING_OPERATION_INVALID_ARG = 1114;
+ public static final int COMPILATION_UNEXPECTED_ALIAS = 1115;
// Feed errors
public static final int DATAFLOW_ILLEGAL_STATE = 3001;
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 033f377..2de5f48 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -192,12 +192,14 @@
1105 = Operation not supported on primary index %1$s
1106 = Expected constant value
1107 = Unexpected hint: %1$s. %2$s expected at this location
-1108 = A synonym with this name %1$s already exists
-1109 = Cannot find synonym with name %1$s
-1110 = Unknown library %1$s
-1111 = Too many grouping sets in group by clause: %1$s. Maximum allowed: %2$s.
-1112 = Invalid argument to grouping() function
-1113 = Unexpected alias: %1$s
+1108 = External source error. %1$s
+1109 = External source container %1$s not found
+1110 = A synonym with this name %1$s already exists
+1111 = Cannot find synonym with name %1$s
+1112 = Unknown library %1$s
+1113 = Too many grouping sets in group by clause: %1$s. Maximum allowed: %2$s.
+1114 = Invalid argument to grouping() function
+1115 = Unexpected alias: %1$s
# Feed Errors
3001 = Illegal state.
diff --git a/asterixdb/asterix-external-data/pom.xml b/asterixdb/asterix-external-data/pom.xml
index f6ebdba..3f1b434 100644
--- a/asterixdb/asterix-external-data/pom.xml
+++ b/asterixdb/asterix-external-data/pom.xml
@@ -458,6 +458,14 @@
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
+ <artifactId>http-client-spi</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>sdk-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
</dependency>
<dependency>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
index 448d3f5..bcbf540 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
@@ -18,23 +18,24 @@
*/
package org.apache.asterix.external.input.record.reader.aws;
-import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3Constants;
+import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3;
import java.io.IOException;
-import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.zip.GZIPInputStream;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.external.input.stream.AbstractMultipleInputStream;
import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.util.CleanupUtils;
-import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
-import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
-import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.s3.S3Client;
-import software.amazon.awssdk.services.s3.S3ClientBuilder;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
public class AwsS3InputStream extends AbstractMultipleInputStream {
@@ -48,7 +49,7 @@
private final List<String> filePaths;
private int nextFileIndex = 0;
- public AwsS3InputStream(Map<String, String> configuration, List<String> filePaths) {
+ public AwsS3InputStream(Map<String, String> configuration, List<String> filePaths) throws HyracksDataException {
this.configuration = configuration;
this.filePaths = filePaths;
this.s3Client = buildAwsS3Client(configuration);
@@ -74,13 +75,17 @@
CleanupUtils.close(in, null);
}
- String bucket = configuration.get(AwsS3Constants.CONTAINER_NAME_FIELD_NAME);
+ String bucket = configuration.get(AwsS3.CONTAINER_NAME_FIELD_NAME);
GetObjectRequest.Builder getObjectBuilder = GetObjectRequest.builder();
GetObjectRequest getObjectRequest = getObjectBuilder.bucket(bucket).key(filePaths.get(nextFileIndex)).build();
// Have a reference to the S3 stream to ensure that if GZipInputStream causes an IOException because of reading
// the header, then the S3 stream gets closed in the close method
- in = s3Client.getObject(getObjectRequest);
+ try {
+ in = s3Client.getObject(getObjectRequest);
+ } catch (SdkException ex) {
+ throw new RuntimeDataException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex.getMessage());
+ }
// Use gzip stream if needed
String filename = filePaths.get(nextFileIndex).toLowerCase();
@@ -96,6 +101,14 @@
return true;
}
+ private S3Client buildAwsS3Client(Map<String, String> configuration) throws HyracksDataException {
+ try {
+ return ExternalDataUtils.AwsS3.buildAwsS3Client(configuration);
+ } catch (CompilationException ex) {
+ throw HyracksDataException.create(ex);
+ }
+ }
+
@Override
public boolean stop() {
return false;
@@ -126,33 +139,4 @@
private String getStreamNameAt(int fileIndex) {
return fileIndex < 0 || filePaths == null || filePaths.isEmpty() ? "" : filePaths.get(fileIndex);
}
-
- /**
- * Prepares and builds the Amazon S3 client with the provided configuration
- *
- * @param configuration S3 client configuration
- *
- * @return Amazon S3 client
- */
- private static S3Client buildAwsS3Client(Map<String, String> configuration) {
- S3ClientBuilder builder = S3Client.builder();
-
- // Credentials
- String accessKeyId = configuration.get(AwsS3Constants.ACCESS_KEY_ID_FIELD_NAME);
- String secretAccessKey = configuration.get(AwsS3Constants.SECRET_ACCESS_KEY_FIELD_NAME);
- AwsBasicCredentials credentials = AwsBasicCredentials.create(accessKeyId, secretAccessKey);
- builder.credentialsProvider(StaticCredentialsProvider.create(credentials));
-
- // Region
- String region = configuration.get(AwsS3Constants.REGION_FIELD_NAME);
- builder.region(Region.of(region));
-
- // Use user's endpoint if provided
- if (configuration.get(AwsS3Constants.SERVICE_END_POINT_FIELD_NAME) != null) {
- String endPoint = configuration.get(AwsS3Constants.SERVICE_END_POINT_FIELD_NAME);
- builder.endpointOverride(URI.create(endPoint));
- }
-
- return builder.build();
- }
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
index 9158a57..e57b31a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
@@ -18,30 +18,30 @@
*/
package org.apache.asterix.external.input.record.reader.aws;
-import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3Constants;
+import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3;
import java.io.Serializable;
-import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.api.IInputStreamFactory;
import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.application.IServiceContext;
import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.CleanupUtils;
-import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
-import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
-import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.s3.S3Client;
-import software.amazon.awssdk.services.s3.S3ClientBuilder;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.services.s3.model.S3Object;
@@ -52,7 +52,8 @@
private Map<String, String> configuration;
// Files to read from
- private List<PartitionWorkLoadBasedOnSize> partitionWorkLoadsBasedOnSize = new ArrayList<>();
+ private final List<S3Object> filesOnly = new ArrayList<>();
+ private final List<PartitionWorkLoadBasedOnSize> partitionWorkLoadsBasedOnSize = new ArrayList<>();
private transient AlgebricksAbsolutePartitionConstraint partitionConstraint;
@@ -67,7 +68,7 @@
}
@Override
- public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition) {
+ public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition) throws HyracksDataException {
return new AwsS3InputStream(configuration, partitionWorkLoadsBasedOnSize.get(partition).getFilePaths());
}
@@ -81,51 +82,57 @@
this.configuration = configuration;
ICcApplicationContext ccApplicationContext = (ICcApplicationContext) ctx.getApplicationContext();
- String container = configuration.get(AwsS3Constants.CONTAINER_NAME_FIELD_NAME);
+ String container = configuration.get(AwsS3.CONTAINER_NAME_FIELD_NAME);
- S3Client s3Client = buildAwsS3Client(configuration);
+ S3Client s3Client = ExternalDataUtils.AwsS3.buildAwsS3Client(configuration);
// Get all objects in a bucket and extract the paths to files
ListObjectsV2Request.Builder listObjectsBuilder = ListObjectsV2Request.builder().bucket(container);
- String path = configuration.get(AwsS3Constants.DEFINITION_FIELD_NAME);
+ String path = configuration.get(AwsS3.DEFINITION_FIELD_NAME);
if (path != null) {
listObjectsBuilder.prefix(path + (!path.isEmpty() && !path.endsWith("/") ? "/" : ""));
}
ListObjectsV2Response listObjectsResponse;
- List<S3Object> s3Objects = new ArrayList<>();
boolean done = false;
String newMarker = null;
- while (!done) {
- // List the objects from the start, or from the last marker in case of truncated result
- if (newMarker == null) {
- listObjectsResponse = s3Client.listObjectsV2(listObjectsBuilder.build());
- } else {
- listObjectsResponse = s3Client.listObjectsV2(listObjectsBuilder.continuationToken(newMarker).build());
+ String fileFormat = configuration.get(ExternalDataConstants.KEY_FORMAT);
+
+ try {
+ while (!done) {
+ // List the objects from the start, or from the last marker in case of truncated result
+ if (newMarker == null) {
+ listObjectsResponse = s3Client.listObjectsV2(listObjectsBuilder.build());
+ } else {
+ listObjectsResponse =
+ s3Client.listObjectsV2(listObjectsBuilder.continuationToken(newMarker).build());
+ }
+
+ // Collect the paths to files only
+ collectFilesOnly(listObjectsResponse.contents(), fileFormat);
+
+ // Mark the flag as done if done, otherwise, get the marker of the previous response for the next request
+ if (!listObjectsResponse.isTruncated()) {
+ done = true;
+ } else {
+ newMarker = listObjectsResponse.nextContinuationToken();
+ }
}
-
- // Collect all the provided objects
- s3Objects.addAll(listObjectsResponse.contents());
-
- // Mark the flag as done if done, otherwise, get the marker of the previous response for the next request
- if (!listObjectsResponse.isTruncated()) {
- done = true;
- } else {
- newMarker = listObjectsResponse.nextContinuationToken();
+ } catch (SdkException ex) {
+ throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex.getMessage());
+ } finally {
+ if (s3Client != null) {
+ CleanupUtils.close(s3Client, null);
}
}
- // Exclude the directories and get the files only
- String fileFormat = configuration.get(ExternalDataConstants.KEY_FORMAT);
- List<S3Object> fileObjects = getFilesOnly(s3Objects, fileFormat);
-
// Partition constraints
partitionConstraint = ccApplicationContext.getClusterStateManager().getClusterLocations();
int partitionsCount = partitionConstraint.getLocations().length;
// Distribute work load amongst the partitions
- distributeWorkLoad(fileObjects, partitionsCount);
+ distributeWorkLoad(filesOnly, partitionsCount);
}
/**
@@ -133,21 +140,17 @@
* a file if it does not end up with a "/" which is the separator in a folder structure.
*
* @param s3Objects List of returned objects
- *
- * @return A list of string paths that point to files only
+ * @param fileFormat The expected file format
*
* @throws AsterixException AsterixException
*/
- private List<S3Object> getFilesOnly(List<S3Object> s3Objects, String fileFormat) throws AsterixException {
- List<S3Object> filesOnly = new ArrayList<>();
+ private void collectFilesOnly(List<S3Object> s3Objects, String fileFormat) throws AsterixException {
String fileExtension = getFileExtension(fileFormat);
if (fileExtension == null) {
throw AsterixException.create(ErrorCode.PROVIDER_STREAM_RECORD_READER_UNKNOWN_FORMAT, fileFormat);
}
s3Objects.stream().filter(object -> isValidFile(object.key(), fileFormat)).forEach(filesOnly::add);
-
- return filesOnly;
}
/**
@@ -214,35 +217,6 @@
}
/**
- * Prepares and builds the Amazon S3 client with the provided configuration
- *
- * @param configuration S3 client configuration
- *
- * @return Amazon S3 client
- */
- private static S3Client buildAwsS3Client(Map<String, String> configuration) {
- S3ClientBuilder builder = S3Client.builder();
-
- // Credentials
- String accessKeyId = configuration.get(AwsS3Constants.ACCESS_KEY_ID_FIELD_NAME);
- String secretAccessKey = configuration.get(AwsS3Constants.SECRET_ACCESS_KEY_FIELD_NAME);
- AwsBasicCredentials credentials = AwsBasicCredentials.create(accessKeyId, secretAccessKey);
- builder.credentialsProvider(StaticCredentialsProvider.create(credentials));
-
- // Region
- String region = configuration.get(AwsS3Constants.REGION_FIELD_NAME);
- builder.region(Region.of(region));
-
- // Use user's endpoint if provided
- if (configuration.get(AwsS3Constants.SERVICE_END_POINT_FIELD_NAME) != null) {
- String endPoint = configuration.get(AwsS3Constants.SERVICE_END_POINT_FIELD_NAME);
- builder.endpointOverride(URI.create(endPoint));
- }
-
- return builder.build();
- }
-
- /**
* Returns the file extension for the provided file format.
*
* @param format file format
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
index dfc60bc..5f8d923 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/SemiStructuredRecordReader.java
@@ -142,6 +142,7 @@
if (state == State.ARRAY || state == State.AFTER_COMMA) {
state = State.NESTED_OBJECT;
}
+ beginLineNumber = lineNumber;
startPosn = bufferPosn;
hasStarted = true;
depth = 1;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java
index 9e1b052..be6a331 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/LocalFSInputStream.java
@@ -18,11 +18,16 @@
*/
package org.apache.asterix.external.input.stream;
+import static org.apache.asterix.common.exceptions.ErrorCode.ASTERIX;
+import static org.apache.asterix.common.exceptions.ErrorCode.INPUT_RECORD_READER_CHAR_ARRAY_RECORD_TOO_LARGE;
+import static org.apache.asterix.common.exceptions.ErrorCode.RECORD_READER_MALFORMED_INPUT_STREAM;
+import static org.apache.hyracks.api.exceptions.ErrorCode.HYRACKS;
+import static org.apache.hyracks.api.exceptions.ErrorCode.PARSING_ERROR;
+
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
-import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.ExceptionUtils;
import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
import org.apache.asterix.external.util.FeedLogManager;
@@ -130,28 +135,29 @@
if (root instanceof HyracksDataException) {
HyracksDataException r = (HyracksDataException) root;
String component = r.getComponent();
- if (ErrorCode.ASTERIX.equals(component)) {
- int errorCode = r.getErrorCode();
+ boolean advance = false;
+ int errorCode = r.getErrorCode();
+ if (ASTERIX.equals(component)) {
switch (errorCode) {
- case ErrorCode.RECORD_READER_MALFORMED_INPUT_STREAM:
- if (currentFile != null) {
- try {
- logManager.logRecord(currentFile.getAbsolutePath(), "Corrupted input file");
- } catch (IOException e) {
- LOGGER.log(Level.WARN, "Filed to write to feed log file", e);
- }
- LOGGER.log(Level.WARN, "Corrupted input file: " + currentFile.getAbsolutePath());
- }
- case ErrorCode.INPUT_RECORD_READER_CHAR_ARRAY_RECORD_TOO_LARGE:
- try {
- advance();
- return true;
- } catch (Exception e) {
- LOGGER.log(Level.WARN, "An exception was thrown while trying to skip a file", e);
- }
+ case RECORD_READER_MALFORMED_INPUT_STREAM:
+ logCorruptedInput();
+ case INPUT_RECORD_READER_CHAR_ARRAY_RECORD_TOO_LARGE:
+ advance = true;
+ break;
default:
break;
}
+ } else if (HYRACKS.equals(component) && errorCode == PARSING_ERROR) {
+ logCorruptedInput();
+ advance = true;
+ }
+ if (advance) {
+ try {
+ advance();
+ return true;
+ } catch (Exception e) {
+ LOGGER.log(Level.WARN, "An exception was thrown while trying to skip a file", e);
+ }
}
}
LOGGER.log(Level.WARN, "Failed to recover from failure", th);
@@ -167,4 +173,15 @@
public String getPreviousStreamName() {
return lastFileName;
}
+
+ private void logCorruptedInput() {
+ if (currentFile != null) {
+ try {
+ logManager.logRecord(currentFile.getAbsolutePath(), "Corrupted input file");
+ } catch (IOException e) {
+ LOGGER.log(Level.WARN, "Filed to write to feed log file", e);
+ }
+ LOGGER.log(Level.WARN, "Corrupted input file: " + currentFile.getAbsolutePath());
+ }
+ }
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/JSONDataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/JSONDataParser.java
index 3216aef..8c518c4 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/JSONDataParser.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/JSONDataParser.java
@@ -22,6 +22,8 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.BitSet;
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
import org.apache.asterix.builders.IARecordBuilder;
import org.apache.asterix.builders.IAsterixListBuilder;
@@ -33,6 +35,7 @@
import org.apache.asterix.external.parser.jackson.ADMToken;
import org.apache.asterix.external.parser.jackson.GeometryCoParser;
import org.apache.asterix.external.parser.jackson.ParserContext;
+import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.om.base.ABoolean;
import org.apache.asterix.om.base.ANull;
import org.apache.asterix.om.base.AUnorderedList;
@@ -45,10 +48,13 @@
import org.apache.asterix.om.utils.RecordUtil;
import org.apache.asterix.runtime.exceptions.UnsupportedTypeException;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.ExceptionUtils;
import org.apache.hyracks.data.std.api.IMutableValueStorage;
import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonStreamContext;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.TreeTraversingParser;
@@ -63,6 +69,8 @@
protected final JsonFactory jsonFactory;
protected final ARecordType rootType;
protected final GeometryCoParser geometryCoParser;
+ private Supplier<String> dataSourceName;
+ private LongSupplier lineNumber;
protected JsonParser jsonParser;
@@ -81,6 +89,8 @@
//GeometyCoParser to parse GeoJSON objects to AsterixDB internal spatial types.
geometryCoParser = new GeometryCoParser(jsonParser);
parserContext = new ParserContext();
+ this.dataSourceName = ExternalDataConstants.EMPTY_STRING;
+ this.lineNumber = ExternalDataConstants.NO_LINES;
}
/*
@@ -90,6 +100,12 @@
*/
@Override
+ public void configure(Supplier<String> dataSourceName, LongSupplier lineNumber) {
+ this.dataSourceName = dataSourceName == null ? ExternalDataConstants.EMPTY_STRING : dataSourceName;
+ this.lineNumber = lineNumber == null ? ExternalDataConstants.NO_LINES : lineNumber;
+ }
+
+ @Override
public final boolean parse(IRawRecord<? extends char[]> record, DataOutput out) throws HyracksDataException {
try {
//TODO(wyk): find a way to reset byte[] instead of creating a new parser for each record.
@@ -99,7 +115,7 @@
parseObject(rootType, out);
return true;
} catch (IOException e) {
- throw new RuntimeDataException(ErrorCode.RECORD_READER_MALFORMED_INPUT_STREAM, e);
+ throw createException(e);
}
}
@@ -460,4 +476,33 @@
}
}
+
+ private HyracksDataException createException(IOException e) {
+ if (jsonParser != null) {
+ String msg;
+ if (e instanceof JsonParseException) {
+ msg = ((JsonParseException) e).getOriginalMessage();
+ } else {
+ msg = ExceptionUtils.getRootCause(e).getMessage();
+ }
+ if (msg == null) {
+ msg = ErrorCode.getErrorMessage(ErrorCode.RECORD_READER_MALFORMED_INPUT_STREAM);
+ }
+ long lineNum = lineNumber.getAsLong() + jsonParser.getCurrentLocation().getLineNr() - 1;
+ JsonStreamContext parsingContext = jsonParser.getParsingContext();
+ String fieldName = "N/A";
+ while (parsingContext != null) {
+ String currentFieldName = parsingContext.getCurrentName();
+ if (currentFieldName != null) {
+ fieldName = currentFieldName;
+ break;
+ }
+ parsingContext = parsingContext.getParent();
+ }
+
+ return HyracksDataException.create(org.apache.hyracks.api.exceptions.ErrorCode.PARSING_ERROR,
+ dataSourceName.get(), lineNum, fieldName, msg);
+ }
+ return new RuntimeDataException(ErrorCode.RECORD_READER_MALFORMED_INPUT_STREAM, e);
+ }
}
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 63f57b6..e93c3b9 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -82,7 +82,7 @@
public static final String KEY_WAIT_FOR_DATA = "wait-for-data";
public static final String KEY_FEED_NAME = "feed";
// a string representing external bucket name
- public static final String KEY_BUCKET = "bucket";
+ public static final String KEY_EXTERNAL_SOURCE_TYPE = "type";
// a comma delimited list of nodes
public static final String KEY_NODES = "nodes";
// a string representing the password used to authenticate with the external data source
@@ -276,8 +276,9 @@
public static final String EMPTY_FIELD = "empty value";
public static final String INVALID_VAL = "invalid value";
- public static class AwsS3Constants {
- private AwsS3Constants() {
+ public static class AwsS3 {
+ private AwsS3() {
+ throw new AssertionError("do not instantiate");
}
public static final String REGION_FIELD_NAME = "region";
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 3cf67e6..d19d949 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -24,10 +24,15 @@
import static org.apache.asterix.external.util.ExternalDataConstants.KEY_RECORD_END;
import static org.apache.asterix.external.util.ExternalDataConstants.KEY_RECORD_START;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.util.EnumMap;
+import java.util.List;
import java.util.Map;
+import java.util.Optional;
import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.common.functions.ExternalFunctionLanguage;
@@ -44,6 +49,7 @@
import org.apache.asterix.om.types.AUnionType;
import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.CleanupUtils;
import org.apache.hyracks.dataflow.common.data.parsers.BooleanParserFactory;
import org.apache.hyracks.dataflow.common.data.parsers.DoubleParserFactory;
import org.apache.hyracks.dataflow.common.data.parsers.FloatParserFactory;
@@ -52,6 +58,15 @@
import org.apache.hyracks.dataflow.common.data.parsers.LongParserFactory;
import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.core.exception.SdkException;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.S3ClientBuilder;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
+
public class ExternalDataUtils {
private static final Map<ATypeTag, IValueParserFactory> valueParserFactoryMap = new EnumMap<>(ATypeTag.class);
@@ -449,4 +464,101 @@
configuration.put(key, paramValue.toLowerCase().trim());
}
}
+
+ /**
+ * Ensures that the external source container is present
+ *
+ * @param configuration external source properties
+ */
+ public static void validateExternalSourceContainer(Map<String, String> configuration) throws CompilationException {
+ String type = configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE);
+
+ switch (type) {
+ case ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3:
+ ExternalDataUtils.AwsS3.validateExternalSourceContainer(configuration);
+ break;
+ default:
+ // Nothing needs to be done
+ break;
+ }
+ }
+
+ public static class AwsS3 {
+ private AwsS3() {
+ throw new AssertionError("do not instantiate");
+ }
+
+ public static S3Client buildAwsS3Client(Map<String, String> configuration) throws CompilationException {
+ // TODO(Hussain): Need to ensure that all required parameters are present in a previous step
+ String accessKeyId = configuration.get(ExternalDataConstants.AwsS3.ACCESS_KEY_ID_FIELD_NAME);
+ String secretAccessKey = configuration.get(ExternalDataConstants.AwsS3.SECRET_ACCESS_KEY_FIELD_NAME);
+ String regionId = configuration.get(ExternalDataConstants.AwsS3.REGION_FIELD_NAME);
+ String serviceEndpoint = configuration.get(ExternalDataConstants.AwsS3.SERVICE_END_POINT_FIELD_NAME);
+
+ S3ClientBuilder builder = S3Client.builder();
+
+ // Credentials
+ AwsBasicCredentials credentials = AwsBasicCredentials.create(accessKeyId, secretAccessKey);
+ builder.credentialsProvider(StaticCredentialsProvider.create(credentials));
+
+ // Validate the region
+ List<Region> supportedRegions = S3Client.serviceMetadata().regions();
+ Optional<Region> selectedRegion =
+ supportedRegions.stream().filter(region -> region.id().equalsIgnoreCase(regionId)).findFirst();
+
+ if (!selectedRegion.isPresent()) {
+ throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR,
+ String.format("region %s is not supported", regionId));
+ }
+ builder.region(selectedRegion.get());
+
+ // Validate the service endpoint if present
+ if (serviceEndpoint != null) {
+ try {
+ URI uri = new URI(serviceEndpoint);
+ try {
+ builder.endpointOverride(uri);
+ } catch (NullPointerException ex) {
+ throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex.getMessage());
+ }
+ } catch (URISyntaxException ex) {
+ throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR,
+ String.format("Invalid service endpoint %s", serviceEndpoint));
+ }
+ }
+
+ return builder.build();
+ }
+
+ /**
+ * Validates if the container being used is available or not.
+ *
+ * @param configuration external datasource configuration
+ *
+ * @throws CompilationException Compilation exception
+ */
+ public static void validateExternalSourceContainer(Map<String, String> configuration)
+ throws CompilationException {
+ S3Client s3Client = null;
+
+ try {
+ String container = configuration.get(ExternalDataConstants.AwsS3.CONTAINER_NAME_FIELD_NAME);
+ s3Client = buildAwsS3Client(configuration);
+ ListObjectsV2Response response =
+ s3Client.listObjectsV2(ListObjectsV2Request.builder().bucket(container).maxKeys(1).build());
+
+ // Returns 200 only in case the bucket exists, however, otherwise, throws an exception. However, to
+ // ensure coverage, check if the result is successful as well and not only catch exceptions
+ if (!response.sdkHttpResponse().isSuccessful()) {
+ throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_CONTAINER_NOT_FOUND, container);
+ }
+ } catch (SdkException ex) {
+ throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex.getMessage());
+ } finally {
+ if (s3Client != null) {
+ CleanupUtils.close(s3Client, null);
+ }
+ }
+ }
+ }
}
diff --git a/asterixdb/pom.xml b/asterixdb/pom.xml
index bb3a018..ffacc07 100644
--- a/asterixdb/pom.xml
+++ b/asterixdb/pom.xml
@@ -1449,6 +1449,11 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>http-client-spi</artifactId>
+ <version>${awsjavasdk.version}</version>
+ </dependency>
<!-- Mock for AWS S3 -->
<dependency>
<groupId>io.findify</groupId>
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java
index 65567b3..8e770a7 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java
@@ -33,6 +33,7 @@
import javax.net.ssl.SSLEngine;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.http.HttpHost;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.conn.util.InetAddressUtils;
@@ -116,13 +117,30 @@
}
public static String defaultPort(String maybeHostPort, int defaultPort) {
- String encodedInput = encodeIPv6LiteralHost(maybeHostPort);
- int lastColon = encodedInput.lastIndexOf(':');
- int closeBracket = encodedInput.lastIndexOf(']');
- if (lastColon > 0 && lastColon > closeBracket) {
- return maybeHostPort;
- } else {
- return encodedInput + ":" + defaultPort;
+ Pair<String, Integer> decoded = extractHostPort(maybeHostPort, defaultPort);
+ return encodeIPv6LiteralHost(decoded.getLeft()) + ":" + decoded.getRight();
+ }
+
+ public static Pair<String, Integer> extractHostPort(String maybeHostPort, int defaultPort) {
+ try {
+ int lastColon = maybeHostPort.lastIndexOf(':');
+ if (lastColon == -1) {
+ return Pair.of(maybeHostPort, defaultPort);
+ }
+ int closeBracket = maybeHostPort.lastIndexOf(']');
+ if (closeBracket == -1) {
+ if (InetAddressUtils.isIPv6Address(maybeHostPort)) {
+ return Pair.of(maybeHostPort, defaultPort);
+ }
+ return Pair.of(maybeHostPort.substring(0, lastColon),
+ Integer.parseInt(maybeHostPort.substring(lastColon + 1)));
+ } else if (closeBracket < lastColon) {
+ return Pair.of(decodeIPv6LiteralHost(maybeHostPort.substring(0, lastColon)),
+ Integer.parseInt(maybeHostPort.substring(lastColon + 1)));
+ }
+ return Pair.of(decodeIPv6LiteralHost(maybeHostPort), defaultPort);
+ } catch (Exception e) {
+ throw new IllegalArgumentException(e);
}
}