[ASTERIXDB-3503][EXT] Fixing Internal Error issue when Delta table does not exists.

- user model changes: yes
- storage format changes: no
- interface changes: no

Ext-ref: MB-64314

Change-Id: I8d403c8c0698d9d39dc8988eb8b57588a684dbed
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19098
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Tested-by: Murtadha Hubail <mhubail@apache.org>
Integration-Tests: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-table-not-exists/deltalake-table-not-exists.00.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-table-not-exists/deltalake-table-not-exists.00.ddl.sqlpp
new file mode 100644
index 0000000..c57de93
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-table-not-exists/deltalake-table-not-exists.00.ddl.sqlpp
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ DROP DATAVERSE test IF EXISTS;
+ CREATE DATAVERSE test;
+
+ USE test;
+
+
+ CREATE TYPE DeltalakeTableType as {
+ };
+
+ CREATE EXTERNAL COLLECTION DeltalakeDataset(DeltalakeTableType) USING %adapter%
+ (
+   %template%,
+   ("container"="playground"),
+   ("definition"="delta-data/s1"),
+   ("table-format" = "delta")
+ );
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
index d56c1a4..d840527 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
@@ -340,6 +340,13 @@
       </compilation-unit>
     </test-case>
     <test-case FilePath="external-dataset">
+      <compilation-unit name="common/deltalake-table-not-exists">
+        <placeholder name="adapter" value="S3" />
+        <output-dir compare="Text">none</output-dir>
+        <expected-error>ASX1108: External source error. io.delta.kernel.exceptions.TableNotFoundException: Delta table at path `s3a://playground/delta-data/s1` is not found.</expected-error>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="external-dataset">
       <compilation-unit name="common/avro/avro-types/avro-map">
         <placeholder name="adapter" value="S3" />
         <output-dir compare="Text">common/avro/avro-types/avro-map</output-dir>
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
index 9d93fce..8dc820b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.external.input.record.reader.aws.delta;
 
 import static org.apache.asterix.external.util.aws.s3.S3Constants.SERVICE_END_POINT_FIELD_NAME;
+import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -33,6 +34,8 @@
 import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
 import org.apache.asterix.external.api.IExternalDataRuntimeContext;
 import org.apache.asterix.external.api.IRecordReader;
@@ -60,6 +63,7 @@
 import io.delta.kernel.data.Row;
 import io.delta.kernel.defaults.engine.DefaultEngine;
 import io.delta.kernel.engine.Engine;
+import io.delta.kernel.exceptions.KernelException;
 import io.delta.kernel.internal.InternalScanFileUtils;
 import io.delta.kernel.types.StructType;
 import io.delta.kernel.utils.CloseableIterator;
@@ -87,20 +91,7 @@
             throws AlgebricksException, HyracksDataException {
         this.configuration = configuration;
         Configuration conf = new Configuration();
-        conf.set(S3Constants.HADOOP_ACCESS_KEY_ID, configuration.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME));
-        conf.set(S3Constants.HADOOP_SECRET_ACCESS_KEY, configuration.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME));
-        if (configuration.get(S3Constants.SESSION_TOKEN_FIELD_NAME) != null) {
-            conf.set(S3Constants.HADOOP_SESSION_TOKEN, configuration.get(S3Constants.SESSION_TOKEN_FIELD_NAME));
-        }
-        conf.set(S3Constants.HADOOP_REGION, configuration.get(S3Constants.REGION_FIELD_NAME));
-        String serviceEndpoint = configuration.get(SERVICE_END_POINT_FIELD_NAME);
-        if (serviceEndpoint != null) {
-            conf.set(S3Constants.HADOOP_SERVICE_END_POINT, serviceEndpoint);
-        }
-        conf.set(ExternalDataConstants.KEY_REQUESTED_FIELDS,
-                configuration.getOrDefault(ExternalDataConstants.KEY_REQUESTED_FIELDS, ""));
-        conf.set(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION,
-                configuration.getOrDefault(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION, ""));
+        configurationBuilder(configuration, conf);
         String tableMetadataPath = S3Constants.HADOOP_S3_PROTOCOL + "://"
                 + configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/'
                 + configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME);
@@ -109,7 +100,13 @@
 
         Engine engine = DefaultEngine.create(conf);
         io.delta.kernel.Table table = io.delta.kernel.Table.forPath(engine, tableMetadataPath);
-        Snapshot snapshot = table.getLatestSnapshot(engine);
+        Snapshot snapshot;
+        try {
+            snapshot = table.getLatestSnapshot(engine);
+        } catch (KernelException e) {
+            LOGGER.info("Failed to get latest snapshot for table: {}", tableMetadataPath, e);
+            throw RuntimeDataException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, e, getMessageOrToString(e));
+        }
 
         List<Warning> warnings = new ArrayList<>();
         DeltaConverterContext converterContext = new DeltaConverterContext(configuration, warnings);
@@ -192,6 +189,23 @@
         partitionWorkLoadsBasedOnSize.addAll(workloadQueue);
     }
 
