Merge "Merge branch 'gerrit/trinity' into 'gerrit/goldfish'" into goldfish
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-table-not-exists/deltalake-table-not-exists.00.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-table-not-exists/deltalake-table-not-exists.00.ddl.sqlpp
new file mode 100644
index 0000000..c57de93
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-table-not-exists/deltalake-table-not-exists.00.ddl.sqlpp
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ DROP DATAVERSE test IF EXISTS;
+ CREATE DATAVERSE test;
+
+ USE test;
+
+
+ CREATE TYPE DeltalakeTableType as {
+ };
+
+ CREATE EXTERNAL COLLECTION DeltalakeDataset(DeltalakeTableType) USING %adapter%
+ (
+ %template%,
+ ("container"="playground"),
+ ("definition"="delta-data/s1"),
+ ("table-format" = "delta")
+ );
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
index d56c1a4..d840527 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
@@ -340,6 +340,13 @@
</compilation-unit>
</test-case>
<test-case FilePath="external-dataset">
+ <compilation-unit name="common/deltalake-table-not-exists">
+ <placeholder name="adapter" value="S3" />
+ <output-dir compare="Text">none</output-dir>
+ <expected-error>ASX1108: External source error. io.delta.kernel.exceptions.TableNotFoundException: Delta table at path `s3a://playground/delta-data/s1` is not found.</expected-error>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="external-dataset">
<compilation-unit name="common/avro/avro-types/avro-map">
<placeholder name="adapter" value="S3" />
<output-dir compare="Text">common/avro/avro-types/avro-map</output-dir>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
index 9d93fce..8dc820b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
@@ -19,6 +19,7 @@
package org.apache.asterix.external.input.record.reader.aws.delta;
import static org.apache.asterix.external.util.aws.s3.S3Constants.SERVICE_END_POINT_FIELD_NAME;
+import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
import java.io.IOException;
import java.io.Serializable;
@@ -33,6 +34,8 @@
import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.api.IExternalDataRuntimeContext;
import org.apache.asterix.external.api.IRecordReader;
@@ -60,6 +63,7 @@
import io.delta.kernel.data.Row;
import io.delta.kernel.defaults.engine.DefaultEngine;
import io.delta.kernel.engine.Engine;
+import io.delta.kernel.exceptions.KernelException;
import io.delta.kernel.internal.InternalScanFileUtils;
import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.CloseableIterator;
@@ -87,20 +91,7 @@
throws AlgebricksException, HyracksDataException {
this.configuration = configuration;
Configuration conf = new Configuration();
- conf.set(S3Constants.HADOOP_ACCESS_KEY_ID, configuration.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME));
- conf.set(S3Constants.HADOOP_SECRET_ACCESS_KEY, configuration.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME));
- if (configuration.get(S3Constants.SESSION_TOKEN_FIELD_NAME) != null) {
- conf.set(S3Constants.HADOOP_SESSION_TOKEN, configuration.get(S3Constants.SESSION_TOKEN_FIELD_NAME));
- }
- conf.set(S3Constants.HADOOP_REGION, configuration.get(S3Constants.REGION_FIELD_NAME));
- String serviceEndpoint = configuration.get(SERVICE_END_POINT_FIELD_NAME);
- if (serviceEndpoint != null) {
- conf.set(S3Constants.HADOOP_SERVICE_END_POINT, serviceEndpoint);
- }
- conf.set(ExternalDataConstants.KEY_REQUESTED_FIELDS,
- configuration.getOrDefault(ExternalDataConstants.KEY_REQUESTED_FIELDS, ""));
- conf.set(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION,
- configuration.getOrDefault(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION, ""));
+ configurationBuilder(configuration, conf);
String tableMetadataPath = S3Constants.HADOOP_S3_PROTOCOL + "://"
+ configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/'
+ configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME);
@@ -109,7 +100,13 @@
Engine engine = DefaultEngine.create(conf);
io.delta.kernel.Table table = io.delta.kernel.Table.forPath(engine, tableMetadataPath);
- Snapshot snapshot = table.getLatestSnapshot(engine);
+ Snapshot snapshot;
+ try {
+ snapshot = table.getLatestSnapshot(engine);
+ } catch (KernelException e) {
+ LOGGER.info("Failed to get latest snapshot for table: {}", tableMetadataPath, e);
+ throw RuntimeDataException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, e, getMessageOrToString(e));
+ }
List<Warning> warnings = new ArrayList<>();
DeltaConverterContext converterContext = new DeltaConverterContext(configuration, warnings);
@@ -192,6 +189,23 @@
partitionWorkLoadsBasedOnSize.addAll(workloadQueue);
}
+ public static void configurationBuilder(Map<String, String> configuration, Configuration conf) {
+ conf.set(S3Constants.HADOOP_ACCESS_KEY_ID, configuration.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME));
+ conf.set(S3Constants.HADOOP_SECRET_ACCESS_KEY, configuration.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME));
+ if (configuration.get(S3Constants.SESSION_TOKEN_FIELD_NAME) != null) {
+ conf.set(S3Constants.HADOOP_SESSION_TOKEN, configuration.get(S3Constants.SESSION_TOKEN_FIELD_NAME));
+ }
+ conf.set(S3Constants.HADOOP_REGION, configuration.get(S3Constants.REGION_FIELD_NAME));
+ String serviceEndpoint = configuration.get(SERVICE_END_POINT_FIELD_NAME);
+ if (serviceEndpoint != null) {
+ conf.set(S3Constants.HADOOP_SERVICE_END_POINT, serviceEndpoint);
+ }
+ conf.set(ExternalDataConstants.KEY_REQUESTED_FIELDS,
+ configuration.getOrDefault(ExternalDataConstants.KEY_REQUESTED_FIELDS, ""));
+ conf.set(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION,
+ configuration.getOrDefault(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION, ""));
+ }
+
@Override
public IRecordReader<?> createRecordReader(IExternalDataRuntimeContext context) throws HyracksDataException {
try {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
index 558f8a9..a5b21b6 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
@@ -133,9 +133,13 @@
scanFile = scanFiles.get(fileIndex);
fileStatus = InternalScanFileUtils.getAddFileStatus(scanFile);
physicalReadSchema = ScanStateRow.getPhysicalDataReadSchema(engine, scanState);
- physicalDataIter = engine.getParquetHandler().readParquetFiles(singletonCloseableIterator(fileStatus),
- physicalReadSchema, Optional.empty());
- dataIter = Scan.transformPhysicalData(engine, scanState, scanFile, physicalDataIter);
+ try {
+ physicalDataIter = engine.getParquetHandler().readParquetFiles(singletonCloseableIterator(fileStatus),
+ physicalReadSchema, Optional.empty());
+ dataIter = Scan.transformPhysicalData(engine, scanState, scanFile, physicalDataIter);
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
return this.hasNext();
} else {
return false;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index b311870..19c1979 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -35,6 +35,7 @@
import static org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE;
import static org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.EMPTY_TYPE;
import static org.apache.asterix.runtime.evaluators.functions.StringEvaluatorUtils.RESERVED_REGEX_CHARS;
+import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
import static org.msgpack.core.MessagePack.Code.ARRAY16;
import java.io.ByteArrayOutputStream;
@@ -71,6 +72,7 @@
import org.apache.asterix.external.api.IInputStreamFactory;
import org.apache.asterix.external.api.IRecordReaderFactory;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher;
+import org.apache.asterix.external.input.record.reader.aws.delta.AwsS3DeltaReaderFactory;
import org.apache.asterix.external.library.JavaLibrary;
import org.apache.asterix.external.library.msgpack.MessagePackUtils;
import org.apache.asterix.external.util.ExternalDataConstants.ParquetOptions;
@@ -109,6 +111,12 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.CloseableIterable;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import io.delta.kernel.defaults.engine.DefaultEngine;
+import io.delta.kernel.engine.Engine;
+import io.delta.kernel.exceptions.KernelException;
public class ExternalDataUtils {
@@ -117,6 +125,8 @@
private static final int DEFAULT_MAX_ARGUMENT_SZ = 1024 * 1024;
private static final int HEADER_FUDGE = 64;
+ private static final Logger LOGGER = LogManager.getLogger();
+
static {
valueParserFactoryMap.put(ATypeTag.INTEGER, IntegerParserFactory.INSTANCE);
valueParserFactoryMap.put(ATypeTag.FLOAT, FloatParserFactory.INSTANCE);
@@ -504,6 +514,26 @@
}
}
+ public static void validateDeltaTableExists(Map<String, String> configuration) throws CompilationException {
+ Configuration conf = new Configuration();
+ String tableMetadataPath = null;
+ if (configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE)
+ .equals(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3)) {
+ AwsS3DeltaReaderFactory.configurationBuilder(configuration, conf);
+ tableMetadataPath = S3Constants.HADOOP_S3_PROTOCOL + "://"
+ + configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/'
+ + configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME);
+ }
+ Engine engine = DefaultEngine.create(conf);
+ io.delta.kernel.Table table = io.delta.kernel.Table.forPath(engine, tableMetadataPath);
+ try {
+ table.getLatestSnapshot(engine);
+ } catch (KernelException e) {
+ LOGGER.info("Failed to get latest snapshot for table: {}", tableMetadataPath, e);
+ throw CompilationException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, e, getMessageOrToString(e));
+ }
+ }
+
public static void prepareIcebergTableFormat(Map<String, String> configuration, Configuration conf,
String tableMetadataPath) throws AlgebricksException {
HadoopTables tables = new HadoopTables(conf);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
index 891d7f3..bf0938b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
@@ -24,6 +24,7 @@
import static org.apache.asterix.common.exceptions.ErrorCode.S3_REGION_NOT_SUPPORTED;
import static org.apache.asterix.external.util.ExternalDataUtils.getPrefix;
import static org.apache.asterix.external.util.ExternalDataUtils.isDeltaTable;
+import static org.apache.asterix.external.util.ExternalDataUtils.validateDeltaTableExists;
import static org.apache.asterix.external.util.ExternalDataUtils.validateDeltaTableProperties;
import static org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude;
import static org.apache.asterix.external.util.aws.s3.S3Constants.ACCESS_KEY_ID_FIELD_NAME;
@@ -281,7 +282,6 @@
else if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, srcLoc, ExternalDataConstants.KEY_FORMAT);
}
-
// Both parameters should be passed, or neither should be passed (for anonymous/no auth)
String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
String secretAccessKey = configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
@@ -346,6 +346,9 @@
if (!response.sdkHttpResponse().isSuccessful()) {
throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_CONTAINER_NOT_FOUND, container);
}
+ if (isDeltaTable(configuration)) {
+ validateDeltaTableExists(configuration);
+ }
}
/**