+    public static void configurationBuilder(Map<String, String> configuration, Configuration conf) {
+        conf.set(S3Constants.HADOOP_ACCESS_KEY_ID, configuration.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME));
+        conf.set(S3Constants.HADOOP_SECRET_ACCESS_KEY, configuration.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME));
+        if (configuration.get(S3Constants.SESSION_TOKEN_FIELD_NAME) != null) {
+            conf.set(S3Constants.HADOOP_SESSION_TOKEN, configuration.get(S3Constants.SESSION_TOKEN_FIELD_NAME));
+        }
+        conf.set(S3Constants.HADOOP_REGION, configuration.get(S3Constants.REGION_FIELD_NAME));
+        String serviceEndpoint = configuration.get(SERVICE_END_POINT_FIELD_NAME);
+        if (serviceEndpoint != null) {
+            conf.set(S3Constants.HADOOP_SERVICE_END_POINT, serviceEndpoint);
+        }
+        conf.set(ExternalDataConstants.KEY_REQUESTED_FIELDS,
+                configuration.getOrDefault(ExternalDataConstants.KEY_REQUESTED_FIELDS, ""));
+        conf.set(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION,
+                configuration.getOrDefault(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION, ""));
+    }
+
     @Override
     public IRecordReader<?> createRecordReader(IExternalDataRuntimeContext context) throws HyracksDataException {
         try {
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
index 558f8a9..a5b21b6 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
@@ -133,9 +133,13 @@
             scanFile = scanFiles.get(fileIndex);
             fileStatus = InternalScanFileUtils.getAddFileStatus(scanFile);
             physicalReadSchema = ScanStateRow.getPhysicalDataReadSchema(engine, scanState);
-            physicalDataIter = engine.getParquetHandler().readParquetFiles(singletonCloseableIterator(fileStatus),
-                    physicalReadSchema, Optional.empty());
-            dataIter = Scan.transformPhysicalData(engine, scanState, scanFile, physicalDataIter);
+            try {
+                physicalDataIter = engine.getParquetHandler().readParquetFiles(singletonCloseableIterator(fileStatus),
+                        physicalReadSchema, Optional.empty());
+                dataIter = Scan.transformPhysicalData(engine, scanState, scanFile, physicalDataIter);
+            } catch (IOException e) {
+                throw HyracksDataException.create(e);
+            }
             return this.hasNext();
         } else {
             return false;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index b311870..19c1979 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -35,6 +35,7 @@
 import static org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE;
 import static org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.EMPTY_TYPE;
 import static org.apache.asterix.runtime.evaluators.functions.StringEvaluatorUtils.RESERVED_REGEX_CHARS;
+import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
 import static org.msgpack.core.MessagePack.Code.ARRAY16;
 
 import java.io.ByteArrayOutputStream;
@@ -71,6 +72,7 @@
 import org.apache.asterix.external.api.IInputStreamFactory;
 import org.apache.asterix.external.api.IRecordReaderFactory;
 import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher;
+import org.apache.asterix.external.input.record.reader.aws.delta.AwsS3DeltaReaderFactory;
 import org.apache.asterix.external.library.JavaLibrary;
 import org.apache.asterix.external.library.msgpack.MessagePackUtils;
 import org.apache.asterix.external.util.ExternalDataConstants.ParquetOptions;
@@ -109,6 +111,12 @@
 import org.apache.iceberg.Table;
 import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.io.CloseableIterable;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import io.delta.kernel.defaults.engine.DefaultEngine;
+import io.delta.kernel.engine.Engine;
+import io.delta.kernel.exceptions.KernelException;
 
 public class ExternalDataUtils {
 
@@ -117,6 +125,8 @@
     private static final int DEFAULT_MAX_ARGUMENT_SZ = 1024 * 1024;
     private static final int HEADER_FUDGE = 64;
 
+    private static final Logger LOGGER = LogManager.getLogger();
+
     static {
         valueParserFactoryMap.put(ATypeTag.INTEGER, IntegerParserFactory.INSTANCE);
         valueParserFactoryMap.put(ATypeTag.FLOAT, FloatParserFactory.INSTANCE);
@@ -504,6 +514,26 @@
         }
     }
 
+    public static void validateDeltaTableExists(Map<String, String> configuration) throws CompilationException {
+        Configuration conf = new Configuration();
+        String tableMetadataPath = null;
+        if (configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE)
+                .equals(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3)) {
+            AwsS3DeltaReaderFactory.configurationBuilder(configuration, conf);
+            tableMetadataPath = S3Constants.HADOOP_S3_PROTOCOL + "://"
+                    + configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/'
+                    + configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME);
+        }
+        Engine engine = DefaultEngine.create(conf);
+        io.delta.kernel.Table table = io.delta.kernel.Table.forPath(engine, tableMetadataPath);
+        try {
+            table.getLatestSnapshot(engine);
+        } catch (KernelException e) {
+            LOGGER.info("Failed to get latest snapshot for table: {}", tableMetadataPath, e);
+            throw CompilationException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, e, getMessageOrToString(e));
+        }
+    }
+
     public static void prepareIcebergTableFormat(Map<String, String> configuration, Configuration conf,
             String tableMetadataPath) throws AlgebricksException {
         HadoopTables tables = new HadoopTables(conf);
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
index 891d7f3..bf0938b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
@@ -24,6 +24,7 @@
 import static org.apache.asterix.common.exceptions.ErrorCode.S3_REGION_NOT_SUPPORTED;
 import static org.apache.asterix.external.util.ExternalDataUtils.getPrefix;
 import static org.apache.asterix.external.util.ExternalDataUtils.isDeltaTable;
+import static org.apache.asterix.external.util.ExternalDataUtils.validateDeltaTableExists;
 import static org.apache.asterix.external.util.ExternalDataUtils.validateDeltaTableProperties;
 import static org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude;
 import static org.apache.asterix.external.util.aws.s3.S3Constants.ACCESS_KEY_ID_FIELD_NAME;
@@ -281,7 +282,6 @@
         else if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
             throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, srcLoc, ExternalDataConstants.KEY_FORMAT);
         }
-
         // Both parameters should be passed, or neither should be passed (for anonymous/no auth)
         String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
         String secretAccessKey = configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
@@ -346,6 +346,9 @@
         if (!response.sdkHttpResponse().isSuccessful()) {
             throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_CONTAINER_NOT_FOUND, container);
         }
+        if (isDeltaTable(configuration)) {
+            validateDeltaTableExists(configuration);
+        }
     }
 
     /**